# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Code generated by the Google Gen AI SDK generator DO NOT EDIT.

import functools
import importlib
import json
import logging
from typing import Any, Iterator, Optional, Union
from urllib.parse import urlencode

from google.genai import _api_module
from google.genai import _common
from google.genai._common import get_value_by_path as getv
from google.genai._common import set_value_by_path as setv
from google.genai.pagers import AsyncPager, Pager

from . import _agent_engines_utils
from . import types


logger = logging.getLogger("vertexai_genai.memories")

logger.setLevel(logging.INFO)


def _AgentEngineMemoryConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}

    if getv(from_object, ["display_name"]) is not None:
        setv(parent_object, ["displayName"], getv(from_object, ["display_name"]))

    if getv(from_object, ["description"]) is not None:
        setv(parent_object, ["description"], getv(from_object, ["description"]))

    if getv(from_object, ["ttl"]) is not None:
        setv(parent_object, ["ttl"], getv(from_object, ["ttl"]))

    if getv(from_object, ["expire_time"]) is not None:
        setv(parent_object, ["expireTime"], getv(from_object, ["expire_time"]))

    if getv(from_object, ["revision_expire_time"]) is not None:
        setv(
            parent_object,
            ["revisionExpireTime"],
            getv(from_object, ["revision_expire_time"]),
        )

    if getv(from_object, ["revision_ttl"]) is not None:
        setv(parent_object, ["revisionTtl"], getv(from_object, ["revision_ttl"]))

    if getv(from_object, ["disable_memory_revisions"]) is not None:
        setv(
            parent_object,
            ["disableMemoryRevisions"],
            getv(from_object, ["disable_memory_revisions"]),
        )

    if getv(from_object, ["topics"]) is not None:
        setv(
            parent_object, ["topics"], [item for item in getv(from_object, ["topics"])]
        )

    return to_object


def _CreateAgentEngineMemoryRequestParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["name"]) is not None:
        setv(to_object, ["_url", "name"], getv(from_object, ["name"]))

    if getv(from_object, ["fact"]) is not None:
        setv(to_object, ["fact"], getv(from_object, ["fact"]))

    if getv(from_object, ["scope"]) is not None:
        setv(to_object, ["scope"], getv(from_object, ["scope"]))

    if getv(from_object, ["config"]) is not None:
        setv(
            to_object,
            ["config"],
            _AgentEngineMemoryConfig_to_vertex(
                getv(from_object, ["config"]), to_object
            ),
        )

    return to_object


def _DeleteAgentEngineMemoryRequestParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["name"]) is not None:
        setv(to_object, ["_url", "name"], getv(from_object, ["name"]))

    if getv(from_object, ["config"]) is not None:
        setv(to_object, ["config"], getv(from_object, ["config"]))

    return to_object


def _GenerateAgentEngineMemoriesConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}

    if getv(from_object, ["disable_consolidation"]) is not None:
        setv(
            parent_object,
            ["disableConsolidation"],
            getv(from_object, ["disable_consolidation"]),
        )

    if getv(from_object, ["revision_labels"]) is not None:
        setv(parent_object, ["revisionLabels"], getv(from_object, ["revision_labels"]))

    if getv(from_object, ["revision_expire_time"]) is not None:
        setv(
            parent_object,
            ["revisionExpireTime"],
            getv(from_object, ["revision_expire_time"]),
        )

    if getv(from_object, ["revision_ttl"]) is not None:
        setv(parent_object, ["revisionTtl"], getv(from_object, ["revision_ttl"]))

    if getv(from_object, ["disable_memory_revisions"]) is not None:
        setv(
            parent_object,
            ["disableMemoryRevisions"],
            getv(from_object, ["disable_memory_revisions"]),
        )

    return to_object


def _GenerateAgentEngineMemoriesRequestParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["name"]) is not None:
        setv(to_object, ["_url", "name"], getv(from_object, ["name"]))

    if getv(from_object, ["vertex_session_source"]) is not None:
        setv(
            to_object,
            ["vertexSessionSource"],
            getv(from_object, ["vertex_session_source"]),
        )

    if getv(from_object, ["direct_contents_source"]) is not None:
        setv(
            to_object,
            ["directContentsSource"],
            getv(from_object, ["direct_contents_source"]),
        )

    if getv(from_object, ["direct_memories_source"]) is not None:
        setv(
            to_object,
            ["directMemoriesSource"],
            getv(from_object, ["direct_memories_source"]),
        )

    if getv(from_object, ["scope"]) is not None:
        setv(to_object, ["scope"], getv(from_object, ["scope"]))

    if getv(from_object, ["config"]) is not None:
        setv(
            to_object,
            ["config"],
            _GenerateAgentEngineMemoriesConfig_to_vertex(
                getv(from_object, ["config"]), to_object
            ),
        )

    return to_object


def _GetAgentEngineGenerateMemoriesOperationParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["operation_name"]) is not None:
        setv(
            to_object, ["_url", "operationName"], getv(from_object, ["operation_name"])
        )

    if getv(from_object, ["config"]) is not None:
        setv(to_object, ["config"], getv(from_object, ["config"]))

    return to_object


def _GetAgentEngineMemoryOperationParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["operation_name"]) is not None:
        setv(
            to_object, ["_url", "operationName"], getv(from_object, ["operation_name"])
        )

    if getv(from_object, ["config"]) is not None:
        setv(to_object, ["config"], getv(from_object, ["config"]))

    return to_object


def _GetAgentEngineMemoryRequestParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["name"]) is not None:
        setv(to_object, ["_url", "name"], getv(from_object, ["name"]))

    if getv(from_object, ["config"]) is not None:
        setv(to_object, ["config"], getv(from_object, ["config"]))

    return to_object


def _ListAgentEngineMemoryConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}

    if getv(from_object, ["page_size"]) is not None:
        setv(parent_object, ["_query", "pageSize"], getv(from_object, ["page_size"]))

    if getv(from_object, ["page_token"]) is not None:
        setv(parent_object, ["_query", "pageToken"], getv(from_object, ["page_token"]))

    if getv(from_object, ["filter"]) is not None:
        setv(parent_object, ["_query", "filter"], getv(from_object, ["filter"]))

    if getv(from_object, ["order_by"]) is not None:
        setv(parent_object, ["_query", "orderBy"], getv(from_object, ["order_by"]))

    return to_object


def _ListAgentEngineMemoryRequestParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["name"]) is not None:
        setv(to_object, ["_url", "name"], getv(from_object, ["name"]))

    if getv(from_object, ["config"]) is not None:
        setv(
            to_object,
            ["config"],
            _ListAgentEngineMemoryConfig_to_vertex(
                getv(from_object, ["config"]), to_object
            ),
        )

    return to_object


def _PurgeAgentEngineMemoriesRequestParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["name"]) is not None:
        setv(to_object, ["_url", "name"], getv(from_object, ["name"]))

    if getv(from_object, ["filter"]) is not None:
        setv(to_object, ["filter"], getv(from_object, ["filter"]))

    if getv(from_object, ["force"]) is not None:
        setv(to_object, ["force"], getv(from_object, ["force"]))

    if getv(from_object, ["config"]) is not None:
        setv(to_object, ["config"], getv(from_object, ["config"]))

    return to_object


def _RetrieveAgentEngineMemoriesConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}

    if getv(from_object, ["filter"]) is not None:
        setv(parent_object, ["filter"], getv(from_object, ["filter"]))

    return to_object


def _RetrieveAgentEngineMemoriesRequestParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["name"]) is not None:
        setv(to_object, ["_url", "name"], getv(from_object, ["name"]))

    if getv(from_object, ["scope"]) is not None:
        setv(to_object, ["scope"], getv(from_object, ["scope"]))

    if getv(from_object, ["similarity_search_params"]) is not None:
        setv(
            to_object,
            ["similaritySearchParams"],
            getv(from_object, ["similarity_search_params"]),
        )

    if getv(from_object, ["simple_retrieval_params"]) is not None:
        setv(
            to_object,
            ["simpleRetrievalParams"],
            getv(from_object, ["simple_retrieval_params"]),
        )

    if getv(from_object, ["config"]) is not None:
        setv(
            to_object,
            ["config"],
            _RetrieveAgentEngineMemoriesConfig_to_vertex(
                getv(from_object, ["config"]), to_object
            ),
        )

    return to_object


def _RollbackAgentEngineMemoryRequestParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["name"]) is not None:
        setv(to_object, ["_url", "name"], getv(from_object, ["name"]))

    if getv(from_object, ["target_revision_id"]) is not None:
        setv(to_object, ["targetRevisionId"], getv(from_object, ["target_revision_id"]))

    if getv(from_object, ["config"]) is not None:
        setv(to_object, ["config"], getv(from_object, ["config"]))

    return to_object


def _UpdateAgentEngineMemoryConfig_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}

    if getv(from_object, ["display_name"]) is not None:
        setv(parent_object, ["displayName"], getv(from_object, ["display_name"]))

    if getv(from_object, ["description"]) is not None:
        setv(parent_object, ["description"], getv(from_object, ["description"]))

    if getv(from_object, ["ttl"]) is not None:
        setv(parent_object, ["ttl"], getv(from_object, ["ttl"]))

    if getv(from_object, ["expire_time"]) is not None:
        setv(parent_object, ["expireTime"], getv(from_object, ["expire_time"]))

    if getv(from_object, ["revision_expire_time"]) is not None:
        setv(
            parent_object,
            ["revisionExpireTime"],
            getv(from_object, ["revision_expire_time"]),
        )

    if getv(from_object, ["revision_ttl"]) is not None:
        setv(parent_object, ["revisionTtl"], getv(from_object, ["revision_ttl"]))

    if getv(from_object, ["disable_memory_revisions"]) is not None:
        setv(
            parent_object,
            ["disableMemoryRevisions"],
            getv(from_object, ["disable_memory_revisions"]),
        )

    if getv(from_object, ["topics"]) is not None:
        setv(
            parent_object, ["topics"], [item for item in getv(from_object, ["topics"])]
        )

    if getv(from_object, ["update_mask"]) is not None:
        setv(
            parent_object, ["_query", "updateMask"], getv(from_object, ["update_mask"])
        )

    return to_object


