# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Code generated by the Google Gen AI SDK generator DO NOT EDIT.

import asyncio
import json
import logging
import time
from typing import Any, AsyncIterator, Iterator, Optional, Union
from urllib.parse import urlencode

from google.genai import _api_module
from google.genai import _common
from google.genai import operations
from google.genai import types as genai_types
from google.genai._common import get_value_by_path as getv
from google.genai._common import set_value_by_path as setv
from google.genai.pagers import AsyncPager, Pager

from . import _prompt_management_utils
from . import types


logger = logging.getLogger("vertexai_genai.prompts")


def _CreateDatasetParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["name"]) is not None:
        setv(to_object, ["name"], getv(from_object, ["name"]))

    if getv(from_object, ["display_name"]) is not None:
        setv(to_object, ["displayName"], getv(from_object, ["display_name"]))

    if getv(from_object, ["metadata_schema_uri"]) is not None:
        setv(
            to_object, ["metadataSchemaUri"], getv(from_object, ["metadata_schema_uri"])
        )

    if getv(from_object, ["metadata"]) is not None:
        setv(to_object, ["metadata"], getv(from_object, ["metadata"]))

    if getv(from_object, ["description"]) is not None:
        setv(to_object, ["description"], getv(from_object, ["description"]))

    if getv(from_object, ["encryption_spec"]) is not None:
        setv(to_object, ["encryptionSpec"], getv(from_object, ["encryption_spec"]))

    if getv(from_object, ["model_reference"]) is not None:
        setv(to_object, ["modelReference"], getv(from_object, ["model_reference"]))

    if getv(from_object, ["config"]) is not None:
        setv(to_object, ["config"], getv(from_object, ["config"]))

    return to_object


def _CreateDatasetVersionParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["dataset_name"]) is not None:
        setv(to_object, ["_url", "name"], getv(from_object, ["dataset_name"]))

    if getv(from_object, ["metadata"]) is not None:
        setv(to_object, ["metadata"], getv(from_object, ["metadata"]))

    if getv(from_object, ["model_reference"]) is not None:
        setv(to_object, ["modelReference"], getv(from_object, ["model_reference"]))

    if getv(from_object, ["parent"]) is not None:
        setv(to_object, ["parent"], getv(from_object, ["parent"]))

    if getv(from_object, ["display_name"]) is not None:
        setv(to_object, ["displayName"], getv(from_object, ["display_name"]))

    if getv(from_object, ["config"]) is not None:
        setv(to_object, ["config"], getv(from_object, ["config"]))

    return to_object


def _DeleteDatasetRequestParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["prompt_id"]) is not None:
        setv(to_object, ["_url", "dataset_id"], getv(from_object, ["prompt_id"]))

    if getv(from_object, ["config"]) is not None:
        setv(to_object, ["config"], getv(from_object, ["config"]))

    return to_object


def _DeletePromptVersionRequestParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["prompt_id"]) is not None:
        setv(to_object, ["_url", "dataset_id"], getv(from_object, ["prompt_id"]))

    if getv(from_object, ["version_id"]) is not None:
        setv(to_object, ["_url", "version_id"], getv(from_object, ["version_id"]))

    if getv(from_object, ["config"]) is not None:
        setv(to_object, ["config"], getv(from_object, ["config"]))

    return to_object


def _GetDatasetOperationParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["dataset_id"]) is not None:
        setv(to_object, ["_url", "dataset_id"], getv(from_object, ["dataset_id"]))

    if getv(from_object, ["operation_id"]) is not None:
        setv(to_object, ["_url", "operation_id"], getv(from_object, ["operation_id"]))

    if getv(from_object, ["config"]) is not None:
        setv(to_object, ["config"], getv(from_object, ["config"]))

    return to_object


def _GetDatasetParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["name"]) is not None:
        setv(to_object, ["_url", "name"], getv(from_object, ["name"]))

    if getv(from_object, ["config"]) is not None:
        setv(to_object, ["config"], getv(from_object, ["config"]))

    return to_object


def _GetDatasetVersionParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["dataset_id"]) is not None:
        setv(to_object, ["_url", "dataset_id"], getv(from_object, ["dataset_id"]))

    if getv(from_object, ["dataset_version_id"]) is not None:
        setv(
            to_object,
            ["_url", "dataset_version_id"],
            getv(from_object, ["dataset_version_id"]),
        )

    if getv(from_object, ["config"]) is not None:
        setv(to_object, ["config"], getv(from_object, ["config"]))

    return to_object


def _ListDatasetVersionsRequestParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["read_mask"]) is not None:
        setv(to_object, ["_url", "read_mask"], getv(from_object, ["read_mask"]))

    if getv(from_object, ["dataset_id"]) is not None:
        setv(to_object, ["_url", "dataset_id"], getv(from_object, ["dataset_id"]))

    if getv(from_object, ["config"]) is not None:
        setv(
            to_object,
            ["config"],
            _ListPromptsConfig_to_vertex(getv(from_object, ["config"]), to_object),
        )

    return to_object


def _ListDatasetsRequestParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["config"]) is not None:
        setv(
            to_object,
            ["config"],
            _ListPromptsConfig_to_vertex(getv(from_object, ["config"]), to_object),
        )

    return to_object


def _ListPromptsConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}

    if getv(from_object, ["page_size"]) is not None:
        setv(parent_object, ["_query", "pageSize"], getv(from_object, ["page_size"]))

    if getv(from_object, ["page_token"]) is not None:
        setv(parent_object, ["_query", "pageToken"], getv(from_object, ["page_token"]))

    if getv(from_object, ["filter"]) is not None:
        setv(parent_object, ["_query", "filter"], getv(from_object, ["filter"]))

    return to_object


def _RestoreVersionRequestParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["dataset_id"]) is not None:
        setv(to_object, ["_url", "dataset_id"], getv(from_object, ["dataset_id"]))

    if getv(from_object, ["version_id"]) is not None:
        setv(to_object, ["_url", "version_id"], getv(from_object, ["version_id"]))

    if getv(from_object, ["config"]) is not None:
        setv(to_object, ["config"], getv(from_object, ["config"]))

    return to_object


def _UpdateDatasetParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["name"]) is not None:
        setv(to_object, ["name"], getv(from_object, ["name"]))

    if getv(from_object, ["dataset_id"]) is not None:
        setv(to_object, ["_url", "dataset_id"], getv(from_object, ["dataset_id"]))

    if getv(from_object, ["display_name"]) is not None:
        setv(to_object, ["displayName"], getv(from_object, ["display_name"]))

    if getv(from_object, ["metadata"]) is not None:
        setv(to_object, ["metadata"], getv(from_object, ["metadata"]))

    if getv(from_object, ["description"]) is not None:
        setv(to_object, ["description"], getv(from_object, ["description"]))

    if getv(from_object, ["encryption_spec"]) is not None:
        setv(to_object, ["encryptionSpec"], getv(from_object, ["encryption_spec"]))

    if getv(from_object, ["model_reference"]) is not None:
        setv(to_object, ["modelReference"], getv(from_object, ["model_reference"]))

    if getv(from_object, ["config"]) is not None:
        setv(to_object, ["config"], getv(from_object, ["config"]))

    return to_object


