#!/usr/bin/env python3
"""
RLM Bloodstream — Memory Digestion Cron
========================================
Reads Claude session transcripts from C:\\Users\\P3\\.claude\\projects\\
Extracts key learnings via pattern matching (errors, fixes, decisions, tools used)
Writes structured KG entities to KNOWLEDGE_GRAPH/entities/
Upserts vectors to Qdrant for semantic search
Deduplicates via PostgreSQL tracking table

Author: Genesis Systems Builder
Created: 2026-02-23
Usage:
    python E:\\genesis-system\\core\\memory_digestion.py
    python E:\\genesis-system\\core\\memory_digestion.py --dry-run
    python E:\\genesis-system\\core\\memory_digestion.py --source cron_nightly
"""

from __future__ import annotations

import argparse
import hashlib
import json
import logging
import os
import re
import sys
import time
import uuid
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

# ── Path setup ────────────────────────────────────────────────────────────────
GENESIS_ROOT = Path(r"E:\genesis-system")
KG_ENTITIES_DIR = GENESIS_ROOT / "KNOWLEDGE_GRAPH" / "entities"
DATA_DIR = GENESIS_ROOT / "data"
CONTEXT_STATE_DIR = DATA_DIR / "context_state"
LOG_FILE = DATA_DIR / "memory_digestion_last_run.log"

# Claude session transcripts location
CLAUDE_PROJECTS_DIR = Path(r"C:\Users\P3\.claude\projects")

# Elestio config
sys.path.insert(0, str(GENESIS_ROOT / "data" / "genesis-memory"))
sys.path.insert(0, str(GENESIS_ROOT))

# ── Logging ───────────────────────────────────────────────────────────────────
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s — %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("MemoryDigestion")

# ── Pattern matchers for learning extraction ──────────────────────────────────
LEARNING_PATTERNS = {
    "error_fix": [
        r"(?:error|exception|traceback|failed).*?(?:fix|resolved|solution|workaround)[:\s]+(.{20,200})",
        r"(?:the (?:fix|solution|workaround) (?:is|was))[:\s]+(.{20,200})",
    ],
    "decision": [
        r"(?:decided|chose|opted|selected|going with)[:\s]+(.{20,200})",
        r"(?:instead of|rather than|over)[:\s]+(.{20,200})",
    ],
    "tool_use": [
        r"(?:using|via|through|with)[:\s]+([A-Za-z_\-]+(?:MCP|API|SDK|tool|module)[^.]{0,100})",
        r"(?:mcp__|playwright|gemini|telnyx|qdrant|postgres|redis)[^\s.]{0,100}",
    ],
    "learning": [
        r"(?:learned|discovered|found out|realized|turns out)[:\s]+(.{20,200})",
        r"(?:key insight|important|critical|NOTE|CRITICAL)[:\s]+(.{20,200})",
    ],
    "titan_memory": [
        r"\*\*([a-z_]+)\*\*:\s+(.{10,200})\s+\(\d{4}-\d{2}-\d{2}\)",
    ],
    "axiom": [
        r"(?:axiom|rule|principle|protocol|mandate)[:\s]+(.{20,200})",
        r"(?:NEVER|ALWAYS|FORBIDDEN|MANDATORY)[:\s]+(.{20,200})",
    ],
}

# ── DDL ───────────────────────────────────────────────────────────────────────
CREATE_DEDUP_TABLE_SQL = """
CREATE TABLE IF NOT EXISTS digestion_processed_sessions (
    id SERIAL PRIMARY KEY,
    session_id TEXT UNIQUE NOT NULL,
    file_path TEXT NOT NULL,
    file_hash TEXT NOT NULL,
    entities_extracted INT DEFAULT 0,
    processed_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_dps_session ON digestion_processed_sessions(session_id);
CREATE INDEX IF NOT EXISTS idx_dps_hash ON digestion_processed_sessions(file_hash);
"""

