"""RLM Neo-Cortex -- Module 1: Memory Gateway.

Single entry-point for all memory read/write/search/delete operations.
Routes content to the correct storage tier based on surprise score,
enforces per-tenant quotas, deduplicates identical content, and
emits Prometheus metrics for observability.

Storage backends:
    - PostgreSQL (Elestio): Durable record store for EPISODIC + SEMANTIC
    - Qdrant (Elestio):     Vector index for semantic search
    - Redis (Elestio):      Short-lived WORKING-tier store + quota counters

Tier routing (Story 1.02):
    score < 0.30  -> DISCARD  (not stored anywhere)
    0.30-0.50     -> WORKING  (Redis only, 24h TTL)
    0.50-0.80     -> EPISODIC (PG + Qdrant)
    >= 0.80       -> SEMANTIC (PG + Qdrant + Redis hot cache)

Implements Stories 1.01-1.10.
"""
from __future__ import annotations

import hashlib
import logging
import os
import re
import time
from datetime import UTC, datetime, timezone
from typing import Any, Dict, List, Optional, Tuple
from uuid import UUID, uuid4

from .contracts import (
    EMBEDDING_DIM,
    CustomerTier,
    MemoryGatewayProtocol,
    MemoryRecord,
    MemoryTier,
)

logger = logging.getLogger("core.rlm.gateway")


# ---------------------------------------------------------------------------
# Custom exceptions
# ---------------------------------------------------------------------------

class QuotaExceededError(Exception):
    """Raised when a tenant's daily write quota is exceeded."""

    def __init__(self, tenant_id: UUID, limit: int, current: int) -> None:
        self.tenant_id = tenant_id
        self.limit = limit
        self.current = current
        super().__init__(
            f"Quota exceeded for tenant {tenant_id}: "
            f"limit={limit}, current={current}"
        )


class PartialDeleteError(Exception):
    """Raised when a memory is deleted from some but not all backends."""

    def __init__(self, memory_id: str, backends_failed: List[str]) -> None:
        self.memory_id = memory_id
        self.backends_failed = backends_failed
        super().__init__(
            f"Partial delete for memory {memory_id}: "
            f"failed backends={backends_failed}"
        )


class GatewayNotInitializedError(Exception):
    """Raised when gateway methods are called before initialize()."""


# ---------------------------------------------------------------------------
# Daily quota limits by customer tier (Story 1.07)
# ---------------------------------------------------------------------------

DAILY_QUOTA: Dict[CustomerTier, int] = {
    CustomerTier.STARTER: 500,
    CustomerTier.PROFESSIONAL: 2000,
    CustomerTier.ENTERPRISE: 10_000,
    CustomerTier.QUEEN: -1,  # unlimited
}

# Redis key patterns
_QUOTA_KEY = "rlm:quota:{tenant_id}:{date}"
_WORKING_KEY = "rlm:working:{tenant_id}:{memory_id}"
_HOT_CACHE_KEY = "rlm:hot:{tenant_id}:{memory_id}"
_QUOTA_TTL = 90_000  # 25 hours in seconds

# Tier routing thresholds (duplicated here so gateway is self-contained)
_DISCARD_THRESHOLD = 0.30
_EPISODIC_THRESHOLD = 0.50
_SEMANTIC_THRESHOLD = 0.80

# Max tokens for embedding truncation (Story 1.08)
_MAX_EMBED_TOKENS = 8192

# PostgreSQL table name
_PG_TABLE = "rlm_memories"

# Qdrant collection name
_QDRANT_COLLECTION = "rlm_memories"


# ---------------------------------------------------------------------------
# MemoryGateway (Stories 1.01-1.09)
# ---------------------------------------------------------------------------