class Prompts(_api_module.BaseModule):

    def _create_dataset_resource(
        self,
        *,
        name: Optional[str] = None,
        display_name: Optional[str] = None,
        metadata_schema_uri: Optional[str] = None,
        metadata: Optional[types.SchemaTextPromptDatasetMetadataOrDict] = None,
        description: Optional[str] = None,
        encryption_spec: Optional[genai_types.EncryptionSpecOrDict] = None,
        model_reference: Optional[str] = None,
        config: Optional[types.CreateDatasetConfigOrDict] = None,
    ) -> types.DatasetOperation:
        """
        Creates a dataset resource to store prompts.
        """

        parameter_model = types._CreateDatasetParameters(
            name=name,
            display_name=display_name,
            metadata_schema_uri=metadata_schema_uri,
            metadata=metadata,
            description=description,
            encryption_spec=encryption_spec,
            model_reference=model_reference,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _CreateDatasetParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "datasets".format_map(request_url_dict)
            else:
                path = "datasets"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("post", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.DatasetOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def _create_dataset_version_resource(
        self,
        *,
        dataset_name: Optional[str] = None,
        metadata: Optional[types.SchemaTextPromptDatasetMetadataOrDict] = None,
        model_reference: Optional[str] = None,
        parent: Optional[str] = None,
        display_name: Optional[str] = None,
        config: Optional[types.CreateDatasetVersionConfigOrDict] = None,
    ) -> types.DatasetOperation:
        """
        Creates a dataset version resource to store prompts.
        """

        parameter_model = types._CreateDatasetVersionParameters(
            dataset_name=dataset_name,
            metadata=metadata,
            model_reference=model_reference,
            parent=parent,
            display_name=display_name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _CreateDatasetVersionParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "datasets/{name}/datasetVersions".format_map(request_url_dict)
            else:
                path = "datasets/{name}/datasetVersions"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("post", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.DatasetOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def _get_dataset_resource(
        self,
        *,
        name: Optional[str] = None,
        config: Optional[types.VertexBaseConfigOrDict] = None,
    ) -> types.Dataset:
        """
        Gets a dataset resource to store prompts.
        """

        parameter_model = types._GetDatasetParameters(
            name=name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _GetDatasetParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "datasets/{name}".format_map(request_url_dict)
            else:
                path = "datasets/{name}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("get", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.Dataset._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def _get_dataset_version_resource(
        self,
        *,
        dataset_id: Optional[str] = None,
        dataset_version_id: Optional[str] = None,
        config: Optional[types.VertexBaseConfigOrDict] = None,
    ) -> types.DatasetVersion:
        """
        Gets a dataset version resource to store prompts.
        """

        parameter_model = types._GetDatasetVersionParameters(
            dataset_id=dataset_id,
            dataset_version_id=dataset_version_id,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _GetDatasetVersionParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "datasets/{dataset_id}/datasetVersions/{dataset_version_id}".format_map(
                    request_url_dict
                )
            else:
                path = "datasets/{dataset_id}/datasetVersions/{dataset_version_id}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("get", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.DatasetVersion._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def _get_dataset_operation(
        self,
        *,
        dataset_id: Optional[str] = None,
        operation_id: Optional[str] = None,
        config: Optional[types.GetDatasetOperationConfigOrDict] = None,
    ) -> types.DatasetOperation:
        """
        Gets the operation from creating a dataset.
        """

        parameter_model = types._GetDatasetOperationParameters(
            dataset_id=dataset_id,
            operation_id=operation_id,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _GetDatasetOperationParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "datasets/{dataset_id}/operations/{operation_id}".format_map(
                    request_url_dict
                )
            else:
                path = "datasets/{dataset_id}/operations/{operation_id}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("get", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.DatasetOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def _list_prompts(
        self, *, config: Optional[types.ListPromptsConfigOrDict] = None
    ) -> types.ListDatasetsResponse:
        """
        Lists Agent Engines.
        """

        parameter_model = types._ListDatasetsRequestParameters(
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _ListDatasetsRequestParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "datasets".format_map(request_url_dict)
            else:
                path = "datasets"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("get", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.ListDatasetsResponse._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def _list_versions(
        self,
        *,
        read_mask: Optional[str] = None,
        dataset_id: Optional[str] = None,
        config: Optional[types.ListPromptsConfigOrDict] = None,
    ) -> types.ListDatasetVersionsResponse:
        """
        Lists Agent Engines.
        """

        parameter_model = types._ListDatasetVersionsRequestParameters(
            read_mask=read_mask,
            dataset_id=dataset_id,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _ListDatasetVersionsRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "datasets/{dataset_id}/datasetVersions".format_map(
                    request_url_dict
                )
            else:
                path = "datasets/{dataset_id}/datasetVersions"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("get", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.ListDatasetVersionsResponse._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def _delete_dataset(
        self, *, prompt_id: str, config: Optional[types.DeletePromptConfigOrDict] = None
    ) -> types.DeletePromptOperation:
        parameter_model = types._DeleteDatasetRequestParameters(
            prompt_id=prompt_id,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _DeleteDatasetRequestParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "datasets/{dataset_id}".format_map(request_url_dict)
            else:
                path = "datasets/{dataset_id}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("delete", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.DeletePromptOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def _delete_dataset_version(
        self,
        *,
        prompt_id: str,
        version_id: str,
        config: Optional[types.DeletePromptConfigOrDict] = None,
    ) -> types.DeletePromptVersionOperation:
        parameter_model = types._DeletePromptVersionRequestParameters(
            prompt_id=prompt_id,
            version_id=version_id,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _DeletePromptVersionRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "datasets/{dataset_id}/datasetVersions/{version_id}".format_map(
                    request_url_dict
                )
            else:
                path = "datasets/{dataset_id}/datasetVersions/{version_id}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("delete", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.DeletePromptVersionOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def _restore_version(
        self,
        *,
        dataset_id: str,
        version_id: str,
        config: Optional[types.RestoreVersionConfigOrDict] = None,
    ) -> types.RestoreVersionOperation:
        """
        Restores the provided prompt version to the latest version.
        """

        parameter_model = types._RestoreVersionRequestParameters(
            dataset_id=dataset_id,
            version_id=version_id,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _RestoreVersionRequestParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "datasets/{dataset_id}/datasetVersions/{version_id}:restore".format_map(
                    request_url_dict
                )
            else:
                path = "datasets/{dataset_id}/datasetVersions/{version_id}:restore"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("get", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.RestoreVersionOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def _update_dataset_resource(
        self,
        *,
        name: Optional[str] = None,
        dataset_id: Optional[str] = None,
        display_name: Optional[str] = None,
        metadata: Optional[types.SchemaTextPromptDatasetMetadataOrDict] = None,
        description: Optional[str] = None,
        encryption_spec: Optional[genai_types.EncryptionSpecOrDict] = None,
        model_reference: Optional[str] = None,
        config: Optional[types.UpdateDatasetConfigOrDict] = None,
    ) -> types.Dataset:
        """
        Creates a dataset resource to store prompts.
        """

        parameter_model = types._UpdateDatasetParameters(
            name=name,
            dataset_id=dataset_id,
            display_name=display_name,
            metadata=metadata,
            description=description,
            encryption_spec=encryption_spec,
            model_reference=model_reference,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _UpdateDatasetParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "datasets/{dataset_id}".format_map(request_url_dict)
            else:
                path = "datasets/{dataset_id}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("patch", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.Dataset._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def create(
        self,
        *,
        prompt: types.PromptOrDict,
        config: Optional[types.CreatePromptConfigOrDict] = None,
    ) -> types.Prompt:
        """Creates a new prompt in a Vertex Dataset resource.

        This method waits for prompt creation to be complete before returning.

        Args:
          prompt: The prompt to create a version for.
          config: Optional configuration for creating the prompt.

        Returns:
          A types.Prompt object representing the prompt with its associated
          Dataset resources.
        """
        if isinstance(prompt, dict):
            prompt = types.Prompt(**prompt)
        if isinstance(config, dict):
            config = types.CreatePromptConfig(**config)
        elif not config:
            config = types.CreatePromptConfig()

        _prompt_management_utils._raise_for_invalid_prompt(prompt)

        prompt_metadata = _prompt_management_utils._create_dataset_metadata_from_prompt(
            prompt,
            variables=(
                prompt.prompt_data.variables
                if prompt.prompt_data and prompt.prompt_data.variables
                else None
            ),
        )

        # Step 1: Create the dataset resource for the prompt and wait for the operation to complete.
        create_prompt_dataset_operation = self._create_dataset_resource(
            display_name=(
                config.prompt_display_name
                if config and config.prompt_display_name
                else f"prompt_{time.strftime('%Y%m%d-%H%M%S')}"
            ),
            name=f"projects/{self._api_client.project}/locations/{self._api_client.location}",
            metadata_schema_uri=_prompt_management_utils.PROMPT_SCHEMA_URI,
            metadata=prompt_metadata,
            model_reference=prompt.prompt_data.model,
            encryption_spec=(
                config.encryption_spec if config and config.encryption_spec else None
            ),
        )
        dataset_resource_name = self._wait_for_operation(
            operation=create_prompt_dataset_operation,
            timeout=config.timeout if config else 90,
        )
        dataset_id = dataset_resource_name.split("/")[-1]

        # Step 2: Get the dataset resource
        dataset_resource = self._get_dataset_resource(
            name=dataset_id,
        )
        prompt._dataset = dataset_resource
        return prompt

    def create_version(
        self,
        *,
        prompt_id: str,
        prompt: types.PromptOrDict,
        config: Optional[types.CreatePromptVersionConfigOrDict] = None,
    ) -> types.Prompt:
        """Creates a new version of a prompt in the prompt resource associated with the provided prompt_id.

        When creating new prompt version resources, this waits for
        the create operation to complete before returning.

        Args:
          prompt_id: The ID of the prompt to create a version for.
          prompt: The prompt to create a version for.
          config: Optional configuration for creating the prompt version.

        Returns:
          A types.Prompt object representing the prompt with its associated
          Dataset and Dataset Version resources.
        """
        if isinstance(prompt, dict):
            prompt = types.Prompt(**prompt)
        if isinstance(config, dict):
            config = types.CreatePromptVersionConfig(**config)
        elif not config:
            config = types.CreatePromptVersionConfig()

        _prompt_management_utils._raise_for_invalid_prompt(prompt)

        if config and config.version_display_name:
            version_name = config.version_display_name
        else:
            version_name = None

        # Step 1: Get the dataset resource
        dataset_resource = self._get_dataset_resource(name=prompt_id)

        # Step 2: Update the dataset with the new prompt metadata
        updated_dataset_resource = self._update_dataset_resource(
            dataset_id=prompt_id,
            display_name=dataset_resource.display_name,
            metadata=_prompt_management_utils._create_dataset_metadata_from_prompt(
                prompt,
                variables=(
                    prompt.prompt_data.variables
                    if prompt.prompt_data and prompt.prompt_data.variables
                    else None
                ),
            ),
            model_reference=prompt.prompt_data.model,
        )

        # Step 3: Create the dataset version
        create_dataset_version_operation = self._create_dataset_version_resource(
            dataset_name=prompt_id,
            display_name=(
                version_name
                if version_name
                else f"prompt_version_{time.strftime('%Y%m%d-%H%M%S')}"
            ),
        )
        dataset_version_resource_name = self._wait_for_operation(
            operation=create_dataset_version_operation,
            timeout=config.timeout if config else 90,
        )

        # Step 4: Get the dataset version resource and return it with the prompt
        dataset_version_resource = self._get_dataset_version_resource(
            dataset_id=prompt_id,
            dataset_version_id=dataset_version_resource_name.split("/")[-1],
        )
        prompt = _prompt_management_utils._create_prompt_from_dataset_metadata(
            dataset_version_resource
        )
        prompt._dataset = updated_dataset_resource
        prompt._dataset_version = dataset_version_resource
        return prompt

    def _wait_for_operation(
        self,
        operation: types.DatasetOperation,
        timeout: int,
    ) -> str:
        """Waits for a dataset operation to complete.

        Args:
          operation: The dataset operation to wait for.
          timeout: The maximum time to wait for the operation to complete.

        Returns:
          The name of the Dataset resource from the operation result.

        Raises:
          TimeoutError: If the operation does not complete within the timeout.
          ValueError: If the operation fails.
        """
        done = False
        prompt_dataset_operation: Optional[types.DatasetOperation] = None

        response_operation_name = operation.name
        dataset_id = response_operation_name.split("/datasets/")[1].split("/")[0]
        operation_id = response_operation_name.split("/")[-1]

        start_time = time.time()
        sleep_duration = 5
        wait_multiplier = 2
        max_wait_time = 60
        previous_time = time.time()

        while not done:
            if (time.time() - start_time) > timeout:
                raise TimeoutError(
                    "Create prompt operation did not complete within the"
                    f" specified timeout of {timeout} seconds."
                )
            current_time = time.time()
            if current_time - previous_time >= sleep_duration:
                sleep_duration = min(sleep_duration * wait_multiplier, max_wait_time)
                previous_time = current_time
            time.sleep(sleep_duration)
            prompt_dataset_operation = self._get_dataset_operation(
                dataset_id=dataset_id,
                operation_id=operation_id,
            )
            done = (
                prompt_dataset_operation.done
                if hasattr(prompt_dataset_operation, "done")
                else False
            )
        if (
            not prompt_dataset_operation
            or prompt_dataset_operation.response is None
            or prompt_dataset_operation.response.get("name") is None
        ):
            raise ValueError("Error creating prompt version resource.")
        if (
            hasattr(prompt_dataset_operation, "error")
            and prompt_dataset_operation.error is not None
        ):
            raise ValueError(
                f"Error creating prompt version resource: {prompt_dataset_operation.error}"
            )
        return prompt_dataset_operation.response.get("name")

    def get(
        self,
        *,
        prompt_id: str,
        config: Optional[types.GetPromptConfig] = None,
    ) -> types.Prompt:
        """Gets a prompt resource from a Vertex Dataset.

        Args:
          prompt_id: The id of the Vertex Dataset resource containing the prompt. For example, if the prompt resource name is "projects/123/locations/us-central1/datasets/456", then the prompt_id is "456".
          config: Optional configuration for getting the prompt.

        Returns:
            A types.Prompt object representing the prompt with its associated Dataset resources.
        """

        prompt_dataset_resource = self._get_dataset_resource(name=prompt_id)
        prompt = _prompt_management_utils._create_prompt_from_dataset_metadata(
            prompt_dataset_resource,
        )
        prompt._dataset = prompt_dataset_resource

        return prompt

    def get_version(
        self,
        *,
        prompt_id: str,
        version_id: str,
        config: Optional[types.GetPromptConfig] = None,
    ) -> types.Prompt:
        """Gets a prompt resource from a Vertex Dataset.

        Args:
          prompt_id: The id of the Vertex Dataset resource containing the prompt. For example, if the prompt resource name is "projects/123/locations/us-central1/datasets/456", then the prompt_id is "456".
          version_id: The id of the Vertex Dataset Version resource containing the prompt version. For example, if the prompt version resource name is "projects/123/locations/us-central1/datasets/456/datasetVersions/1", then the version_id is "1".
          config: Optional configuration for getting the prompt.

        Returns:
            A types.Prompt object representing the prompt with its associated Dataset and Dataset Version resources.
        """

        prompt_dataset_resource = self._get_dataset_resource(name=prompt_id)
        prompt = _prompt_management_utils._create_prompt_from_dataset_metadata(
            prompt_dataset_resource,
        )
        prompt._dataset = prompt_dataset_resource

        prompt_version_resource = self._get_dataset_version_resource(
            dataset_id=prompt_id,
            dataset_version_id=version_id,
        )
        prompt._dataset_version = prompt_version_resource

        return prompt

    def _list_prompts_pager(
        self,
        *,
        config: Optional[types.ListPromptsConfigOrDict] = None,
    ) -> Pager[types.Dataset]:
        return Pager(
            "datasets",
            self._list_prompts,
            self._list_prompts(config=config),
            config,
        )

    def _list_versions_pager(
        self,
        *,
        prompt_id: str,
        config: Optional[types.ListPromptsConfigOrDict] = None,
    ) -> Pager[types.DatasetVersion]:
        return Pager(
            "dataset_versions",
            self._list_versions,
            self._list_versions(config=config, dataset_id=prompt_id),
            config,
        )

    def list(
        self,
        *,
        config: Optional[types.ListPromptsConfigOrDict] = None,
    ) -> Iterator[types.PromptRef]:
        """Lists prompt resources in a project.

        This method retrieves all the prompts from the project provided in the
        vertexai.Client constructor and returns a list of prompt references containing the prompt_id and model for the prompt.

        To get the full types.Prompt resource for a PromptRef after calling this method, use the get() method with the prompt_id as the prompt_id argument.
        Example usage:

        ```
        # Using an iterator
        prompt_refs = client.prompt_management.list_prompts()
        for prompt_ref in prompt_refs:
          client.prompt_management.get(prompt_id=prompt_ref.prompt_id)

        # Using a list
        prompts_list = list(client.prompt_management.list_prompts())
        client.prompt_management.get(prompt_id=prompts_list[0].prompt_id)
        ```

        Args:
          config: Optional configuration for listing prompts.

        Returns:
            An iterable of types.PromptRef objects.
        """
        if isinstance(config, dict):
            config = types.ListPromptsConfig(**config)
        elif not config:
            config = types.ListPromptsConfig()
        for dataset in self._list_prompts_pager(config=config):
            prompt_ref = types.PromptRef(
                model=dataset.model_reference, prompt_id=dataset.name.split("/")[-1]
            )
            yield prompt_ref

    def list_versions(
        self,
        *,
        prompt_id: str,
        config: Optional[types.ListPromptsConfigOrDict] = None,
    ) -> Iterator[types.PromptVersionRef]:
        """Lists prompt version resources for a provided prompt_id.

        This method retrieves all the prompt versions for a provided prompt_id.

        To get the full types.Prompt resource for a PromptVersionRef after calling this method, use the get() method with the returned prompt_id and version_id.
        Example usage:

        ```
        # Using an iterator
        prompt_version_refs = client.prompt_management.list_versions(prompt_id="123")
        for version_ref in prompt_version_refs:
          client.prompt_management.get(prompt_id=version_ref.prompt_id, version_id=version_ref.version_id)

        # Using a list
        prompt_versions_list = list(client.prompt_management.list_versions(prompt_id="123"))
        client.prompt_management.get(prompt_id=prompt_versions_list[0].prompt_id, version_id=prompt_versions_list[0].version_id)
        ```

        Args:
          prompt_id: The id of the Vertex Dataset resource containing the prompt. For example, if the prompt resource name is "projects/123/locations/us-central1/datasets/456", then the prompt_id is "456".
          config: Optional configuration for listing prompts.

        Returns:
            An iterable of types.PromptVersionRef objects representing the prompt version resources for the provided prompt_id.

        """
        if isinstance(config, dict):
            config = types.ListPromptsConfig(**config)
        elif not config:
            config = types.ListPromptsConfig()
        for dataset_version in self._list_versions_pager(
            config=config, prompt_id=prompt_id
        ):
            prompt_version_ref = types.PromptVersionRef(
                model=dataset_version.model_reference,
                version_id=dataset_version.name.split("/")[-1],
                prompt_id=prompt_id,
            )
            yield prompt_version_ref

    def _wait_for_project_operation(
        self,
        operation: types.DatasetOperation,
        timeout: int,
    ) -> None:
        """Waits for a dataset deletion operation to complete.

        Delete operations are project level operations and are separate from dataset resource operations, for example: projects/123/locations/us-central1/operations/789.

        Args:
          operation: The project operation to wait for.
          timeout: The maximum time to wait for the operation to complete.
        Raises:
          TimeoutError: If the operation does not complete within the timeout.
          ValueError: If the operation fails.
        """
        done = False

        start_time = time.time()
        sleep_duration = 5
        wait_multiplier = 2
        max_wait_time = 60
        previous_time = time.time()
        while not done:
            if (time.time() - start_time) > timeout:
                raise TimeoutError(
                    f"Delete operation did not complete within the"
                    f" specified timeout of {timeout} seconds."
                )
            current_time = time.time()
            if current_time - previous_time >= sleep_duration:
                sleep_duration = min(sleep_duration * wait_multiplier, max_wait_time)
                previous_time = current_time
            time.sleep(sleep_duration)
            operations_module = operations.Operations(api_client_=self._api_client)

            operation = operations_module._get(
                operation_id=operation.name.split("/")[-1],
            )
            done = operation.done if hasattr(operation, "done") else False
        if hasattr(operation, "error") and operation.error is not None:
            raise ValueError(f"Error in delete operation: {operation.error}")

    def delete(
        self,
        *,
        prompt_id: str,
        config: Optional[types.DeletePromptConfig] = None,
    ) -> None:
        """Deletes a prompt resource.

        Args:
          prompt_id: The id of the prompt resource to delete.

        Raises:
          TimeoutError: If the delete operation does not complete within the timeout.
          ValueError: If the delete operation fails.
        """

        delete_prompt_operation = self._delete_dataset(
            prompt_id=prompt_id,
            config=config,
        )
        self._wait_for_project_operation(
            operation=delete_prompt_operation, timeout=config.timeout if config else 90
        )
        logger.info(f"Deleted prompt with id: {prompt_id}")

    def delete_version(
        self,
        *,
        prompt_id: str,
        version_id: str,
        config: Optional[types.DeletePromptConfig] = None,
    ) -> None:
        """Deletes a prompt version resource.

        Args:
          prompt_id: The id of the prompt resource to delete.
          version_id: The id of the prompt version resource to delete.

        Raises:
          TimeoutError: If the delete operation does not complete within the timeout.
          ValueError: If the delete operation fails.
        """
        delete_version_operation = self._delete_dataset_version(
            prompt_id=prompt_id,
            version_id=version_id,
            config=config,
        )

        self._wait_for_project_operation(
            operation=delete_version_operation, timeout=config.timeout if config else 90
        )
        logger.info(
            f"Deleted prompt version {version_id} from prompt with id: {prompt_id}"
        )

    def restore_version(
        self,
        *,
        prompt_id: str,
        version_id: str,
        config: Optional[types.RestoreVersionConfig] = None,
    ) -> types.Prompt:
        """Restores the provided prompt version to the latest version.

        Args:
          prompt_id: The id of the Vertex Dataset resource containing the prompt. For example, if the prompt resource name is "projects/123/locations/us-central1/datasets/456", then the prompt_id is "456".
          version_id: The id of the Vertex Dataset Version resource to restore. For example, if the version resource name is "projects/123/locations/us-central1/datasets/456/datasetVersions/789", then the version_id is "789".
          config: Optional configuration for restoring the prompt version.

        Returns:
            A types.Prompt object representing the prompt with the updated Dataset Version resource.
        """

        restore_prompt_operation = self._restore_version(
            dataset_id=prompt_id,
            version_id=version_id,
        )
        self._wait_for_project_operation(
            operation=restore_prompt_operation,
            timeout=config.timeout if config else 90,
        )
        dataset_version_resource = self._get_dataset_version_resource(
            dataset_id=prompt_id,
            dataset_version_id=version_id,
        )
        updated_prompt = _prompt_management_utils._create_prompt_from_dataset_metadata(
            dataset_version_resource,
        )
        updated_prompt._dataset_version = dataset_version_resource
        return updated_prompt


class AsyncPrompts(_api_module.BaseModule):

    async def _create_dataset_resource(
        self,
        *,
        name: Optional[str] = None,
        display_name: Optional[str] = None,
        metadata_schema_uri: Optional[str] = None,
        metadata: Optional[types.SchemaTextPromptDatasetMetadataOrDict] = None,
        description: Optional[str] = None,
        encryption_spec: Optional[genai_types.EncryptionSpecOrDict] = None,
        model_reference: Optional[str] = None,
        config: Optional[types.CreateDatasetConfigOrDict] = None,
    ) -> types.DatasetOperation:
        """
        Creates a dataset resource to store prompts.
        """

        parameter_model = types._CreateDatasetParameters(
            name=name,
            display_name=display_name,
            metadata_schema_uri=metadata_schema_uri,
            metadata=metadata,
            description=description,
            encryption_spec=encryption_spec,
            model_reference=model_reference,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _CreateDatasetParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "datasets".format_map(request_url_dict)
            else:
                path = "datasets"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "post", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.DatasetOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def _create_dataset_version_resource(
        self,
        *,
        dataset_name: Optional[str] = None,
        metadata: Optional[types.SchemaTextPromptDatasetMetadataOrDict] = None,
        model_reference: Optional[str] = None,
        parent: Optional[str] = None,
        display_name: Optional[str] = None,
        config: Optional[types.CreateDatasetVersionConfigOrDict] = None,
    ) -> types.DatasetOperation:
        """
        Creates a dataset version resource to store prompts.
        """

        parameter_model = types._CreateDatasetVersionParameters(
            dataset_name=dataset_name,
            metadata=metadata,
            model_reference=model_reference,
            parent=parent,
            display_name=display_name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _CreateDatasetVersionParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "datasets/{name}/datasetVersions".format_map(request_url_dict)
            else:
                path = "datasets/{name}/datasetVersions"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "post", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.DatasetOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def _get_dataset_resource(
        self,
        *,
        name: Optional[str] = None,
        config: Optional[types.VertexBaseConfigOrDict] = None,
    ) -> types.Dataset:
        """
        Gets a dataset resource to store prompts.
        """

        parameter_model = types._GetDatasetParameters(
            name=name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _GetDatasetParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "datasets/{name}".format_map(request_url_dict)
            else:
                path = "datasets/{name}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "get", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.Dataset._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def _get_dataset_version_resource(
        self,
        *,
        dataset_id: Optional[str] = None,
        dataset_version_id: Optional[str] = None,
        config: Optional[types.VertexBaseConfigOrDict] = None,
    ) -> types.DatasetVersion:
        """
        Gets a dataset version resource to store prompts.
        """

        parameter_model = types._GetDatasetVersionParameters(
            dataset_id=dataset_id,
            dataset_version_id=dataset_version_id,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _GetDatasetVersionParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "datasets/{dataset_id}/datasetVersions/{dataset_version_id}".format_map(
                    request_url_dict
                )
            else:
                path = "datasets/{dataset_id}/datasetVersions/{dataset_version_id}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "get", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.DatasetVersion._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def _get_dataset_operation(
        self,
        *,
        dataset_id: Optional[str] = None,
        operation_id: Optional[str] = None,
        config: Optional[types.GetDatasetOperationConfigOrDict] = None,
    ) -> types.DatasetOperation:
        """
        Gets the operation from creating a dataset.
        """

        parameter_model = types._GetDatasetOperationParameters(
            dataset_id=dataset_id,
            operation_id=operation_id,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _GetDatasetOperationParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "datasets/{dataset_id}/operations/{operation_id}".format_map(
                    request_url_dict
                )
            else:
                path = "datasets/{dataset_id}/operations/{operation_id}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "get", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.DatasetOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def _list_prompts(
        self, *, config: Optional[types.ListPromptsConfigOrDict] = None
    ) -> types.ListDatasetsResponse:
        """
        Lists Agent Engines.
        """

        parameter_model = types._ListDatasetsRequestParameters(
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _ListDatasetsRequestParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "datasets".format_map(request_url_dict)
            else:
                path = "datasets"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "get", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.ListDatasetsResponse._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def _list_versions(
        self,
        *,
        read_mask: Optional[str] = None,
        dataset_id: Optional[str] = None,
        config: Optional[types.ListPromptsConfigOrDict] = None,
    ) -> types.ListDatasetVersionsResponse:
        """
        Lists Agent Engines.
        """

        parameter_model = types._ListDatasetVersionsRequestParameters(
            read_mask=read_mask,
            dataset_id=dataset_id,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _ListDatasetVersionsRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "datasets/{dataset_id}/datasetVersions".format_map(
                    request_url_dict
                )
            else:
                path = "datasets/{dataset_id}/datasetVersions"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "get", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.ListDatasetVersionsResponse._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def _delete_dataset(
        self, *, prompt_id: str, config: Optional[types.DeletePromptConfigOrDict] = None
    ) -> types.DeletePromptOperation:
        parameter_model = types._DeleteDatasetRequestParameters(
            prompt_id=prompt_id,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _DeleteDatasetRequestParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "datasets/{dataset_id}".format_map(request_url_dict)
            else:
                path = "datasets/{dataset_id}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "delete", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.DeletePromptOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def _delete_dataset_version(
        self,
        *,
        prompt_id: str,
        version_id: str,
        config: Optional[types.DeletePromptConfigOrDict] = None,
    ) -> types.DeletePromptVersionOperation:
        parameter_model = types._DeletePromptVersionRequestParameters(
            prompt_id=prompt_id,
            version_id=version_id,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _DeletePromptVersionRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "datasets/{dataset_id}/datasetVersions/{version_id}".format_map(
                    request_url_dict
                )
            else:
                path = "datasets/{dataset_id}/datasetVersions/{version_id}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "delete", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.DeletePromptVersionOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def _restore_version(
        self,
        *,
        dataset_id: str,
        version_id: str,
        config: Optional[types.RestoreVersionConfigOrDict] = None,
    ) -> types.RestoreVersionOperation:
        """
        Restores the provided prompt version to the latest version.
        """

        parameter_model = types._RestoreVersionRequestParameters(
            dataset_id=dataset_id,
            version_id=version_id,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _RestoreVersionRequestParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "datasets/{dataset_id}/datasetVersions/{version_id}:restore".format_map(
                    request_url_dict
                )
            else:
                path = "datasets/{dataset_id}/datasetVersions/{version_id}:restore"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "get", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.RestoreVersionOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def _update_dataset_resource(
        self,
        *,
        name: Optional[str] = None,
        dataset_id: Optional[str] = None,
        display_name: Optional[str] = None,
        metadata: Optional[types.SchemaTextPromptDatasetMetadataOrDict] = None,
        description: Optional[str] = None,
        encryption_spec: Optional[genai_types.EncryptionSpecOrDict] = None,
        model_reference: Optional[str] = None,
        config: Optional[types.UpdateDatasetConfigOrDict] = None,
    ) -> types.Dataset:
        """
        Creates a dataset resource to store prompts.
        """

        parameter_model = types._UpdateDatasetParameters(
            name=name,
            dataset_id=dataset_id,
            display_name=display_name,
            metadata=metadata,
            description=description,
            encryption_spec=encryption_spec,
            model_reference=model_reference,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _UpdateDatasetParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "datasets/{dataset_id}".format_map(request_url_dict)
            else:
                path = "datasets/{dataset_id}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "patch", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.Dataset._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def create(
        self,
        *,
        prompt: types.PromptOrDict,
        config: Optional[types.CreatePromptConfigOrDict] = None,
    ) -> types.Prompt:
        """Creates a new prompt in a Vertex Dataset resource.

        This method waits for prompt creation to be complete before returning.

        Args:
          prompt: The prompt to create.
          config: Optional configuration for creating the prompt.

        Returns:
          A types.Prompt object representing the prompt with its associated
          Dataset resources.
        """
        if isinstance(prompt, dict):
            prompt = types.Prompt(**prompt)
        if isinstance(config, dict):
            config = types.CreatePromptConfig(**config)
        elif not config:
            config = types.CreatePromptConfig()

        _prompt_management_utils._raise_for_invalid_prompt(prompt)

        prompt_metadata = _prompt_management_utils._create_dataset_metadata_from_prompt(
            prompt,
            variables=(
                prompt.prompt_data.variables
                if prompt.prompt_data and prompt.prompt_data.variables
                else None
            ),
        )

        # Step 1: Create the dataset resource for the prompt and wait for the operation to complete.
        create_prompt_dataset_operation = await self._create_dataset_resource(
            display_name=(
                config.prompt_display_name
                if config and config.prompt_display_name
                else f"prompt_{time.strftime('%Y%m%d-%H%M%S')}"
            ),
            name=f"projects/{self._api_client.project}/locations/{self._api_client.location}",
            metadata_schema_uri=_prompt_management_utils.PROMPT_SCHEMA_URI,
            metadata=prompt_metadata,
            model_reference=prompt.prompt_data.model,
            encryption_spec=(
                config.encryption_spec if config and config.encryption_spec else None
            ),
        )
        dataset_resource_name = await self._wait_for_operation(
            operation=create_prompt_dataset_operation,
            timeout=config.timeout if config else 90,
        )
        dataset_id = dataset_resource_name.split("/")[-1]

        # Step 2: Get the dataset resource
        dataset_resource = await self._get_dataset_resource(
            name=dataset_id,
        )
        prompt._dataset = dataset_resource
        return prompt

    async def create_version(
        self,
        *,
        prompt_id: str,
        prompt: types.PromptOrDict,
        config: Optional[types.CreatePromptVersionConfigOrDict] = None,
    ) -> types.Prompt:
        """Creates a new version of a prompt in the prompt resource associated with the provided prompt_id.

        When creating new prompt version resources, this waits for
        the create operation to complete before returning.

        Args:
          prompt_id: The ID of the prompt to create a version for.
          prompt: The prompt to create a version for.
          config: Optional configuration for creating the prompt version.

        Returns:
          A types.Prompt object representing the prompt with its associated
          Dataset and Dataset Version resources.
        """
        if isinstance(prompt, dict):
            prompt = types.Prompt(**prompt)
        if isinstance(config, dict):
            config = types.CreatePromptVersionConfig(**config)
        elif not config:
            config = types.CreatePromptVersionConfig()

        _prompt_management_utils._raise_for_invalid_prompt(prompt)

        if config and config.version_display_name:
            version_name = config.version_display_name
        else:
            version_name = None

        # Step 1: Get the dataset resource
        dataset_resource = await self._get_dataset_resource(name=prompt_id)

        # Step 2: Update the dataset with the new prompt metadata
        updated_dataset_resource = await self._update_dataset_resource(
            dataset_id=prompt_id,
            display_name=dataset_resource.display_name,
            metadata=_prompt_management_utils._create_dataset_metadata_from_prompt(
                prompt,
                variables=(
                    prompt.prompt_data.variables
                    if prompt.prompt_data and prompt.prompt_data.variables
                    else None
                ),
            ),
            model_reference=prompt.prompt_data.model,
        )

        # Step 3: Create the dataset version
        create_dataset_version_operation = await self._create_dataset_version_resource(
            dataset_name=prompt_id,
            display_name=(
                version_name
                if version_name
                else f"prompt_version_{time.strftime('%Y%m%d-%H%M%S')}"
            ),
        )
        dataset_version_resource_name = await self._wait_for_operation(
            operation=create_dataset_version_operation,
            timeout=config.timeout if config else 90,
        )

        # Step 4: Get the dataset version resource and return it with the prompt
        dataset_version_resource = await self._get_dataset_version_resource(
            dataset_id=prompt_id,
            dataset_version_id=dataset_version_resource_name.split("/")[-1],
        )
        prompt = _prompt_management_utils._create_prompt_from_dataset_metadata(
            dataset_version_resource
        )
        prompt._dataset = updated_dataset_resource
        prompt._dataset_version = dataset_version_resource
        return prompt

    async def _wait_for_operation(
        self,
        operation: types.DatasetOperation,
        timeout: int,
    ) -> str:
        """Waits for a dataset operation to complete.

        Args:
          operation: The dataset operation to wait for.
          timeout: The maximum time to wait for the operation to complete.

        Returns:
          The name of the Dataset resource from the operation result.

        Raises:
          TimeoutError: If the operation does not complete within the timeout.
          ValueError: If the operation fails.
        """
        done = False
        prompt_dataset_operation: Optional[types.DatasetOperation] = None

        response_operation_name = operation.name
        dataset_id = response_operation_name.split("/datasets/")[1].split("/")[0]
        operation_id = response_operation_name.split("/")[-1]

        start_time = time.time()
        sleep_duration = 5
        wait_multiplier = 2
        max_wait_time = 60
        previous_time = time.time()

        while not done:
            if (time.time() - start_time) > timeout:
                raise TimeoutError(
                    "Create prompt operation did not complete within the"
                    f" specified timeout of {timeout} seconds."
                )
            current_time = time.time()
            if current_time - previous_time >= sleep_duration:
                sleep_duration = min(sleep_duration * wait_multiplier, max_wait_time)
                previous_time = current_time
            await asyncio.sleep(sleep_duration)
            prompt_dataset_operation = await self._get_dataset_operation(
                dataset_id=dataset_id,
                operation_id=operation_id,
            )
            done = (
                prompt_dataset_operation.done
                if hasattr(prompt_dataset_operation, "done")
                else False
            )
        if (
            not prompt_dataset_operation
            or prompt_dataset_operation.response is None
            or prompt_dataset_operation.response.get("name") is None
        ):
            raise ValueError("Error creating prompt version resource.")
        if (
            hasattr(prompt_dataset_operation, "error")
            and prompt_dataset_operation.error is not None
        ):
            raise ValueError(
                f"Error creating prompt version resource: {prompt_dataset_operation.error}"
            )
        return prompt_dataset_operation.response.get("name")

    async def get(
        self,
        *,
        prompt_id: str,
        config: Optional[types.GetPromptConfig] = None,
    ) -> types.Prompt:
        """Gets a prompt resource from a Vertex Dataset.

        Args:
          prompt_id: The id of the Vertex Dataset resource containing the prompt. For example, if the prompt resource name is "projects/123/locations/us-central1/datasets/456", then the prompt_id is "456".
          config: Optional configuration for getting the prompt.

        Returns:
            A types.Prompt object representing the prompt with its associated Dataset and Dataset Version resources.
        """

        prompt_dataset_resource = await self._get_dataset_resource(name=prompt_id)
        prompt = _prompt_management_utils._create_prompt_from_dataset_metadata(
            prompt_dataset_resource,
        )
        prompt._dataset = prompt_dataset_resource

        if config and config.version_id:
            prompt_version_resource = await self._get_dataset_version_resource(
                dataset_id=prompt_id,
                dataset_version_id=config.version_id,
            )
            prompt._dataset_version = prompt_version_resource

        return prompt

    async def get_version(
        self,
        *,
        prompt_id: str,
        version_id: str,
        config: Optional[types.GetPromptConfig] = None,
    ) -> types.Prompt:
        """Gets a prompt resource from a Vertex Dataset.

        Args:
          prompt_id: The id of the Vertex Dataset resource containing the prompt. For example, if the prompt resource name is "projects/123/locations/us-central1/datasets/456", then the prompt_id is "456".
          version_id: The id of the Vertex Dataset Version resource containing the prompt version. For example, if the prompt version resource name is "projects/123/locations/us-central1/datasets/456/datasetVersions/1", then the version_id is "1".
          config: Optional configuration for getting the prompt.

        Returns:
            A types.Prompt object representing the prompt with its associated Dataset and Dataset Version resources.
        """

        prompt_dataset_resource = await self._get_dataset_resource(name=prompt_id)
        prompt = _prompt_management_utils._create_prompt_from_dataset_metadata(
            prompt_dataset_resource,
        )
        prompt._dataset = prompt_dataset_resource

        prompt_version_resource = await self._get_dataset_version_resource(
            dataset_id=prompt_id,
            dataset_version_id=version_id,
        )
        prompt._dataset_version = prompt_version_resource

        return prompt

    async def _wait_for_project_operation(
        self,
        operation: types.DatasetOperation,
        timeout: int,
    ) -> None:
        """Waits for a dataset deletion operation to complete.

        Delete operations are project level operations and are separate from dataset resource operations, for example: projects/123/locations/us-central1/operations/789.

        Args:
          operation: The project operation to wait for.
          timeout: The maximum time to wait for the operation to complete.
        Raises:
          TimeoutError: If the operation does not complete within the timeout.
          ValueError: If the operation fails.
        """
        done = False

        start_time = time.time()
        sleep_duration = 5
        wait_multiplier = 2
        max_wait_time = 60
        previous_time = time.time()
        while not done:
            if (time.time() - start_time) > timeout:
                raise TimeoutError(
                    f"Delete operation did not complete within the"
                    f" specified timeout of {timeout} seconds."
                )
            current_time = time.time()
            if current_time - previous_time >= sleep_duration:
                sleep_duration = min(sleep_duration * wait_multiplier, max_wait_time)
                previous_time = current_time
            await asyncio.sleep(sleep_duration)
            operations_module = operations.AsyncOperations(api_client_=self._api_client)

            operation = await operations_module._get(
                operation_id=operation.name.split("/")[-1],
            )
            done = operation.done if hasattr(operation, "done") else False
        if hasattr(operation, "error") and operation.error is not None:
            raise ValueError(f"Error in delete operation: {operation.error}")

    async def delete(
        self,
        *,
        prompt_id: str,
        config: Optional[types.DeletePromptConfig] = None,
    ) -> None:
        """Deletes a prompt resource.

        Args:
          prompt_id: The id of the prompt resource to delete.

        Raises:
          TimeoutError: If the delete operation does not complete within the timeout.
          ValueError: If the delete operation fails.
        """

        delete_prompt_operation = await self._delete_dataset(
            prompt_id=prompt_id,
            config=config,
        )
        await self._wait_for_project_operation(
            operation=delete_prompt_operation, timeout=config.timeout if config else 90
        )
        logger.info(f"Deleted prompt with id: {prompt_id}")

    async def delete_version(
        self,
        *,
        prompt_id: str,
        version_id: str,
        config: Optional[types.DeletePromptConfig] = None,
    ) -> None:
        """Deletes a prompt version resource.

        Args:
          prompt_id: The id of the prompt resource to delete.
          version_id: The id of the prompt version resource to delete.

        Raises:
          TimeoutError: If the delete operation does not complete within the timeout.
          ValueError: If the delete operation fails.
        """
        delete_version_operation = await self._delete_dataset_version(
            prompt_id=prompt_id,
            version_id=version_id,
            config=config,
        )

        await self._wait_for_project_operation(
            operation=delete_version_operation, timeout=config.timeout if config else 90
        )
        logger.info(
            f"Deleted prompt version {version_id} from prompt with id: {prompt_id}"
        )

    async def _list_prompts_pager(
        self,
        *,
        config: Optional[types.ListPromptsConfigOrDict] = None,
    ) -> AsyncPager[types.Dataset]:
        return AsyncPager(
            "datasets",
            self._list_prompts,
            await self._list_prompts(config=config),
            config,
        )

    async def _list_versions_pager(
        self,
        *,
        prompt_id: str,
        config: Optional[types.ListPromptsConfigOrDict] = None,
    ) -> AsyncPager[types.DatasetVersion]:
        return AsyncPager(
            "dataset_versions",
            self._list_versions,
            await self._list_versions(config=config, dataset_id=prompt_id),
            config,
        )

    async def list(
        self,
        *,
        config: Optional[types.ListPromptsConfigOrDict] = None,
    ) -> AsyncIterator[types.PromptRef]:
        """Lists prompt resources in a project.

        This method retrieves all the prompts from the project provided in the
        vertexai.Client constructor and returns a list of prompt references containing the prompt_id and model for the prompt.

        To get the full types.Prompt resource for a PromptRef after calling this method, use the get() method with the prompt_id as the prompt_id argument.
        Example usage:

        ```
        prompt_refs = client.aio.prompt_management.list_prompts()
        async for prompt_ref in prompt_refs:
          await client.prompt_management.get(prompt_id=prompt_ref.prompt_id)
        ```

        Args:
          config: Optional configuration for listing prompts.

        Returns:
            An async iterator of types.PromptRef objects.
        """
        if isinstance(config, dict):
            config = types.ListPromptsConfig(**config)
        elif not config:
            config = types.ListPromptsConfig()
        async for dataset in await self._list_prompts_pager(config=config):
            if not dataset or not dataset.model_reference or not dataset.name:
                continue
            prompt_ref = types.PromptRef(
                model=dataset.model_reference, prompt_id=dataset.name.split("/")[-1]
            )
            yield prompt_ref

    async def list_versions(
        self,
        *,
        prompt_id: str,
        config: Optional[types.ListPromptsConfigOrDict] = None,
    ) -> AsyncIterator[types.PromptVersionRef]:
        """Lists prompt version resources for a provided prompt_id.

        This method retrieves all the prompt versions for a provided prompt_id.

        To get the full types.Prompt resource for a PromptVersionRef after calling this method, use the get() method with the returned prompt_id and version_id.
        Example usage:

        ```
        prompt_version_refs = await client.prompt_management.list_versions(prompt_id="123")
        async for version_ref in prompt_version_refs:
          await client.aio.prompt_management.get(prompt_id=version_ref.prompt_id, version_id=version_ref.version_id)
        ```

        Args:
          prompt_id: The id of the Vertex Dataset resource containing the prompt. For example, if the prompt resource name is "projects/123/locations/us-central1/datasets/456", then the prompt_id is "456".
          config: Optional configuration for listing prompts.

        Returns:
            An async iterator of types.PromptVersionRef objects representing the prompt version resources for the provided prompt_id.

        """
        if isinstance(config, dict):
            config = types.ListPromptsConfig(**config)
        elif not config:
            config = types.ListPromptsConfig()
        async for dataset_version in await self._list_versions_pager(
            config=config, prompt_id=prompt_id
        ):
            if (
                not dataset_version
                or not dataset_version.model_reference
                or not dataset_version.name
            ):
                continue
            prompt_version_ref = types.PromptVersionRef(
                model=dataset_version.model_reference,
                version_id=dataset_version.name.split("/")[-1],
                prompt_id=prompt_id,
            )
            yield prompt_version_ref

    async def restore_version(
        self,
        *,
        prompt_id: str,
        version_id: str,
        config: Optional[types.RestoreVersionConfig] = None,
    ) -> types.Prompt:
        """Restores the provided prompt version to the latest version.

        Args:
          prompt_id: The id of the Vertex Dataset resource containing the prompt. For example, if the prompt resource name is "projects/123/locations/us-central1/datasets/456", then the prompt_id is "456".
          version_id: The id of the Vertex Dataset Version resource to restore. For example, if the version resource name is "projects/123/locations/us-central1/datasets/456/datasetVersions/789", then the version_id is "789".
          config: Optional configuration for restoring the prompt version.

        Returns:
            A types.Prompt object representing the prompt with the updated Dataset Version resource.
        """

        restore_prompt_operation = await self._restore_version(
            dataset_id=prompt_id,
            version_id=version_id,
        )
        await self._wait_for_project_operation(
            operation=restore_prompt_operation,
            timeout=config.timeout if config else 90,
        )
        dataset_version_resource = await self._get_dataset_version_resource(
            dataset_id=prompt_id,
            dataset_version_id=version_id,
        )
        updated_prompt = _prompt_management_utils._create_prompt_from_dataset_metadata(
            dataset_version_resource,
        )
        updated_prompt._dataset_version = dataset_version_resource
        return updated_prompt
