# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Code generated by the Google Gen AI SDK generator DO NOT EDIT.

import functools
import importlib
import json
import logging
from typing import Any, Iterator, Optional, Union
from urllib.parse import urlencode

from google.genai import _api_module
from google.genai import _common
from google.genai._common import get_value_by_path as getv
from google.genai._common import set_value_by_path as setv
from google.genai.pagers import AsyncPager, Pager

from . import _agent_engines_utils
from . import types


logger = logging.getLogger("vertexai_genai.sessions")

logger.setLevel(logging.INFO)


def _CreateAgentEngineSessionConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}

    if getv(from_object, ["display_name"]) is not None:
        setv(parent_object, ["displayName"], getv(from_object, ["display_name"]))

    if getv(from_object, ["session_state"]) is not None:
        setv(parent_object, ["sessionState"], getv(from_object, ["session_state"]))

    if getv(from_object, ["ttl"]) is not None:
        setv(parent_object, ["ttl"], getv(from_object, ["ttl"]))

    if getv(from_object, ["expire_time"]) is not None:
        setv(parent_object, ["expireTime"], getv(from_object, ["expire_time"]))

    if getv(from_object, ["labels"]) is not None:
        setv(parent_object, ["labels"], getv(from_object, ["labels"]))

    return to_object


def _CreateAgentEngineSessionRequestParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["name"]) is not None:
        setv(to_object, ["_url", "name"], getv(from_object, ["name"]))

    if getv(from_object, ["user_id"]) is not None:
        setv(to_object, ["userId"], getv(from_object, ["user_id"]))

    if getv(from_object, ["config"]) is not None:
        setv(
            to_object,
            ["config"],
            _CreateAgentEngineSessionConfig_to_vertex(
                getv(from_object, ["config"]), to_object
            ),
        )

    return to_object


def _DeleteAgentEngineSessionRequestParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["name"]) is not None:
        setv(to_object, ["_url", "name"], getv(from_object, ["name"]))

    if getv(from_object, ["config"]) is not None:
        setv(to_object, ["config"], getv(from_object, ["config"]))

    return to_object


def _GetAgentEngineSessionOperationParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["operation_name"]) is not None:
        setv(
            to_object, ["_url", "operationName"], getv(from_object, ["operation_name"])
        )

    if getv(from_object, ["config"]) is not None:
        setv(to_object, ["config"], getv(from_object, ["config"]))

    return to_object


def _GetAgentEngineSessionRequestParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["name"]) is not None:
        setv(to_object, ["_url", "name"], getv(from_object, ["name"]))

    if getv(from_object, ["config"]) is not None:
        setv(to_object, ["config"], getv(from_object, ["config"]))

    return to_object


def _ListAgentEngineSessionsConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}

    if getv(from_object, ["page_size"]) is not None:
        setv(parent_object, ["_query", "pageSize"], getv(from_object, ["page_size"]))

    if getv(from_object, ["page_token"]) is not None:
        setv(parent_object, ["_query", "pageToken"], getv(from_object, ["page_token"]))

    if getv(from_object, ["filter"]) is not None:
        setv(parent_object, ["_query", "filter"], getv(from_object, ["filter"]))

    return to_object


def _ListAgentEngineSessionsRequestParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["name"]) is not None:
        setv(to_object, ["_url", "name"], getv(from_object, ["name"]))

    if getv(from_object, ["config"]) is not None:
        setv(
            to_object,
            ["config"],
            _ListAgentEngineSessionsConfig_to_vertex(
                getv(from_object, ["config"]), to_object
            ),
        )

    return to_object


def _UpdateAgentEngineSessionConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}

    if getv(from_object, ["display_name"]) is not None:
        setv(parent_object, ["displayName"], getv(from_object, ["display_name"]))

    if getv(from_object, ["session_state"]) is not None:
        setv(parent_object, ["sessionState"], getv(from_object, ["session_state"]))

    if getv(from_object, ["ttl"]) is not None:
        setv(parent_object, ["ttl"], getv(from_object, ["ttl"]))

    if getv(from_object, ["expire_time"]) is not None:
        setv(parent_object, ["expireTime"], getv(from_object, ["expire_time"]))

    if getv(from_object, ["labels"]) is not None:
        setv(parent_object, ["labels"], getv(from_object, ["labels"]))

    if getv(from_object, ["update_mask"]) is not None:
        setv(
            parent_object, ["_query", "updateMask"], getv(from_object, ["update_mask"])
        )

    if getv(from_object, ["user_id"]) is not None:
        setv(parent_object, ["userId"], getv(from_object, ["user_id"]))

    return to_object


def _UpdateAgentEngineSessionRequestParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["name"]) is not None:
        setv(to_object, ["_url", "name"], getv(from_object, ["name"]))

    if getv(from_object, ["config"]) is not None:
        setv(
            to_object,
            ["config"],
            _UpdateAgentEngineSessionConfig_to_vertex(
                getv(from_object, ["config"]), to_object
            ),
        )

    return to_object


