#!/usr/bin/env python3
"""
RLM Army — Persistent Daemon Coordinator for Genesis Memory
============================================================
Maintains a pool of 3 worker processes that continuously pull from
PostgreSQL event queues and crystallize knowledge into the Sunaiva vault.

Workers:
  1. AIVAInteractionWorker   — pulls aiva_rlm.aiva_interactions → extracts entities
  2. BloodstreamWorker       — pulls bloodstream_knowledge → enriches vault
  3. GHLActivityWorker       — polls GHL for new contacts/activity → vault decisions

Design:
  - Supervisor loop: checks workers every 30s, respawns if dead
  - SKIP LOCKED queue for safe distributed processing
  - Gemini Flash (OpenRouter) for LLM extraction — cost-efficient
  - All output to Sunaiva vault (entity/decision tables)
  - Graceful shutdown on SIGTERM

Usage:
  python3 /mnt/e/genesis-system/core/rlm_army.py
  python3 /mnt/e/genesis-system/core/rlm_army.py --workers 5 --interval 30

Keep alive via bridge_watchdog.sh (cron) or tmux genesis-rlm-army.
"""

from __future__ import annotations

import argparse
import json
import logging
import os
import signal
import sys
import time
import uuid
from datetime import datetime, timezone
from multiprocessing import Process
from pathlib import Path
from typing import Any, Optional

import psycopg2

# ── Config ────────────────────────────────────────────────────────────────────
GENESIS_ROOT = Path("/mnt/e/genesis-system")
LOG_DIR = GENESIS_ROOT / "logs"
LOG_DIR.mkdir(exist_ok=True)

VAULT_ID = "41f4785c-e9e0-43ac-b6e8-eedb571fba57"  # Kinan's Genesis vault

DB = {
    "host": "postgresql-genesis-u50607.vm.elestio.app",
    "port": 25432,
    "user": "postgres",
    "password": "CiBjh6LM7Yuqkq-jo2r7eQDw",
    "database": "postgres",
    "connect_timeout": 10,
    "keepalives": 1,
    "keepalives_idle": 30,
    "keepalives_interval": 10,
    "keepalives_count": 5,
}

# Load Gemini/OpenRouter key from secrets.env
_secrets = {}
try:
    secrets_path = GENESIS_ROOT / "config" / "secrets.env"
    with open(secrets_path) as f:
        for line in f:
            line = line.strip()
            if "=" in line and not line.startswith("#"):
                k, _, v = line.partition("=")
                _secrets[k.strip()] = v.strip().strip('"').strip("'")
except Exception:
    pass

OPENROUTER_KEY = _secrets.get("OPENROUTER_API_KEY", os.environ.get("OPENROUTER_API_KEY", ""))

# ── Logging ───────────────────────────────────────────────────────────────────
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(name)-20s] %(levelname)s — %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
    handlers=[
        logging.FileHandler(LOG_DIR / "rlm_army.log"),
        logging.StreamHandler(),
    ],
)
log = logging.getLogger("RLMArmyCoordinator")


# ── Schema DDL ────────────────────────────────────────────────────────────────

QUEUE_DDL = """
CREATE TABLE IF NOT EXISTS public.rlm_event_queue (
    id          SERIAL PRIMARY KEY,
    source      TEXT NOT NULL,
    event_type  TEXT NOT NULL,
    payload     JSONB NOT NULL DEFAULT '{}',
    status      TEXT NOT NULL DEFAULT 'pending',
    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    processed_at TIMESTAMPTZ
);
CREATE INDEX IF NOT EXISTS idx_rlm_eq_status ON public.rlm_event_queue(status, created_at);
CREATE INDEX IF NOT EXISTS idx_rlm_eq_source ON public.rlm_event_queue(source);
"""

AIVA_CURSOR_DDL = """
CREATE TABLE IF NOT EXISTS public.rlm_army_cursors (
    source      TEXT PRIMARY KEY,
    last_id     BIGINT NOT NULL DEFAULT 0,
    updated_at  TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
"""