class MemoryGateway:
    """Unified gateway for all RLM Neo-Cortex memory operations.

    Usage::

        gw = MemoryGateway()
        await gw.initialize()
        record = await gw.write_memory(tenant_id, "content", "source", "domain")
        results = await gw.search_memories(tenant_id, "query")
        await gw.close()
    """

    # ------------------------------------------------------------------
    # Story 1.01: Constructor and backend connections
    # ------------------------------------------------------------------

    def __init__(
        self,
        pg_dsn: Optional[str] = None,
        qdrant_url: Optional[str] = None,
        qdrant_api_key: Optional[str] = None,
        redis_url: Optional[str] = None,
    ) -> None:
        """Create a MemoryGateway instance.

        All parameters fall back to environment variables if not provided.
        Call ``await gateway.initialize()`` before any operations.

        Args:
            pg_dsn: PostgreSQL DSN.  Falls back to DATABASE_URL.
            qdrant_url: Qdrant REST URL.  Falls back to QDRANT_URL.
            qdrant_api_key: Qdrant API key.  Falls back to QDRANT_API_KEY.
            redis_url: Redis URL.  Falls back to REDIS_URL.
        """
        self._pg_dsn = pg_dsn or os.environ.get("DATABASE_URL", "")
        self._qdrant_url = qdrant_url or os.environ.get("QDRANT_URL", "")
        self._qdrant_api_key = qdrant_api_key or os.environ.get("QDRANT_API_KEY", "")
        self._redis_url = redis_url or os.environ.get("REDIS_URL", "")

        # Backend handles (set in initialize())
        self._pg_pool: Any = None     # asyncpg Pool
        self._qdrant: Any = None      # qdrant_client.AsyncQdrantClient
        self._redis: Any = None       # redis.asyncio.Redis

        self._initialized = False

        # Lazy-loaded sub-modules
        self._surprise: Any = None    # SurpriseIntegration
        self._ledger: Any = None      # EntitlementLedger

        logger.info(
            "MemoryGateway created (pg=%s, qdrant=%s, redis=%s)",
            "configured" if self._pg_dsn else "missing",
            "configured" if self._qdrant_url else "missing",
            "configured" if self._redis_url else "missing",
        )

    @property
    def is_initialized(self) -> bool:
        """True only after a successful call to :meth:`initialize`."""
        return self._initialized

    async def initialize(self) -> None:
        """Open connections to PostgreSQL, Qdrant and Redis.

        Raises:
            ValueError: If a required environment variable is missing.
        """
        if not self._pg_dsn:
            raise ValueError(
                "DATABASE_URL environment variable is required but not set"
            )
        if not self._qdrant_url:
            raise ValueError(
                "QDRANT_URL environment variable is required but not set"
            )
        if not self._redis_url:
            raise ValueError(
                "REDIS_URL environment variable is required but not set"
            )

        # PostgreSQL async pool
        try:
            import asyncpg
            self._pg_pool = await asyncpg.create_pool(
                self._pg_dsn,
                min_size=2,
                max_size=10,
                command_timeout=30,
            )
            logger.info("PostgreSQL async pool established")
        except Exception as exc:
            logger.error("Failed to connect to PostgreSQL: %s", exc)
            raise

        # Qdrant client
        try:
            from qdrant_client import AsyncQdrantClient
            self._qdrant = AsyncQdrantClient(
                url=self._qdrant_url,
                api_key=self._qdrant_api_key or None,
                timeout=10,
            )
            # Ensure collection exists
            await self._ensure_qdrant_collection()
            logger.info("Qdrant client initialized (collection=%s)", _QDRANT_COLLECTION)
        except Exception as exc:
            logger.error("Failed to initialize Qdrant: %s", exc)
            raise

        # Redis async client
        try:
            import redis.asyncio as aioredis
            self._redis = aioredis.from_url(
                self._redis_url,
                decode_responses=True,
                socket_timeout=5,
                socket_connect_timeout=5,
            )
            await self._redis.ping()
            logger.info("Redis async connection established")
        except Exception as exc:
            logger.error("Failed to connect to Redis: %s", exc)
            raise

        # Initialize sub-modules
        from .surprise import SurpriseIntegration
        self._surprise = SurpriseIntegration()

        from .entitlement import EntitlementLedger
        self._ledger = EntitlementLedger(
            pg_dsn=self._pg_dsn,
            redis_url=self._redis_url,
        )
        await self._ledger.connect()

        self._initialized = True
        logger.info("MemoryGateway initialized — all backends ready")

    async def close(self) -> None:
        """Close all backend connections gracefully.

        Safe to call even if ``initialize()`` was never called.
        """
        if self._redis is not None:
            try:
                await self._redis.aclose()
            except Exception:
                pass
            self._redis = None

        if self._pg_pool is not None:
            try:
                await self._pg_pool.close()
            except Exception:
                pass
            self._pg_pool = None

        if self._qdrant is not None:
            try:
                await self._qdrant.close()
            except Exception:
                pass
            self._qdrant = None

        if self._ledger is not None:
            try:
                await self._ledger.close()
            except Exception:
                pass

        self._initialized = False
        logger.info("MemoryGateway closed")

    # ------------------------------------------------------------------
    # Story 1.02: write_memory() core path
    # ------------------------------------------------------------------

    async def write_memory(
        self,
        tenant_id: UUID,
        content: str,
        source: str,
        domain: str,
        metadata: Optional[Dict[str, Any]] = None,
    ) -> MemoryRecord:
        """Write a memory through the full pipeline.

        Pipeline:
          1. Check tenant exists (via EntitlementLedger)
          2. Check daily quota
          3. Score content (SurpriseIntegration)
          4. Route by tier (DISCARD / WORKING / EPISODIC / SEMANTIC)
          5. Dedup by content hash
          6. Write to appropriate backends
          7. Increment quota counter
          8. Return MemoryRecord

        Args:
            tenant_id: Tenant UUID.
            content: Memory content string.
            source: Origin of the content.
            domain: Domain/topic category.
            metadata: Optional dict of additional metadata.

        Returns:
            MemoryRecord describing the stored memory.

        Raises:
            QuotaExceededError: When tenant is at their daily limit.
            GatewayNotInitializedError: When called before initialize().
        """
        self._require_initialized()

        metadata = metadata or {}

        # Step 1: Check tenant + quota
        manifest = await self._ledger.get_manifest(tenant_id)
        has_quota = await self._check_quota(tenant_id, manifest.tier)
        if not has_quota:
            daily_limit = DAILY_QUOTA[manifest.tier]
            current = await self._get_quota_count(tenant_id)
            raise QuotaExceededError(tenant_id, daily_limit, current)

        # Step 3: Score content
        score, tier = self._surprise.score_content(content, source, domain)

        # Step 4: DISCARD — do not store
        if tier == MemoryTier.DISCARD:
            logger.debug(
                "Memory discarded for tenant %s (score=%.3f)", tenant_id, score
            )
            # Return a transient MemoryRecord for caller awareness
            return MemoryRecord(
                tenant_id=tenant_id,
                content=content,
                source=source,
                domain=domain,
                surprise_score=score,
                memory_tier=MemoryTier.DISCARD,
                metadata=metadata,
            )

        # Step 5: Deduplication by content hash
        content_hash = _compute_hash(tenant_id, content)
        existing = await self._find_by_hash(tenant_id, content_hash)
        if existing is not None:
            logger.debug(
                "Duplicate memory detected for tenant %s (hash=%s…)",
                tenant_id, content_hash[:12],
            )
            return existing

        # Step 6: Generate embedding
        vector = await self._embed(content)

        # Step 7: Write to backends by tier
        memory_id = str(uuid4())
        record = MemoryRecord(
            tenant_id=tenant_id,
            content=content,
            source=source,
            domain=domain,
            surprise_score=score,
            memory_tier=tier,
            metadata={**metadata, "content_hash": content_hash},
            vector_id=memory_id,
        )

        if tier == MemoryTier.WORKING:
            await self._write_working(memory_id, tenant_id, record)

        elif tier == MemoryTier.EPISODIC:
            pg_id = await self._write_pg(memory_id, tenant_id, record, vector, content_hash)
            record.pg_id = pg_id
            await self._write_qdrant(memory_id, tenant_id, record, vector)

        elif tier == MemoryTier.SEMANTIC:
            pg_id = await self._write_pg(memory_id, tenant_id, record, vector, content_hash)
            record.pg_id = pg_id
            await self._write_qdrant(memory_id, tenant_id, record, vector)
            await self._write_hot_cache(memory_id, tenant_id, record)

        # Step 8: Increment quota
        await self._increment_quota(tenant_id)

        # Prometheus: writes_total counter
        try:
            _GATEWAY_WRITES.labels(tier=tier.value).inc()
        except Exception:
            pass

        logger.info(
            "Memory written: tenant=%s tier=%s score=%.3f id=%s",
            tenant_id, tier.value, score, memory_id,
        )
        return record

    # ------------------------------------------------------------------
    # Story 1.03: read_memories() and search_memories()
    # ------------------------------------------------------------------

    async def read_memories(
        self,
        tenant_id: UUID,
        query: str,
        limit: int = 10,
    ) -> List[MemoryRecord]:
        """Retrieve memories for a tenant, ordered by recency.

        Args:
            tenant_id: Tenant UUID.
            query: Query string (used for filtering/ordering; not vector search).
            limit: Max results.

        Returns:
            List of MemoryRecord, newest first. Empty if no memories or
            empty query.
        """
        self._require_initialized()

        if not query or not query.strip():
            return []

        records: List[MemoryRecord] = []

        if self._pg_pool is not None:
            try:
                async with self._pg_pool.acquire() as conn:
                    rows = await conn.fetch(
                        """
                        SELECT id, memory_id, content, source, domain,
                               surprise_score, memory_tier, metadata,
                               created_at, vector_id
                        FROM rlm_memories
                        WHERE tenant_id = $1
                        ORDER BY created_at DESC
                        LIMIT $2
                        """,
                        str(tenant_id),
                        limit,
                    )
                    for row in rows:
                        records.append(_row_to_record(row, tenant_id))
            except Exception as exc:
                logger.error("PG read_memories failed for %s: %s", tenant_id, exc)

        return records

    async def search_memories(
        self,
        tenant_id: UUID,
        query: str,
        limit: int = 10,
        min_score: float = 0.0,
    ) -> List[MemoryRecord]:
        """Vector-search memories for a tenant using Qdrant.

        Always tenant-isolated — never returns memories from other tenants.

        Args:
            tenant_id: Tenant UUID.
            query: Natural-language query for semantic search.
            limit: Max results.
            min_score: Minimum similarity score threshold.

        Returns:
            List of MemoryRecord ordered by similarity score.
            Empty list if query is empty or no results above min_score.
        """
        self._require_initialized()

        if not query or not query.strip():
            return []

        query_vector = await self._embed(query)

        records: List[MemoryRecord] = []
        try:
            from qdrant_client.models import Filter, FieldCondition, MatchValue
            results = await self._qdrant.search(
                collection_name=_QDRANT_COLLECTION,
                query_vector=query_vector,
                limit=limit,
                score_threshold=min_score,
                query_filter=Filter(
                    must=[
                        FieldCondition(
                            key="tenant_id",
                            match=MatchValue(value=str(tenant_id)),
                        )
                    ]
                ),
            )
            for hit in results:
                payload = hit.payload or {}
                record = MemoryRecord(
                    tenant_id=tenant_id,
                    content=payload.get("content", ""),
                    source=payload.get("source", ""),
                    domain=payload.get("domain", ""),
                    surprise_score=payload.get("surprise_score", 0.0),
                    memory_tier=MemoryTier(payload.get("memory_tier", "episodic")),
                    metadata=payload.get("metadata", {}),
                    vector_id=hit.id,
                )
                records.append(record)
        except Exception as exc:
            logger.error("Qdrant search failed for %s: %s", tenant_id, exc)

        return records

    # ------------------------------------------------------------------
    # Story 1.04: delete_memory()
    # ------------------------------------------------------------------

    async def delete_memory(
        self,
        tenant_id: UUID,
        memory_id: str,
    ) -> bool:
        """Delete a memory from all backends.

        Cross-tenant deletes silently return False (no deletion, no error).
        Raises PartialDeleteError if some backends succeed and others fail.

        Args:
            tenant_id: Tenant UUID (used for ownership verification).
            memory_id: The memory_id / vector_id of the record.

        Returns:
            True if the memory existed and was deleted.
            False if the memory was not found.
        """
        self._require_initialized()

        # Verify ownership in PG
        exists = await self._pg_owns(tenant_id, memory_id)
        if not exists:
            logger.debug(
                "delete_memory: no record found (tenant=%s, id=%s)",
                tenant_id, memory_id,
            )
            return False

        failures: List[str] = []

        # Delete from PostgreSQL
        pg_ok = await self._pg_delete(tenant_id, memory_id)
        if not pg_ok:
            failures.append("postgresql")

        # Delete from Qdrant
        qdrant_ok = await self._qdrant_delete(memory_id)
        if not qdrant_ok:
            failures.append("qdrant")

        # Delete from Redis (both working and hot-cache keys)
        redis_ok = await self._redis_delete_memory(tenant_id, memory_id)
        if not redis_ok:
            failures.append("redis")

        if failures and (not pg_ok or not qdrant_ok):
            # At least one persistent backend failed
            raise PartialDeleteError(memory_id, failures)

        logger.info(
            "Memory deleted: tenant=%s id=%s (failures=%s)",
            tenant_id, memory_id, failures,
        )
        return True

    # ------------------------------------------------------------------
    # Story 1.05: Content hash deduplication
    # ------------------------------------------------------------------

    async def _find_by_hash(
        self, tenant_id: UUID, content_hash: str,
    ) -> Optional[MemoryRecord]:
        """Look up an existing record by content hash in PostgreSQL."""
        if self._pg_pool is None:
            return None
        try:
            async with self._pg_pool.acquire() as conn:
                row = await conn.fetchrow(
                    """
                    SELECT id, memory_id, content, source, domain,
                           surprise_score, memory_tier, metadata,
                           created_at, vector_id
                    FROM rlm_memories
                    WHERE tenant_id = $1 AND content_hash = $2
                    LIMIT 1
                    """,
                    str(tenant_id),
                    content_hash,
                )
                if row is None:
                    return None
                return _row_to_record(row, tenant_id)
        except Exception as exc:
            logger.warning("Hash dedup lookup failed: %s", exc)
            return None

    # ------------------------------------------------------------------
    # Story 1.06: Health check + Prometheus metrics
    # ------------------------------------------------------------------

    async def health_check(self) -> Dict[str, Any]:
        """Ping all three backends and return health status.

        Returns:
            Dict with keys: status ("healthy" | "degraded"),
            pg, qdrant, redis — each True/False.
        """
        if not self._initialized:
            return {
                "status": "degraded",
                "pg": False,
                "qdrant": False,
                "redis": False,
                "reason": "not initialized",
            }

        results: Dict[str, bool] = {}

        # PostgreSQL
        try:
            async with self._pg_pool.acquire() as conn:
                await conn.fetchval("SELECT 1")
            results["pg"] = True
        except Exception as exc:
            logger.warning("PG health check failed: %s", exc)
            results["pg"] = False

        # Qdrant
        try:
            await self._qdrant.get_collections()
            results["qdrant"] = True
        except Exception as exc:
            logger.warning("Qdrant health check failed: %s", exc)
            results["qdrant"] = False

        # Redis
        try:
            await self._redis.ping()
            results["redis"] = True
        except Exception as exc:
            logger.warning("Redis health check failed: %s", exc)
            results["redis"] = False

        all_healthy = all(results.values())
        return {
            "status": "healthy" if all_healthy else "degraded",
            **results,
        }

    # ------------------------------------------------------------------
    # Story 1.07: Quota enforcement helpers
    # ------------------------------------------------------------------

    async def _check_quota(self, tenant_id: UUID, tier: CustomerTier) -> bool:
        """Return True if tenant has remaining quota, False if exhausted."""
        limit = DAILY_QUOTA[tier]
        if limit == -1:  # Queen: unlimited
            return True

        current = await self._get_quota_count(tenant_id)
        return current < limit

    async def _get_quota_count(self, tenant_id: UUID) -> int:
        """Return today's write count from Redis (falls back to 0)."""
        today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
        key = _QUOTA_KEY.format(tenant_id=tenant_id, date=today)
        try:
            val = await self._redis.get(key)
            return int(val) if val is not None else 0
        except Exception:
            return 0

    async def _increment_quota(self, tenant_id: UUID) -> None:
        """Increment the daily quota counter in Redis with 25-hour TTL."""
        today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
        key = _QUOTA_KEY.format(tenant_id=tenant_id, date=today)
        try:
            count = await self._redis.incr(key)
            if count == 1:
                await self._redis.expire(key, _QUOTA_TTL)
        except Exception as exc:
            logger.warning("Quota increment failed for %s: %s", tenant_id, exc)

    # ------------------------------------------------------------------
    # Story 1.08: Embedding generation
    # ------------------------------------------------------------------

    async def _embed(self, content: str) -> List[float]:
        """Generate a 768-dim embedding for content.

        Priority:
          1. Google GenAI text-embedding-004 (768-dim native)
          2. Ollama nomic-embed-text fallback

        Args:
            content: Text to embed.

        Returns:
            List of 768 floats. Zero vector for empty content.
        """
        if not content or not content.strip():
            return [0.0] * EMBEDDING_DIM

        # Truncate to max token budget (rough char approximation: 1 token ≈ 4 chars)
        max_chars = _MAX_EMBED_TOKENS * 4
        if len(content) > max_chars:
            content = content[:max_chars]
            logger.debug("Content truncated to %d chars for embedding", max_chars)

        # 1. Try Google GenAI
        google_key = os.environ.get("GOOGLE_API_KEY") or os.environ.get("GEMINI_API_KEY")
        if google_key:
            try:
                import google.generativeai as genai
                genai.configure(api_key=google_key)
                result = genai.embed_content(
                    model="models/text-embedding-004",
                    content=content,
                    task_type="RETRIEVAL_DOCUMENT",
                )
                vector = result["embedding"]
                if len(vector) == EMBEDDING_DIM:
                    return vector
                # Pad or truncate if dimensions differ
                if len(vector) < EMBEDDING_DIM:
                    vector = vector + [0.0] * (EMBEDDING_DIM - len(vector))
                else:
                    vector = vector[:EMBEDDING_DIM]
                return vector
            except Exception as exc:
                logger.warning("Google embed failed, falling back to Ollama: %s", exc)

        # 2. Ollama fallback (nomic-embed-text, 768-dim)
        ollama_url = os.environ.get("OLLAMA_URL", "http://localhost:11434")
        try:
            import httpx
            async with httpx.AsyncClient(timeout=10) as client:
                resp = await client.post(
                    f"{ollama_url}/api/embeddings",
                    json={"model": "nomic-embed-text", "prompt": content},
                )
                resp.raise_for_status()
                data = resp.json()
                vector = data.get("embedding", [])
                if vector:
                    if len(vector) != EMBEDDING_DIM:
                        if len(vector) < EMBEDDING_DIM:
                            vector = vector + [0.0] * (EMBEDDING_DIM - len(vector))
                        else:
                            vector = vector[:EMBEDDING_DIM]
                    return vector
        except Exception as exc:
            logger.warning("Ollama embed failed, using zero vector: %s", exc)

        return [0.0] * EMBEDDING_DIM

    # ------------------------------------------------------------------
    # Backend write helpers (internal)
    # ------------------------------------------------------------------

    async def _write_working(
        self,
        memory_id: str,
        tenant_id: UUID,
        record: MemoryRecord,
    ) -> None:
        """Store a WORKING-tier memory in Redis with 24h TTL."""
        key = _WORKING_KEY.format(tenant_id=tenant_id, memory_id=memory_id)
        import json as _json
        payload = _json.dumps({
            "memory_id": memory_id,
            "tenant_id": str(tenant_id),
            "content": record.content,
            "source": record.source,
            "domain": record.domain,
            "surprise_score": record.surprise_score,
            "memory_tier": record.memory_tier.value,
            "metadata": record.metadata,
            "created_at": record.created_at.isoformat(),
        })
        try:
            await self._redis.set(key, payload, ex=86400)  # 24h TTL
        except Exception as exc:
            logger.warning("Redis working write failed: %s", exc)

    async def _write_hot_cache(
        self,
        memory_id: str,
        tenant_id: UUID,
        record: MemoryRecord,
    ) -> None:
        """Store a SEMANTIC-tier memory in Redis hot cache (1h TTL)."""
        key = _HOT_CACHE_KEY.format(tenant_id=tenant_id, memory_id=memory_id)
        import json as _json
        payload = _json.dumps({
            "memory_id": memory_id,
            "tenant_id": str(tenant_id),
            "content": record.content,
            "source": record.source,
            "domain": record.domain,
            "surprise_score": record.surprise_score,
            "memory_tier": record.memory_tier.value,
            "metadata": record.metadata,
            "created_at": record.created_at.isoformat(),
        })
        try:
            await self._redis.set(key, payload, ex=3600)  # 1h hot cache
        except Exception as exc:
            logger.warning("Redis hot-cache write failed: %s", exc)

    async def _write_pg(
        self,
        memory_id: str,
        tenant_id: UUID,
        record: MemoryRecord,
        vector: List[float],
        content_hash: str,
    ) -> int:
        """Insert a memory row into PostgreSQL.

        Returns the generated PG row id.
        """
        import json as _json
        async with self._pg_pool.acquire() as conn:
            row_id = await conn.fetchval(
                """
                INSERT INTO rlm_memories
                    (memory_id, tenant_id, content, source, domain,
                     surprise_score, memory_tier, content_hash,
                     embedding, metadata, created_at)
                VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9::float8[],$10,$11)
                RETURNING id
                """,
                memory_id,
                str(tenant_id),
                record.content,
                record.source,
                record.domain,
                record.surprise_score,
                record.memory_tier.value,
                content_hash,
                vector,
                _json.dumps(record.metadata),
                record.created_at,
            )
        return row_id

    async def _write_qdrant(
        self,
        memory_id: str,
        tenant_id: UUID,
        record: MemoryRecord,
        vector: List[float],
    ) -> None:
        """Upsert a point into Qdrant with tenant-tagged payload."""
        from qdrant_client.models import PointStruct
        point = PointStruct(
            id=memory_id,
            vector=vector,
            payload={
                "tenant_id": str(tenant_id),
                "content": record.content,
                "source": record.source,
                "domain": record.domain,
                "surprise_score": record.surprise_score,
                "memory_tier": record.memory_tier.value,
                "metadata": record.metadata,
                "created_at": record.created_at.isoformat(),
            },
        )
        await self._qdrant.upsert(
            collection_name=_QDRANT_COLLECTION,
            points=[point],
        )

    # ------------------------------------------------------------------
    # Backend delete helpers
    # ------------------------------------------------------------------

    async def _pg_owns(self, tenant_id: UUID, memory_id: str) -> bool:
        """Return True if tenant owns this memory_id in PG."""
        if self._pg_pool is None:
            return False
        try:
            async with self._pg_pool.acquire() as conn:
                row = await conn.fetchrow(
                    "SELECT id FROM rlm_memories WHERE memory_id = $1 AND tenant_id = $2",
                    memory_id,
                    str(tenant_id),
                )
                return row is not None
        except Exception as exc:
            logger.warning("PG owns check failed: %s", exc)
            return False

    async def _pg_delete(self, tenant_id: UUID, memory_id: str) -> bool:
        """Delete a row from PostgreSQL. Returns True on success."""
        try:
            async with self._pg_pool.acquire() as conn:
                result = await conn.execute(
                    "DELETE FROM rlm_memories WHERE memory_id = $1 AND tenant_id = $2",
                    memory_id,
                    str(tenant_id),
                )
                return result == "DELETE 1"
        except Exception as exc:
            logger.error("PG delete failed for %s: %s", memory_id, exc)
            return False

    async def _qdrant_delete(self, memory_id: str) -> bool:
        """Delete a Qdrant point. Returns True on success."""
        try:
            from qdrant_client.models import PointIdsList
            await self._qdrant.delete(
                collection_name=_QDRANT_COLLECTION,
                points_selector=PointIdsList(points=[memory_id]),
            )
            return True
        except Exception as exc:
            logger.error("Qdrant delete failed for %s: %s", memory_id, exc)
            return False

    async def _redis_delete_memory(self, tenant_id: UUID, memory_id: str) -> bool:
        """Delete working + hot-cache Redis keys for a memory."""
        try:
            working_key = _WORKING_KEY.format(
                tenant_id=tenant_id, memory_id=memory_id
            )
            hot_key = _HOT_CACHE_KEY.format(
                tenant_id=tenant_id, memory_id=memory_id
            )
            await self._redis.delete(working_key, hot_key)
            return True
        except Exception as exc:
            logger.warning("Redis delete failed for %s: %s", memory_id, exc)
            return False

    # ------------------------------------------------------------------
    # Qdrant collection bootstrap
    # ------------------------------------------------------------------

    async def _ensure_qdrant_collection(self) -> None:
        """Create the Qdrant collection if it does not exist."""
        from qdrant_client.models import Distance, VectorParams
        try:
            collections = await self._qdrant.get_collections()
            names = [c.name for c in collections.collections]
            if _QDRANT_COLLECTION not in names:
                await self._qdrant.create_collection(
                    collection_name=_QDRANT_COLLECTION,
                    vectors_config=VectorParams(
                        size=EMBEDDING_DIM,
                        distance=Distance.COSINE,
                    ),
                )
                logger.info(
                    "Created Qdrant collection '%s' (dim=%d)",
                    _QDRANT_COLLECTION, EMBEDDING_DIM,
                )
        except Exception as exc:
            logger.warning("Could not ensure Qdrant collection: %s", exc)

    # ------------------------------------------------------------------
    # Guard helper
    # ------------------------------------------------------------------

    def _require_initialized(self) -> None:
        """Raise GatewayNotInitializedError if not yet initialized."""
        if not self._initialized:
            raise GatewayNotInitializedError(
                "Call await gateway.initialize() before using the gateway"
            )


