# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Code generated by the Google Gen AI SDK generator DO NOT EDIT.

import datetime
import json
import logging
import time
from typing import Any, Optional, Union
from urllib.parse import urlencode

from google.genai import _api_module
from google.genai import _common
from google.genai import types as genai_types
from google.genai._common import get_value_by_path as getv
from google.genai._common import set_value_by_path as setv

from . import _prompt_optimizer_utils
from . import types


logger = logging.getLogger("vertexai_genai.promptoptimizer")


def _CustomJobParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["custom_job"]) is not None:
        setv(
            parent_object,
            ["customJob"],
            _CustomJob_to_vertex(getv(from_object, ["custom_job"]), to_object),
        )

    if getv(from_object, ["config"]) is not None:
        setv(to_object, ["config"], getv(from_object, ["config"]))

    return to_object


def _CustomJob_from_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(parent_object, ["displayName"]) is not None:
        setv(to_object, ["display_name"], getv(parent_object, ["displayName"]))

    if getv(parent_object, ["jobSpec"]) is not None:
        setv(to_object, ["job_spec"], getv(parent_object, ["jobSpec"]))

    if getv(parent_object, ["encryptionSpec"]) is not None:
        setv(to_object, ["encryption_spec"], getv(parent_object, ["encryptionSpec"]))

    if getv(from_object, ["state"]) is not None:
        setv(to_object, ["state"], getv(from_object, ["state"]))

    if getv(parent_object, ["error"]) is not None:
        setv(to_object, ["error"], getv(parent_object, ["error"]))

    if getv(from_object, ["createTime"]) is not None:
        setv(to_object, ["create_time"], getv(from_object, ["createTime"]))

    if getv(from_object, ["endTime"]) is not None:
        setv(to_object, ["end_time"], getv(from_object, ["endTime"]))

    if getv(from_object, ["labels"]) is not None:
        setv(to_object, ["labels"], getv(from_object, ["labels"]))

    if getv(from_object, ["name"]) is not None:
        setv(to_object, ["name"], getv(from_object, ["name"]))

    if getv(from_object, ["satisfiesPzi"]) is not None:
        setv(to_object, ["satisfies_pzi"], getv(from_object, ["satisfiesPzi"]))

    if getv(from_object, ["satisfiesPzs"]) is not None:
        setv(to_object, ["satisfies_pzs"], getv(from_object, ["satisfiesPzs"]))

    if getv(from_object, ["startTime"]) is not None:
        setv(to_object, ["start_time"], getv(from_object, ["startTime"]))

    if getv(from_object, ["updateTime"]) is not None:
        setv(to_object, ["update_time"], getv(from_object, ["updateTime"]))

    if getv(from_object, ["webAccessUris"]) is not None:
        setv(to_object, ["web_access_uris"], getv(from_object, ["webAccessUris"]))

    return to_object


def _CustomJob_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["display_name"]) is not None:
        setv(parent_object, ["displayName"], getv(from_object, ["display_name"]))

    if getv(from_object, ["job_spec"]) is not None:
        setv(parent_object, ["jobSpec"], getv(from_object, ["job_spec"]))

    if getv(from_object, ["encryption_spec"]) is not None:
        setv(parent_object, ["encryptionSpec"], getv(from_object, ["encryption_spec"]))

    if getv(from_object, ["state"]) is not None:
        setv(to_object, ["state"], getv(from_object, ["state"]))

    if getv(from_object, ["error"]) is not None:
        setv(parent_object, ["error"], getv(from_object, ["error"]))

    if getv(from_object, ["create_time"]) is not None:
        setv(to_object, ["createTime"], getv(from_object, ["create_time"]))

    if getv(from_object, ["end_time"]) is not None:
        setv(to_object, ["endTime"], getv(from_object, ["end_time"]))

    if getv(from_object, ["labels"]) is not None:
        setv(to_object, ["labels"], getv(from_object, ["labels"]))

    if getv(from_object, ["name"]) is not None:
        setv(to_object, ["name"], getv(from_object, ["name"]))

    if getv(from_object, ["satisfies_pzi"]) is not None:
        setv(to_object, ["satisfiesPzi"], getv(from_object, ["satisfies_pzi"]))

    if getv(from_object, ["satisfies_pzs"]) is not None:
        setv(to_object, ["satisfiesPzs"], getv(from_object, ["satisfies_pzs"]))

    if getv(from_object, ["start_time"]) is not None:
        setv(to_object, ["startTime"], getv(from_object, ["start_time"]))

    if getv(from_object, ["update_time"]) is not None:
        setv(to_object, ["updateTime"], getv(from_object, ["update_time"]))

    if getv(from_object, ["web_access_uris"]) is not None:
        setv(to_object, ["webAccessUris"], getv(from_object, ["web_access_uris"]))

    return to_object