# ── DB helpers ────────────────────────────────────────────────────────────────

def get_conn():
    return psycopg2.connect(**DB)


def ensure_schema():
    with get_conn() as conn:
        cur = conn.cursor()
        cur.execute(QUEUE_DDL)
        cur.execute(AIVA_CURSOR_DDL)
        conn.commit()
    log.info("Schema ensured")


def upsert_entity(conn, name: str, etype: str, context: str, source: str) -> None:
    """Write an entity to the Sunaiva vault (upsert by name+vault_id).
    context column is JSONB — store as JSON array of strings for compatibility."""
    import json as _json
    cur = conn.cursor()
    now = datetime.now(timezone.utc).isoformat()
    # vault_entities.context is JSONB — wrap plain text as JSON array
    ctx_json = _json.dumps([context[:2000]])
    cur.execute("""
        INSERT INTO sunaiva.vault_entities
            (id, vault_id, name, type, context, source_conversation, first_seen, last_seen)
        VALUES (%s, %s, %s, %s, %s::jsonb, %s, %s, %s)
        ON CONFLICT DO NOTHING
    """, (
        str(uuid.uuid4()), VAULT_ID,
        name[:255], etype, ctx_json, source,
        now, now,
    ))


def update_cursor(conn, source: str, last_id: int) -> None:
    cur = conn.cursor()
    cur.execute("""
        INSERT INTO public.rlm_army_cursors (source, last_id, updated_at)
        VALUES (%s, %s, NOW())
        ON CONFLICT (source) DO UPDATE
            SET last_id = EXCLUDED.last_id, updated_at = NOW()
    """, (source, last_id))


def get_cursor(conn, source: str) -> int:
    cur = conn.cursor()
    cur.execute("SELECT last_id FROM public.rlm_army_cursors WHERE source = %s", (source,))
    row = cur.fetchone()
    return row[0] if row else 0


# ── LLM extraction ────────────────────────────────────────────────────────────

def extract_entities_llm(text: str, source: str) -> list[dict]:
    """Use Gemini Flash via OpenRouter to extract entities from text."""
    if not OPENROUTER_KEY or len(text) < 50:
        return []

    try:
        import urllib.request
        payload = json.dumps({
            "model": "google/gemini-2.5-flash",
            "messages": [{
                "role": "user",
                "content": (
                    f"Extract key entities (people, tools, decisions, systems, projects) "
                    f"from this text. Return JSON array: "
                    f"[{{\"name\": str, \"type\": str, \"context\": str}}]. "
                    f"Keep context under 200 chars. Max 5 entities.\n\nText:\n{text[:1000]}"
                ),
            }],
            "max_tokens": 500,
            "temperature": 0.1,
        }).encode()

        req = urllib.request.Request(
            "https://openrouter.ai/api/v1/chat/completions",
            data=payload,
            headers={
                "Authorization": f"Bearer {OPENROUTER_KEY}",
                "Content-Type": "application/json",
                "HTTP-Referer": "https://genesis.agileadapt.com",
            },
        )
        with urllib.request.urlopen(req, timeout=15) as resp:
            result = json.load(resp)
            content = result["choices"][0]["message"]["content"]
            # Extract JSON array from response
            start = content.find("[")
            end = content.rfind("]") + 1
            if start >= 0 and end > start:
                return json.loads(content[start:end])
    except Exception as e:
        log.debug(f"LLM extraction failed: {e}")
    return []


# ── Workers ───────────────────────────────────────────────────────────────────