CREATE_ENTITIES_TABLE_SQL = """
CREATE TABLE IF NOT EXISTS digestion_kg_entities (
    id SERIAL PRIMARY KEY,
    entity_id TEXT UNIQUE NOT NULL,
    session_id TEXT,
    pattern_type TEXT NOT NULL,
    content TEXT NOT NULL,
    content_hash TEXT NOT NULL,
    source_file TEXT,
    confidence FLOAT DEFAULT 0.7,
    embedding_id TEXT,
    created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_dke_content_hash ON digestion_kg_entities(content_hash);
"""


# ── Database helpers ───────────────────────────────────────────────────────────
def get_pg_connection():
    """Get PostgreSQL connection via Elestio config."""
    try:
        from elestio_config import PostgresConfig
        import psycopg2
        return psycopg2.connect(**PostgresConfig.get_connection_params())
    except ImportError:
        logger.warning("elestio_config not found — using env vars fallback")
        import psycopg2
        return psycopg2.connect(
            host=os.environ.get("PG_HOST", "localhost"),
            port=int(os.environ.get("PG_PORT", 5432)),
            dbname=os.environ.get("PG_DBNAME", "genesis"),
            user=os.environ.get("PG_USER", "genesis"),
            password=os.environ.get("PG_PASSWORD", ""),
        )


def ensure_tables(conn) -> None:
    """Create tables if not exist."""
    cur = conn.cursor()
    cur.execute(CREATE_DEDUP_TABLE_SQL)
    cur.execute(CREATE_ENTITIES_TABLE_SQL)
    conn.commit()
    cur.close()


def is_session_processed(conn, file_hash: str) -> bool:
    """Check if session file was already processed (by hash)."""
    cur = conn.cursor()
    cur.execute(
        "SELECT 1 FROM digestion_processed_sessions WHERE file_hash = %s",
        (file_hash,)
    )
    result = cur.fetchone()
    cur.close()
    return result is not None


def mark_session_processed(conn, session_id: str, file_path: str,
                           file_hash: str, entity_count: int) -> None:
    cur = conn.cursor()
    cur.execute(
        """
        INSERT INTO digestion_processed_sessions
            (session_id, file_path, file_hash, entities_extracted)
        VALUES (%s, %s, %s, %s)
        ON CONFLICT (session_id) DO UPDATE
            SET file_hash = EXCLUDED.file_hash,
                entities_extracted = EXCLUDED.entities_extracted,
                processed_at = NOW()
        """,
        (session_id, file_path, file_hash, entity_count)
    )
    conn.commit()
    cur.close()


def is_entity_duplicate(conn, content_hash: str) -> bool:
    cur = conn.cursor()
    cur.execute(
        "SELECT 1 FROM digestion_kg_entities WHERE content_hash = %s",
        (content_hash,)
    )
    result = cur.fetchone()
    cur.close()
    return result is not None


def insert_entity(conn, entity: Dict[str, Any]) -> None:
    cur = conn.cursor()
    cur.execute(
        """
        INSERT INTO digestion_kg_entities
            (entity_id, session_id, pattern_type, content, content_hash,
             source_file, confidence)
        VALUES (%s, %s, %s, %s, %s, %s, %s)
        ON CONFLICT (content_hash) DO NOTHING
        """,
        (
            entity["entity_id"],
            entity.get("session_id"),
            entity["pattern_type"],
            entity["content"],
            entity["content_hash"],
            entity.get("source_file"),
            entity.get("confidence", 0.7),
        )
    )
    conn.commit()
    cur.close()


