# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Code generated by the Google Gen AI SDK generator DO NOT EDIT.

import datetime
import importlib
import json
import logging
from typing import Any, AsyncIterator, Iterator, Optional, Sequence, Tuple, Union
from urllib.parse import urlencode
import warnings

from google.genai import _api_module
from google.genai import _common
from google.genai import types as genai_types
from google.genai._common import get_value_by_path as getv
from google.genai._common import set_value_by_path as setv
from google.genai.pagers import Pager

from . import _agent_engines_utils
from . import types


logger = logging.getLogger("vertexai_genai.agentengines")

logger.setLevel(logging.INFO)


def _CreateAgentEngineConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}

    if getv(from_object, ["display_name"]) is not None:
        setv(parent_object, ["displayName"], getv(from_object, ["display_name"]))

    if getv(from_object, ["description"]) is not None:
        setv(parent_object, ["description"], getv(from_object, ["description"]))

    if getv(from_object, ["spec"]) is not None:
        setv(parent_object, ["spec"], getv(from_object, ["spec"]))

    if getv(from_object, ["context_spec"]) is not None:
        setv(parent_object, ["contextSpec"], getv(from_object, ["context_spec"]))

    if getv(from_object, ["psc_interface_config"]) is not None:
        setv(
            parent_object,
            ["pscInterfaceConfig"],
            getv(from_object, ["psc_interface_config"]),
        )

    if getv(from_object, ["encryption_spec"]) is not None:
        setv(parent_object, ["encryptionSpec"], getv(from_object, ["encryption_spec"]))

    if getv(from_object, ["labels"]) is not None:
        setv(parent_object, ["labels"], getv(from_object, ["labels"]))

    if getv(from_object, ["source_packages"]) is not None:
        setv(parent_object, ["sourcePackages"], getv(from_object, ["source_packages"]))

    if getv(from_object, ["entrypoint_module"]) is not None:
        setv(
            parent_object,
            ["entrypointModule"],
            getv(from_object, ["entrypoint_module"]),
        )

    if getv(from_object, ["entrypoint_object"]) is not None:
        setv(
            parent_object,
            ["entrypointObject"],
            getv(from_object, ["entrypoint_object"]),
        )

    if getv(from_object, ["requirements_file"]) is not None:
        setv(
            parent_object,
            ["requirementsFile"],
            getv(from_object, ["requirements_file"]),
        )

    if getv(from_object, ["agent_framework"]) is not None:
        setv(parent_object, ["agentFramework"], getv(from_object, ["agent_framework"]))

    if getv(from_object, ["python_version"]) is not None:
        setv(parent_object, ["pythonVersion"], getv(from_object, ["python_version"]))

    return to_object


def _CreateAgentEngineRequestParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["config"]) is not None:
        setv(
            to_object,
            ["config"],
            _CreateAgentEngineConfig_to_vertex(
                getv(from_object, ["config"]), to_object
            ),
        )

    return to_object


def _DeleteAgentEngineRequestParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["name"]) is not None:
        setv(to_object, ["_url", "name"], getv(from_object, ["name"]))

    if getv(from_object, ["force"]) is not None:
        setv(to_object, ["force"], getv(from_object, ["force"]))

    if getv(from_object, ["config"]) is not None:
        setv(to_object, ["config"], getv(from_object, ["config"]))

    return to_object


def _GetAgentEngineOperationParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["operation_name"]) is not None:
        setv(
            to_object, ["_url", "operationName"], getv(from_object, ["operation_name"])
        )

    if getv(from_object, ["config"]) is not None:
        setv(to_object, ["config"], getv(from_object, ["config"]))

    return to_object


def _GetAgentEngineRequestParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["name"]) is not None:
        setv(to_object, ["_url", "name"], getv(from_object, ["name"]))

    if getv(from_object, ["config"]) is not None:
        setv(to_object, ["config"], getv(from_object, ["config"]))

    return to_object


def _ListAgentEngineConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}

    if getv(from_object, ["page_size"]) is not None:
        setv(parent_object, ["_query", "pageSize"], getv(from_object, ["page_size"]))

    if getv(from_object, ["page_token"]) is not None:
        setv(parent_object, ["_query", "pageToken"], getv(from_object, ["page_token"]))

    if getv(from_object, ["filter"]) is not None:
        setv(parent_object, ["_query", "filter"], getv(from_object, ["filter"]))

    return to_object


def _ListAgentEngineRequestParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["config"]) is not None:
        setv(
            to_object,
            ["config"],
            _ListAgentEngineConfig_to_vertex(getv(from_object, ["config"]), to_object),
        )

    return to_object


def _QueryAgentEngineConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}

    if getv(from_object, ["class_method"]) is not None:
        setv(parent_object, ["classMethod"], getv(from_object, ["class_method"]))

    if getv(from_object, ["input"]) is not None:
        setv(parent_object, ["input"], getv(from_object, ["input"]))

    if getv(from_object, ["include_all_fields"]) is not None:
        setv(to_object, ["includeAllFields"], getv(from_object, ["include_all_fields"]))

    return to_object


def _QueryAgentEngineRequestParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["name"]) is not None:
        setv(to_object, ["_url", "name"], getv(from_object, ["name"]))

    if getv(from_object, ["config"]) is not None:
        setv(
            to_object,
            ["config"],
            _QueryAgentEngineConfig_to_vertex(getv(from_object, ["config"]), to_object),
        )

    return to_object


def _UpdateAgentEngineConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}

    if getv(from_object, ["display_name"]) is not None:
        setv(parent_object, ["displayName"], getv(from_object, ["display_name"]))

    if getv(from_object, ["description"]) is not None:
        setv(parent_object, ["description"], getv(from_object, ["description"]))

    if getv(from_object, ["spec"]) is not None:
        setv(parent_object, ["spec"], getv(from_object, ["spec"]))

    if getv(from_object, ["context_spec"]) is not None:
        setv(parent_object, ["contextSpec"], getv(from_object, ["context_spec"]))

    if getv(from_object, ["psc_interface_config"]) is not None:
        setv(
            parent_object,
            ["pscInterfaceConfig"],
            getv(from_object, ["psc_interface_config"]),
        )

    if getv(from_object, ["encryption_spec"]) is not None:
        setv(parent_object, ["encryptionSpec"], getv(from_object, ["encryption_spec"]))

    if getv(from_object, ["labels"]) is not None:
        setv(parent_object, ["labels"], getv(from_object, ["labels"]))

    if getv(from_object, ["source_packages"]) is not None:
        setv(parent_object, ["sourcePackages"], getv(from_object, ["source_packages"]))

    if getv(from_object, ["entrypoint_module"]) is not None:
        setv(
            parent_object,
            ["entrypointModule"],
            getv(from_object, ["entrypoint_module"]),
        )

    if getv(from_object, ["entrypoint_object"]) is not None:
        setv(
            parent_object,
            ["entrypointObject"],
            getv(from_object, ["entrypoint_object"]),
        )

    if getv(from_object, ["requirements_file"]) is not None:
        setv(
            parent_object,
            ["requirementsFile"],
            getv(from_object, ["requirements_file"]),
        )

    if getv(from_object, ["agent_framework"]) is not None:
        setv(parent_object, ["agentFramework"], getv(from_object, ["agent_framework"]))

    if getv(from_object, ["python_version"]) is not None:
        setv(parent_object, ["pythonVersion"], getv(from_object, ["python_version"]))

    if getv(from_object, ["update_mask"]) is not None:
        setv(
            parent_object, ["_query", "updateMask"], getv(from_object, ["update_mask"])
        )

    return to_object


def _UpdateAgentEngineRequestParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["name"]) is not None:
        setv(to_object, ["_url", "name"], getv(from_object, ["name"]))

    if getv(from_object, ["config"]) is not None:
        setv(
            to_object,
            ["config"],
            _UpdateAgentEngineConfig_to_vertex(
                getv(from_object, ["config"]), to_object
            ),
        )

    return to_object


class AgentEngines(_api_module.BaseModule):

    def _create(
        self, *, config: Optional[types.CreateAgentEngineConfigOrDict] = None
    ) -> types.AgentEngineOperation:
        """
        Creates a new Agent Engine.
        """

        parameter_model = types._CreateAgentEngineRequestParameters(
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _CreateAgentEngineRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "reasoningEngines".format_map(request_url_dict)
            else:
                path = "reasoningEngines"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("post", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.AgentEngineOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def _delete(
        self,
        *,
        name: str,
        force: Optional[bool] = None,
        config: Optional[types.DeleteAgentEngineConfigOrDict] = None,
    ) -> types.DeleteAgentEngineOperation:
        """
        Delete an Agent Engine resource.

        Args:
            name (str):
                Required. The name of the Agent Engine to be deleted. Format:
                `projects/{project}/locations/{location}/reasoningEngines/{resource_id}`
                or `reasoningEngines/{resource_id}`.
            force (bool):
                Optional. If set to True, child resources will also be deleted.
                Otherwise, the request will fail with FAILED_PRECONDITION error when
                the Agent Engine has undeleted child resources. Defaults to False.
            config (DeleteAgentEngineConfig):
                Optional. Additional configurations for deleting the Agent Engine.

        """

        parameter_model = types._DeleteAgentEngineRequestParameters(
            name=name,
            force=force,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _DeleteAgentEngineRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}".format_map(request_url_dict)
            else:
                path = "{name}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("delete", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.DeleteAgentEngineOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def _get(
        self, *, name: str, config: Optional[types.GetAgentEngineConfigOrDict] = None
    ) -> types.ReasoningEngine:
        """
        Get an Agent Engine instance.
        """

        parameter_model = types._GetAgentEngineRequestParameters(
            name=name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _GetAgentEngineRequestParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}".format_map(request_url_dict)
            else:
                path = "{name}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("get", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.ReasoningEngine._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def _list(
        self, *, config: Optional[types.ListAgentEngineConfigOrDict] = None
    ) -> types.ListReasoningEnginesResponse:
        """
        Lists Agent Engines.
        """

        parameter_model = types._ListAgentEngineRequestParameters(
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _ListAgentEngineRequestParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "reasoningEngines".format_map(request_url_dict)
            else:
                path = "reasoningEngines"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("get", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.ListReasoningEnginesResponse._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def _get_agent_operation(
        self,
        *,
        operation_name: str,
        config: Optional[types.GetAgentEngineOperationConfigOrDict] = None,
    ) -> types.AgentEngineOperation:
        parameter_model = types._GetAgentEngineOperationParameters(
            operation_name=operation_name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _GetAgentEngineOperationParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{operationName}".format_map(request_url_dict)
            else:
                path = "{operationName}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("get", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.AgentEngineOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def _query(
        self, *, name: str, config: Optional[types.QueryAgentEngineConfigOrDict] = None
    ) -> types.QueryReasoningEngineResponse:
        """
        Query an Agent Engine.
        """

        parameter_model = types._QueryAgentEngineRequestParameters(
            name=name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _QueryAgentEngineRequestParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}:query".format_map(request_url_dict)
            else:
                path = "{name}:query"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("post", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.QueryReasoningEngineResponse._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def _update(
        self, *, name: str, config: Optional[types.UpdateAgentEngineConfigOrDict] = None
    ) -> types.AgentEngineOperation:
        """
        Updates an Agent Engine.
        """

        parameter_model = types._UpdateAgentEngineRequestParameters(
            name=name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _UpdateAgentEngineRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}".format_map(request_url_dict)
            else:
                path = "{name}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("patch", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.AgentEngineOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    _memories = None
    _sandboxes = None
    _sessions = None

    @property
    def memories(self) -> Any:
        if self._memories is None:
            try:
                # We need to lazy load the memories module to handle the
                # possibility of ImportError when dependencies are not installed.
                self._memories = importlib.import_module(".memories", __package__)
            except ImportError as e:
                raise ImportError(
                    "The 'agent_engines.memories' module requires additional "
                    "packages. Please install them using pip install "
                    "google-cloud-aiplatform[agent_engines]"
                ) from e
        return self._memories.Memories(self._api_client)

    @property
    @_common.experimental_warning(
        "The Vertex SDK GenAI agent_engines.sandboxes module is experimental, "
        "and may change in future versions."
    )
    def sandboxes(self) -> Any:
        if self._sandboxes is None:
            try:
                # We need to lazy load the sandboxes module to handle the
                # possibility of ImportError when dependencies are not installed.
                self._sandboxes = importlib.import_module(".sandboxes", __package__)
            except ImportError as e:
                raise ImportError(
                    "The agent_engines.sandboxes module requires additional packages. "
                    "Please install them using pip install "
                    "google-cloud-aiplatform[agent_engines]"
                ) from e
        return self._sandboxes.Sandboxes(self._api_client)

    @property
    def sessions(self) -> Any:
        if self._sessions is None:
            try:
                # We need to lazy load the sessions module to handle the
                # possibility of ImportError when dependencies are not installed.
                self._sessions = importlib.import_module(".sessions", __package__)
            except ImportError as e:
                raise ImportError(
                    "The agent_engines.sessions module requires additional packages. "
                    "Please install them using pip install "
                    "google-cloud-aiplatform[agent_engines]"
                ) from e
        return self._sessions.Sessions(self._api_client)

    def _list_pager(
        self, *, config: Optional[types.ListAgentEngineConfigOrDict] = None
    ) -> Pager[types.ReasoningEngine]:
        return Pager(
            "reasoning_engines",
            self._list,
            self._list(config=config),
            config,
        )

    def get(
        self,
        *,
        name: str,
        config: Optional[types.GetAgentEngineConfigOrDict] = None,
    ) -> types.AgentEngine:
        """Gets an agent engine.

        Args:
            name (str):
                Required. A fully-qualified resource name or ID such as
                "projects/123/locations/us-central1/reasoningEngines/456" or
                a shortened name such as "reasoningEngines/456".
        """
        api_resource = self._get(name=name, config=config)
        agent_engine = types.AgentEngine(
            api_client=self,
            api_async_client=AsyncAgentEngines(api_client_=self._api_client),
            api_resource=api_resource,
        )
        if api_resource.spec:
            self._register_api_methods(agent_engine=agent_engine)
        return agent_engine

    def delete(
        self,
        *,
        name: str,
        force: Optional[bool] = None,
        config: Optional[types.DeleteAgentEngineConfigOrDict] = None,
    ) -> types.DeleteAgentEngineOperation:
        """
        Delete an Agent Engine resource.

        Args:
            name (str):
                Required. The name of the Agent Engine to be deleted. Format:
                `projects/{project}/locations/{location}/reasoningEngines/{resource_id}`
                or `reasoningEngines/{resource_id}`.
            force (bool):
                Optional. If set to True, child resources will also be deleted.
                Otherwise, the request will fail with FAILED_PRECONDITION error when
                the Agent Engine has undeleted child resources. Defaults to False.
            config (DeleteAgentEngineConfig):
                Optional. Additional configurations for deleting the Agent Engine.

        """
        logger.info(f"Deleting AgentEngine resource: {name}")
        operation = self._delete(name=name, force=force, config=config)
        logger.info(f"Started AgentEngine delete operation: {operation.name}")
        return operation

    def create(
        self,
        *,
        agent_engine: Any = None,
        agent: Any = None,
        config: Optional[types.AgentEngineConfigOrDict] = None,
    ) -> types.AgentEngine:
        """Creates an agent engine.

        The Agent Engine will be an instance of the `agent_engine` that
        was passed in, running remotely on Vertex AI.

        Sample ``src_dir`` contents (e.g. ``./user_src_dir``):

        .. code-block:: python

            user_src_dir/
            |-- main.py
            |-- requirements.txt
            |-- user_code/
            |   |-- utils.py
            |   |-- ...
            |-- ...

        To build an Agent Engine with the above files, run:

        .. code-block:: python

            client = vertexai.Client(
                project="your-project",
                location="us-central1",
            )
            remote_agent = client.agent_engines.create(
                agent=local_agent,
                config=dict(
                    requirements=[
                        # I.e. the PyPI dependencies listed in requirements.txt
                        "google-cloud-aiplatform[agent_engines,adk]",
                        ...
                    ],
                    extra_packages=[
                        "./user_src_dir/main.py", # a single file
                        "./user_src_dir/user_code", # a directory
                        ...
                    ],
                ),
            )

        Args:
            agent (Any):
                Optional. The Agent to be created. If not specified, this will
                correspond to a lightweight instance that cannot be queried
                (but can be updated to future instances that can be queried).
            agent_engine (Any):
                Optional. This is deprecated. Please use `agent` instead.
            config (AgentEngineConfig):
                Optional. The configurations to use for creating the Agent Engine.

        Returns:
            AgentEngine: The created Agent Engine instance.

        Raises:
            ValueError: If the `project` was not set using `client.Client`.
            ValueError: If the `location` was not set using `client.Client`.
            ValueError: If `config.staging_bucket` was not set when `agent`
            is specified.
            ValueError: If `config.staging_bucket` does not start with "gs://".
            ValueError: If `config.extra_packages` is specified but `agent`
            is None.
            ValueError: If `config.requirements` is specified but `agent` is None.
            ValueError: If `config.env_vars` has a dictionary entry that does not
            correspond to an environment variable value or a SecretRef.
            TypeError: If `config.env_vars` is not a dictionary.
            FileNotFoundError: If `config.extra_packages` includes a file or
            directory that does not exist.
            IOError: If ``config.requirements` is a string that corresponds to a
            nonexistent file.
        """
        if config is None:
            config = {}
        if isinstance(config, dict):
            config = types.AgentEngineConfig.model_validate(config)
        elif not isinstance(config, types.AgentEngineConfig):
            raise TypeError(
                f"config must be a dict or AgentEngineConfig, but got {type(config)}."
            )
        context_spec = config.context_spec
        if context_spec is not None:
            # Conversion to a dict for _create_config
            context_spec = context_spec.model_dump()
        if agent and agent_engine:
            raise ValueError("Please specify only one of `agent` or `agent_engine`.")
        elif agent_engine:
            raise DeprecationWarning(
                "The `agent_engine` argument is deprecated. Please use `agent` instead."
            )
        agent = agent or agent_engine
        api_config = self._create_config(
            mode="create",
            agent=agent,
            identity_type=config.identity_type,
            staging_bucket=config.staging_bucket,
            requirements=config.requirements,
            display_name=config.display_name,
            description=config.description,
            gcs_dir_name=config.gcs_dir_name,
            extra_packages=config.extra_packages,
            env_vars=config.env_vars,
            service_account=config.service_account,
            context_spec=context_spec,
            psc_interface_config=config.psc_interface_config,
            min_instances=config.min_instances,
            max_instances=config.max_instances,
            resource_limits=config.resource_limits,
            container_concurrency=config.container_concurrency,
            encryption_spec=config.encryption_spec,
            agent_server_mode=config.agent_server_mode,
            labels=config.labels,
            class_methods=config.class_methods,
            source_packages=config.source_packages,
            entrypoint_module=config.entrypoint_module,
            entrypoint_object=config.entrypoint_object,
            requirements_file=config.requirements_file,
            agent_framework=config.agent_framework,
            python_version=config.python_version,
            build_options=config.build_options,
        )
        operation = self._create(config=api_config)
        reasoning_engine_id = _agent_engines_utils._get_reasoning_engine_id(
            operation_name=operation.name
        )
        logger.info(
            "View progress and logs at https://console.cloud.google.com/logs/query?"
            f"project={self._api_client.project}"
            "&query=resource.type%3D%22aiplatform.googleapis.com%2FReasoningEngine%22%0A"
            f"resource.labels.reasoning_engine_id%3D%22{reasoning_engine_id}%22."
        )
        if agent is not None or config.source_packages is not None:
            poll_interval_seconds = 10
        else:
            poll_interval_seconds = 1  # Lightweight agent engine resource creation.
        operation = _agent_engines_utils._await_operation(
            operation_name=operation.name,
            get_operation_fn=self._get_agent_operation,
            poll_interval_seconds=poll_interval_seconds,
        )

        agent_engine = types.AgentEngine(
            api_client=self,
            api_async_client=AsyncAgentEngines(api_client_=self._api_client),
            api_resource=operation.response,
        )
        if agent_engine.api_resource:
            logger.info("Agent Engine created. To use it in another session:")
            logger.info(
                f"agent_engine=client.agent_engines.get(name='{agent_engine.api_resource.name}')"
            )
        elif operation.error:
            raise RuntimeError(f"Failed to create Agent Engine: {operation.error}")
        else:
            logger.warning("The operation returned an empty response.")
        if agent is not None or config.source_packages is not None:
            # If the user did not provide an agent_engine (e.g. lightweight
            # provisioning), it will not have any API methods registered.
            agent_engine = self._register_api_methods(agent_engine=agent_engine)
        return agent_engine

    def _create_config(
        self,
        *,
        mode: str,
        agent: Any = None,
        identity_type: Optional[types.IdentityType] = None,
        staging_bucket: Optional[str] = None,
        requirements: Optional[Union[str, Sequence[str]]] = None,
        display_name: Optional[str] = None,
        description: Optional[str] = None,
        gcs_dir_name: Optional[str] = None,
        extra_packages: Optional[Sequence[str]] = None,
        env_vars: Optional[dict[str, Union[str, Any]]] = None,
        service_account: Optional[str] = None,
        context_spec: Optional[types.ReasoningEngineContextSpecDict] = None,
        psc_interface_config: Optional[types.PscInterfaceConfigDict] = None,
        min_instances: Optional[int] = None,
        max_instances: Optional[int] = None,
        resource_limits: Optional[dict[str, str]] = None,
        container_concurrency: Optional[int] = None,
        encryption_spec: Optional[genai_types.EncryptionSpecDict] = None,
        labels: Optional[dict[str, str]] = None,
        agent_server_mode: Optional[types.AgentServerMode] = None,
        class_methods: Optional[Sequence[dict[str, Any]]] = None,
        source_packages: Optional[Sequence[str]] = None,
        entrypoint_module: Optional[str] = None,
        entrypoint_object: Optional[str] = None,
        requirements_file: Optional[str] = None,
        agent_framework: Optional[str] = None,
        python_version: Optional[str] = None,
        build_options: Optional[dict[str, list[str]]] = None,
    ) -> types.UpdateAgentEngineConfigDict:
        import sys

        config: types.UpdateAgentEngineConfigDict = {}
        update_masks = []
        if mode not in ["create", "update"]:
            raise ValueError(f"Unsupported mode: {mode}")
        if agent is None:
            if requirements is not None:
                raise ValueError("requirements must be None if agent is None.")
            if extra_packages is not None:
                raise ValueError("extra_packages must be None if agent is None.")
        if display_name is not None:
            update_masks.append("display_name")
            config["display_name"] = display_name
        if description is not None:
            update_masks.append("description")
            config["description"] = description
        if context_spec is not None:
            update_masks.append("context_spec")
            config["context_spec"] = context_spec
        if encryption_spec is not None:
            update_masks.append("encryption_spec")
            config["encryption_spec"] = encryption_spec
        if labels is not None:
            update_masks.append("labels")
            config["labels"] = labels

        if agent_framework == "google-adk":
            env_vars = _agent_engines_utils._add_telemetry_enablement_env(env_vars)

        if python_version:
            sys_version = python_version
        else:
            sys_version = f"{sys.version_info.major}.{sys.version_info.minor}"
        agent_engine_spec: Any = None
        if agent is not None:
            if source_packages is not None:
                raise ValueError(
                    "If you have provided `source_packages` in `config`, please "
                    "do not specify `agent` in `agent_engines.create()` or "
                    "`agent_engines.update()`."
                )

            project = self._api_client.project
            if project is None:
                raise ValueError("project must be set using `vertexai.Client`.")
            location = self._api_client.location
            if location is None:
                raise ValueError("location must be set using `vertexai.Client`.")
            gcs_dir_name = gcs_dir_name or _agent_engines_utils._DEFAULT_GCS_DIR_NAME
            agent = _agent_engines_utils._validate_agent_or_raise(agent=agent)
            if _agent_engines_utils._is_adk_agent(agent):
                env_vars = _agent_engines_utils._add_telemetry_enablement_env(env_vars)
            staging_bucket = _agent_engines_utils._validate_staging_bucket_or_raise(
                staging_bucket=staging_bucket,
            )
            requirements = _agent_engines_utils._validate_requirements_or_raise(
                agent=agent,
                requirements=requirements,
            )
            extra_packages = _agent_engines_utils._validate_packages_or_raise(
                packages=extra_packages,
                build_options=build_options,
            )
            # Prepares the Agent Engine for creation/update in Vertex AI. This
            # involves packaging and uploading the artifacts for agent_engine,
            # requirements and extra_packages to `staging_bucket/gcs_dir_name`.
            _agent_engines_utils._prepare(
                agent=agent,
                requirements=requirements,
                project=project,
                location=location,
                staging_bucket=staging_bucket,
                gcs_dir_name=gcs_dir_name,
                extra_packages=extra_packages,
                credentials=self._api_client._credentials,
            )
            # Update the package spec.
            update_masks.append("spec.package_spec.pickle_object_gcs_uri")
            package_spec = {
                "python_version": sys_version,
                "pickle_object_gcs_uri": "{}/{}/{}".format(
                    staging_bucket,
                    gcs_dir_name,
                    _agent_engines_utils._BLOB_FILENAME,
                ),
            }
            if extra_packages:
                update_masks.append("spec.package_spec.dependency_files_gcs_uri")
                package_spec["dependency_files_gcs_uri"] = "{}/{}/{}".format(
                    staging_bucket,
                    gcs_dir_name,
                    _agent_engines_utils._EXTRA_PACKAGES_FILE,
                )
            if requirements:
                update_masks.append("spec.package_spec.requirements_gcs_uri")
                package_spec["requirements_gcs_uri"] = "{}/{}/{}".format(
                    staging_bucket,
                    gcs_dir_name,
                    _agent_engines_utils._REQUIREMENTS_FILE,
                )

            update_masks.append("spec.class_methods")
            class_methods_spec = []
            if class_methods is not None:
                class_methods_spec = (
                    _agent_engines_utils._class_methods_to_class_methods_spec(
                        class_methods=class_methods
                    )
                )
            else:
                class_methods_spec = (
                    _agent_engines_utils._generate_class_methods_spec_or_raise(
                        agent=agent,
                        operations=_agent_engines_utils._get_registered_operations(
                            agent=agent
                        ),
                    )
                )

            agent_engine_spec: types.ReasoningEngineSpecDict = {
                "package_spec": package_spec,
                "class_methods": [
                    _agent_engines_utils._to_dict(class_method_spec)
                    for class_method_spec in class_methods_spec
                ],
            }

        if source_packages is not None:
            source_packages = _agent_engines_utils._validate_packages_or_raise(
                packages=source_packages,
                build_options=build_options,
            )
            update_masks.append("spec.source_code_spec.inline_source.source_archive")
            source_code_spec = {
                "inline_source": {
                    "source_archive": _agent_engines_utils._create_base64_encoded_tarball(
                        source_packages=source_packages
                    )
                }
            }

            update_masks.append("spec.source_code_spec.python_spec.version")
            python_spec = {
                "version": sys_version,
            }
            if not entrypoint_module:
                raise ValueError(
                    "entrypoint_module must be specified if source_packages is specified."
                )
            update_masks.append("spec.source_code_spec.python_spec.entrypoint_module")
            python_spec["entrypoint_module"] = entrypoint_module
            if not entrypoint_object:
                raise ValueError(
                    "entrypoint_object must be specified if source_packages is specified."
                )
            update_masks.append("spec.source_code_spec.python_spec.entrypoint_object")
            python_spec["entrypoint_object"] = entrypoint_object
            if requirements_file is not None:
                update_masks.append(
                    "spec.source_code_spec.python_spec.requirements_file"
                )
                python_spec["requirements_file"] = requirements_file
            source_code_spec["python_spec"] = python_spec

            if class_methods is None:
                raise ValueError(
                    "class_methods must be specified if source_packages is specified."
                )
            update_masks.append("spec.class_methods")
            class_methods_spec = (
                _agent_engines_utils._class_methods_to_class_methods_spec(
                    class_methods=class_methods
                )
            )

            agent_engine_spec: types.ReasoningEngineSpecDict = {
                "source_code_spec": source_code_spec,
                "class_methods": [
                    _agent_engines_utils._to_dict(class_method_spec)
                    for class_method_spec in class_methods_spec
                ],
            }

        if agent_engine_spec is not None:
            if (
                env_vars is not None
                or psc_interface_config is not None
                or min_instances is not None
                or max_instances is not None
                or resource_limits is not None
                or container_concurrency is not None
            ):
                (
                    deployment_spec,
                    deployment_update_masks,
                ) = self._generate_deployment_spec_or_raise(
                    env_vars=env_vars,
                    psc_interface_config=psc_interface_config,
                    min_instances=min_instances,
                    max_instances=max_instances,
                    resource_limits=resource_limits,
                    container_concurrency=container_concurrency,
                )
                update_masks.extend(deployment_update_masks)
                agent_engine_spec["deployment_spec"] = deployment_spec

            if agent_server_mode:
                if not agent_engine_spec.get("deployment_spec"):
                    agent_engine_spec["deployment_spec"] = (
                        types.ReasoningEngineSpecDeploymentSpecDict()
                    )
                agent_engine_spec["deployment_spec"][
                    "agent_server_mode"
                ] = agent_server_mode

            agent_engine_spec["agent_framework"] = (
                _agent_engines_utils._get_agent_framework(
                    agent_framework=agent_framework,
                    agent=agent,
                )
            )
            update_masks.append("spec.agent_framework")

        if identity_type is not None or service_account is not None:
            if agent_engine_spec is None:
                agent_engine_spec = {}

            if identity_type is not None:
                agent_engine_spec["identity_type"] = identity_type
                update_masks.append("spec.identity_type")
            if service_account is not None:
                # Clear the field in case of empty service_account.
                if service_account:
                    agent_engine_spec["service_account"] = service_account
                update_masks.append("spec.service_account")

        if agent_engine_spec is not None:
            config["spec"] = agent_engine_spec

        if update_masks and mode == "update":
            config["update_mask"] = ",".join(update_masks)
        return config

    def _generate_deployment_spec_or_raise(
        self,
        *,
        env_vars: Optional[dict[str, Union[str, Any]]] = None,
        psc_interface_config: Optional[types.PscInterfaceConfigDict] = None,
        min_instances: Optional[int] = None,
        max_instances: Optional[int] = None,
        resource_limits: Optional[dict[str, str]] = None,
        container_concurrency: Optional[int] = None,
    ) -> Tuple[dict[str, Any], Sequence[str]]:
        deployment_spec: dict[str, Any] = {}
        update_masks = []
        if env_vars:
            deployment_spec["env"] = []
            deployment_spec["secret_env"] = []
            if isinstance(env_vars, dict):
                self._update_deployment_spec_with_env_vars_dict_or_raise(
                    deployment_spec=deployment_spec,
                    env_vars=env_vars,
                )
            else:
                raise TypeError(f"env_vars must be a dict, but got {type(env_vars)}.")
            if deployment_spec.get("env"):
                update_masks.append("spec.deployment_spec.env")
            if deployment_spec.get("secret_env"):
                update_masks.append("spec.deployment_spec.secret_env")
        if psc_interface_config:
            deployment_spec["psc_interface_config"] = psc_interface_config
            update_masks.append("spec.deployment_spec.psc_interface_config")
        if min_instances is not None:
            if not 0 <= min_instances <= 10:
                raise ValueError(
                    f"min_instances must be between 0 and 10. Got {min_instances}"
                )
            deployment_spec["min_instances"] = min_instances
            update_masks.append("spec.deployment_spec.min_instances")
        if max_instances is not None:
            if psc_interface_config and not 1 <= max_instances <= 100:
                raise ValueError(
                    f"max_instances must be between 1 and 100 when PSC-I is enabled. Got {max_instances}"
                )
            elif not psc_interface_config and not 1 <= max_instances <= 1000:
                raise ValueError(
                    f"max_instances must be between 1 and 1000. Got {max_instances}"
                )
            deployment_spec["max_instances"] = max_instances
            update_masks.append("spec.deployment_spec.max_instances")
        if resource_limits:
            _agent_engines_utils._validate_resource_limits_or_raise(
                resource_limits=resource_limits
            )
            deployment_spec["resource_limits"] = resource_limits
            update_masks.append("spec.deployment_spec.resource_limits")
        if container_concurrency:
            deployment_spec["container_concurrency"] = container_concurrency
            update_masks.append("spec.deployment_spec.container_concurrency")
        return deployment_spec, update_masks

    def _update_deployment_spec_with_env_vars_dict_or_raise(
        self,
        *,
        deployment_spec: dict[str, Any],
        env_vars: dict[str, Any],
    ) -> None:
        for key, value in env_vars.items():
            if isinstance(value, dict):
                if "secret_env" not in deployment_spec:
                    deployment_spec["secret_env"] = []
                deployment_spec["secret_env"].append({"name": key, "secret_ref": value})
            elif isinstance(value, str):
                if "env" not in deployment_spec:
                    deployment_spec["env"] = []
                deployment_spec["env"].append({"name": key, "value": value})
            else:
                raise TypeError(
                    f"Unknown value type in env_vars for {key}. "
                    f"Must be a str or SecretRef: {value}"
                )

    def _register_api_methods(
        self,
        *,
        agent_engine: types.AgentEngine,
    ) -> types.AgentEngine:
        """Registers the API methods for the agent engine."""
        try:
            _agent_engines_utils._register_api_methods_or_raise(
                agent_engine=agent_engine,
                wrap_operation_fn={
                    "": _agent_engines_utils._wrap_query_operation,
                    "async": _agent_engines_utils._wrap_async_query_operation,
                    "stream": _agent_engines_utils._wrap_stream_query_operation,
                    "async_stream": _agent_engines_utils._wrap_async_stream_query_operation,
                    "a2a_extension": _agent_engines_utils._wrap_a2a_operation,
                },
            )
        except Exception as e:
            logger.warning(
                _agent_engines_utils._FAILED_TO_REGISTER_API_METHODS_WARNING_TEMPLATE, e
            )
        return agent_engine

    def list(
        self, *, config: Optional[types.ListAgentEngineConfigOrDict] = None
    ) -> Iterator[types.AgentEngine]:
        """List all instances of Agent Engine matching the filter.

        Example Usage:

        .. code-block:: python
            import vertexai

            client = vertexai.Client(project="my_project", location="us-central1")
            for agent in client.agent_engines.list(
                config={"filter": "'display_name="My Custom Agent"'},
            ):
                print(agent.api_resource.name)

        Args:
            config (ListAgentEngineConfig):
                Optional. The config (e.g. filter) for the agents to be listed.

        Returns:
            Iterable[AgentEngine]: An iterable of Agent Engines matching the filter.
        """

        for reasoning_engine in self._list_pager(config=config):
            yield types.AgentEngine(
                api_client=self,
                api_async_client=AsyncAgentEngines(api_client_=self._api_client),
                api_resource=reasoning_engine,
            )

    def update(
        self,
        *,
        name: str,
        agent: Any = None,
        agent_engine: Any = None,
        config: types.AgentEngineConfigOrDict,
    ) -> types.AgentEngine:
        """Updates an existing Agent Engine.

        This method updates the configuration of an existing Agent Engine running
        remotely, which is identified by its name.

        Args:
            name (str): Required. A fully-qualified resource name or ID such as
                "projects/123/locations/us-central1/reasoningEngines/456" or a
                shortened name such as "reasoningEngines/456".
            agent (Any):
                Optional. The instance to be used as the updated Agent Engine.
                If it is not specified, the existing instance will be used.
            agent_engine (Any):
                Optional. This is deprecated. Please use `agent` instead.
            config (AgentEngineConfig):
                Optional. The configurations to use for updating the Agent Engine.

        Returns:
            AgentEngine: The updated Agent Engine.

        Raises:
          ValueError: If the `project` was not set using `client.Client`.
          ValueError: If the `location` was not set using `client.Client`.
          ValueError: If `config.staging_bucket` was not set when `agent_engine`
          is specified.
          ValueError: If `config.staging_bucket` does not start with "gs://".
          ValueError: If `config.extra_packages` is specified but `agent_engine`
          is None.
          ValueError: If `config.requirements` is specified but `agent_engine` is
          None.
          ValueError: If `config.env_vars` has a dictionary entry that does not
          correspond to an environment variable value or a SecretRef.
          TypeError: If `config.env_vars` is not a dictionary.
          FileNotFoundError: If `config.extra_packages` includes a file or
          directory that does not exist.
          IOError: If `config.requirements` is a string that corresponds to a
          nonexistent file.
        """
        if isinstance(config, dict):
            config = types.AgentEngineConfig.model_validate(config)
        elif not isinstance(config, types.AgentEngineConfig):
            raise TypeError(
                f"config must be a dict or AgentEngineConfig, but got {type(config)}."
            )
        context_spec = config.context_spec
        if context_spec is not None:
            # Conversion to a dict for _create_config
            context_spec = context_spec.model_dump()
        if agent and agent_engine:
            raise ValueError("Please specify only one of `agent` or `agent_engine`.")
        elif agent_engine:
            raise DeprecationWarning(
                "The `agent_engine` argument is deprecated. Please use `agent` instead."
            )
        agent = agent or agent_engine
        api_config = self._create_config(
            mode="update",
            agent=agent,
            identity_type=config.identity_type,
            staging_bucket=config.staging_bucket,
            requirements=config.requirements,
            display_name=config.display_name,
            description=config.description,
            gcs_dir_name=config.gcs_dir_name,
            extra_packages=config.extra_packages,
            env_vars=config.env_vars,
            service_account=config.service_account,
            context_spec=context_spec,
            psc_interface_config=config.psc_interface_config,
            min_instances=config.min_instances,
            max_instances=config.max_instances,
            resource_limits=config.resource_limits,
            container_concurrency=config.container_concurrency,
            labels=config.labels,
            class_methods=config.class_methods,
            source_packages=config.source_packages,
            entrypoint_module=config.entrypoint_module,
            entrypoint_object=config.entrypoint_object,
            requirements_file=config.requirements_file,
            agent_framework=config.agent_framework,
            python_version=config.python_version,
            build_options=config.build_options,
        )
        operation = self._update(name=name, config=api_config)
        reasoning_engine_id = _agent_engines_utils._get_reasoning_engine_id(
            resource_name=name
        )
        logger.info(
            "View progress and logs at https://console.cloud.google.com/logs/query?"
            f"project={self._api_client.project}"
            "&query=resource.type%3D%22aiplatform.googleapis.com%2FReasoningEngine%22%0A"
            f"resource.labels.reasoning_engine_id%3D%22{reasoning_engine_id}%22."
        )
        operation = _agent_engines_utils._await_operation(
            operation_name=operation.name,
            get_operation_fn=self._get_agent_operation,
        )
        agent_engine = types.AgentEngine(
            api_client=self,
            api_async_client=AsyncAgentEngines(api_client_=self._api_client),
            api_resource=operation.response,
        )
        if agent_engine.api_resource:
            logger.info("Agent Engine updated. To use it in another session:")
            logger.info(
                f"agent_engine=client.agent_engines.get(name='{agent_engine.api_resource.name}')"
            )
        elif operation.error:
            raise RuntimeError(f"Failed to update Agent Engine: {operation.error}")
        if agent_engine.api_resource.spec:
            self._register_api_methods(agent_engine=agent_engine)
        return agent_engine

    def _stream_query(
        self, *, name: str, config: Optional[types.QueryAgentEngineConfigOrDict] = None
    ) -> Iterator[Any]:
        """Streams the response of the agent engine."""
        parameter_model = types._QueryAgentEngineRequestParameters(
            name=name,
            config=config,
        )
        request_dict = _QueryAgentEngineRequestParameters_to_vertex(parameter_model)
        request_url_dict = request_dict.get("_url")
        if request_url_dict:
            path = "{name}:streamQuery?alt=sse".format_map(request_url_dict)
        else:
            path = "{name}:streamQuery?alt=sse"
        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)
        http_options = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)
        for response in self._api_client.request_streamed(
            "post", path, request_dict, http_options
        ):
            yield response

    # TODO: b/436704146 - Replace with generated methods
    # TODO: b/437129724 - Add replay test for async stream query
    async def _async_stream_query(
        self,
        *,
        name: str,
        config: Optional[types.QueryAgentEngineConfigOrDict] = None,
    ) -> AsyncIterator[Any]:
        """Streams the response of the agent engine asynchronously."""
        parameter_model = types._QueryAgentEngineRequestParameters(
            name=name,
            config=config,
        )
        request_dict = _QueryAgentEngineRequestParameters_to_vertex(parameter_model)
        request_url_dict = request_dict.get("_url")
        if request_url_dict:
            path = "{name}:streamQuery?alt=sse".format_map(request_url_dict)
        else:
            path = "{name}:streamQuery?alt=sse"
        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)
        http_options = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)
        async_iterator = await self._api_client.async_request_streamed(
            "post", path, request_dict, http_options
        )
        async for response in async_iterator:
            yield response

    def create_memory(
        self,
        *,
        name: str,
        fact: str,
        scope: dict[str, str],
        config: Optional[types.AgentEngineMemoryConfigOrDict] = None,
    ) -> types.AgentEngineMemoryOperation:
        """Deprecated. Use agent_engines.memories.create instead."""
        warnings.warn(
            (
                "agent_engines.create_memory is deprecated. "
                "Use agent_engines.memories.create instead."
            ),
            DeprecationWarning,
            stacklevel=2,
        )
        return self.memories.create(
            name=name,
            fact=fact,
            scope=scope,
            config=config,
        )

    def delete_memory(
        self,
        *,
        name: str,
        config: Optional[types.DeleteAgentEngineMemoryConfigOrDict] = None,
    ) -> types.DeleteAgentEngineMemoryOperation:
        """Deprecated. Use agent_engines.memories.delete instead."""
        warnings.warn(
            (
                "agent_engines.delete_memory is deprecated. "
                "Use agent_engines.memories.delete instead."
            ),
            DeprecationWarning,
            stacklevel=2,
        )
        return self.memories.delete(name=name, config=config)

    def generate_memories(
        self,
        *,
        name: str,
        vertex_session_source: Optional[
            types.GenerateMemoriesRequestVertexSessionSourceOrDict
        ] = None,
        direct_contents_source: Optional[
            types.GenerateMemoriesRequestDirectContentsSourceOrDict
        ] = None,
        direct_memories_source: Optional[
            types.GenerateMemoriesRequestDirectMemoriesSourceOrDict
        ] = None,
        scope: Optional[dict[str, str]] = None,
        config: Optional[types.GenerateAgentEngineMemoriesConfigOrDict] = None,
    ) -> types.AgentEngineGenerateMemoriesOperation:
        """Deprecated. Use agent_engines.memories.generate instead."""
        warnings.warn(
            (
                "agent_engines.generate_memories is deprecated. "
                "Use agent_engines.memories.generate instead."
            ),
            DeprecationWarning,
            stacklevel=2,
        )
        return self.memories.generate(
            name=name,
            vertex_session_source=vertex_session_source,
            direct_contents_source=direct_contents_source,
            direct_memories_source=direct_memories_source,
            scope=scope,
            config=config,
        )

    def get_memory(
        self,
        *,
        name: str,
        config: Optional[types.GetAgentEngineMemoryConfigOrDict] = None,
    ) -> types.Memory:
        """Deprecated. Use agent_engines.memories.get instead."""
        warnings.warn(
            (
                "agent_engines.get_memory is deprecated. "
                "Use agent_engines.memories.get instead."
            ),
            DeprecationWarning,
            stacklevel=2,
        )
        return self.memories.get(name=name, config=config)

    def list_memories(
        self,
        *,
        name: str,
        config: Optional[types.ListAgentEngineMemoryConfigOrDict] = None,
    ) -> Iterator[types.Memory]:
        """Deprecated. Use agent_engines.memories.list instead."""
        warnings.warn(
            (
                "agent_engines.list_memories is deprecated. "
                "Use agent_engines.memories.list instead."
            ),
            DeprecationWarning,
            stacklevel=2,
        )
        return self.memories.list(name=name, config=config)

    def retrieve_memories(
        self,
        *,
        name: str,
        scope: dict[str, str],
        similarity_search_params: Optional[
            types.RetrieveMemoriesRequestSimilaritySearchParamsOrDict
        ] = None,
        simple_retrieval_params: Optional[
            types.RetrieveMemoriesRequestSimpleRetrievalParamsOrDict
        ] = None,
        config: Optional[types.RetrieveAgentEngineMemoriesConfigOrDict] = None,
    ) -> Iterator[types.RetrieveMemoriesResponseRetrievedMemory]:
        """Deprecated. Use agent_engines.memories.retrieve instead."""
        warnings.warn(
            (
                "agent_engines.retrieve_memories is deprecated. "
                "Use agent_engines.memories.retrieve instead."
            ),
            DeprecationWarning,
            stacklevel=2,
        )
        return self.memories.retrieve(
            name=name,
            scope=scope,
            similarity_search_params=similarity_search_params,
            simple_retrieval_params=simple_retrieval_params,
            config=config,
        )

    def create_session(
        self,
        *,
        name: str,
        user_id: str,
        config: Optional[types.CreateAgentEngineSessionConfigOrDict] = None,
    ) -> types.AgentEngineSessionOperation:
        """Deprecated. Use agent_engines.sessions.create instead."""
        warnings.warn(
            (
                "agent_engines.create_session is deprecated. "
                "Use agent_engines.sessions.create instead."
            ),
            DeprecationWarning,
            stacklevel=2,
        )
        return self.sessions.create(name=name, user_id=user_id, config=config)

    def delete_session(
        self,
        *,
        name: str,
        config: Optional[types.DeleteAgentEngineSessionConfigOrDict] = None,
    ) -> types.DeleteAgentEngineSessionOperation:
        """Deprecated. Use agent_engines.sessions.delete instead."""
        warnings.warn(
            (
                "agent_engines.delete_session is deprecated. "
                "Use agent_engines.sessions.delete instead."
            ),
            DeprecationWarning,
            stacklevel=2,
        )
        return self.sessions.delete(name=name, config=config)

    def get_session(
        self,
        *,
        name: str,
        config: Optional[types.GetAgentEngineSessionConfigOrDict] = None,
    ) -> types.Session:
        """Deprecated. Use agent_engines.sessions.get instead."""
        warnings.warn(
            (
                "agent_engines.get_session is deprecated. "
                "Use agent_engines.sessions.get instead."
            ),
            DeprecationWarning,
            stacklevel=2,
        )
        return self.sessions.get(name=name, config=config)

    def list_sessions(
        self,
        *,
        name: str,
        config: Optional[types.ListAgentEngineSessionsConfigOrDict] = None,
    ) -> Iterator[types.Session]:
        """Deprecated. Use agent_engines.sessions.list instead."""
        warnings.warn(
            (
                "agent_engines.list_sessions is deprecated. "
                "Use agent_engines.sessions.list instead."
            ),
            DeprecationWarning,
            stacklevel=2,
        )
        return self.sessions.list(name=name, config=config)

    def append_session_event(
        self,
        *,
        name: str,
        author: str,
        invocation_id: str,
        timestamp: datetime.datetime,
        config: Optional[types.AppendAgentEngineSessionEventConfigOrDict] = None,
    ) -> types.AppendAgentEngineSessionEventResponse:
        """Deprecated. Use agent_engines.sessions.events.append instead."""
        warnings.warn(
            (
                "agent_engines.append_session_event is deprecated. "
                "Use agent_engines.sessions.events.append instead."
            ),
            DeprecationWarning,
            stacklevel=2,
        )
        return self.sessions.events.append(
            name=name,
            author=author,
            invocation_id=invocation_id,
            timestamp=timestamp,
            config=config,
        )

    def list_session_events(
        self,
        *,
        name: str,
        config: Optional[types.ListAgentEngineSessionEventsConfigOrDict] = None,
    ) -> Iterator[types.SessionEvent]:
        """Deprecated. Use agent_engines.sessions.events.list instead."""
        warnings.warn(
            (
                "agent_engines.list_session_events is deprecated. "
                "Use agent_engines.sessions.events.list instead."
            ),
            DeprecationWarning,
            stacklevel=2,
        )
        return self.sessions.events.list(name=name, config=config)