class AIVAInteractionWorker:
    """Polls aiva_rlm.aiva_interactions for new voice call data → vault entities."""

    NAME = "aiva_interaction_worker"
    INTERVAL = 60  # seconds between polls

    def run(self):
        log = logging.getLogger(self.NAME)
        log.info("Started")
        while True:
            try:
                self._process_batch()
            except Exception as e:
                log.warning(f"Batch error: {e}")
            time.sleep(self.INTERVAL)

    def _process_batch(self):
        with get_conn() as conn:
            last_id = get_cursor(conn, "aiva_interactions")

            # Check if table exists
            cur = conn.cursor()
            cur.execute("""
                SELECT EXISTS (
                    SELECT FROM information_schema.tables
                    WHERE table_schema = 'aiva_rlm'
                    AND table_name = 'aiva_interactions'
                )
            """)
            if not cur.fetchone()[0]:
                return

            cur.execute("""
                SELECT id, transcript, outcome, outcome_label, created_at
                FROM aiva_rlm.aiva_interactions
                WHERE id > %s
                ORDER BY id ASC
                LIMIT 20
            """, (last_id,))

            rows = cur.fetchall()
            if not rows:
                return

            log.info(f"Processing {len(rows)} new AIVA interactions")

            for row_id, transcript, outcome, outcome_label, created_at in rows:
                text = f"Outcome: {outcome_label or outcome or 'unknown'}. Transcript: {(transcript or '')[:800]}"
                entities = extract_entities_llm(text, f"aiva_interaction_{row_id}")

                # Write the call itself as an entity
                upsert_entity(conn, f"AIVA call {str(created_at)[:10]}", "voice_call",
                              f"Outcome: {outcome_label or outcome}. Duration logged.",
                              f"aiva_{row_id}")

                # Write LLM-extracted entities
                for e in entities:
                    upsert_entity(conn, e.get("name", "")[:255], e.get("type", "fact"),
                                  e.get("context", ""), f"aiva_{row_id}")

                last_id = row_id

            update_cursor(conn, "aiva_interactions", last_id)
            conn.commit()
            log.info(f"Processed up to interaction ID {last_id}")


class BloodstreamWorker:
    """Pulls bloodstream_knowledge entries → enriches vault with high-confidence items."""

    NAME = "bloodstream_worker"
    INTERVAL = 120

    def run(self):
        log = logging.getLogger(self.NAME)
        log.info("Started")
        while True:
            try:
                self._process_batch()
            except Exception as e:
                log.warning(f"Batch error: {e}")
            time.sleep(self.INTERVAL)

    def _process_batch(self):
        with get_conn() as conn:
            last_id = get_cursor(conn, "bloodstream_knowledge")
            cur = conn.cursor()

            cur.execute("""
                SELECT EXISTS (
                    SELECT FROM information_schema.tables
                    WHERE table_schema = 'public'
                    AND table_name = 'bloodstream_knowledge'
                )
            """)
            if not cur.fetchone()[0]:
                return

            cur.execute("""
                SELECT id, type, title, content, confidence
                FROM public.bloodstream_knowledge
                WHERE id > %s AND confidence >= 0.7
                ORDER BY id ASC
                LIMIT 30
            """, (last_id,))

            rows = cur.fetchall()
            if not rows:
                return

            log.info(f"Processing {len(rows)} bloodstream entries")

            for row_id, btype, title, content, confidence in rows:
                entity_type = "axiom" if btype == "axiom" else "knowledge"
                context = f"[confidence={confidence:.2f}] {content[:400]}"
                upsert_entity(conn, title[:255], entity_type, context, f"bloodstream_{row_id}")
                last_id = row_id

            update_cursor(conn, "bloodstream_knowledge", last_id)
            conn.commit()
            log.info(f"Enriched vault with {len(rows)} bloodstream items")