# ── Qdrant helpers ─────────────────────────────────────────────────────────────
def upsert_to_qdrant(entities: List[Dict[str, Any]], dry_run: bool = False) -> int:
    """Upsert entity vectors to Qdrant genesis_memories collection."""
    if dry_run:
        logger.info(f"  [DRY RUN] Would upsert {len(entities)} vectors to Qdrant")
        return len(entities)

    try:
        from elestio_config import QdrantConfig
        from qdrant_client import QdrantClient
        from qdrant_client.models import PointStruct, VectorParams, Distance

        cfg = QdrantConfig.get_connection_params()
        client = QdrantClient(host=cfg["host"], port=cfg.get("port", 6333),
                              api_key=cfg.get("api_key"))

        COLLECTION = "genesis_memories"
        VECTOR_SIZE = 384  # sentence-transformers default

        # Ensure collection exists
        collections = [c.name for c in client.get_collections().collections]
        if COLLECTION not in collections:
            client.create_collection(
                COLLECTION,
                vectors_config=VectorParams(size=VECTOR_SIZE, distance=Distance.COSINE)
            )

        # Simple embedding via sentence-transformers if available
        try:
            from sentence_transformers import SentenceTransformer
            model = SentenceTransformer("all-MiniLM-L6-v2")
            texts = [e["content"][:512] for e in entities]
            vectors = model.encode(texts).tolist()
        except ImportError:
            logger.warning("sentence-transformers not installed — using zero vectors")
            vectors = [[0.0] * VECTOR_SIZE for _ in entities]

        points = [
            PointStruct(
                id=str(uuid.uuid4()),
                vector=vec,
                payload={
                    "entity_id": e["entity_id"],
                    "pattern_type": e["pattern_type"],
                    "content": e["content"][:1000],
                    "session_id": e.get("session_id", ""),
                    "source_file": e.get("source_file", ""),
                    "confidence": e.get("confidence", 0.7),
                    "created_at": datetime.utcnow().isoformat(),
                }
            )
            for e, vec in zip(entities, vectors)
        ]

        client.upsert(collection_name=COLLECTION, points=points)
        logger.info(f"  Upserted {len(points)} vectors to Qdrant:{COLLECTION}")
        return len(points)

    except Exception as e:
        logger.error(f"  Qdrant upsert failed: {e}")
        return 0


# ── Extraction engine ─────────────────────────────────────────────────────────
def extract_entities_from_text(text: str, session_id: str,
                                source_file: str) -> List[Dict[str, Any]]:
    """Run all pattern matchers against transcript text. Returns list of entities."""
    entities = []

    for pattern_type, patterns in LEARNING_PATTERNS.items():
        for pattern in patterns:
            try:
                matches = re.findall(pattern, text, re.IGNORECASE | re.DOTALL)
                for match in matches:
                    content = match if isinstance(match, str) else " | ".join(match)
                    content = content.strip()[:500]

                    if len(content) < 15:
                        continue

                    content_hash = hashlib.sha256(content.encode()).hexdigest()[:16]
                    entity = {
                        "entity_id": f"dig_{pattern_type}_{content_hash}",
                        "session_id": session_id,
                        "pattern_type": pattern_type,
                        "content": content,
                        "content_hash": content_hash,
                        "source_file": source_file,
                        "confidence": _confidence_for_type(pattern_type),
                    }
                    entities.append(entity)
            except re.error as e:
                logger.debug(f"Regex error in pattern '{pattern}': {e}")

    # Deduplicate within batch
    seen = set()
    unique = []
    for e in entities:
        if e["content_hash"] not in seen:
            seen.add(e["content_hash"])
            unique.append(e)

    return unique


def _confidence_for_type(pattern_type: str) -> float:
    return {
        "error_fix": 0.85,
        "decision": 0.75,
        "tool_use": 0.65,
        "learning": 0.80,
        "titan_memory": 0.90,
        "axiom": 0.90,
    }.get(pattern_type, 0.70)


# ── Session file discovery ─────────────────────────────────────────────────────
def discover_session_files(base_dir: Path) -> List[Path]:
    """Find all .jsonl, .json, .md, .txt files in Claude projects directory."""
    if not base_dir.exists():
        logger.warning(f"Claude projects dir not found: {base_dir}")
        return []

    files = []
    for ext in ["*.jsonl", "*.json", "*.md", "*.txt"]:
        files.extend(base_dir.rglob(ext))

    # Sort by modification time (newest first)
    files.sort(key=lambda f: f.stat().st_mtime, reverse=True)
    logger.info(f"Discovered {len(files)} session files in {base_dir}")
    return files


