#!/usr/bin/env python3
"""
RLM Bloodstream Pipeline
=========================
Autonomous pipeline that keeps Genesis memory alive and flowing after every
session. Runs nightly via cron (or manually) to:

  Step 1 — Load new KG entities/axioms into PostgreSQL (delta, no duplicates)
  Step 2 — Embed new entries into Qdrant genesis_memories collection
  Step 3 — Refresh Redis hot cache (top 50 by confidence, 24h TTL)
  Step 4 — Run MemoryDigestion night-cycle (prune stale episodic memories)
  Step 5 — Generate mentor preference pairs for unprocessed AIVA interactions
  Step 6 — Write structured run report to data/bloodstream_last_run.log

Design principles:
  - Never crash: every step is try/except guarded; log error and continue
  - Run flag prevents concurrent executions
  - Delta load: duplicate detection via (source, title) composite key
  - Dry-run mode: report stats without writing anything

Usage:
  python3 /mnt/e/genesis-system/core/rlm_bloodstream_pipeline.py
  python3 /mnt/e/genesis-system/core/rlm_bloodstream_pipeline.py --dry-run
  python3 /mnt/e/genesis-system/core/rlm_bloodstream_pipeline.py --source cron_nightly

Author: Genesis Orchestrator
Created: 2026-02-20
"""

from __future__ import annotations

import argparse
import json
import logging
import os
import sys
import time
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

# ── Path setup ────────────────────────────────────────────────────────────────
GENESIS_ROOT = Path("/mnt/e/genesis-system")
KG_ENTITIES_DIR = GENESIS_ROOT / "KNOWLEDGE_GRAPH" / "entities"
KG_AXIOMS_DIR = GENESIS_ROOT / "KNOWLEDGE_GRAPH" / "axioms"
KG_CREATOR_MIND_DIR   = GENESIS_ROOT / "KNOWLEDGE_GRAPH" / "creator_mind"
KG_RESEARCH_DIR       = GENESIS_ROOT / "KNOWLEDGE_GRAPH" / "research"
KG_RELATIONSHIPS_DIR  = GENESIS_ROOT / "KNOWLEDGE_GRAPH" / "relationships"
KG_CONVERSATIONS_DIR  = GENESIS_ROOT / "KNOWLEDGE_GRAPH" / "claude_conversations"
DEEP_THINK_DIR        = GENESIS_ROOT / "deep_think_results"
PLANS_DIR             = GENESIS_ROOT / "plans"
DOCS_DIR              = GENESIS_ROOT / "docs"
DATA_DIR = GENESIS_ROOT / "data"
RUN_FLAG = DATA_DIR / ".bloodstream_running"
LOG_FILE = DATA_DIR / "bloodstream_last_run.log"

# Elestio config
sys.path.insert(0, str(GENESIS_ROOT / "data" / "genesis-memory"))
sys.path.insert(0, str(GENESIS_ROOT))

# ── Logging ───────────────────────────────────────────────────────────────────
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s — %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("BloodstreamPipeline")

# ── Table DDL (idempotent) ────────────────────────────────────────────────────
CREATE_TABLE_SQL = """
CREATE TABLE IF NOT EXISTS bloodstream_knowledge (
    id SERIAL PRIMARY KEY,
    source TEXT NOT NULL,
    type TEXT NOT NULL,
    title TEXT NOT NULL,
    content TEXT NOT NULL,
    tags TEXT[] DEFAULT '{}',
    confidence FLOAT DEFAULT 0.8,
    embedding_id TEXT,
    embedding_text TEXT,
    metadata JSONB DEFAULT '{}',
    created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_bk_source     ON bloodstream_knowledge(source);
CREATE INDEX IF NOT EXISTS idx_bk_type       ON bloodstream_knowledge(type);
CREATE INDEX IF NOT EXISTS idx_bk_confidence ON bloodstream_knowledge(confidence DESC);
CREATE INDEX IF NOT EXISTS idx_bk_tags       ON bloodstream_knowledge USING GIN(tags);
CREATE INDEX IF NOT EXISTS idx_bk_content_fts
    ON bloodstream_knowledge USING GIN(to_tsvector('english', content));
CREATE INDEX IF NOT EXISTS idx_bk_title_fts
    ON bloodstream_knowledge USING GIN(to_tsvector('english', title));
"""


# ─────────────────────────────────────────────────────────────────────────────
# KG Parsing helpers
# ─────────────────────────────────────────────────────────────────────────────