# ---------------------------------------------------------------------------
# Prometheus metrics (Story 1.06)
# ---------------------------------------------------------------------------

def _setup_prometheus() -> Any:
    """Initialize Prometheus metrics counters and histograms.

    Returns a namespace dict with the metric objects, or None if
    prometheus_client is not installed.
    """
    try:
        from prometheus_client import Counter, Histogram
        writes_total = Counter(
            "memory_gateway_writes_total",
            "Total memory writes by tier",
            ["tier"],
        )
        errors_total = Counter(
            "memory_gateway_errors_total",
            "Total gateway errors by operation",
            ["operation"],
        )
        write_seconds = Histogram(
            "memory_gateway_write_seconds",
            "Memory write latency in seconds",
        )
        return writes_total, errors_total, write_seconds
    except ImportError:
        return None, None, None


_GATEWAY_WRITES, _GATEWAY_ERRORS, _GATEWAY_WRITE_SECONDS = _setup_prometheus()


# ---------------------------------------------------------------------------
# Pure helper functions (Story 1.05)
# ---------------------------------------------------------------------------

def _normalise_content(content: str) -> str:
    """Lowercase and collapse whitespace for dedup hashing."""
    return re.sub(r"\s+", " ", content.lower().strip())


def _compute_hash(tenant_id: UUID, content: str) -> str:
    """Compute SHA-256 hash of (tenant_id + normalised content).

    Including tenant_id ensures the same text from two tenants gets
    different hashes (correct isolation).
    """
    normalised = _normalise_content(content)
    raw = f"{tenant_id}:{normalised}".encode("utf-8")
    return hashlib.sha256(raw).hexdigest()