def _GetCustomJobParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["name"]) is not None:
        setv(to_object, ["_url", "name"], getv(from_object, ["name"]))

    if getv(from_object, ["config"]) is not None:
        setv(to_object, ["config"], getv(from_object, ["config"]))

    return to_object


def _OptimizeConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}

    if getv(from_object, ["optimization_target"]) is not None:
        setv(
            parent_object,
            ["optimizationTarget"],
            getv(from_object, ["optimization_target"]),
        )

    return to_object


def _OptimizeRequestParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["content"]) is not None:
        setv(to_object, ["content"], getv(from_object, ["content"]))

    if getv(from_object, ["config"]) is not None:
        setv(
            to_object,
            ["config"],
            _OptimizeConfig_to_vertex(getv(from_object, ["config"]), to_object),
        )

    return to_object


class PromptOptimizer(_api_module.BaseModule):
    """Prompt Optimizer"""

    def _optimize_prompt(
        self,
        *,
        content: Optional[genai_types.ContentOrDict] = None,
        config: Optional[types.OptimizeConfigOrDict] = None,
    ) -> types.OptimizeResponseEndpoint:
        """
        Optimize a single prompt.
        """

        parameter_model = types._OptimizeRequestParameters(
            content=content,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _OptimizeRequestParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "tuningJobs:optimizePrompt".format_map(request_url_dict)
            else:
                path = "tuningJobs:optimizePrompt"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("post", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.OptimizeResponseEndpoint._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def _create_custom_job_resource(
        self,
        *,
        custom_job: types.CustomJobOrDict,
        config: Optional[types.VertexBaseConfigOrDict] = None,
    ) -> types.CustomJob:
        """
        Creates a custom job.
        """

        parameter_model = types._CustomJobParameters(
            custom_job=custom_job,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _CustomJobParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "customJobs".format_map(request_url_dict)
            else:
                path = "customJobs"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("post", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        if self._api_client.vertexai:
            response_dict = _CustomJob_from_vertex(response_dict)

        return_value = types.CustomJob._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def _get_custom_job(
        self, *, name: str, config: Optional[types.VertexBaseConfigOrDict] = None
    ) -> types.CustomJob:
        """
        Gets a custom job.
        """

        parameter_model = types._GetCustomJobParameters(
            name=name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _GetCustomJobParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "customJobs/{name}".format_map(request_url_dict)
            else:
                path = "customJobs/{name}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("get", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        if self._api_client.vertexai:
            response_dict = _CustomJob_from_vertex(response_dict)

        return_value = types.CustomJob._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    """Prompt Optimizer PO-Data."""

    def _wait_for_completion(self, job_name: str) -> types.CustomJob:

        JOB_COMPLETE_STATES = [
            genai_types.JobState.JOB_STATE_SUCCEEDED,
            genai_types.JobState.JOB_STATE_FAILED,
            genai_types.JobState.JOB_STATE_CANCELLED,
            genai_types.JobState.JOB_STATE_PAUSED,
        ]
        JOB_ERROR_STATES = [
            genai_types.JobState.JOB_STATE_FAILED,
            genai_types.JobState.JOB_STATE_CANCELLED,
        ]

        log_wait = 5
        wait_multiplier = 2
        max_wait_time = 60
        previous_time = time.time()

        job = self._get_custom_job(name=job_name)

        while job.state not in JOB_COMPLETE_STATES:
            current_time = time.time()
            if current_time - previous_time >= log_wait:
                logger.info(f"Waiting for job to complete. Current state: {job.state}")
                log_wait = min(log_wait * wait_multiplier, max_wait_time)
                previous_time = current_time
            time.sleep(log_wait)
            job = self._get_custom_job(name=job_name)

        logger.info(f"Job state: {job.state}")

        if job.state in JOB_ERROR_STATES:
            raise RuntimeError(f"Job failed with state: {job.state}")
        else:
            logger.info(f"Job completed with state: {job.state}")
        return job

    def optimize(
        self,
        method: types.PromptOptimizerMethod,
        config: types.PromptOptimizerConfigOrDict,
    ) -> types.CustomJob:
        """Call PO-Data optimizer.

        Args:
          method: The method for optimizing multiple prompts. Supported methods:
            VAPO, OPTIMIZATION_TARGET_GEMINI_NANO.
          config: PromptOptimizerConfig instance containing the
              configuration for prompt optimization.
        Returns:
          The custom job that was created.
        """

        if isinstance(config, dict):
            config = types.PromptOptimizerConfig(**config)

        if not config.config_path:
            raise ValueError("Config path is required.")

        _OPTIMIZER_METHOD_TO_CONTAINER_URI = {
            types.PromptOptimizerMethod.VAPO: "us-docker.pkg.dev/vertex-ai/cair/vaipo:preview_v1_0",
            types.PromptOptimizerMethod.OPTIMIZATION_TARGET_GEMINI_NANO: "us-docker.pkg.dev/vertex-ai/cair/vaipo:preview_android_v1_0",
        }
        container_uri = _OPTIMIZER_METHOD_TO_CONTAINER_URI.get(method)
        if not container_uri:
            raise ValueError(
                'Only "VAPO" and "OPTIMIZATION_TARGET_GEMINI_NANO" '
                "methods are currently supported."
            )

        if config.optimizer_job_display_name:
            display_name = config.optimizer_job_display_name
        else:
            timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
            display_name = f"{method.value.lower()}-optimizer-{timestamp}"

        wait_for_completion = config.wait_for_completion
        bucket = "/".join(config.config_path.split("/")[:-1])

        region = self._api_client.location
        project = self._api_client.project
        container_args = {
            "config": config.config_path,
        }
        args = ["--%s=%s" % (k, v) for k, v in container_args.items()]
        worker_pool_specs = [
            types.WorkerPoolSpec(
                replica_count=1,
                machine_spec=types.MachineSpec(machine_type="n1-standard-4"),
                container_spec=types.ContainerSpec(
                    image_uri=container_uri,
                    args=args,
                ),
            )
        ]

        service_account = _prompt_optimizer_utils._get_service_account(config)

        job_spec = types.CustomJobSpec(
            worker_pool_specs=worker_pool_specs,
            base_output_directory=genai_types.GcsDestination(output_uri_prefix=bucket),
            service_account=service_account,
        )

        custom_job = types.CustomJob(
            display_name=display_name,
            job_spec=job_spec,
        )

        job = self._create_custom_job_resource(
            custom_job=custom_job,
        )

        # Get the job resource name
        job_resource_name = job.name
        if not job_resource_name:
            raise ValueError(f"Error creating job: {job}")
        job_id = job_resource_name.split("/")[-1]
        logger.info("Job created: %s", job.name)

        # Construct the dashboard URL
        dashboard_url = f"https://console.cloud.google.com/vertex-ai/locations/{region}/training/{job_id}/cpu?project={project}"
        logger.info("View the job status at: %s", dashboard_url)

        if wait_for_completion:
            job = self._wait_for_completion(job_id)
        return job

    def optimize_prompt(
        self,
        *,
        prompt: str,
        config: Optional[types.OptimizeConfig] = None,
    ) -> types.OptimizeResponse:
        """Makes an API request to _optimize_prompt and returns the parsed response.

        Example usage:
        client = vertexai.Client(project=PROJECT_NAME, location='us-central1')
        prompt = "Generate system instructions for analyzing medical articles"
        response = client.prompt_optimizer.optimize_prompt(prompt=prompt)
        print(response.suggested_prompt)

        Args:
          prompt: The prompt to optimize.
          config: Optional.The configuration for prompt optimization. To optimize
            prompts from Android API provide
            types.OptimizeConfig(
                optimization_target=types.OptimizeTarget.OPTIMIZATION_TARGET_GEMINI_NANO
            )
        Returns:
          The parsed response from the API request.
        """

        prompt = genai_types.Content(parts=[genai_types.Part(text=prompt)], role="user")
        # TODO: b/435653980 - replace the custom method with a generated method.
        return self._custom_optimize_prompt(
            content=prompt,
            config=config,
        )

    def _custom_optimize_prompt(
        self,
        *,
        content: Optional[genai_types.ContentOrDict] = None,
        config: Optional[types.OptimizeConfigOrDict] = None,
    ) -> types.OptimizeResponse:
        """Optimize a single prompt.

        Sends a request to the tuningJobs:optimizePrompt streaming endpoint.
        Then gathers the response, concatenates into one string and returns
        the parsed response.
        """

        parameter_model = types._OptimizeRequestParameters(
            content=content,
            config=config,
        )
        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _OptimizeRequestParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "tuningJobs:optimizePrompt".format_map(request_url_dict)
            else:
                path = "tuningJobs:optimizePrompt"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[genai_types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("post", path, request_dict, http_options)

        response_list = "" if not response.body else json.loads(response.body)

        return_value = []

        for response_dict in response_list:
            response_value = types.OptimizeResponseEndpoint._from_response(
                response=response_dict, kwargs=parameter_model.model_dump()
            )
            self._api_client._verify_response(response_value)
            if (
                response_value.content is not None
                and len(response_value.content.parts) > 0
                and response_value.content.parts[0].text is not None
            ):
                return_value.append(response_value.content.parts[0].text)

        output = "".join(return_value)
        final_response = types.OptimizeResponse(raw_text_response=output)
        try:
            final_response.parsed_response = _prompt_optimizer_utils._parse(output)
        except Exception as e:
            logger.warning(
                f"Failed to parse response: {e}. Returning only raw_text_response."
            )
        return final_response


class AsyncPromptOptimizer(_api_module.BaseModule):
    """Prompt Optimizer"""

    async def _optimize_prompt(
        self,
        *,
        content: Optional[genai_types.ContentOrDict] = None,
        config: Optional[types.OptimizeConfigOrDict] = None,
    ) -> types.OptimizeResponseEndpoint:
        """
        Optimize a single prompt.
        """

        parameter_model = types._OptimizeRequestParameters(
            content=content,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _OptimizeRequestParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "tuningJobs:optimizePrompt".format_map(request_url_dict)
            else:
                path = "tuningJobs:optimizePrompt"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "post", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.OptimizeResponseEndpoint._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def _create_custom_job_resource(
        self,
        *,
        custom_job: types.CustomJobOrDict,
        config: Optional[types.VertexBaseConfigOrDict] = None,
    ) -> types.CustomJob:
        """
        Creates a custom job.
        """

        parameter_model = types._CustomJobParameters(
            custom_job=custom_job,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _CustomJobParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "customJobs".format_map(request_url_dict)
            else:
                path = "customJobs"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "post", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        if self._api_client.vertexai:
            response_dict = _CustomJob_from_vertex(response_dict)

        return_value = types.CustomJob._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def _get_custom_job(
        self, *, name: str, config: Optional[types.VertexBaseConfigOrDict] = None
    ) -> types.CustomJob:
        """
        Gets a custom job.
        """

        parameter_model = types._GetCustomJobParameters(
            name=name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _GetCustomJobParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "customJobs/{name}".format_map(request_url_dict)
            else:
                path = "customJobs/{name}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "get", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        if self._api_client.vertexai:
            response_dict = _CustomJob_from_vertex(response_dict)

        return_value = types.CustomJob._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    # Todo: b/428953357 - Add example in the README.
    async def optimize(
        self,
        method: types.PromptOptimizerMethod,
        config: types.PromptOptimizerConfigOrDict,
    ) -> types.CustomJob:
        """Call async Vertex AI Prompt Optimizer (VAPO).


        Note: The `wait_for_completion` parameter in the config will be
        ignored when using the AsyncClient, as it is not supported.

        Example usage:
        client = vertexai.Client(project=PROJECT_NAME, location='us-central1')
        vapo_config = vertexai.types.PromptOptimizerConfig(
            config_path='gs://you-bucket-name/your-config.json',
            service_account=service_account,
        )
        job = await client.aio.prompt_optimizer.optimize(
            method=types.PromptOptimizerMethod.VAPO, config=vapo_config)

        Args:
          method: The method for optimizing multiple prompts. Supported methods:
            VAPO, OPTIMIZATION_TARGET_GEMINI_NANO.
          config: PromptOptimizerConfig instance containing the
            configuration for prompt optimization.
        Returns:
          The custom job that was created.
        """
        if isinstance(config, dict):
            config = types.PromptOptimizerConfig(**config)

        if not config.config_path:
            raise ValueError("Config path is required.")

        _OPTIMIZER_METHOD_TO_CONTAINER_URI = {
            types.PromptOptimizerMethod.VAPO: "us-docker.pkg.dev/vertex-ai/cair/vaipo:preview_v1_0",
            types.PromptOptimizerMethod.OPTIMIZATION_TARGET_GEMINI_NANO: "us-docker.pkg.dev/vertex-ai/cair/vaipo:preview_android_v1_0",
        }
        container_uri = _OPTIMIZER_METHOD_TO_CONTAINER_URI.get(method)
        if not container_uri:
            raise ValueError(
                'Only "VAPO" and "OPTIMIZATION_TARGET_GEMINI_NANO" '
                "methods are currently supported."
            )

        if config.wait_for_completion:
            logger.info(
                "Ignoring wait_for_completion=True since the AsyncClient does not support it."
            )

        if config.optimizer_job_display_name:
            display_name = config.optimizer_job_display_name
        else:
            timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
            display_name = f"{method.value.lower()}-optimizer-{timestamp}"

        if not config.config_path:
            raise ValueError("Config path is required.")
        bucket = "/".join(config.config_path.split("/")[:-1])

        region = self._api_client.location
        project = self._api_client.project
        container_args = {
            "config": config.config_path,
        }
        args = ["--%s=%s" % (k, v) for k, v in container_args.items()]
        worker_pool_specs = [
            types.WorkerPoolSpec(
                replica_count=1,
                machine_spec=types.MachineSpec(machine_type="n1-standard-4"),
                container_spec=types.ContainerSpec(
                    image_uri=container_uri,
                    args=args,
                ),
            )
        ]

        service_account = _prompt_optimizer_utils._get_service_account(config)

        job_spec = types.CustomJobSpec(
            worker_pool_specs=worker_pool_specs,
            base_output_directory=genai_types.GcsDestination(output_uri_prefix=bucket),
            service_account=service_account,
        )

        custom_job = types.CustomJob(
            display_name=display_name,
            job_spec=job_spec,
        )

        job = await self._create_custom_job_resource(
            custom_job=custom_job,
        )

        # Get the job id for the dashboard url and display to the user.
        job_resource_name = job.name
        if not job_resource_name:
            raise ValueError(f"Error creating job: {job}")
        job_id = job_resource_name.split("/")[-1]
        logger.info("Job created: %s", job.name)

        # Construct the dashboard URL to show to the user.
        dashboard_url = f"https://console.cloud.google.com/vertex-ai/locations/{region}/training/{job_id}/cpu?project={project}"
        logger.info("View the job status at: %s", dashboard_url)

        return job

    async def _custom_optimize_prompt(
        self,
        *,
        content: Optional[genai_types.ContentOrDict] = None,
        config: Optional[types.OptimizeConfigOrDict] = None,
    ) -> types.OptimizeResponse:
        """Optimize a single prompt."""

        parameter_model = types._OptimizeRequestParameters(
            content=content,
            config=config,
        )
        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _OptimizeRequestParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "tuningJobs:optimizePrompt".format_map(request_url_dict)
            else:
                path = "tuningJobs:optimizePrompt"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "post", path, request_dict, http_options
        )

        response_list = "" if not response.body else json.loads(response.body)

        return_value = []

        for response_dict in response_list:
            response_value = types.OptimizeResponseEndpoint._from_response(
                response=response_dict, kwargs=parameter_model.model_dump()
            )
            self._api_client._verify_response(response_value)
            if (
                response_value.content is not None
                and len(response_value.content.parts) > 0
                and response_value.content.parts[0].text is not None
            ):
                return_value.append(response_value.content.parts[0].text)

        output = "".join(return_value)
        final_response = types.OptimizeResponse(raw_text_response=output)
        try:
            final_response.parsed_response = _prompt_optimizer_utils._parse(output)
        except Exception as e:
            logger.warning(
                f"Failed to parse response: {e}. Returning only raw_text_response."
            )
        return final_response

    async def optimize_prompt(
        self,
        *,
        prompt: str,
        config: Optional[types.OptimizeConfig] = None,
    ) -> types.OptimizeResponse:
        """Makes an async request to _optimize_prompt and returns an optimized prompt.

        Example usage:
        client = vertexai.Client(project=PROJECT_NAME, location='us-central1')
        prompt = "Generate system instructions for analyzing medical articles"
        response = await client.aio.prompt_optimizer.optimize_prompt(prompt=prompt)

        Args:
          prompt: The prompt to optimize.
          config: Optional.The configuration for prompt optimization. To optimize
            prompts from Android API provide
            types.OptimizeConfig(
                optimization_target=types.OptimizeTarget.OPTIMIZATION_TARGET_GEMINI_NANO
            )
        Returns:
          The parsed response from the API request.
        """

        prompt = genai_types.Content(parts=[genai_types.Part(text=prompt)], role="user")
        # TODO: b/435653980 - replace the custom method with a generated method.
        return await self._custom_optimize_prompt(
            content=prompt,
            config=config,
        )