def _parse_confidence(raw_value: Any, default: float = 0.85) -> float:
    """
    Parse a confidence value that may be a float, int, or string.

    String variants handled:
      - Numeric strings: "0.9", "90", "0.97"
      - Qualitative labels: "HIGH", "MEDIUM", "LOW", "VERY_HIGH", etc.
    """
    if raw_value is None:
        return default
    if isinstance(raw_value, (float, int)):
        return min(max(float(raw_value), 0.0), 1.0)
    # String
    s = str(raw_value).strip().upper()
    qualitative_map = {
        "VERY_HIGH": 0.95, "VERY HIGH": 0.95,
        "HIGH": 0.85,
        "MEDIUM_HIGH": 0.75, "MEDIUM HIGH": 0.75,
        "MEDIUM": 0.65,
        "MEDIUM_LOW": 0.55, "MEDIUM LOW": 0.55,
        "LOW": 0.45,
        "VERY_LOW": 0.3, "VERY LOW": 0.3,
    }
    if s in qualitative_map:
        return qualitative_map[s]
    try:
        val = float(s)
        # Handle percentage strings like "97" meaning 0.97
        if val > 1.0:
            val = val / 100.0
        return min(max(val, 0.0), 1.0)
    except ValueError:
        return default


def _extract_entity_fields(raw: Dict[str, Any], file_stem: str) -> Optional[Dict[str, Any]]:
    """
    Map a raw KG entity JSON object to the bloodstream schema.

    KG entities use varied field names; we normalise them here.
    Returns None if no usable title + content can be found.
    """
    # Title: prefer 'name', then 'title', then 'id'
    title = (
        raw.get("name")
        or raw.get("title")
        or raw.get("id")
        or ""
    )
    if not title:
        return None

    # Content: prefer 'description', then 'content', then 'properties' dump
    content = raw.get("description") or raw.get("content") or ""
    if not content:
        props = raw.get("properties")
        if isinstance(props, dict):
            content = json.dumps(props, ensure_ascii=False)

    if not content:
        return None

    entity_type = raw.get("type", "entity")
    confidence = _parse_confidence(raw.get("confidence"), default=0.85)

    # Build metadata: preserve original id + all other fields not in schema
    skip_keys = {"name", "title", "description", "content", "type",
                 "confidence", "tags", "id"}
    metadata: Dict[str, Any] = {
        k: v for k, v in raw.items() if k not in skip_keys
    }
    if raw.get("id"):
        metadata["kg_id"] = raw["id"]

    tags = raw.get("tags", [])
    if isinstance(tags, str):
        tags = [t.strip() for t in tags.split(",") if t.strip()]

    return {
        "source": f"kg_entity:{file_stem}",
        "type": entity_type,
        "title": str(title)[:500],          # Guard against oversized titles
        "content": str(content)[:10000],    # Guard against oversized content
        "tags": tags,
        "confidence": min(max(confidence, 0.0), 1.0),
        "metadata": metadata,
    }


def _extract_axiom_fields(raw: Dict[str, Any], file_stem: str) -> Optional[Dict[str, Any]]:
    """
    Map a raw KG axiom JSON object to the bloodstream schema.

    Axioms typically have: id, axiom/statement, source, confidence, category/type
    """
    # Title: prefer 'title', then 'id', then first 80 chars of axiom text
    axiom_text = raw.get("axiom") or raw.get("statement") or raw.get("content") or ""
    title = (
        raw.get("title")
        or raw.get("name")
        or raw.get("id")
        or axiom_text[:80]
    )
    if not title:
        return None

    content = axiom_text or raw.get("description") or ""
    if not content:
        return None

    entity_type = raw.get("type") or raw.get("category") or "axiom"
    confidence = _parse_confidence(raw.get("confidence"), default=0.85)

    skip_keys = {"axiom", "statement", "title", "name", "description", "content",
                 "type", "category", "confidence", "tags", "id"}
    metadata: Dict[str, Any] = {
        k: v for k, v in raw.items() if k not in skip_keys
    }
    if raw.get("id"):
        metadata["kg_id"] = raw["id"]

    tags = raw.get("tags", [])
    if isinstance(tags, str):
        tags = [t.strip() for t in tags.split(",") if t.strip()]

    return {
        "source": f"kg_axiom:{file_stem}",
        "type": entity_type,
        "title": str(title)[:500],
        "content": str(content)[:10000],
        "tags": tags,
        "confidence": min(max(confidence, 0.0), 1.0),
        "metadata": metadata,
    }


def _build_embedding_text(entry: Dict[str, Any]) -> str:
    """Build embedding-optimised text string from a bloodstream entry."""
    parts = [f"[{entry['type']}]", entry["title"], entry["content"]]
    tags = entry.get("tags", [])
    if tags:
        parts.append("Tags: " + ", ".join(str(t) for t in tags))
    return " | ".join(parts)


def _scan_jsonl_dir(directory: Path, kind: str) -> List[Dict[str, Any]]:
    """
    Scan a directory of .jsonl files and parse each line.

    kind: 'entity' or 'axiom' — controls which extractor runs.
    Silently skips non-.jsonl files (e.g. .md report files).
    """
    results: List[Dict[str, Any]] = []
    if not directory.exists():
        logger.warning("KG directory not found: %s", directory)
        return results

    for jsonl_file in sorted(directory.glob("*.jsonl")):
        stem = jsonl_file.stem
        try:
            with open(jsonl_file, "r", encoding="utf-8") as fh:
                for line_no, line in enumerate(fh, 1):
                    line = line.strip()
                    if not line:
                        continue
                    try:
                        raw = json.loads(line)
                    except json.JSONDecodeError as exc:
                        logger.debug("Bad JSON in %s:%d — %s", jsonl_file.name, line_no, exc)
                        continue

                    if kind == "entity":
                        entry = _extract_entity_fields(raw, stem)
                    else:
                        entry = _extract_axiom_fields(raw, stem)

                    if entry:
                        results.append(entry)
        except OSError as exc:
            logger.warning("Cannot read %s — %s", jsonl_file, exc)

    return results