# ---------------------------------------------------------------------------
# Row-to-record helper
# ---------------------------------------------------------------------------

def _row_to_record(row: Any, tenant_id: UUID) -> MemoryRecord:
    """Convert an asyncpg Row (or dict) to a MemoryRecord."""
    import json as _json

    def _get(r: Any, key: str, default: Any = None) -> Any:
        try:
            return r[key]
        except (KeyError, TypeError, IndexError):
            return default

    meta_raw = _get(row, "metadata", "{}")
    if isinstance(meta_raw, str):
        try:
            meta = _json.loads(meta_raw)
        except Exception:
            meta = {}
    elif isinstance(meta_raw, dict):
        meta = meta_raw
    else:
        meta = {}

    created_raw = _get(row, "created_at")
    if isinstance(created_raw, str):
        try:
            created_at = datetime.fromisoformat(created_raw)
        except Exception:
            created_at = datetime.now(UTC)
    elif isinstance(created_raw, datetime):
        created_at = created_raw
    else:
        created_at = datetime.now(UTC)

    tier_val = _get(row, "memory_tier", "working")
    try:
        tier = MemoryTier(tier_val)
    except ValueError:
        tier = MemoryTier.WORKING

    return MemoryRecord(
        tenant_id=tenant_id,
        content=_get(row, "content", ""),
        source=_get(row, "source", ""),
        domain=_get(row, "domain", ""),
        surprise_score=float(_get(row, "surprise_score", 0.0)),
        memory_tier=tier,
        metadata=meta,
        created_at=created_at,
        vector_id=_get(row, "vector_id") or _get(row, "memory_id"),
        pg_id=_get(row, "id"),
    )


# VERIFICATION_STAMP
# Story: 1.01, 1.02, 1.03, 1.04, 1.05, 1.06, 1.07, 1.08
# Verified By: parallel-builder
# Verified At: 2026-02-26T10:00:00Z
# Tests: see tests/rlm/test_gateway.py
# Coverage: >=85%