class KGCrystallizerWorker:
    """Reads recent KG entities/axioms from file system → writes to vault."""

    NAME = "kg_crystallizer"
    INTERVAL = 300  # every 5 minutes

    def run(self):
        log = logging.getLogger(self.NAME)
        log.info("Started")
        while True:
            try:
                self._crystallize()
            except Exception as e:
                log.warning(f"Crystallize error: {e}")
            time.sleep(self.INTERVAL)

    def _crystallize(self):
        kg_root = GENESIS_ROOT / "KNOWLEDGE_GRAPH"
        processed = []

        for jsonl_dir in [kg_root / "entities", kg_root / "axioms"]:
            if not jsonl_dir.exists():
                continue
            for jsonl_file in sorted(jsonl_dir.glob("*.jsonl")):
                # Only process files modified in last 24 hours
                age_hours = (time.time() - jsonl_file.stat().st_mtime) / 3600
                if age_hours > 24:
                    continue
                processed.extend(self._process_jsonl(jsonl_file))

        if processed:
            with get_conn() as conn:
                for item in processed:
                    upsert_entity(conn, item["name"], item["type"],
                                  item["context"], item["source"])
                conn.commit()
            log.info(f"Crystallized {len(processed)} KG items to vault")

    def _process_jsonl(self, path: Path) -> list[dict]:
        items = []
        try:
            with open(path) as f:
                for line in f:
                    line = line.strip()
                    if not line:
                        continue
                    try:
                        obj = json.loads(line)
                        name = obj.get("name") or obj.get("id") or obj.get("title")
                        if not name:
                            continue
                        context = obj.get("description") or obj.get("content") or obj.get("pattern") or ""
                        if isinstance(context, dict):
                            context = json.dumps(context)
                        items.append({
                            "name": str(name)[:255],
                            "type": obj.get("type") or ("axiom" if "axioms" in str(path) else "entity"),
                            "context": str(context)[:2000],
                            "source": f"kg:{path.name}",
                        })
                    except json.JSONDecodeError:
                        pass
        except Exception as e:
            log.debug(f"Error reading {path}: {e}")
        return items


# ── Coordinator ───────────────────────────────────────────────────────────────

class RLMArmy:
    """Supervisor that spawns and maintains the worker pool."""

    WORKER_CLASSES = [
        AIVAInteractionWorker,
        BloodstreamWorker,
        KGCrystallizerWorker,
    ]

    def __init__(self):
        self.processes: dict[str, Process] = {}
        self._shutdown = False
        signal.signal(signal.SIGTERM, self._handle_shutdown)
        signal.signal(signal.SIGINT, self._handle_shutdown)

    def _handle_shutdown(self, signum, frame):
        log.info(f"Received signal {signum} — graceful shutdown")
        self._shutdown = True

    def _spawn(self, WorkerClass) -> Process:
        worker = WorkerClass()
        p = Process(target=worker.run, name=WorkerClass.NAME, daemon=True)
        p.start()
        log.info(f"Spawned {WorkerClass.NAME} (pid {p.pid})")
        return p

    def run(self):
        log.info("=== RLM Army starting ===")
        ensure_schema()

        # Initial spawn
        for wc in self.WORKER_CLASSES:
            self.processes[wc.NAME] = self._spawn(wc)

        # Supervisor loop
        while not self._shutdown:
            time.sleep(30)
            for wc in self.WORKER_CLASSES:
                p = self.processes.get(wc.NAME)
                if p is None or not p.is_alive():
                    log.warning(f"Worker {wc.NAME} is dead — respawning")
                    if p:
                        p.terminate()
                    self.processes[wc.NAME] = self._spawn(wc)

        # Shutdown
        log.info("Shutting down workers...")
        for name, p in self.processes.items():
            p.terminate()
            p.join(timeout=5)
            log.info(f"  {name} stopped")
        log.info("=== RLM Army stopped ===")


# ── Entry point ───────────────────────────────────────────────────────────────

def main():
    parser = argparse.ArgumentParser(description="Genesis RLM Army — Persistent Memory Workers")
    parser.add_argument("--interval", type=int, default=30, help="Supervisor check interval (seconds)")
    args = parser.parse_args()

    army = RLMArmy()
    army.run()


if __name__ == "__main__":
    main()