# ─────────────────────────────────────────────────────────────────────────────
# Skip-list for bulk directory scans
# ─────────────────────────────────────────────────────────────────────────────

_SKIP_DIRS = frozenset({
    "node_modules", "__pycache__", ".venv", "generated", "lightrag_index",
})


def _is_skipped_path(p: Path) -> bool:
    """Return True if any component of the path is in the skip list."""
    return any(part in _SKIP_DIRS for part in p.parts)


# ─────────────────────────────────────────────────────────────────────────────
# Markdown document scanner
# ─────────────────────────────────────────────────────────────────────────────

def _scan_md_files(
    directory: Path,
    source_prefix: str,
    *,
    recursive: bool = True,
) -> List[Dict[str, Any]]:
    """
    Walk *directory* for .md files and build bloodstream entries.

    Entry shape:
      source:         f"{source_prefix}:{relative_path}"
      type:           "document"
      title:          first H1 heading, or filename stem
      content:        first 2000 chars of file
      embedding_text: first 1000 chars of file
      confidence:     0.8

    Skips:
      - Any path component in _SKIP_DIRS
      - Files with fewer than 200 characters (stubs)
    """
    results: List[Dict[str, Any]] = []
    if not directory.exists():
        logger.warning("Markdown directory not found: %s", directory)
        return results

    pattern = "**/*.md" if recursive else "*.md"
    for md_file in sorted(directory.glob(pattern)):
        if _is_skipped_path(md_file):
            continue
        try:
            text = md_file.read_text(encoding="utf-8", errors="replace")
        except OSError as exc:
            logger.debug("Cannot read %s — %s", md_file, exc)
            continue

        if len(text) < 200:
            continue  # Stub file — skip

        # Extract title: first H1 heading or filename stem
        title = ""
        for raw_line in text.splitlines():
            stripped = raw_line.strip()
            if stripped.startswith("# "):
                title = stripped[2:].strip()
                break
        if not title:
            title = md_file.stem.replace("_", " ").replace("-", " ")

        rel = str(md_file.relative_to(directory))
        content = text[:2000]
        embedding_text = text[:1000]

        results.append({
            "source": f"{source_prefix}:{rel}",
            "type": "document",
            "title": title[:500],
            "content": content,
            "tags": [],
            "confidence": 0.8,
            "embedding_text": embedding_text,
            "metadata": {"path": str(md_file)},
        })

    return results


# ─────────────────────────────────────────────────────────────────────────────
# Recursive JSONL / JSON scanner (for creator_mind, relationships, research)
# ─────────────────────────────────────────────────────────────────────────────

def _scan_jsonl_recursive(
    directory: Path,
    kind: str,
) -> List[Dict[str, Any]]:
    """
    Recursively scan *directory* for .jsonl and .json files and parse them
    as structured KG entities using the existing extract helpers.

    kind: 'entity' or 'axiom'

    Skips any path component in _SKIP_DIRS.
    For .json files that contain a top-level list, iterates the list.
    For .json files that contain a top-level dict, treats as a single record.
    .jsonl files are parsed line-by-line (existing behaviour).
    """
    results: List[Dict[str, Any]] = []
    if not directory.exists():
        logger.warning("KG directory not found: %s", directory)
        return results

    all_files: List[Path] = sorted(
        f for f in directory.rglob("*")
        if f.is_file() and f.suffix in {".jsonl", ".json"}
        and not _is_skipped_path(f)
    )

    for kfile in all_files:
        stem = kfile.stem
        try:
            with open(kfile, "r", encoding="utf-8") as fh:
                raw_text = fh.read()
        except OSError as exc:
            logger.warning("Cannot read %s — %s", kfile, exc)
            continue

        if kfile.suffix == ".jsonl":
            # Line-by-line
            for line_no, line in enumerate(raw_text.splitlines(), 1):
                line = line.strip()
                if not line:
                    continue
                try:
                    raw = json.loads(line)
                except json.JSONDecodeError as exc:
                    logger.debug("Bad JSON in %s:%d — %s", kfile.name, line_no, exc)
                    continue
                entry = (
                    _extract_entity_fields(raw, stem)
                    if kind == "entity"
                    else _extract_axiom_fields(raw, stem)
                )
                if entry:
                    results.append(entry)
        else:
            # .json — try to parse as list or single dict
            try:
                data = json.loads(raw_text)
            except json.JSONDecodeError as exc:
                logger.debug("Bad JSON in %s — %s", kfile.name, exc)
                continue

            records = data if isinstance(data, list) else [data]
            for raw in records:
                if not isinstance(raw, dict):
                    continue
                entry = (
                    _extract_entity_fields(raw, stem)
                    if kind == "entity"
                    else _extract_axiom_fields(raw, stem)
                )
                if entry:
                    results.append(entry)

    return results