class AsyncAgentEngines(_api_module.BaseModule):

    async def _create(
        self, *, config: Optional[types.CreateAgentEngineConfigOrDict] = None
    ) -> types.AgentEngineOperation:
        """
        Creates a new Agent Engine.
        """

        parameter_model = types._CreateAgentEngineRequestParameters(
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _CreateAgentEngineRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "reasoningEngines".format_map(request_url_dict)
            else:
                path = "reasoningEngines"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "post", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.AgentEngineOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def _delete(
        self,
        *,
        name: str,
        force: Optional[bool] = None,
        config: Optional[types.DeleteAgentEngineConfigOrDict] = None,
    ) -> types.DeleteAgentEngineOperation:
        """
        Delete an Agent Engine resource.

        Args:
            name (str):
                Required. The name of the Agent Engine to be deleted. Format:
                `projects/{project}/locations/{location}/reasoningEngines/{resource_id}`
                or `reasoningEngines/{resource_id}`.
            force (bool):
                Optional. If set to True, child resources will also be deleted.
                Otherwise, the request will fail with FAILED_PRECONDITION error when
                the Agent Engine has undeleted child resources. Defaults to False.
            config (DeleteAgentEngineConfig):
                Optional. Additional configurations for deleting the Agent Engine.

        """

        parameter_model = types._DeleteAgentEngineRequestParameters(
            name=name,
            force=force,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _DeleteAgentEngineRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}".format_map(request_url_dict)
            else:
                path = "{name}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "delete", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.DeleteAgentEngineOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def _get(
        self, *, name: str, config: Optional[types.GetAgentEngineConfigOrDict] = None
    ) -> types.ReasoningEngine:
        """
        Get an Agent Engine instance.
        """

        parameter_model = types._GetAgentEngineRequestParameters(
            name=name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _GetAgentEngineRequestParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}".format_map(request_url_dict)
            else:
                path = "{name}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "get", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.ReasoningEngine._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def _list(
        self, *, config: Optional[types.ListAgentEngineConfigOrDict] = None
    ) -> types.ListReasoningEnginesResponse:
        """
        Lists Agent Engines.
        """

        parameter_model = types._ListAgentEngineRequestParameters(
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _ListAgentEngineRequestParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "reasoningEngines".format_map(request_url_dict)
            else:
                path = "reasoningEngines"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "get", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.ListReasoningEnginesResponse._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def _get_agent_operation(
        self,
        *,
        operation_name: str,
        config: Optional[types.GetAgentEngineOperationConfigOrDict] = None,
    ) -> types.AgentEngineOperation:
        parameter_model = types._GetAgentEngineOperationParameters(
            operation_name=operation_name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _GetAgentEngineOperationParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{operationName}".format_map(request_url_dict)
            else:
                path = "{operationName}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "get", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.AgentEngineOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def _query(
        self, *, name: str, config: Optional[types.QueryAgentEngineConfigOrDict] = None
    ) -> types.QueryReasoningEngineResponse:
        """
        Query an Agent Engine.
        """

        parameter_model = types._QueryAgentEngineRequestParameters(
            name=name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _QueryAgentEngineRequestParameters_to_vertex(parameter_model)
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}:query".format_map(request_url_dict)
            else:
                path = "{name}:query"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "post", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.QueryReasoningEngineResponse._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def _update(
        self, *, name: str, config: Optional[types.UpdateAgentEngineConfigOrDict] = None
    ) -> types.AgentEngineOperation:
        """
        Updates an Agent Engine.
        """

        parameter_model = types._UpdateAgentEngineRequestParameters(
            name=name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _UpdateAgentEngineRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}".format_map(request_url_dict)
            else:
                path = "{name}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "patch", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.AgentEngineOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    _memories = None
    _sessions = None

    async def delete(
        self,
        *,
        name: str,
        force: Optional[bool] = None,
        config: Optional[types.DeleteAgentEngineConfigOrDict] = None,
    ) -> types.DeleteAgentEngineOperation:
        """
        Delete an Agent Engine resource.

        Args:
            name (str):
                Required. The name of the Agent Engine to be deleted. Format:
                `projects/{project}/locations/{location}/reasoningEngines/{resource_id}`
                or `reasoningEngines/{resource_id}`.
            force (bool):
                Optional. If set to True, child resources will also be deleted.
                Otherwise, the request will fail with FAILED_PRECONDITION error when
                the Agent Engine has undeleted child resources. Defaults to False.
            config (DeleteAgentEngineConfig):
                Optional. Additional configurations for deleting the Agent Engine.

        """
        logger.info(f"Deleting AgentEngine resource: {name}")
        operation = await self._delete(name=name, force=force, config=config)
        logger.info(f"Started AgentEngine delete operation: {operation.name}")
        return operation

    @property
    def memories(self):
        if self._memories is None:
            try:
                # We need to lazy load the memories module to handle the
                # possibility of ImportError when dependencies are not installed.
                self._memories = importlib.import_module(".memories", __package__)
            except ImportError as e:
                raise ImportError(
                    "The 'agent_engines.memories' module requires additional "
                    "packages. Please install them using pip install "
                    "google-cloud-aiplatform[agent_engines]"
                ) from e
        return self._memories.AsyncMemories(self._api_client)

    @property
    def sessions(self):
        if self._sessions is None:
            try:
                # We need to lazy load the sessions module to handle the
                # possibility of ImportError when dependencies are not installed.
                self._sessions = importlib.import_module(".sessions", __package__)
            except ImportError as e:
                raise ImportError(
                    "The agent_engines.sessions module requires additional packages. "
                    "Please install them using pip install "
                    "google-cloud-aiplatform[agent_engines]"
                ) from e
        return self._sessions.AsyncSessions(self._api_client)

    async def append_session_event(
        self,
        *,
        name: str,
        author: str,
        invocation_id: str,
        timestamp: datetime.datetime,
        config: Optional[types.AppendAgentEngineSessionEventConfigOrDict] = None,
    ) -> types.AppendAgentEngineSessionEventResponse:
        """Deprecated. Use agent_engines.sessions.events.append instead."""
        warnings.warn(
            (
                "agent_engines.append_session_event is deprecated. "
                "Use agent_engines.sessions.events.append instead."
            ),
            DeprecationWarning,
            stacklevel=2,
        )
        return await self.sessions.events.append(name=name, config=config)

    async def delete_memory(
        self,
        *,
        name: str,
        config: Optional[types.DeleteAgentEngineMemoryConfigOrDict] = None,
    ) -> types.DeleteAgentEngineMemoryOperation:
        """Deprecated. Use agent_engines.memories.delete instead."""
        warnings.warn(
            (
                "agent_engines.delete_memory is deprecated. "
                "Use agent_engines.memories.delete instead."
            ),
            DeprecationWarning,
            stacklevel=2,
        )
        return await self.memories.delete(name=name, config=config)

    async def delete_session(
        self,
        *,
        name: str,
        config: Optional[types.DeleteAgentEngineSessionConfigOrDict] = None,
    ) -> types.DeleteAgentEngineSessionOperation:
        """Deprecated. Use agent_engines.sessions.delete instead."""
        warnings.warn(
            (
                "agent_engines.delete_session is deprecated. "
                "Use agent_engines.sessions.delete instead."
            ),
            DeprecationWarning,
            stacklevel=2,
        )
        return await self.sessions.delete(name=name, config=config)

    async def get_memory(
        self,
        *,
        name: str,
        config: Optional[types.GetAgentEngineMemoryConfigOrDict] = None,
    ) -> types.Memory:
        """Deprecated. Use agent_engines.memories.get instead."""
        warnings.warn(
            (
                "agent_engines.get_memory is deprecated. "
                "Use agent_engines.memories.get instead."
            ),
            DeprecationWarning,
            stacklevel=2,
        )
        return await self.memories.get(name=name, config=config)

    async def get_session(
        self,
        *,
        name: str,
        config: Optional[types.GetAgentEngineSessionConfigOrDict] = None,
    ) -> types.Session:
        """Deprecated. Use agent_engines.sessions.get instead."""
        warnings.warn(
            (
                "agent_engines.get_session is deprecated. "
                "Use agent_engines.sessions.get instead."
            ),
            DeprecationWarning,
            stacklevel=2,
        )
        return await self.sessions.get(name=name, config=config)