def read_session_text(file_path: Path) -> str:
    """Read file and extract text content."""
    try:
        text = file_path.read_text(encoding="utf-8", errors="replace")

        # For JSONL: extract message content fields
        if file_path.suffix == ".jsonl":
            lines = []
            for line in text.splitlines():
                try:
                    obj = json.loads(line)
                    # Extract common transcript fields
                    for field in ["content", "message", "text", "value", "output"]:
                        if isinstance(obj.get(field), str):
                            lines.append(obj[field])
                except json.JSONDecodeError:
                    lines.append(line)
            return "\n".join(lines)

        return text
    except Exception as e:
        logger.warning(f"Could not read {file_path}: {e}")
        return ""


def compute_file_hash(file_path: Path) -> str:
    try:
        content = file_path.read_bytes()
        return hashlib.md5(content).hexdigest()
    except Exception:
        return str(file_path.stat().st_mtime)


# ── KG entity writer ──────────────────────────────────────────────────────────
def write_kg_entity_file(entities: List[Dict[str, Any]], run_timestamp: str,
                          dry_run: bool = False) -> Optional[Path]:
    """Write entities as JSONL to KG entities directory."""
    if not entities:
        return None

    if dry_run:
        logger.info(f"  [DRY RUN] Would write {len(entities)} entities to KG")
        return None

    KG_ENTITIES_DIR.mkdir(parents=True, exist_ok=True)
    out_path = KG_ENTITIES_DIR / f"digestion_run_{run_timestamp}.jsonl"

    with open(out_path, "w", encoding="utf-8") as f:
        for entity in entities:
            record = {
                "id": entity["entity_id"],
                "type": entity["pattern_type"],
                "content": entity["content"],
                "session_id": entity.get("session_id", ""),
                "source_file": entity.get("source_file", ""),
                "confidence": entity.get("confidence", 0.7),
                "created_at": datetime.utcnow().isoformat(),
            }
            f.write(json.dumps(record) + "\n")

    logger.info(f"  Wrote {len(entities)} KG entities to {out_path}")
    return out_path


# ── Night-cycle digestion (existing functionality — enhanced) ─────────────────
def run_night_cycle_digestion(conn, dry_run: bool = False) -> None:
    """Prune stale memories and consolidate semantic entities."""
    logger.info("Running night-cycle memory digestion...")

    if dry_run:
        logger.info("  [DRY RUN] Skipping prune and consolidate")
        return

    # Prune old low-impact episodes
    cutoff = (datetime.now() - timedelta(days=7)).isoformat()
    try:
        cur = conn.cursor()
        cur.execute(
            "DELETE FROM em_episodic_memories WHERE score < %s AND timestamp < %s",
            (0.4, cutoff)
        )
        pruned = cur.rowcount
        conn.commit()
        cur.close()
        logger.info(f"  Pruned {pruned} stale episodic memories")
    except Exception as e:
        logger.warning(f"  Prune step failed (table may not exist): {e}")

    # Consolidate: move high-confidence digestion entities to main KG
    try:
        cur = conn.cursor()
        cur.execute(
            """
            SELECT entity_id, pattern_type, content
            FROM digestion_kg_entities
            WHERE confidence >= 0.85 AND embedding_id IS NULL
            ORDER BY created_at DESC LIMIT 50
            """
        )
        rows = cur.fetchall()
        cur.close()
        logger.info(f"  Found {len(rows)} high-confidence entities pending embedding")
    except Exception as e:
        logger.warning(f"  Consolidate step failed: {e}")