# ─────────────────────────────────────────────────────────────────────────────
# Pipeline class
# ─────────────────────────────────────────────────────────────────────────────

class BloodstreamPipeline:
    """
    Autonomous Genesis memory synchronisation pipeline.

    Each step is self-contained and failure-tolerant. The pipeline always
    completes and always writes a run log.
    """

    def __init__(self, source: str = "manual", dry_run: bool = False):
        self.source = source
        self.dry_run = dry_run
        self.start_time = time.monotonic()
        self.start_ts = datetime.now()

        # Counters for the run report
        self.stats: Dict[str, Any] = {
            "files_scanned": 0,
            "entries_found": 0,
            "new_entries_loaded": 0,
            "qdrant_embedded": 0,
            "redis_cached": 0,
            "digestion_pruned": 0,
            "mentor_pairs": 0,
            "errors": [],
        }

    # ── Public entry point ────────────────────────────────────────────────────

    def run(self) -> None:
        """Execute the full pipeline."""
        if not self._acquire_run_flag():
            return

        try:
            logger.info("=== Bloodstream Pipeline START (source=%s, dry_run=%s) ===",
                        self.source, self.dry_run)

            entries = self._step1_load_kg()
            self._step2_embed_qdrant(entries)
            self._step3_refresh_redis()
            self._step4_memory_digestion()
            self._step5_mentor_pairs()

        finally:
            self._step6_write_report()
            self._release_run_flag()
            logger.info("=== Bloodstream Pipeline COMPLETE ===")

    # ── Step 1: Load new KG entries into PostgreSQL ───────────────────────────

    def _step1_load_kg(self) -> List[Dict[str, Any]]:
        """
        Scan KG entity and axiom directories, delta-load new entries into
        PostgreSQL bloodstream_knowledge table.

        Returns the list of newly inserted entries (for downstream embedding).
        """
        logger.info("[Step 1] Scanning KG directories...")

        entity_entries       = _scan_jsonl_dir(KG_ENTITIES_DIR, "entity")
        axiom_entries        = _scan_jsonl_dir(KG_AXIOMS_DIR, "axiom")

        # ── Expanded KB: structured KG sub-directories ────────────────────────────
        creator_mind_entries = _scan_jsonl_recursive(KG_CREATOR_MIND_DIR, "entity")
        research_entries     = _scan_jsonl_recursive(KG_RESEARCH_DIR, "entity")
        relationship_entries = _scan_jsonl_recursive(KG_RELATIONSHIPS_DIR, "entity")

        # ── Expanded KB: markdown documents ───────────────────────────────────────
        convo_md_entries     = _scan_md_files(KG_CONVERSATIONS_DIR, "md:claude_conversations")
        deep_think_entries   = _scan_md_files(DEEP_THINK_DIR, "md:deep_think")
        plans_entries        = _scan_md_files(PLANS_DIR, "md:plans")
        docs_entries         = _scan_md_files(DOCS_DIR, "md:docs")

        all_entries = (
            entity_entries
            + axiom_entries
            + creator_mind_entries
            + research_entries
            + relationship_entries
            + convo_md_entries
            + deep_think_entries
            + plans_entries
            + docs_entries
        )

        entity_files = len(list(KG_ENTITIES_DIR.glob("*.jsonl"))) if KG_ENTITIES_DIR.exists() else 0
        axiom_files  = len(list(KG_AXIOMS_DIR.glob("*.jsonl"))) if KG_AXIOMS_DIR.exists() else 0
        self.stats["files_scanned"] = entity_files + axiom_files
        self.stats["entries_found"] = len(all_entries)

        logger.info("  Entities:      %d entries from %d files", len(entity_entries), entity_files)
        logger.info("  Axioms:        %d entries from %d files", len(axiom_entries), axiom_files)
        logger.info("  Creator mind:  %d entries", len(creator_mind_entries))
        logger.info("  Research:      %d entries", len(research_entries))
        logger.info("  Relationships: %d entries", len(relationship_entries))
        logger.info("  Conversations: %d entries", len(convo_md_entries))
        logger.info("  Deep think:    %d entries", len(deep_think_entries))
        logger.info("  Plans:         %d entries", len(plans_entries))
        logger.info("  Docs:          %d entries", len(docs_entries))

        if self.dry_run:
            logger.info("  [DRY RUN] Skipping PostgreSQL write.")
            return all_entries

        if not all_entries:
            logger.info("  No entries to process.")
            return []

        try:
            import psycopg2
            import psycopg2.extras
            from elestio_config import PostgresConfig

            conn = psycopg2.connect(**PostgresConfig.get_connection_params())
            conn.autocommit = False
            cur = conn.cursor()

            # Ensure table exists
            cur.execute(CREATE_TABLE_SQL)
            conn.commit()

            # Load existing (source, title) pairs for duplicate detection
            cur.execute("SELECT source, title FROM bloodstream_knowledge")
            existing_keys = set()
            for row in cur.fetchall():
                existing_keys.add((row[0], row[1]))

            logger.info("  Existing rows in DB: %d", len(existing_keys))

            # Filter to only new entries
            new_entries = [
                e for e in all_entries
                if (e["source"], e["title"]) not in existing_keys
            ]
            logger.info("  New entries after dedup: %d", len(new_entries))

            if not new_entries:
                cur.close()
                conn.close()
                return []

            # Batch insert
            insert_sql = """
                INSERT INTO bloodstream_knowledge
                    (source, type, title, content, tags, confidence, embedding_text, metadata)
                VALUES
                    (%s, %s, %s, %s, %s, %s, %s, %s)
                RETURNING id, source, type, title, embedding_text
            """

            inserted_entries: List[Dict[str, Any]] = []
            batch_size = 500

            for i in range(0, len(new_entries), batch_size):
                chunk = new_entries[i : i + batch_size]
                rows: List[Tuple] = []
                for entry in chunk:
                    emb_text = _build_embedding_text(entry)
                    rows.append((
                        entry["source"],
                        entry["type"],
                        entry["title"],
                        entry["content"],
                        entry["tags"],
                        entry["confidence"],
                        emb_text,
                        json.dumps(entry.get("metadata", {})),
                    ))

                # Use executemany and fetchall for RETURNING
                for row_tuple in rows:
                    cur.execute(insert_sql, row_tuple)
                    returned = cur.fetchone()
                    if returned:
                        inserted_entries.append({
                            "db_id": returned[0],
                            "source": returned[1],
                            "type": returned[2],
                            "title": returned[3],
                            "embedding_text": returned[4],
                        })

                conn.commit()
                logger.info("  Inserted batch %d/%d (%d so far)",
                            min(i + batch_size, len(new_entries)), len(new_entries),
                            len(inserted_entries))

            cur.close()
            conn.close()

            self.stats["new_entries_loaded"] = len(inserted_entries)
            logger.info("[Step 1] Done. %d new entries loaded.", len(inserted_entries))
            return inserted_entries

        except Exception as exc:
            msg = f"Step 1 PostgreSQL error: {exc}"
            logger.error(msg)
            self.stats["errors"].append(msg)
            return []

    # ── Step 2: Embed new entries into Qdrant ─────────────────────────────────

    def _step2_embed_qdrant(self, new_entries: List[Dict[str, Any]]) -> None:
        """
        Generate embeddings for newly inserted entries and upsert into the
        Qdrant 'genesis_memories' collection.

        Updates embedding_id in PostgreSQL after successful upsert.
        Skips gracefully if Qdrant is unavailable.
        """
        logger.info("[Step 2] Embedding %d new entries into Qdrant...", len(new_entries))

        if self.dry_run:
            logger.info("  [DRY RUN] Skipping Qdrant write.")
            return

        if not new_entries:
            logger.info("  Nothing to embed.")
            return

        # Attempt Qdrant connection
        try:
            from qdrant_client import QdrantClient
            from qdrant_client.http import models as qdrant_models
            from elestio_config import QdrantConfig
        except ImportError as exc:
            logger.warning("  Qdrant client not available (%s). Skipping Step 2.", exc)
            return

        COLLECTION = "genesis_memories"
        VECTOR_DIM = 3072  # gemini-embedding-001 dimensionality

        try:
            qcfg = QdrantConfig()
            qdrant = QdrantClient(url=qcfg.url, api_key=qcfg.api_key, timeout=15)

            # Ensure collection exists
            try:
                qdrant.get_collection(COLLECTION)
            except Exception:
                qdrant.create_collection(
                    collection_name=COLLECTION,
                    vectors_config=qdrant_models.VectorParams(
                        size=VECTOR_DIM,
                        distance=qdrant_models.Distance.COSINE,
                    ),
                )
                logger.info("  Created Qdrant collection '%s'", COLLECTION)

        except Exception as exc:
            logger.warning("  Qdrant connection failed: %s. Skipping Step 2.", exc)
            return

        # Attempt to load embedding model (Gemini via API or local fallback)
        embed_fn = self._get_embed_function()
        if embed_fn is None:
            logger.warning("  No embedding function available. Skipping Qdrant upsert.")
            return

        # Attempt PostgreSQL for embedding_id update
        pg_conn = None
        try:
            import psycopg2
            from elestio_config import PostgresConfig
            pg_conn = psycopg2.connect(**PostgresConfig.get_connection_params())
            pg_conn.autocommit = True
            pg_cur = pg_conn.cursor()
        except Exception as exc:
            logger.warning("  PG connection for embedding_id update failed: %s", exc)
            pg_cur = None

        embedded_count = 0
        GENESIS_NS = uuid.UUID("e2a6d7f8-b3c4-4d5e-8f90-a1b2c3d4e5f6")

        for entry in new_entries:
            emb_text = entry.get("embedding_text", "")
            if not emb_text:
                continue
            try:
                vector = embed_fn(emb_text)
                if not vector:
                    continue

                point_id = str(uuid.uuid5(GENESIS_NS, emb_text))
                qdrant.upsert(
                    collection_name=COLLECTION,
                    points=[
                        qdrant_models.PointStruct(
                            id=point_id,
                            vector=vector,
                            payload={
                                "source": entry.get("source", ""),
                                "type": entry.get("type", ""),
                                "title": entry.get("title", ""),
                                "db_id": entry.get("db_id"),
                            },
                        )
                    ],
                )

                # Update embedding_id in PostgreSQL
                if pg_cur and entry.get("db_id"):
                    try:
                        pg_cur.execute(
                            "UPDATE bloodstream_knowledge SET embedding_id = %s WHERE id = %s",
                            (point_id, entry["db_id"]),
                        )
                    except Exception:
                        pass  # Non-fatal

                embedded_count += 1

            except Exception as exc:
                logger.debug("  Embed failed for '%s': %s", entry.get("title", "?"), exc)

        if pg_conn:
            pg_conn.close()

        self.stats["qdrant_embedded"] = embedded_count
        logger.info("[Step 2] Done. %d entries embedded into Qdrant.", embedded_count)

    def _get_embed_function(self):
        """
        Return a callable that maps text -> List[float].
        Tries Gemini API first, falls back to no-op.
        Returns None if no embedding provider is available.
        """
        # Try Gemini embedding via google.genai (new SDK)
        try:
            from google import genai as google_genai  # type: ignore

            api_key = os.environ.get("GEMINI_API_KEY") or os.environ.get("GEMINI_API_KEY_NEW", "")
            if not api_key:
                # Try reading from secrets.env
                secrets_path = GENESIS_ROOT / "config" / "secrets.env"
                if secrets_path.exists():
                    with open(secrets_path) as f:
                        for line in f:
                            line = line.strip()
                            if line.startswith("GEMINI_API_KEY_NEW=") or line.startswith("GEMINI_API_KEY="):
                                val = line.split("=", 1)[1].strip().strip('"').strip("'")
                                if val and val != "YOUR_GEMINI_API_KEY":
                                    api_key = val
                                    break

            if api_key and api_key != "YOUR_GEMINI_API_KEY":
                _genai_client = google_genai.Client(api_key=api_key)

                def gemini_embed(text: str) -> List[float]:
                    result = _genai_client.models.embed_content(
                        model="gemini-embedding-001",
                        contents=text,
                    )
                    return list(result.embeddings[0].values)

                logger.info("  Embedding provider: gemini-embedding-001 (google.genai, 3072d)")
                return gemini_embed
        except Exception as exc:
            logger.warning("  Gemini embedding not available: %s", exc)

        logger.warning("  No embedding provider configured. Qdrant upsert will be skipped.")
        return None

    # ── Step 3: Refresh Redis hot cache ──────────────────────────────────────

    def _step3_refresh_redis(self) -> None:
        """
        Query PostgreSQL for top 50 entries by confidence and cache them
        in Redis with a 24h TTL.

        Key scheme:
          bloodstream:{id}    — JSON blob of the entry
          bloodstream:hot_ids — Redis list of ids (ordered by confidence DESC)
        """
        logger.info("[Step 3] Refreshing Redis hot cache...")

        if self.dry_run:
            logger.info("  [DRY RUN] Skipping Redis write.")
            return

        try:
            import psycopg2
            from elestio_config import PostgresConfig
            conn = psycopg2.connect(**PostgresConfig.get_connection_params())
            cur = conn.cursor()
            cur.execute(
                """
                SELECT id, source, type, title, content, tags, confidence, metadata
                FROM bloodstream_knowledge
                ORDER BY confidence DESC
                LIMIT 50
                """
            )
            rows = cur.fetchall()
            cur.close()
            conn.close()
        except Exception as exc:
            msg = f"Step 3 PostgreSQL query error: {exc}"
            logger.error(msg)
            self.stats["errors"].append(msg)
            return

        try:
            import redis as redis_lib
            from elestio_config import RedisConfig
            r = redis_lib.Redis(**RedisConfig.get_connection_params())
            r.ping()
        except Exception as exc:
            logger.warning("  Redis unavailable (%s). Skipping Step 3.", exc)
            return

        try:
            pipe = r.pipeline()
            # Clear stale hot_ids index
            pipe.delete("bloodstream:hot_ids")

            for row in rows:
                row_id, source, rtype, title, content, tags, confidence, metadata = row
                key = f"bloodstream:{row_id}"
                blob = json.dumps({
                    "id": row_id,
                    "source": source,
                    "type": rtype,
                    "title": title,
                    "content": content[:2000],  # Truncate for cache efficiency
                    "tags": tags or [],
                    "confidence": confidence,
                    "metadata": metadata,
                }, ensure_ascii=False, default=str)
                pipe.set(key, blob, ex=86400)       # 24h TTL
                pipe.rpush("bloodstream:hot_ids", str(row_id))

            pipe.expire("bloodstream:hot_ids", 86400)
            pipe.execute()

            self.stats["redis_cached"] = len(rows)
            logger.info("[Step 3] Done. %d entries cached in Redis.", len(rows))

        except Exception as exc:
            msg = f"Step 3 Redis write error: {exc}"
            logger.error(msg)
            self.stats["errors"].append(msg)

    # ── Step 4: Run memory digestion ─────────────────────────────────────────

    def _step4_memory_digestion(self) -> None:
        """
        Run the MemoryDigestion night-cycle to prune low-impact episodic
        memories older than 7 days.
        """
        logger.info("[Step 4] Running MemoryDigestion...")

        if self.dry_run:
            logger.info("  [DRY RUN] Skipping memory digestion.")
            return

        try:
            from core.memory_digestion import run as memory_digestion_run
            memory_digestion_run(dry_run=self.dry_run)
            self.stats["digestion_pruned"] = "completed"
            logger.info("[Step 4] Done. Memory digestion cycle complete.")
        except Exception as exc:
            msg = f"Step 4 MemoryDigestion error: {exc}"
            logger.error(msg)
            self.stats["errors"].append(msg)
            self.stats["digestion_pruned"] = "error"

    # ── Step 5: Generate mentor preference pairs ──────────────────────────────

    def _step5_mentor_pairs(self) -> None:
        """
        Query unprocessed AIVA interactions and run them through the mentor
        feedback engine to generate preference pairs for the RLM training loop.

        Skips gracefully if:
          - aiva_interactions table doesn't exist
          - pl_preference_pairs table doesn't exist
          - mentor_feedback_engine import fails
        """
        logger.info("[Step 5] Generating mentor preference pairs...")

        if self.dry_run:
            logger.info("  [DRY RUN] Skipping mentor pair generation.")
            return

        # Fetch unprocessed interactions
        interactions: List[Dict[str, Any]] = []
        try:
            import psycopg2
            import psycopg2.extras
            from elestio_config import PostgresConfig
            conn = psycopg2.connect(**PostgresConfig.get_connection_params())
            cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
            cur.execute(
                """
                SELECT *
                FROM aiva_interactions
                WHERE mentor_evaluated = FALSE OR mentor_evaluated IS NULL
                LIMIT 100
                """
            )
            interactions = [dict(row) for row in cur.fetchall()]
            cur.close()
            conn.close()
            logger.info("  Found %d unprocessed AIVA interactions.", len(interactions))
        except Exception as exc:
            # Table may not exist yet — skip gracefully
            logger.warning("  aiva_interactions query failed (%s). Skipping Step 5.", exc)
            return

        if not interactions:
            logger.info("[Step 5] No unprocessed interactions. Skipping.")
            return

        # Import mentor feedback engine
        try:
            sys.path.insert(0, str(GENESIS_ROOT / "AIVA" / "mentors"))
            from mentor_feedback_engine import MentorFeedbackEngine  # type: ignore
            engine = MentorFeedbackEngine()
        except (ImportError, AttributeError) as exc:
            # Try alternate class name
            try:
                from mentor_feedback_engine import run_evaluation  # type: ignore
                engine = None
                run_fn = run_evaluation
            except ImportError:
                logger.warning("  mentor_feedback_engine import failed (%s). Skipping Step 5.", exc)
                return

        # Ensure pl_preference_pairs table exists
        try:
            import psycopg2
            from elestio_config import PostgresConfig
            conn = psycopg2.connect(**PostgresConfig.get_connection_params())
            cur = conn.cursor()
            cur.execute(
                """
                SELECT 1 FROM information_schema.tables
                WHERE table_name = 'pl_preference_pairs'
                LIMIT 1
                """
            )
            table_exists = cur.fetchone() is not None
            cur.close()
            conn.close()
        except Exception as exc:
            logger.warning("  Table existence check failed (%s). Skipping Step 5.", exc)
            return

        if not table_exists:
            logger.warning("  pl_preference_pairs table not found. Skipping Step 5.")
            return

        # Process each interaction
        pairs_generated = 0
        try:
            import psycopg2
            import psycopg2.extras
            from elestio_config import PostgresConfig
            conn = psycopg2.connect(**PostgresConfig.get_connection_params())
            conn.autocommit = False
            cur = conn.cursor()

            for interaction in interactions:
                try:
                    if engine is not None and hasattr(engine, "evaluate"):
                        pairs = engine.evaluate(interaction)
                    elif engine is not None and hasattr(engine, "generate_preference_pairs"):
                        pairs = engine.generate_preference_pairs(interaction)
                    else:
                        pairs = run_fn(interaction)  # type: ignore

                    if not pairs:
                        continue

                    # Normalise: pairs may be a list of dicts or a single dict
                    if isinstance(pairs, dict):
                        pairs = [pairs]

                    for pair in pairs:
                        cur.execute(
                            """
                            INSERT INTO pl_preference_pairs
                                (interaction_id, chosen, rejected, context, mentor, score, created_at)
                            VALUES (%s, %s, %s, %s, %s, %s, NOW())
                            ON CONFLICT DO NOTHING
                            """,
                            (
                                interaction.get("id"),
                                json.dumps(pair.get("chosen", {})),
                                json.dumps(pair.get("rejected", {})),
                                json.dumps(pair.get("context", {})),
                                pair.get("mentor", "unknown"),
                                float(pair.get("score", 0.5)),
                            ),
                        )
                        pairs_generated += 1

                    # Mark interaction as evaluated
                    cur.execute(
                        "UPDATE aiva_interactions SET mentor_evaluated = TRUE WHERE id = %s",
                        (interaction.get("id"),),
                    )

                except Exception as exc:
                    logger.debug("  Pair generation failed for interaction %s: %s",
                                 interaction.get("id"), exc)

            conn.commit()
            cur.close()
            conn.close()

        except Exception as exc:
            msg = f"Step 5 mentor pair write error: {exc}"
            logger.error(msg)
            self.stats["errors"].append(msg)

        self.stats["mentor_pairs"] = pairs_generated
        logger.info("[Step 5] Done. %d preference pairs generated.", pairs_generated)

    # ── Step 6: Write run report ──────────────────────────────────────────────

    def _step6_write_report(self) -> None:
        """Append a structured run summary to bloodstream_last_run.log."""
        elapsed = time.monotonic() - self.start_time

        digestion_val = self.stats.get("digestion_pruned", 0)
        if isinstance(digestion_val, int):
            digestion_str = f"pruned {digestion_val} memories"
        else:
            digestion_str = digestion_val

        report = (
            f"[{self.start_ts.strftime('%Y-%m-%d %H:%M:%S')}] Pipeline run complete\n"
            f"  Source: {self.source}\n"
            f"  Dry run: {self.dry_run}\n"
            f"  KG files scanned: {self.stats['files_scanned']}\n"
            f"  Entries found: {self.stats['entries_found']}\n"
            f"  New entries loaded: {self.stats['new_entries_loaded']}\n"
            f"  Qdrant embeddings: {self.stats['qdrant_embedded']}\n"
            f"  Redis cache items: {self.stats['redis_cached']}\n"
            f"  Digestion: {digestion_str}\n"
            f"  Mentor pairs generated: {self.stats['mentor_pairs']}\n"
            f"  Duration: {elapsed:.1f}s\n"
        )

        if self.stats["errors"]:
            report += f"  Errors ({len(self.stats['errors'])}):\n"
            for err in self.stats["errors"]:
                report += f"    - {err}\n"

        report += "\n"

        if self.dry_run:
            logger.info("\n--- DRY RUN REPORT ---\n%s", report)
            return

        try:
            DATA_DIR.mkdir(parents=True, exist_ok=True)
            with open(LOG_FILE, "a", encoding="utf-8") as fh:
                fh.write(report)
            logger.info("[Step 6] Run report written to %s", LOG_FILE)
        except OSError as exc:
            logger.error("[Step 6] Failed to write run log: %s", exc)

        # Also print to stdout for cron log capture
        print(report, end="")

    # ── Run flag helpers ──────────────────────────────────────────────────────

    def _acquire_run_flag(self) -> bool:
        """
        Write a run flag to prevent concurrent executions.
        Returns False if pipeline is already running.
        """
        try:
            DATA_DIR.mkdir(parents=True, exist_ok=True)
            if RUN_FLAG.exists():
                # Check if flag is stale (> 2 hours old = crashed run)
                age = time.time() - RUN_FLAG.stat().st_mtime
                if age < 7200:
                    logger.warning("Pipeline already running (flag age: %.0fs). Exiting.", age)
                    return False
                else:
                    logger.warning("Stale run flag found (%.0f hours old). Removing.", age / 3600)
                    RUN_FLAG.unlink()

            RUN_FLAG.write_text(
                json.dumps({"pid": os.getpid(), "started": self.start_ts.isoformat()}),
                encoding="utf-8",
            )
            logger.info("Run flag acquired (PID %d).", os.getpid())
            return True
        except OSError as exc:
            logger.error("Cannot acquire run flag: %s", exc)
            return True  # Proceed anyway if flag write fails

    def _release_run_flag(self) -> None:
        """Remove the run flag on clean exit."""
        try:
            if RUN_FLAG.exists():
                RUN_FLAG.unlink()
                logger.debug("Run flag released.")
        except OSError:
            pass


# ─────────────────────────────────────────────────────────────────────────────
# Entry point
# ─────────────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Genesis RLM Bloodstream Pipeline — autonomous memory sync"
    )
    parser.add_argument(
        "--source",
        default="manual",
        help="What triggered this run (e.g. manual, cron_nightly, session_end)",
    )
    parser.add_argument(
        "--dry-run",
        action="store_true",
        help="Report stats without writing to any database",
    )
    args = parser.parse_args()

    pipeline = BloodstreamPipeline(source=args.source, dry_run=args.dry_run)
    pipeline.run()