def _UpdateAgentEngineMemoryRequestParameters_to_vertex(
    from_object: Union[dict[str, Any], object],
    parent_object: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    to_object: dict[str, Any] = {}
    if getv(from_object, ["name"]) is not None:
        setv(to_object, ["_url", "name"], getv(from_object, ["name"]))

    if getv(from_object, ["fact"]) is not None:
        setv(to_object, ["fact"], getv(from_object, ["fact"]))

    if getv(from_object, ["scope"]) is not None:
        setv(to_object, ["scope"], getv(from_object, ["scope"]))

    if getv(from_object, ["config"]) is not None:
        setv(
            to_object,
            ["config"],
            _UpdateAgentEngineMemoryConfig_to_vertex(
                getv(from_object, ["config"]), to_object
            ),
        )

    return to_object


class Memories(_api_module.BaseModule):

    def _create(
        self,
        *,
        name: str,
        fact: str,
        scope: dict[str, str],
        config: Optional[types.AgentEngineMemoryConfigOrDict] = None,
    ) -> types.AgentEngineMemoryOperation:
        """
        Creates a new memory in the Agent Engine.
        """

        parameter_model = types._CreateAgentEngineMemoryRequestParameters(
            name=name,
            fact=fact,
            scope=scope,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _CreateAgentEngineMemoryRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}/memories".format_map(request_url_dict)
            else:
                path = "{name}/memories"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("post", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.AgentEngineMemoryOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def delete(
        self,
        *,
        name: str,
        config: Optional[types.DeleteAgentEngineMemoryConfigOrDict] = None,
    ) -> types.DeleteAgentEngineMemoryOperation:
        """
        Delete an Agent Engine memory.

        Args:
            name (str):
                Required. The name of the Agent Engine memory to be deleted. Format:
                `projects/{project}/locations/{location}/reasoningEngines/{resource_id}/memories/{memory}`.
            config (DeleteAgentEngineMemoryConfig):
                Optional. Additional configurations for deleting the Agent Engine.

        """

        parameter_model = types._DeleteAgentEngineMemoryRequestParameters(
            name=name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _DeleteAgentEngineMemoryRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}".format_map(request_url_dict)
            else:
                path = "{name}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("delete", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.DeleteAgentEngineMemoryOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def _generate(
        self,
        *,
        name: str,
        vertex_session_source: Optional[
            types.GenerateMemoriesRequestVertexSessionSourceOrDict
        ] = None,
        direct_contents_source: Optional[
            types.GenerateMemoriesRequestDirectContentsSourceOrDict
        ] = None,
        direct_memories_source: Optional[
            types.GenerateMemoriesRequestDirectMemoriesSourceOrDict
        ] = None,
        scope: Optional[dict[str, str]] = None,
        config: Optional[types.GenerateAgentEngineMemoriesConfigOrDict] = None,
    ) -> types.AgentEngineGenerateMemoriesOperation:
        """
        Generates memories for an Agent Engine.
        """

        parameter_model = types._GenerateAgentEngineMemoriesRequestParameters(
            name=name,
            vertex_session_source=vertex_session_source,
            direct_contents_source=direct_contents_source,
            direct_memories_source=direct_memories_source,
            scope=scope,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _GenerateAgentEngineMemoriesRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}/memories:generate".format_map(request_url_dict)
            else:
                path = "{name}/memories:generate"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("post", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.AgentEngineGenerateMemoriesOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def get(
        self,
        *,
        name: str,
        config: Optional[types.GetAgentEngineMemoryConfigOrDict] = None,
    ) -> types.Memory:
        """
        Gets an agent engine memory.

        Args:
            name (str): Required. A fully-qualified resource name or ID such as
              "projects/123/locations/us-central1/reasoningEngines/456/memories/789"
              or a shortened name such as "reasoningEngines/456/memories/789".

        """

        parameter_model = types._GetAgentEngineMemoryRequestParameters(
            name=name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _GetAgentEngineMemoryRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}".format_map(request_url_dict)
            else:
                path = "{name}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("get", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.Memory._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def _list(
        self,
        *,
        name: str,
        config: Optional[types.ListAgentEngineMemoryConfigOrDict] = None,
    ) -> types.ListReasoningEnginesMemoriesResponse:
        """
        Lists Agent Engine memories.
        """

        parameter_model = types._ListAgentEngineMemoryRequestParameters(
            name=name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _ListAgentEngineMemoryRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}/memories".format_map(request_url_dict)
            else:
                path = "{name}/memories"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("get", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.ListReasoningEnginesMemoriesResponse._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def _get_memory_operation(
        self,
        *,
        operation_name: str,
        config: Optional[types.GetAgentEngineOperationConfigOrDict] = None,
    ) -> types.AgentEngineMemoryOperation:
        parameter_model = types._GetAgentEngineMemoryOperationParameters(
            operation_name=operation_name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _GetAgentEngineMemoryOperationParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{operationName}".format_map(request_url_dict)
            else:
                path = "{operationName}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("get", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.AgentEngineMemoryOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def _get_generate_memories_operation(
        self,
        *,
        operation_name: str,
        config: Optional[types.GetAgentEngineOperationConfigOrDict] = None,
    ) -> types.AgentEngineGenerateMemoriesOperation:
        parameter_model = types._GetAgentEngineGenerateMemoriesOperationParameters(
            operation_name=operation_name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _GetAgentEngineGenerateMemoriesOperationParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{operationName}".format_map(request_url_dict)
            else:
                path = "{operationName}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("get", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.AgentEngineGenerateMemoriesOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def _retrieve(
        self,
        *,
        name: str,
        scope: dict[str, str],
        similarity_search_params: Optional[
            types.RetrieveMemoriesRequestSimilaritySearchParamsOrDict
        ] = None,
        simple_retrieval_params: Optional[
            types.RetrieveMemoriesRequestSimpleRetrievalParamsOrDict
        ] = None,
        config: Optional[types.RetrieveAgentEngineMemoriesConfigOrDict] = None,
    ) -> types.RetrieveMemoriesResponse:
        """
        Retrieves memories for an Agent Engine.
        """

        parameter_model = types._RetrieveAgentEngineMemoriesRequestParameters(
            name=name,
            scope=scope,
            similarity_search_params=similarity_search_params,
            simple_retrieval_params=simple_retrieval_params,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _RetrieveAgentEngineMemoriesRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}/memories:retrieve".format_map(request_url_dict)
            else:
                path = "{name}/memories:retrieve"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("post", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.RetrieveMemoriesResponse._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def _rollback(
        self,
        *,
        name: str,
        target_revision_id: str,
        config: Optional[types.RollbackAgentEngineMemoryConfigOrDict] = None,
    ) -> types.AgentEngineRollbackMemoryOperation:
        """
        Rollback a memory to a previous revision.
        """

        parameter_model = types._RollbackAgentEngineMemoryRequestParameters(
            name=name,
            target_revision_id=target_revision_id,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _RollbackAgentEngineMemoryRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}:rollback".format_map(request_url_dict)
            else:
                path = "{name}:rollback"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("post", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.AgentEngineRollbackMemoryOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def _update(
        self,
        *,
        name: str,
        fact: Optional[str] = None,
        scope: Optional[dict[str, str]] = None,
        config: Optional[types.UpdateAgentEngineMemoryConfigOrDict] = None,
    ) -> types.AgentEngineMemoryOperation:
        """
        Updates an Agent Engine memory.
        """

        parameter_model = types._UpdateAgentEngineMemoryRequestParameters(
            name=name,
            fact=fact,
            scope=scope,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _UpdateAgentEngineMemoryRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}".format_map(request_url_dict)
            else:
                path = "{name}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("patch", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.AgentEngineMemoryOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    def _purge(
        self,
        *,
        name: str,
        filter: str,
        force: Optional[bool] = None,
        config: Optional[types.PurgeAgentEngineMemoriesConfigOrDict] = None,
    ) -> types.AgentEnginePurgeMemoriesOperation:
        """
        Purges memories from an Agent Engine.
        """

        parameter_model = types._PurgeAgentEngineMemoriesRequestParameters(
            name=name,
            filter=filter,
            force=force,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _PurgeAgentEngineMemoriesRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}/memories:purge".format_map(request_url_dict)
            else:
                path = "{name}/memories:purge"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = self._api_client.request("post", path, request_dict, http_options)

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.AgentEnginePurgeMemoriesOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    _revisions = None

    @property
    def revisions(self):
        if self._revisions is None:
            try:
                # We need to lazy load the revisions module to handle the
                # possibility of ImportError when dependencies are not installed.
                self._revisions = importlib.import_module(
                    ".memory_revisions", __package__
                )
            except ImportError as e:
                raise ImportError(
                    "The 'agent_engines.memories.revisions' module requires "
                    "additional packages. Please install them using pip install "
                    "google-cloud-aiplatform[agent_engines]"
                ) from e
        return self._revisions.MemoryRevisions(self._api_client)

    def create(
        self,
        *,
        name: str,
        fact: str,
        scope: dict[str, str],
        config: Optional[types.AgentEngineMemoryConfigOrDict] = None,
    ) -> types.AgentEngineMemoryOperation:
        """Creates a new memory in the Agent Engine.

        Args:
            name (str):
                Required. The name of the memory to create.
            fact (str):
                Required. The fact to be stored in the memory.
            scope (dict[str, str]):
                Required. The scope of the memory. For example, {"user_id": "123"}.
            config (AgentEngineMemoryConfigOrDict):
                Optional. The configuration for the memory.

        Returns:
            AgentEngineMemoryOperation: The operation for creating the memory.
        """
        if config is None:
            config = types.AgentEngineMemoryConfig()
        elif isinstance(config, dict):
            config = types.AgentEngineMemoryConfig.model_validate(config)
        operation = self._create(
            name=name,
            fact=fact,
            scope=scope,
            config=config,
        )
        if config.wait_for_completion:
            if not operation.done:
                operation = _agent_engines_utils._await_operation(
                    operation_name=operation.name,
                    get_operation_fn=self._get_memory_operation,
                    poll_interval_seconds=0.5,
                )
            # We need to make a call to get the memory because the operation
            # response might not contain the relevant fields.
            if operation.response:
                operation.response = self.get(name=operation.response.name)
            elif operation.error:
                raise RuntimeError(f"Failed to create memory: {operation.error}")
            else:
                raise RuntimeError("Error creating memory.")
        return operation

    def generate(
        self,
        *,
        name: str,
        vertex_session_source: Optional[
            types.GenerateMemoriesRequestVertexSessionSourceOrDict
        ] = None,
        direct_contents_source: Optional[
            types.GenerateMemoriesRequestDirectContentsSourceOrDict
        ] = None,
        direct_memories_source: Optional[
            types.GenerateMemoriesRequestDirectMemoriesSourceOrDict
        ] = None,
        scope: Optional[dict[str, str]] = None,
        config: Optional[types.GenerateAgentEngineMemoriesConfigOrDict] = None,
    ) -> types.AgentEngineGenerateMemoriesOperation:
        """Generates memories for the agent engine.

        Args:
            name (str):
                Required. The name of the agent engine to generate memories for.
            vertex_session_source (GenerateMemoriesRequestVertexSessionSource):
                Optional. The vertex session source to use for generating
                memories. Only one of vertex_session_source,
                direct_contents_source, or direct_memories_source can be
                specified.
            direct_contents_source(GenerateMemoriesRequestDirectContentsSource):
                Optional. The direct contents source to use for generating
                memories. Only one of vertex_session_source, direct_contents_source,
                or direct_memories_source can be specified.
            direct_memories_source (GenerateMemoriesRequestDirectMemoriesSource):
                Optional. The direct memories source to use for generating
                memories. Only one of vertex_session_source, direct_contents_source,
                or direct_memories_source can be specified.
            scope (dict[str, str]):
                Optional. The scope of the memories to generate. This is optional
                if vertex_session_source is used, otherwise it must be specified.
            config (GenerateMemoriesConfig):
                Optional. The configuration for the memories to generate.

        Returns:
            AgentEngineGenerateMemoriesOperation:
                The operation for generating the memories.
        """
        if config is None:
            config = types.GenerateAgentEngineMemoriesConfig()
        elif isinstance(config, dict):
            config = types.GenerateAgentEngineMemoriesConfig.model_validate(config)
        operation = self._generate(
            name=name,
            vertex_session_source=vertex_session_source,
            direct_contents_source=direct_contents_source,
            direct_memories_source=direct_memories_source,
            scope=scope,
            config=config,
        )
        if config.wait_for_completion and not operation.done:
            operation = _agent_engines_utils._await_operation(
                operation_name=operation.name,
                get_operation_fn=self._get_generate_memories_operation,
                poll_interval_seconds=0.5,
            )
            if operation.error:
                raise RuntimeError(f"Failed to generate memory: {operation.error}")
        return operation

    def list(
        self,
        *,
        name: str,
        config: Optional[types.ListAgentEngineMemoryConfigOrDict] = None,
    ) -> Iterator[types.Memory]:
        """Lists Agent Engine memories.

        Args:
            name (str):
                Required. The name of the agent engine to list memories for.
            config (ListAgentEngineMemoryConfig):
                Optional. The configuration for the memories to list.

        Returns:
            Iterable[Memory]: An iterable of memories.
        """

        return Pager(
            "memories",
            functools.partial(self._list, name=name),
            self._list(name=name, config=config),
            config,
        )

    def retrieve(
        self,
        *,
        name: str,
        scope: dict[str, str],
        similarity_search_params: Optional[
            types.RetrieveMemoriesRequestSimilaritySearchParamsOrDict
        ] = None,
        simple_retrieval_params: Optional[
            types.RetrieveMemoriesRequestSimpleRetrievalParamsOrDict
        ] = None,
        config: Optional[types.RetrieveAgentEngineMemoriesConfigOrDict] = None,
    ) -> Iterator[types.RetrieveMemoriesResponseRetrievedMemory]:
        """Retrieves memories for the agent.

        Args:
            name (str):
                Required. The name of the agent engine to retrieve memories for.
            scope (dict[str, str]):
                Required. The scope of the memories to retrieve. For example,
                {"user_id": "123"}.
            similarity_search_params (RetrieveMemoriesRequestSimilaritySearchParams):
                Optional. The similarity search parameters to use for retrieving
                memories.
            simple_retrieval_params (RetrieveMemoriesRequestSimpleRetrievalParams):
                Optional. The simple retrieval parameters to use for retrieving
                memories.
            config (RetrieveAgentEngineMemoriesConfig):
                Optional. The configuration for the memories to retrieve.

        Returns:
            Iterator[RetrieveMemoriesResponseRetrievedMemory]: An iterable of
                retrieved memories.
        """
        return Pager(
            "retrieved_memories",
            lambda config: self._retrieve(
                name=name,
                similarity_search_params=similarity_search_params,
                simple_retrieval_params=simple_retrieval_params,
                scope=scope,
                config=config,
            ),
            self._retrieve(
                name=name,
                similarity_search_params=similarity_search_params,
                simple_retrieval_params=simple_retrieval_params,
                scope=scope,
                config=config,
            ),
            config,
        )

    def rollback(
        self,
        *,
        name: str,
        target_revision_id: str,
        config: Optional[types.RollbackAgentEngineMemoryConfigOrDict] = None,
    ) -> types.AgentEngineRollbackMemoryOperation:
        """Rolls back a memory to a previous revision.

        Args:
            name (str):
                Required. The name of the memory to rollback.
            target_revision_id (str):
                Required. The revision ID to roll back to
            config (RollbackAgentEngineMemoryConfig):
                Optional. The configuration for the rollback.

        Returns:
            AgentEngineRollbackMemoryOperation:
                The operation for rolling back the memory.
        """
        if config is None:
            config = types.RollbackAgentEngineMemoryConfig()
        elif isinstance(config, dict):
            config = types.RollbackAgentEngineMemoryConfig.model_validate(config)
        operation = self._rollback(
            name=name,
            target_revision_id=target_revision_id,
            config=config,
        )
        if config.wait_for_completion and not operation.done:
            operation = _agent_engines_utils._await_operation(
                operation_name=operation.name,
                get_operation_fn=self._get_memory_operation,
                poll_interval_seconds=0.5,
            )
            if operation.error:
                raise RuntimeError(f"Failed to rollback memory: {operation.error}")
        return operation

    def purge(
        self,
        *,
        name: str,
        filter: str,
        force: bool = False,
        config: Optional[types.PurgeAgentEngineMemoriesConfigOrDict] = None,
    ) -> types.AgentEnginePurgeMemoriesOperation:
        """Purges memories from an Agent Engine.

        Args:
            name (str):
                Required. The name of the Agent Engine to purge memories from.
            filter (str):
                Required. The standard list filter to determine which memories to purge.
            force (bool):
                Optional. Whether to force the purge operation. If false, the
                operation will be staged but not executed.
            config (PurgeAgentEngineMemoriesConfig):
                Optional. The configuration for the purge operation.

        Returns:
            AgentEnginePurgeMemoriesOperation:
                The operation for purging the memories.
        """
        if config is None:
            config = types.PurgeAgentEngineMemoriesConfig()
        elif isinstance(config, dict):
            config = types.PurgeAgentEngineMemoriesConfig.model_validate(config)
        operation = self._purge(
            name=name,
            filter=filter,
            force=force,
            config=config,
        )
        if config.wait_for_completion and not operation.done:
            operation = _agent_engines_utils._await_operation(
                operation_name=operation.name,
                get_operation_fn=self._get_memory_operation,
                poll_interval_seconds=0.5,
            )
            if operation.error:
                raise RuntimeError(f"Failed to purge memories: {operation.error}")
        return operation


class AsyncMemories(_api_module.BaseModule):

    async def _create(
        self,
        *,
        name: str,
        fact: str,
        scope: dict[str, str],
        config: Optional[types.AgentEngineMemoryConfigOrDict] = None,
    ) -> types.AgentEngineMemoryOperation:
        """
        Creates a new memory in the Agent Engine.
        """

        parameter_model = types._CreateAgentEngineMemoryRequestParameters(
            name=name,
            fact=fact,
            scope=scope,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _CreateAgentEngineMemoryRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}/memories".format_map(request_url_dict)
            else:
                path = "{name}/memories"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "post", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.AgentEngineMemoryOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def delete(
        self,
        *,
        name: str,
        config: Optional[types.DeleteAgentEngineMemoryConfigOrDict] = None,
    ) -> types.DeleteAgentEngineMemoryOperation:
        """
        Delete an Agent Engine memory.

        Args:
            name (str):
                Required. The name of the Agent Engine memory to be deleted. Format:
                `projects/{project}/locations/{location}/reasoningEngines/{resource_id}/memories/{memory}`.
            config (DeleteAgentEngineMemoryConfig):
                Optional. Additional configurations for deleting the Agent Engine.

        """

        parameter_model = types._DeleteAgentEngineMemoryRequestParameters(
            name=name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _DeleteAgentEngineMemoryRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}".format_map(request_url_dict)
            else:
                path = "{name}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "delete", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.DeleteAgentEngineMemoryOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def _generate(
        self,
        *,
        name: str,
        vertex_session_source: Optional[
            types.GenerateMemoriesRequestVertexSessionSourceOrDict
        ] = None,
        direct_contents_source: Optional[
            types.GenerateMemoriesRequestDirectContentsSourceOrDict
        ] = None,
        direct_memories_source: Optional[
            types.GenerateMemoriesRequestDirectMemoriesSourceOrDict
        ] = None,
        scope: Optional[dict[str, str]] = None,
        config: Optional[types.GenerateAgentEngineMemoriesConfigOrDict] = None,
    ) -> types.AgentEngineGenerateMemoriesOperation:
        """
        Generates memories for an Agent Engine.
        """

        parameter_model = types._GenerateAgentEngineMemoriesRequestParameters(
            name=name,
            vertex_session_source=vertex_session_source,
            direct_contents_source=direct_contents_source,
            direct_memories_source=direct_memories_source,
            scope=scope,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _GenerateAgentEngineMemoriesRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}/memories:generate".format_map(request_url_dict)
            else:
                path = "{name}/memories:generate"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "post", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.AgentEngineGenerateMemoriesOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def get(
        self,
        *,
        name: str,
        config: Optional[types.GetAgentEngineMemoryConfigOrDict] = None,
    ) -> types.Memory:
        """
        Gets an agent engine memory.

        Args:
            name (str): Required. A fully-qualified resource name or ID such as
              "projects/123/locations/us-central1/reasoningEngines/456/memories/789"
              or a shortened name such as "reasoningEngines/456/memories/789".

        """

        parameter_model = types._GetAgentEngineMemoryRequestParameters(
            name=name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _GetAgentEngineMemoryRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}".format_map(request_url_dict)
            else:
                path = "{name}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "get", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.Memory._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def _list(
        self,
        *,
        name: str,
        config: Optional[types.ListAgentEngineMemoryConfigOrDict] = None,
    ) -> types.ListReasoningEnginesMemoriesResponse:
        """
        Lists Agent Engine memories.
        """

        parameter_model = types._ListAgentEngineMemoryRequestParameters(
            name=name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _ListAgentEngineMemoryRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}/memories".format_map(request_url_dict)
            else:
                path = "{name}/memories"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "get", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.ListReasoningEnginesMemoriesResponse._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def _get_memory_operation(
        self,
        *,
        operation_name: str,
        config: Optional[types.GetAgentEngineOperationConfigOrDict] = None,
    ) -> types.AgentEngineMemoryOperation:
        parameter_model = types._GetAgentEngineMemoryOperationParameters(
            operation_name=operation_name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _GetAgentEngineMemoryOperationParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{operationName}".format_map(request_url_dict)
            else:
                path = "{operationName}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "get", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.AgentEngineMemoryOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def _get_generate_memories_operation(
        self,
        *,
        operation_name: str,
        config: Optional[types.GetAgentEngineOperationConfigOrDict] = None,
    ) -> types.AgentEngineGenerateMemoriesOperation:
        parameter_model = types._GetAgentEngineGenerateMemoriesOperationParameters(
            operation_name=operation_name,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _GetAgentEngineGenerateMemoriesOperationParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{operationName}".format_map(request_url_dict)
            else:
                path = "{operationName}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "get", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.AgentEngineGenerateMemoriesOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def _retrieve(
        self,
        *,
        name: str,
        scope: dict[str, str],
        similarity_search_params: Optional[
            types.RetrieveMemoriesRequestSimilaritySearchParamsOrDict
        ] = None,
        simple_retrieval_params: Optional[
            types.RetrieveMemoriesRequestSimpleRetrievalParamsOrDict
        ] = None,
        config: Optional[types.RetrieveAgentEngineMemoriesConfigOrDict] = None,
    ) -> types.RetrieveMemoriesResponse:
        """
        Retrieves memories for an Agent Engine.
        """

        parameter_model = types._RetrieveAgentEngineMemoriesRequestParameters(
            name=name,
            scope=scope,
            similarity_search_params=similarity_search_params,
            simple_retrieval_params=simple_retrieval_params,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _RetrieveAgentEngineMemoriesRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}/memories:retrieve".format_map(request_url_dict)
            else:
                path = "{name}/memories:retrieve"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "post", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.RetrieveMemoriesResponse._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def _rollback(
        self,
        *,
        name: str,
        target_revision_id: str,
        config: Optional[types.RollbackAgentEngineMemoryConfigOrDict] = None,
    ) -> types.AgentEngineRollbackMemoryOperation:
        """
        Rollback a memory to a previous revision.
        """

        parameter_model = types._RollbackAgentEngineMemoryRequestParameters(
            name=name,
            target_revision_id=target_revision_id,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _RollbackAgentEngineMemoryRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}:rollback".format_map(request_url_dict)
            else:
                path = "{name}:rollback"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "post", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.AgentEngineRollbackMemoryOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def _update(
        self,
        *,
        name: str,
        fact: Optional[str] = None,
        scope: Optional[dict[str, str]] = None,
        config: Optional[types.UpdateAgentEngineMemoryConfigOrDict] = None,
    ) -> types.AgentEngineMemoryOperation:
        """
        Updates an Agent Engine memory.
        """

        parameter_model = types._UpdateAgentEngineMemoryRequestParameters(
            name=name,
            fact=fact,
            scope=scope,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _UpdateAgentEngineMemoryRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}".format_map(request_url_dict)
            else:
                path = "{name}"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "patch", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.AgentEngineMemoryOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    async def _purge(
        self,
        *,
        name: str,
        filter: str,
        force: Optional[bool] = None,
        config: Optional[types.PurgeAgentEngineMemoriesConfigOrDict] = None,
    ) -> types.AgentEnginePurgeMemoriesOperation:
        """
        Purges memories from an Agent Engine.
        """

        parameter_model = types._PurgeAgentEngineMemoriesRequestParameters(
            name=name,
            filter=filter,
            force=force,
            config=config,
        )

        request_url_dict: Optional[dict[str, str]]
        if not self._api_client.vertexai:
            raise ValueError("This method is only supported in the Vertex AI client.")
        else:
            request_dict = _PurgeAgentEngineMemoriesRequestParameters_to_vertex(
                parameter_model
            )
            request_url_dict = request_dict.get("_url")
            if request_url_dict:
                path = "{name}/memories:purge".format_map(request_url_dict)
            else:
                path = "{name}/memories:purge"

        query_params = request_dict.get("_query")
        if query_params:
            path = f"{path}?{urlencode(query_params)}"
        # TODO: remove the hack that pops config.
        request_dict.pop("config", None)

        http_options: Optional[types.HttpOptions] = None
        if (
            parameter_model.config is not None
            and parameter_model.config.http_options is not None
        ):
            http_options = parameter_model.config.http_options

        request_dict = _common.convert_to_dict(request_dict)
        request_dict = _common.encode_unserializable_types(request_dict)

        response = await self._api_client.async_request(
            "post", path, request_dict, http_options
        )

        response_dict = {} if not response.body else json.loads(response.body)

        return_value = types.AgentEnginePurgeMemoriesOperation._from_response(
            response=response_dict, kwargs=parameter_model.model_dump()
        )

        self._api_client._verify_response(return_value)
        return return_value

    _revisions = None

    @property
    def revisions(self):
        if self._revisions is None:
            try:
                # We need to lazy load the revisions module to handle the
                # possibility of ImportError when dependencies are not installed.
                self._revisions = importlib.import_module(
                    ".memory_revisions", __package__
                )
            except ImportError as e:
                raise ImportError(
                    "The 'agent_engines.memories.revisions' module requires "
                    "additional packages. Please install them using pip install "
                    "google-cloud-aiplatform[agent_engines]"
                ) from e
        return self._revisions.AsyncMemoryRevisions(self._api_client)

    async def create(
        self,
        *,
        name: str,
        fact: str,
        scope: dict[str, str],
        config: Optional[types.AgentEngineMemoryConfigOrDict] = None,
    ) -> types.AgentEngineMemoryOperation:
        """Creates a new memory in the Agent Engine.

        Args:
            name (str):
                Required. The name of the memory to create.
            fact (str):
                Required. The fact to be stored in the memory.
            scope (dict[str, str]):
                Required. The scope of the memory. For example, {"user_id": "123"}.
            config (AgentEngineMemoryConfigOrDict):
                Optional. The configuration for the memory.

        Returns:
            AgentEngineMemoryOperation: The operation for creating the memory.
        """
        if config is None:
            config = types.AgentEngineMemoryConfig()
        elif isinstance(config, dict):
            config = types.AgentEngineMemoryConfig.model_validate(config)
        operation = await self._create(
            name=name,
            fact=fact,
            scope=scope,
            config=config,
        )
        if config.wait_for_completion:
            if not operation.done:
                operation = await _agent_engines_utils._await_async_operation(
                    operation_name=operation.name,
                    get_operation_fn=self._get_memory_operation,
                    poll_interval_seconds=0.5,
                )
            # We need to make a call to get the memory because the operation
            # response might not contain the relevant fields.
            if operation.response:
                operation.response = await self.get(name=operation.response.name)
            elif operation.error:
                raise RuntimeError(f"Failed to create memory: {operation.error}")
            else:
                raise RuntimeError("Error creating memory.")
        return operation

    async def generate(
        self,
        *,
        name: str,
        vertex_session_source: Optional[
            types.GenerateMemoriesRequestVertexSessionSourceOrDict
        ] = None,
        direct_contents_source: Optional[
            types.GenerateMemoriesRequestDirectContentsSourceOrDict
        ] = None,
        direct_memories_source: Optional[
            types.GenerateMemoriesRequestDirectMemoriesSourceOrDict
        ] = None,
        scope: Optional[dict[str, str]] = None,
        config: Optional[types.GenerateAgentEngineMemoriesConfigOrDict] = None,
    ) -> types.AgentEngineGenerateMemoriesOperation:
        """Generates memories for the agent engine.

        Args:
            name (str):
                Required. The name of the agent engine to generate memories for.
            vertex_session_source (GenerateMemoriesRequestVertexSessionSource):
                Optional. The vertex session source to use for generating
                memories. Only one of vertex_session_source,
                direct_contents_source, or direct_memories_source can be
                specified.
            direct_contents_source(GenerateMemoriesRequestDirectContentsSource):
                Optional. The direct contents source to use for generating
                memories. Only one of vertex_session_source, direct_contents_source,
                or direct_memories_source can be specified.
            direct_memories_source (GenerateMemoriesRequestDirectMemoriesSource):
                Optional. The direct memories source to use for generating
                memories. Only one of vertex_session_source, direct_contents_source,
                or direct_memories_source can be specified.
            scope (dict[str, str]):
                Optional. The scope of the memories to generate. This is optional
                if vertex_session_source is used, otherwise it must be specified.
            config (GenerateMemoriesConfig):
                Optional. The configuration for the memories to generate.

        Returns:
            AgentEngineGenerateMemoriesOperation:
                The operation for generating the memories.
        """
        if config is None:
            config = types.GenerateAgentEngineMemoriesConfig()
        elif isinstance(config, dict):
            config = types.GenerateAgentEngineMemoriesConfig.model_validate(config)
        operation = await self._generate(
            name=name,
            vertex_session_source=vertex_session_source,
            direct_contents_source=direct_contents_source,
            direct_memories_source=direct_memories_source,
            scope=scope,
            config=config,
        )
        if config.wait_for_completion and not operation.done:
            operation = await _agent_engines_utils._await_async_operation(
                operation_name=operation.name,
                get_operation_fn=self._get_generate_memories_operation,
                poll_interval_seconds=0.5,
            )
            if operation.error:
                raise RuntimeError(f"Failed to generate memory: {operation.error}")
        return operation

    async def list(
        self,
        *,
        name: str,
        config: Optional[types.ListAgentEngineMemoryConfigOrDict] = None,
    ) -> AsyncPager[types.Memory]:
        """Lists Agent Engine memories.

        Args:
            name (str):
                Required. The name of the agent engine to list memories for.
            config (ListAgentEngineMemoryConfig):
                Optional. The configuration for the memories to list.

        Returns:
            AsyncPager[Memory]: An async pager of memories.
        """

        return AsyncPager(
            "memories",
            functools.partial(self._list, name=name),
            await self._list(name=name, config=config),
            config,
        )

    async def retrieve(
        self,
        *,
        name: str,
        scope: dict[str, str],
        similarity_search_params: Optional[
            types.RetrieveMemoriesRequestSimilaritySearchParamsOrDict
        ] = None,
        simple_retrieval_params: Optional[
            types.RetrieveMemoriesRequestSimpleRetrievalParamsOrDict
        ] = None,
        config: Optional[types.RetrieveAgentEngineMemoriesConfigOrDict] = None,
    ) -> AsyncPager[types.RetrieveMemoriesResponseRetrievedMemory]:
        """Retrieves memories for the agent.

        Args:
            name (str):
                Required. The name of the agent engine to retrieve memories for.
            scope (dict[str, str]):
                Required. The scope of the memories to retrieve. For example,
                {"user_id": "123"}.
            similarity_search_params (RetrieveMemoriesRequestSimilaritySearchParams):
                Optional. The similarity search parameters to use for retrieving
                memories.
            simple_retrieval_params (RetrieveMemoriesRequestSimpleRetrievalParams):
                Optional. The simple retrieval parameters to use for retrieving
                memories.
            config (RetrieveAgentEngineMemoriesConfig):
                Optional. The configuration for the memories to retrieve.

        Returns:
            AsyncPager[RetrieveMemoriesResponseRetrievedMemory]: An async pager of
                retrieved memories.
        """
        return AsyncPager(
            "retrieved_memories",
            lambda config: self._retrieve(
                name=name,
                similarity_search_params=similarity_search_params,
                simple_retrieval_params=simple_retrieval_params,
                scope=scope,
                config=config,
            ),
            await self._retrieve(
                name=name,
                similarity_search_params=similarity_search_params,
                simple_retrieval_params=simple_retrieval_params,
                scope=scope,
                config=config,
            ),
            config,
        )

    async def rollback(
        self,
        *,
        name: str,
        target_revision_id: str,
        config: Optional[types.RollbackAgentEngineMemoryConfigOrDict] = None,
    ) -> types.AgentEngineRollbackMemoryOperation:
        """Rolls back a memory to a previous revision.

        Args:
            name (str):
                Required. The name of the memory to rollback.
            target_revision_id (str):
                Required. The revision ID to roll back to
            config (RollbackAgentEngineMemoryConfig):
                Optional. The configuration for the rollback.

        Returns:
            AgentEngineRollbackMemoryOperation:
                The operation for rolling back the memory.
        """
        if config is None:
            config = types.RollbackAgentEngineMemoryConfig()
        elif isinstance(config, dict):
            config = types.RollbackAgentEngineMemoryConfig.model_validate(config)
        operation = await self._rollback(
            name=name,
            target_revision_id=target_revision_id,
            config=config,
        )
        if config.wait_for_completion and not operation.done:
            operation = await _agent_engines_utils._await_async_operation(
                operation_name=operation.name,
                get_operation_fn=self._get_memory_operation,
                poll_interval_seconds=0.5,
            )
            if operation.error:
                raise RuntimeError(f"Failed to rollback memory: {operation.error}")
        return operation

    async def purge(
        self,
        *,
        name: str,
        filter: str,
        force: bool = False,
        config: Optional[types.PurgeAgentEngineMemoriesConfigOrDict] = None,
    ) -> types.AgentEnginePurgeMemoriesOperation:
        """Purges memories from an Agent Engine.

        Args:
            name (str):
                Required. The name of the Agent Engine to purge memories from.
            filter (str):
                Required. The standard list filter to determine which memories to purge.
            force (bool):
                Optional. Whether to force the purge operation. If false, the
                operation will be staged but not executed.
            config (PurgeAgentEngineMemoriesConfig):
                Optional. The configuration for the purge operation.

        Returns:
            AgentEnginePurgeMemoriesOperation:
                The operation for purging the memories.
        """
        if config is None:
            config = types.PurgeAgentEngineMemoriesConfig()
        elif isinstance(config, dict):
            config = types.PurgeAgentEngineMemoriesConfig.model_validate(config)
        operation = await self._purge(
            name=name,
            filter=filter,
            force=force,
            config=config,
        )
        if config.wait_for_completion and not operation.done:
            operation = await _agent_engines_utils._await_async_operation(
                operation_name=operation.name,
                get_operation_fn=self._get_memory_operation,
                poll_interval_seconds=0.5,
            )
            if operation.error:
                raise RuntimeError(f"Failed to purge memories: {operation.error}")
        return operation