# ── Main pipeline ─────────────────────────────────────────────────────────────
def run(dry_run: bool = False, source: str = "manual",
        max_files: int = 100, max_age_hours: int = 72) -> Dict[str, Any]:
    """
    Main digestion pipeline.
    Returns summary dict with counts.
    """
    run_ts = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
    logger.info(f"=== Memory Digestion Run [{run_ts}] source={source} dry_run={dry_run} ===")

    stats = {
        "run_timestamp": run_ts,
        "source": source,
        "files_discovered": 0,
        "files_processed": 0,
        "files_skipped_dedup": 0,
        "entities_extracted": 0,
        "entities_written_pg": 0,
        "entities_upserted_qdrant": 0,
        "errors": [],
    }

    # Connect to PostgreSQL
    conn = None
    try:
        conn = get_pg_connection()
        ensure_tables(conn)
        logger.info("PostgreSQL connected")
    except Exception as e:
        logger.error(f"PostgreSQL connection failed: {e}")
        stats["errors"].append(f"pg_connect: {e}")
        conn = None

    # Discover session files
    session_files = discover_session_files(CLAUDE_PROJECTS_DIR)
    stats["files_discovered"] = len(session_files)

    # Filter by age
    cutoff_time = time.time() - (max_age_hours * 3600)
    recent_files = [
        f for f in session_files
        if f.stat().st_mtime >= cutoff_time
    ][:max_files]

    logger.info(f"Processing {len(recent_files)} files (max_age={max_age_hours}h, max={max_files})")

    all_entities: List[Dict[str, Any]] = []

    for file_path in recent_files:
        try:
            file_hash = compute_file_hash(file_path)
            session_id = f"session_{file_path.stem}_{file_hash[:8]}"

            # Dedup check
            if conn and is_session_processed(conn, file_hash):
                stats["files_skipped_dedup"] += 1
                continue

            text = read_session_text(file_path)
            if not text or len(text) < 50:
                continue

            entities = extract_entities_from_text(
                text, session_id, str(file_path)
            )

            if not entities:
                continue

            # Dedup entities against PostgreSQL
            new_entities = []
            for entity in entities:
                if conn and is_entity_duplicate(conn, entity["content_hash"]):
                    continue
                new_entities.append(entity)

            if not new_entities:
                if conn:
                    mark_session_processed(conn, session_id, str(file_path),
                                           file_hash, 0)
                continue

            all_entities.extend(new_entities)
            stats["files_processed"] += 1
            stats["entities_extracted"] += len(new_entities)

            # Write to PostgreSQL
            if conn and not dry_run:
                for entity in new_entities:
                    try:
                        insert_entity(conn, entity)
                        stats["entities_written_pg"] += 1
                    except Exception as e:
                        logger.debug(f"Entity insert error: {e}")

                mark_session_processed(conn, session_id, str(file_path),
                                        file_hash, len(new_entities))

        except Exception as e:
            logger.error(f"Error processing {file_path}: {e}")
            stats["errors"].append(f"{file_path.name}: {e}")

    # Write KG entity file
    if all_entities:
        write_kg_entity_file(all_entities, run_ts, dry_run)
        upserted = upsert_to_qdrant(all_entities, dry_run)
        stats["entities_upserted_qdrant"] = upserted

    # Night-cycle digestion
    if conn:
        run_night_cycle_digestion(conn, dry_run)

    if conn:
        conn.close()

    # Write run log
    LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
    with open(LOG_FILE, "w", encoding="utf-8") as f:
        json.dump(stats, f, indent=2, default=str)

    logger.info(
        f"=== Run complete: {stats['files_processed']} files, "
        f"{stats['entities_extracted']} entities, "
        f"{stats['entities_upserted_qdrant']} vectors ==="
    )
    return stats


# ── CLI ───────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="RLM Memory Digestion Cron")
    parser.add_argument("--dry-run", action="store_true",
                        help="Report stats without writing")
    parser.add_argument("--source", default="manual",
                        help="Source identifier for this run")
    parser.add_argument("--max-files", type=int, default=100,
                        help="Max session files to process")
    parser.add_argument("--max-age-hours", type=int, default=72,
                        help="Only process files modified within N hours")
    args = parser.parse_args()

    result = run(
        dry_run=args.dry_run,
        source=args.source,
        max_files=args.max_files,
        max_age_hours=args.max_age_hours,
    )
    print(json.dumps(result, indent=2, default=str))
