#!/usr/bin/env python3
"""
session_transcript_ingestor.py
Extract high-signal knowledge from Claude Code session JSONL files
and inject into bloodstream PostgreSQL as KG entities.

Usage:
    python3 core/session_transcript_ingestor.py [--limit N] [--dry-run]
"""
import os, sys, json, hashlib, logging, argparse
from pathlib import Path
from datetime import datetime

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("session_ingestor")

GENESIS_ROOT = Path("/mnt/e/genesis-system")
SESSION_DIR = Path("/home/authentic88/.claude/projects/-mnt-e-genesis-system")
PROCESSED_LOG = GENESIS_ROOT / "data" / "session_ingest_processed.txt"

# PG connection
def get_pg():
    import psycopg2
    return psycopg2.connect(
        host="postgresql-genesis-u50607.vm.elestio.app",
        port=25432, user="postgres",
        password="CiBjh6LM7Yuqkq-jo2r7eQDw",
        dbname="postgres", sslmode="require"
    )

def ensure_table(conn):
    """Create table if not exists. Does NOT attempt to add UNIQUE constraint."""
    with conn.cursor() as cur:
        cur.execute("""
            CREATE TABLE IF NOT EXISTS bloodstream_knowledge (
                id SERIAL PRIMARY KEY,
                source TEXT NOT NULL,
                type TEXT NOT NULL,
                title TEXT NOT NULL,
                content TEXT,
                confidence REAL DEFAULT 0.7,
                embedding_id TEXT,
                embedding_text TEXT,
                created_at TIMESTAMPTZ DEFAULT NOW()
            )
        """)
    conn.commit()

def extract_session_knowledge(filepath: Path) -> list:
    """Extract user messages and key assistant decisions from a session JSONL."""
    entries = []
    try:
        lines = filepath.read_text(errors='ignore').strip().splitlines()
        user_msgs = []
        assistant_summaries = []
        
        for line in lines:
            if not line.strip():
                continue
            try:
                obj = json.loads(line)
            except:
                continue
            
            # Extract user messages (Kinan's intent/decisions)
            msg = obj.get('message', obj)
            role = msg.get('role', '')
            content = msg.get('content', '')
            
            if isinstance(content, list):
                # Extract text blocks
                text_parts = [c.get('text','') for c in content if isinstance(c, dict) and c.get('type') == 'text']
                content = ' '.join(text_parts)
            
            if not isinstance(content, str):
                continue
                
            content = content.strip()
            if not content or len(content) < 20:
                continue
                
            if role == 'user' and len(content) > 30:
                user_msgs.append(content[:500])
            elif role == 'assistant' and len(content) > 100:
                # Only keep substantial assistant responses (decisions, completions)
                assistant_summaries.append(content[:300])
        
        if user_msgs:
            session_id = filepath.stem[:16]
            combined = '\n'.join(user_msgs[:10])  # Top 10 user intents
            entries.append({
                'source': f'session:{session_id}',
                'type': 'session_intent',
                'title': f'Session {session_id} — user intents',
                'content': combined,
                'embedding_text': combined[:1000],
                'confidence': 0.75,
            })
        
        if assistant_summaries:
            session_id = filepath.stem[:16]
            combined = '\n'.join(assistant_summaries[:5])
            entries.append({
                'source': f'session:{session_id}',
                'type': 'session_decision',
                'title': f'Session {session_id} — decisions',
                'content': combined,
                'embedding_text': combined[:1000],
                'confidence': 0.7,
            })
    except Exception as e:
        log.debug(f"Failed to parse {filepath.name}: {e}")
    
    return entries

def insert_entries(conn, entries: list) -> int:
    """Insert entries using SELECT EXISTS to skip duplicates — avoids needing UNIQUE constraint."""
    inserted = 0
    with conn.cursor() as cur:
        for e in entries:
            try:
                # Check if (source, title) already exists
                cur.execute(
                    "SELECT 1 FROM bloodstream_knowledge WHERE source = %s AND title = %s LIMIT 1",
                    (e['source'], e['title'])
                )
                if cur.fetchone():
                    continue  # already present, skip
                cur.execute("""
                    INSERT INTO bloodstream_knowledge
                        (source, type, title, content, confidence, embedding_text)
                    VALUES (%s, %s, %s, %s, %s, %s)
                """, (e['source'], e['type'], e['title'], e['content'], e['confidence'], e['embedding_text']))
                inserted += 1
            except Exception as ex:
                log.warning(f"Insert failed for {e['source']}: {ex}")
                conn.rollback()
                continue
    conn.commit()
    return inserted

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--limit', type=int, default=0)
    parser.add_argument('--dry-run', action='store_true')
    args = parser.parse_args()

    # Load processed files
    processed = set()
    if PROCESSED_LOG.exists():
        processed = set(PROCESSED_LOG.read_text().splitlines())

    # Find all session JSONL files — top-level sessions + subagent transcripts
    files = sorted(SESSION_DIR.rglob("*.jsonl"))
    log.info(f"Found {len(files)} session files, {len(processed)} already processed")
    
    if args.limit:
        files = files[:args.limit]

    conn = None if args.dry_run else get_pg()
    if conn:
        ensure_table(conn)

    total_inserted = 0
    total_processed = 0

    for i, fp in enumerate(files):
        if fp.name in processed:
            continue
        
        entries = extract_session_knowledge(fp)
        if not entries:
            processed.add(fp.name)
            continue
        
        if args.dry_run:
            log.info(f"[DRY] {fp.name}: {len(entries)} entries")
            for e in entries:
                log.info(f"  type={e['type']} | {e['title'][:60]}")
        else:
            n = insert_entries(conn, entries)
            total_inserted += n
            log.debug(f"{fp.name}: {n}/{len(entries)} new entries inserted")
        
        processed.add(fp.name)
        total_processed += 1
        
        if total_processed % 50 == 0:
            log.info(f"Progress: {total_processed}/{len(files)} files, {total_inserted} inserted so far")
            # Save progress checkpoint
            if not args.dry_run:
                PROCESSED_LOG.write_text('\n'.join(processed))

    if conn:
        conn.close()
        PROCESSED_LOG.write_text('\n'.join(processed))

    log.info(f"Done. Processed: {total_processed}, Inserted: {total_inserted}")

if __name__ == '__main__':
    main()