class Sessions(_api_module.BaseModule):

    def _create(
        self,
        *,
        name: str,
        user_id: str,
        config: Optional[types.CreateAgentEngineSessionConfigOrDict] = None,
    ) -> types.AgentEngineSessionOperation:
        """
        Creates a new session in the Agent Engine.

        Args:
            name (str): Required. The name of the Agent Engine to create the session under. Format:
                `projects/{project}/locations/{location}/reasoningEngines/{resource_id}`.
            user_id (str): Required. The user ID of the session.
            config (CreateAgentEngineSessionConfig):
                Optional. Additional configurations for creating the Agent Engine session.

        Returns:
            AgentEngineSessionOperation: The operation for creating the Agent Engine session.

        """

        parameter_model = types._CreateAgentEngineSessionRequestParameters(
            name=name,
            user_id=user_id,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _CreateAgentEngineSessionRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}/sessions".format_map(request_url_dict)
            else:
                path = "{name}/sessions"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("post", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.AgentEngineSessionOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def delete(
        self,
        *,
        name: str,
        config: Optional[types.DeleteAgentEngineSessionConfigOrDict] = None,
    ) -> types.DeleteAgentEngineSessionOperation:
        """
        Delete an Agent Engine session.

        Args:
            name (str): Required. The name of the Agent Engine session to be deleted. Format:
                `projects/{project}/locations/{location}/reasoningEngines/{resource_id}/sessions/{session_id}`.
            config (DeleteAgentEngineSessionConfig):
                Optional. Additional configurations for deleting the Agent Engine session.

        Returns:
            DeleteAgentEngineSessionOperation: The operation for deleting the Agent Engine session.

        """

        parameter_model = types._DeleteAgentEngineSessionRequestParameters(
            name=name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _DeleteAgentEngineSessionRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}".format_map(request_url_dict)
            else:
                path = "{name}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("delete", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.DeleteAgentEngineSessionOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def get(
        self,
        *,
        name: str,
        config: Optional[types.GetAgentEngineSessionConfigOrDict] = None,
    ) -> types.Session:
        """
        Gets an agent engine session.

        Args:
            name (str): Required. The name of the Agent Engine session to get. Format:
                `projects/{project}/locations/{location}/reasoningEngines/{resource_id}/sessions/{session_id}`.
            config (GetAgentEngineSessionConfig):
                Optional. Additional configurations for getting the Agent Engine session.

        Returns:
            AgentEngineSession: The requested Agent Engine session.

        """

        parameter_model = types._GetAgentEngineSessionRequestParameters(
            name=name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _GetAgentEngineSessionRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}".format_map(request_url_dict)
            else:
                path = "{name}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("get", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.Session._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def _list(
        self,
        *,
        name: str,
        config: Optional[types.ListAgentEngineSessionsConfigOrDict] = None,
    ) -> types.ListReasoningEnginesSessionsResponse:
        """
        Lists Agent Engine sessions.

        Args:
            name (str): Required. The name of the Agent Engine to list sessions for. Format:
                `projects/{project}/locations/{location}/reasoningEngines/{resource_id}`.
            config (ListAgentEngineSessionsConfig):
                Optional. Additional configurations for listing the Agent Engine sessions.

        Returns:
            ListReasoningEnginesSessionsResponse: The requested Agent Engine sessions.

        """

        parameter_model = types._ListAgentEngineSessionsRequestParameters(
            name=name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _ListAgentEngineSessionsRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}/sessions".format_map(request_url_dict)
            else:
                path = "{name}/sessions"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("get", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.ListReasoningEnginesSessionsResponse._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def _get_session_operation(
        self,
        *,
        operation_name: str,
        config: Optional[types.GetAgentEngineOperationConfigOrDict] = None,
    ) -> types.AgentEngineSessionOperation:
        parameter_model = types._GetAgentEngineSessionOperationParameters(
            operation_name=operation_name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _GetAgentEngineSessionOperationParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{operationName}".format_map(request_url_dict)
            else:
                path = "{operationName}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("get", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.AgentEngineSessionOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def _update(
        self,
        *,
        name: str,
        config: Optional[types.UpdateAgentEngineSessionConfigOrDict] = None,
    ) -> types.AgentEngineSessionOperation:
        """
        Updates an Agent Engine session.

        Args:
            name (str): Required. The name of the Agent Engine session to be updated. Format:
                `projects/{project}/locations/{location}/reasoningEngines/{resource_id}/sessions/{session_id}`.
            config (UpdateAgentEngineSessionConfig):
                Optional. Additional configurations for updating the Agent Engine session.

        Returns:
            AgentEngineSessionOperation: The operation for updating the Agent Engine session.

        """

        parameter_model = types._UpdateAgentEngineSessionRequestParameters(
            name=name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _UpdateAgentEngineSessionRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}".format_map(request_url_dict)
            else:
                path = "{name}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("patch", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.AgentEngineSessionOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    _events = None

    @property
    @_common.experimental_warning(
        "The Vertex SDK GenAI agent_engines.sessions.events module is "
        "experimental, and may change in future versions."
    )
    def events(self):
        if self._events is None:
            try:
                # We need to lazy load the sessions.events module to handle the
                # possibility of ImportError when dependencies are not installed.
                self._events = importlib.import_module(".session_events", __package__)
            except ImportError as e:
                raise ImportError(
                    "The 'agent_engines.sessions.events' module requires"
                    "additional packages. Please install them using pip install "
                    "google-cloud-aiplatform[agent_engines]"
                ) from e
        return self._events.SessionEvents(self._api_client)

    def create(
        self,
        *,
        name: str,
        user_id: str,
        config: Optional[types.CreateAgentEngineSessionConfigOrDict] = None,
    ) -> types.AgentEngineSessionOperation:
        """Creates a new session in the Agent Engine.

        Args:
            name (str):
                Required. The name of the agent engine to create the session for.
            user_id (str):
                Required. The user ID of the session.
            config (CreateAgentEngineSessionConfig):
                Optional. The configuration for the session to create.

        Returns:
            AgentEngineSessionOperation: The operation for creating the session.
        """
        if config is None:
            config = types.CreateAgentEngineSessionConfig()
        elif isinstance(config, dict):
            config = types.CreateAgentEngineSessionConfig.model_validate(config)
        operation = self._create(
            name=name,
            user_id=user_id,
            config=config,
        )
        if config.wait_for_completion and not operation.done:
            operation = _agent_engines_utils._await_operation(
                operation_name=operation.name,
                get_operation_fn=self._get_session_operation,
                poll_interval_seconds=0.5,
            )
            if operation.response:
                operation.response = self.get(name=operation.response.name)
            elif operation.error:
                raise RuntimeError(f"Failed to create session: {operation.error}")
            else:
                raise RuntimeError(
                    "Error retrieving session from the operation response. "
                    f"Operation name: {operation.name}"
                )
        return operation

    def list(
        self,
        *,
        name: str,
        config: Optional[types.ListAgentEngineSessionsConfigOrDict] = None,
    ) -> Iterator[types.Session]:
        """Lists Agent Engine sessions.

        Args:
            name (str): Required. The name of the agent engine to list sessions
                for.
            config (ListAgentEngineSessionConfig): Optional. The configuration
                for the sessions to list.

        Returns:
            Iterable[Session]: An iterable of sessions.
        """

        return Pager(
            "sessions",
            functools.partial(self._list, name=name),
            self._list(name=name, config=config),
            config,
        )


class AsyncSessions(_api_module.BaseModule):

    async def _create(
        self,
        *,
        name: str,
        user_id: str,
        config: Optional[types.CreateAgentEngineSessionConfigOrDict] = None,
    ) -> types.AgentEngineSessionOperation:
        """
        Creates a new session in the Agent Engine.

        Args:
            name (str): Required. The name of the Agent Engine to create the session under. Format:
                `projects/{project}/locations/{location}/reasoningEngines/{resource_id}`.
            user_id (str): Required. The user ID of the session.
            config (CreateAgentEngineSessionConfig):
                Optional. Additional configurations for creating the Agent Engine session.

        Returns:
            AgentEngineSessionOperation: The operation for creating the Agent Engine session.

        """

        parameter_model = types._CreateAgentEngineSessionRequestParameters(
            name=name,
            user_id=user_id,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _CreateAgentEngineSessionRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}/sessions".format_map(request_url_dict)
            else:
                path = "{name}/sessions"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "post", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.AgentEngineSessionOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def delete(
        self,
        *,
        name: str,
        config: Optional[types.DeleteAgentEngineSessionConfigOrDict] = None,
    ) -> types.DeleteAgentEngineSessionOperation:
        """
        Delete an Agent Engine session.

        Args:
            name (str): Required. The name of the Agent Engine session to be deleted. Format:
                `projects/{project}/locations/{location}/reasoningEngines/{resource_id}/sessions/{session_id}`.
            config (DeleteAgentEngineSessionConfig):
                Optional. Additional configurations for deleting the Agent Engine session.

        Returns:
            DeleteAgentEngineSessionOperation: The operation for deleting the Agent Engine session.

        """

        parameter_model = types._DeleteAgentEngineSessionRequestParameters(
            name=name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _DeleteAgentEngineSessionRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}".format_map(request_url_dict)
            else:
                path = "{name}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "delete", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.DeleteAgentEngineSessionOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def get(
        self,
        *,
        name: str,
        config: Optional[types.GetAgentEngineSessionConfigOrDict] = None,
    ) -> types.Session:
        """
        Gets an agent engine session.

        Args:
            name (str): Required. The name of the Agent Engine session to get. Format:
                `projects/{project}/locations/{location}/reasoningEngines/{resource_id}/sessions/{session_id}`.
            config (GetAgentEngineSessionConfig):
                Optional. Additional configurations for getting the Agent Engine session.

        Returns:
            AgentEngineSession: The requested Agent Engine session.

        """

        parameter_model = types._GetAgentEngineSessionRequestParameters(
            name=name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _GetAgentEngineSessionRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}".format_map(request_url_dict)
            else:
                path = "{name}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "get", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.Session._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def _list(
        self,
        *,
        name: str,
        config: Optional[types.ListAgentEngineSessionsConfigOrDict] = None,
    ) -> types.ListReasoningEnginesSessionsResponse:
        """
        Lists Agent Engine sessions.

        Args:
            name (str): Required. The name of the Agent Engine to list sessions for. Format:
                `projects/{project}/locations/{location}/reasoningEngines/{resource_id}`.
            config (ListAgentEngineSessionsConfig):
                Optional. Additional configurations for listing the Agent Engine sessions.

        Returns:
            ListReasoningEnginesSessionsResponse: The requested Agent Engine sessions.

        """

        parameter_model = types._ListAgentEngineSessionsRequestParameters(
            name=name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _ListAgentEngineSessionsRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}/sessions".format_map(request_url_dict)
            else:
                path = "{name}/sessions"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "get", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.ListReasoningEnginesSessionsResponse._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def _get_session_operation(
        self,
        *,
        operation_name: str,
        config: Optional[types.GetAgentEngineOperationConfigOrDict] = None,
    ) -> types.AgentEngineSessionOperation:
        parameter_model = types._GetAgentEngineSessionOperationParameters(
            operation_name=operation_name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _GetAgentEngineSessionOperationParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{operationName}".format_map(request_url_dict)
            else:
                path = "{operationName}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "get", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.AgentEngineSessionOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def _update(
        self,
        *,
        name: str,
        config: Optional[types.UpdateAgentEngineSessionConfigOrDict] = None,
    ) -> types.AgentEngineSessionOperation:
        """
        Updates an Agent Engine session.

        Args:
            name (str): Required. The name of the Agent Engine session to be updated. Format:
                `projects/{project}/locations/{location}/reasoningEngines/{resource_id}/sessions/{session_id}`.
            config (UpdateAgentEngineSessionConfig):
                Optional. Additional configurations for updating the Agent Engine session.

        Returns:
            AgentEngineSessionOperation: The operation for updating the Agent Engine session.

        """

        parameter_model = types._UpdateAgentEngineSessionRequestParameters(
            name=name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _UpdateAgentEngineSessionRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}".format_map(request_url_dict)
            else:
                path = "{name}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "patch", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.AgentEngineSessionOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    _events = None

    @property
    @_common.experimental_warning(
        "The Vertex SDK GenAI agent_engines.sessions.events module is "
        "experimental, and may change in future versions."
    )
    def events(self):
        if self._events is None:
            try:
                # We need to lazy load the sessions.events module to handle the
                # possibility of ImportError when dependencies are not installed.
                self._events = importlib.import_module(".session_events", __package__)
            except ImportError as e:
                raise ImportError(
                    "The 'agent_engines.sessions.events' module requires"
                    "additional packages. Please install them using pip install "
                    "google-cloud-aiplatform[agent_engines]"
                ) from e
        return self._events.AsyncSessionEvents(self._api_client)

    async def create(
        self,
        *,
        name: str,
        user_id: str,
        config: Optional[types.CreateAgentEngineSessionConfigOrDict] = None,
    ) -> types.AgentEngineSessionOperation:
        """Creates a new session in the Agent Engine.

        Args:
            name (str):
                Required. The name of the agent engine to create the session for.
            user_id (str):
                Required. The user ID of the session.
            config (CreateAgentEngineSessionConfig):
                Optional. The configuration for the session to create.

        Returns:
            AgentEngineSessionOperation: The operation for creating the session.
        """
        if config is None:
            config = types.CreateAgentEngineSessionConfig()
        elif isinstance(config, dict):
            config = types.CreateAgentEngineSessionConfig.model_validate(config)
        operation = await self._create(
            name=name,
            user_id=user_id,
            config=config,
        )
        if config.wait_for_completion and not operation.done:
            operation = await _agent_engines_utils._await_async_operation(
                operation_name=operation.name,
                get_operation_fn=self._get_session_operation,
                poll_interval_seconds=0.5,
            )
            if operation.response:
                operation.response = await self.get(name=operation.response.name)
            elif operation.error:
                raise RuntimeError(f"Failed to create session: {operation.error}")
            else:
                raise RuntimeError(
                    "Error retrieving session from the operation response. "
                    f"Operation name: {operation.name}"
                )
        return operation

    async def list(
        self,
        *,
        name: str,
        config: Optional[types.ListAgentEngineSessionsConfigOrDict] = None,
    ) -> AsyncPager[types.Session]:
        """Lists Agent Engine sessions.

        Args:
            name (str): Required. The name of the agent engine to list sessions
                for.
            config (ListAgentEngineSessionConfig): Optional. The configuration
                for the sessions to list.

        Returns:
            AsyncPager[Session]: An async pager of sessions.
        """

        return AsyncPager(
            "sessions",
            functools.partial(self._list, name=name),
            await self._list(name=name, config=config),
            config,
        )
