#!/usr/bin/env python3
"""
Alpha Evolve Daemon -- Autonomous Nightly Evolution System
===========================================================

Runs at 03:00 AEST daily. Applies Darwinian selection pressure to Genesis
revenue-generating assets (Phase 1: email templates via Instantly.ai).

6-Phase Cycle:
  HARVEST -> CULL -> MUTATE -> TEST -> ASCEND -> LOG

Usage:
  python scripts/alpha_evolve_daemon.py --cycle       # Run one evolution cycle
  python scripts/alpha_evolve_daemon.py --dry-run     # Simulate without changes
  python scripts/alpha_evolve_daemon.py --status      # Show current generation status
  python scripts/alpha_evolve_daemon.py --daemon      # Run as daemon (03:00 AEST daily)

Environment:
  INSTANTLY_API_KEY         Instantly.ai API key (fallback hardcoded for Phase 1)
  GEMINI_API_KEY            Gemini API key for mutation generation
  ALPHA_EVOLVE_PAUSE=true   Skip current cycle
  ALPHA_EVOLVE_DRY_RUN=true Log only, no changes
  ALPHA_EVOLVE_MUTATIONS=5  Mutations per elite (default 5)

Safety:
  - Minimum 3 survivors per asset type (never cull all)
  - Archive, never delete
  - Dry-run by default unless --cycle or --daemon is given
  - Pause flag checked at start of every cycle
  - NO SQLite -- PostgreSQL only (Elestio Bloodstream)
  - All paths on E: drive

CRITICAL: This script does NOT activate or unpause the Instantly.ai campaign.
          It only READS analytics from the API.

Version: 1.0.0
Phase: 1 (Email template evolution only)
"""

import argparse
import json
import logging
import os
import re
import sys
import time
import uuid
from dataclasses import dataclass, asdict, field
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

# ---------------------------------------------------------------------------
# Path setup: ensure genesis-system root and genesis-memory are importable
# ---------------------------------------------------------------------------

GENESIS_ROOT = Path(__file__).resolve().parent.parent
GENESIS_MEMORY_PATH = GENESIS_ROOT / "data" / "genesis-memory"

sys.path.insert(0, str(GENESIS_ROOT))
sys.path.insert(0, str(GENESIS_MEMORY_PATH))

# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------

AEST = timezone(timedelta(hours=10))
CYCLE_VERSION = "1.0.0"
MIN_SURVIVORS_PER_TYPE = 3
DEFAULT_MUTATIONS_PER_ELITE = 5
MAJOR_IMPROVEMENT_THRESHOLD = 0.20   # 20% improvement triggers notification
SIMILARITY_THRESHOLD = 0.85          # Qdrant cosine sim -- reject near-duplicates
MAX_JULES_TASKS_PER_CYCLE = 30       # Reserve 70 of 100 daily quota for daytime

ASSET_TYPES = ["email_template", "voice_prompt", "widget_config", "execution_script"]

# Phase 1 scope: email templates only
PHASE_1_ASSET_TYPES = ["email_template"]

# Minimum data before an asset is eligible for scoring / culling
MIN_DATA_THRESHOLDS: Dict[str, Dict[str, int]] = {
    "email_template": {"sent": 50},
    "voice_prompt": {"total_calls": 15},
    "widget_config": {"impressions": 200},
    "execution_script": {"runs": 20},
}

# Phase 1 Instantly.ai campaign
INSTANTLY_CAMPAIGN_ID = "cf21ba92-4c8c-443c-b67e-4b6530024118"
# NOTE: API key hardcoded for Phase 1 (design doc specifies this)
INSTANTLY_API_KEY_FALLBACK = "MjBjODUxNGYtNjA5MC00NjY4LWFhY2UtOWZmYTE3NDhhMmQ1OnRTYkpqRU94RHJveg=="

GEMINI_MUTATION_MODEL = "gemini-2.5-pro"   # Upgrade to gemini-3-pro when available

# Paths
LOG_DIR = GENESIS_ROOT / "hive" / "logs"
PROGRESS_DIR = GENESIS_ROOT / "hive" / "progress"
JULES_TASKS_DIR = GENESIS_ROOT / "hive" / "jules_evolve_tasks"
KG_AXIOMS_FILE = GENESIS_ROOT / "KNOWLEDGE_GRAPH" / "axioms" / "alpha_evolve_daemon.jsonl"
MAJOR_IMPROVEMENTS_LOG = LOG_DIR / "major_improvements.log"

# Ensure dirs exist at import time
for _d in [LOG_DIR, PROGRESS_DIR, JULES_TASKS_DIR, KG_AXIOMS_FILE.parent]:
    _d.mkdir(parents=True, exist_ok=True)

# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s -- %(message)s",
    handlers=[
        logging.FileHandler(str(LOG_DIR / "alpha_evolve.log"), encoding="utf-8"),
        logging.StreamHandler(sys.stdout),
    ],
)
log = logging.getLogger("alpha_evolve")


# ---------------------------------------------------------------------------
# Data Classes
# ---------------------------------------------------------------------------

@dataclass
class EvolutionAsset:
    """A single evolvable asset."""
    asset_id: str
    asset_type: str
    content: Dict[str, Any]
    version: int
    parent_id: Optional[str]
    fitness_score: float
    status: str          # active | archived | superseded | mutation
    metrics: Dict[str, Any]
    created_at: str
    metadata: Dict[str, Any] = field(default_factory=dict)


@dataclass
class CycleReport:
    """Summary of one evolution cycle."""
    cycle_id: str
    started_at: str
    completed_at: str
    assets_scored: int
    assets_culled: int
    mutations_generated: int
    mutations_tested: int
    promotions: int
    major_improvements: List[Dict]
    errors: List[str]
    dry_run: bool


# ---------------------------------------------------------------------------
# Fitness Functions
# ---------------------------------------------------------------------------

def safe_divide(numerator, denominator) -> float:
    """Safe division returning 0.0 if denominator is 0 or None."""
    if not denominator:
        return 0.0
    return round(float(numerator) / float(denominator), 4)


def email_fitness(metrics: Dict[str, Any]) -> float:
    """
    Composite fitness score for email templates.
    Range: 0.0 to 1.0 (higher = better)

    Weights:
      open_rate        20% -- necessary but not sufficient
      reply_rate       40% -- primary engagement signal
      conversion_rate  30% -- ultimate business value
      bounce_rate       5% -- deliverability health
      spam_rate         5% -- reputation protection
    """
    sent = metrics.get("sent", 0)
    if sent < 10:
        return 0.0   # Not enough data

    open_rate = float(metrics.get("open_rate", 0))
    reply_rate = float(metrics.get("reply_rate", 0))
    conversion_rate = float(metrics.get("conversion_rate", 0))
    bounce_rate = float(metrics.get("bounce_rate", 0))
    spam_rate = float(metrics.get("spam_rate", 0))

    raw = (
        open_rate * 0.20
        + reply_rate * 0.40
        + conversion_rate * 0.30
        + (1.0 - bounce_rate) * 0.05
        + (1.0 - spam_rate) * 0.05
    )

    # Confidence penalty: ramp up to 1.0 at 100 sends
    confidence = min(1.0, sent / 100.0)
    return round(raw * confidence, 4)


def voice_fitness(metrics: Dict[str, Any]) -> float:
    """
    Composite fitness score for voice agent prompts.
    Range: 0.0 to 1.0 (higher = better)
    """
    total_calls = metrics.get("total_calls", 0)
    if total_calls < 5:
        return 0.0

    avg_duration = float(metrics.get("avg_duration_sec", 0))
    booking_rate = float(metrics.get("booking_rate", 0))
    sentiment = float(metrics.get("sentiment_avg", 0.5))
    hangup_rate = float(metrics.get("hangup_before_30s_rate", 1.0))

    # Duration score: bad < 30s, ok 30-180s, great 180-600s, max 600+
    if avg_duration < 30:
        duration_score = 0.1
    elif avg_duration < 180:
        duration_score = 0.3 + (avg_duration - 30) / 150.0 * 0.4
    elif avg_duration < 600:
        duration_score = 0.7 + (avg_duration - 180) / 420.0 * 0.3
    else:
        duration_score = 1.0

    raw = (
        (1.0 - hangup_rate) * 0.15
        + duration_score * 0.20
        + booking_rate * 0.40
        + sentiment * 0.25
    )

    confidence = min(1.0, total_calls / 30.0)
    return round(raw * confidence, 4)


def widget_fitness(metrics: Dict[str, Any]) -> float:
    """
    Composite fitness score for widget configurations.
    Range: 0.0 to 1.0 (higher = better)
    """
    impressions = metrics.get("impressions", 0)
    if impressions < 50:
        return 0.0

    interaction_rate = float(metrics.get("interaction_rate", 0))
    lead_capture_rate = float(metrics.get("lead_capture_rate", 0))

    raw = interaction_rate * 0.40 + lead_capture_rate * 0.60
    confidence = min(1.0, impressions / 500.0)
    return round(raw * confidence, 4)


FITNESS_FUNCTIONS = {
    "email_template": email_fitness,
    "voice_prompt": voice_fitness,
    "widget_config": widget_fitness,
}


# ---------------------------------------------------------------------------
# Database helpers
# ---------------------------------------------------------------------------

def get_pg_connection():
    """
    Connect to Elestio PostgreSQL Bloodstream.
    Uses elestio_config as primary, secrets_loader as fallback.
    """
    try:
        from elestio_config import PostgresConfig
        import psycopg2
        conn = psycopg2.connect(**PostgresConfig.get_connection_params())
        log.info("Connected to PostgreSQL via elestio_config")
        return conn
    except Exception as e:
        log.warning(f"elestio_config connection failed: {e} -- trying secrets_loader")

    try:
        from core.secrets_loader import get_postgres_config as sl_pg
        import psycopg2
        cfg = sl_pg()
        conn = psycopg2.connect(
            host=cfg.host,
            port=cfg.port,
            user=cfg.user,
            password=cfg.password,
            dbname=cfg.dbname,
            sslmode=cfg.sslmode,
        )
        log.info("Connected to PostgreSQL via secrets_loader")
        return conn
    except Exception as e:
        raise RuntimeError(f"Cannot connect to PostgreSQL: {e}") from e


def ensure_schema(conn) -> None:
    """
    Create genesis schema and all required tables if they do not yet exist.
    Idempotent -- safe to call every cycle.
    """
    ddl_file = GENESIS_ROOT / "scripts" / "alpha_evolve_create_tables.sql"
    if not ddl_file.exists():
        log.warning(f"DDL file not found at {ddl_file} -- using embedded DDL")
        ddl = _EMBEDDED_DDL
    else:
        ddl = ddl_file.read_text(encoding="utf-8")

    try:
        cur = conn.cursor()
        cur.execute(ddl)
        conn.commit()
        log.info("Schema ensured (genesis tables created/verified)")
    except Exception as e:
        conn.rollback()
        log.warning(f"Schema DDL partially failed (tables may already exist): {e}")


# Embedded fallback DDL (mirrors scripts/alpha_evolve_create_tables.sql)
_EMBEDDED_DDL = """
CREATE SCHEMA IF NOT EXISTS genesis;

CREATE TABLE IF NOT EXISTS genesis.evolution_assets (
    asset_id        VARCHAR(255) PRIMARY KEY,
    asset_type      VARCHAR(50)  NOT NULL,
    content         JSONB        NOT NULL,
    version         INTEGER      NOT NULL DEFAULT 1,
    parent_id       VARCHAR(255),
    fitness_score   FLOAT        NOT NULL DEFAULT 0.0,
    status          VARCHAR(20)  NOT NULL DEFAULT 'active',
    metrics         JSONB        NOT NULL DEFAULT '{}',
    metadata        JSONB        NOT NULL DEFAULT '{}',
    created_at      TIMESTAMPTZ  NOT NULL DEFAULT NOW(),
    updated_at      TIMESTAMPTZ,
    archived_at     TIMESTAMPTZ,
    archived_reason VARCHAR(100),
    superseded_by   VARCHAR(255)
);

CREATE INDEX IF NOT EXISTS idx_evo_assets_type_status
    ON genesis.evolution_assets(asset_type, status);
CREATE INDEX IF NOT EXISTS idx_evo_assets_fitness
    ON genesis.evolution_assets(fitness_score DESC);
CREATE INDEX IF NOT EXISTS idx_evo_assets_parent
    ON genesis.evolution_assets(parent_id);

CREATE TABLE IF NOT EXISTS genesis.evolution_cycles (
    cycle_id            VARCHAR(20)  PRIMARY KEY,
    started_at          TIMESTAMPTZ  NOT NULL,
    completed_at        TIMESTAMPTZ  NOT NULL,
    assets_scored       INTEGER      NOT NULL DEFAULT 0,
    assets_culled       INTEGER      NOT NULL DEFAULT 0,
    mutations_generated INTEGER      NOT NULL DEFAULT 0,
    mutations_tested    INTEGER      NOT NULL DEFAULT 0,
    promotions          INTEGER      NOT NULL DEFAULT 0,
    major_improvements  JSONB        DEFAULT '[]',
    errors              JSONB        DEFAULT '[]',
    dry_run             BOOLEAN      NOT NULL DEFAULT FALSE,
    cycle_version       VARCHAR(20)  NOT NULL
);

CREATE TABLE IF NOT EXISTS genesis.mutation_test_results (
    mutation_id         VARCHAR(255) PRIMARY KEY,
    parent_id           VARCHAR(255) NOT NULL,
    asset_type          VARCHAR(50)  NOT NULL,
    mutation_content    JSONB        NOT NULL,
    mutation_fitness    FLOAT,
    parent_fitness      FLOAT        NOT NULL,
    test_verdict        VARCHAR(20),
    test_details        JSONB        DEFAULT '{}',
    promoted            BOOLEAN,
    promoted_asset_id   VARCHAR(255),
    cycle_id            VARCHAR(20)  NOT NULL,
    created_at          TIMESTAMPTZ  NOT NULL DEFAULT NOW(),
    evaluated_at        TIMESTAMPTZ,
    status              VARCHAR(20)  NOT NULL DEFAULT 'pending'
);

CREATE INDEX IF NOT EXISTS idx_mut_results_status
    ON genesis.mutation_test_results(status);
CREATE INDEX IF NOT EXISTS idx_mut_results_cycle
    ON genesis.mutation_test_results(cycle_id);

CREATE TABLE IF NOT EXISTS genesis.call_insights (
    id                  SERIAL       PRIMARY KEY,
    assistant_id        VARCHAR(255) NOT NULL,
    call_id             VARCHAR(255),
    duration_seconds    FLOAT,
    sentiment_score     FLOAT,
    booking_made        BOOLEAN      DEFAULT FALSE,
    transcript_summary  TEXT,
    caller_number       VARCHAR(30),
    created_at          TIMESTAMPTZ  NOT NULL DEFAULT NOW()
);

CREATE INDEX IF NOT EXISTS idx_call_insights_assistant
    ON genesis.call_insights(assistant_id, created_at);

CREATE TABLE IF NOT EXISTS genesis.widget_events (
    id          SERIAL       PRIMARY KEY,
    widget_id   VARCHAR(255) NOT NULL,
    event_type  VARCHAR(30)  NOT NULL,
    event_data  JSONB        DEFAULT '{}',
    created_at  TIMESTAMPTZ  NOT NULL DEFAULT NOW()
);

CREATE INDEX IF NOT EXISTS idx_widget_events_widget
    ON genesis.widget_events(widget_id, created_at);
"""


def jsonb_col(val) -> Any:
    """Coerce a JSONB column (may arrive as str or dict) to dict."""
    if val is None:
        return {}
    if isinstance(val, (dict, list)):
        return val
    try:
        return json.loads(val)
    except (ValueError, TypeError):
        return {}


# ---------------------------------------------------------------------------
# Phase 1: HARVEST
# ---------------------------------------------------------------------------

class MetricsHarvester:
    """Pull performance data from Instantly.ai and store in Bloodstream."""

    INSTANTLY_BASE = "https://api.instantly.ai/api/v2"

    def __init__(self, conn, dry_run: bool = False):
        self.conn = conn
        self.dry_run = dry_run
        self._instantly_key = (
            os.environ.get("INSTANTLY_API_KEY") or INSTANTLY_API_KEY_FALLBACK
        )

    # ------------------------------------------------------------------
    # Public interface
    # ------------------------------------------------------------------

    def harvest_email_metrics(self) -> List[Dict]:
        """
        Pull campaign-level analytics from Instantly.ai API v2.
        Returns a list of metric dicts, one per campaign.

        IMPORTANT: This is READ-ONLY. We never activate or unpause campaigns.
        """
        log.info("HARVEST: Pulling email metrics from Instantly.ai API...")

        try:
            import requests
        except ImportError:
            log.error("HARVEST: 'requests' package not available")
            return []

        headers = {
            "Authorization": f"Bearer {self._instantly_key}",
            "Content-Type": "application/json",
        }

        # Fetch all campaigns first so we can map IDs to names
        campaigns = self._fetch_campaigns(requests, headers)

        # Fetch analytics for all campaigns (Instantly v2 aggregates by campaign)
        metrics_list = self._fetch_campaign_analytics(requests, headers, campaigns)

        log.info(f"HARVEST: Pulled metrics for {len(metrics_list)} campaigns")
        return metrics_list

    def update_email_metrics_in_bloodstream(self, metrics_list: List[Dict]) -> None:
        """
        Upsert campaign-level metrics into genesis.evolution_assets
        for any email_template assets that belong to those campaigns.
        """
        if self.dry_run:
            log.info("HARVEST [DRY RUN]: Would update metrics in Bloodstream")
            return

        cur = self.conn.cursor()
        for m in metrics_list:
            campaign_id = m.get("campaign_id", "")
            if not campaign_id:
                continue

            # Update metrics for all active email_template assets in this campaign
            cur.execute("""
                UPDATE genesis.evolution_assets
                SET metrics = metrics || %s::jsonb,
                    updated_at = NOW()
                WHERE asset_type = 'email_template'
                  AND status = 'active'
                  AND metadata->>'campaign_id' = %s
            """, (json.dumps(m), campaign_id))

        self.conn.commit()
        log.info("HARVEST: Email metrics upserted into Bloodstream")

    def update_fitness_scores(self, asset_type: str) -> int:
        """
        Recalculate fitness scores for all active assets of a given type.
        Returns count of updated assets.
        """
        log.info(f"HARVEST: Recalculating fitness scores for {asset_type}...")
        fitness_fn = FITNESS_FUNCTIONS.get(asset_type)
        if not fitness_fn:
            log.warning(f"HARVEST: No fitness function for {asset_type}")
            return 0

        cur = self.conn.cursor()
        cur.execute("""
            SELECT asset_id, metrics
            FROM genesis.evolution_assets
            WHERE asset_type = %s AND status = 'active'
        """, (asset_type,))
        rows = cur.fetchall()

        updated = 0
        for asset_id, raw_metrics in rows:
            metrics = jsonb_col(raw_metrics)
            score = fitness_fn(metrics)

            if not self.dry_run:
                cur.execute("""
                    UPDATE genesis.evolution_assets
                    SET fitness_score = %s, updated_at = NOW()
                    WHERE asset_id = %s
                """, (score, asset_id))
            updated += 1

        if not self.dry_run:
            self.conn.commit()

        log.info(f"HARVEST: Updated {updated} fitness scores for {asset_type}")
        return updated

    # ------------------------------------------------------------------
    # Private helpers
    # ------------------------------------------------------------------

    def _fetch_campaigns(self, requests, headers: Dict) -> Dict[str, str]:
        """Fetch campaign list, return {campaign_id: campaign_name} mapping."""
        # Instantly v2 uses /campaigns (plural) not /campaign (singular)
        try:
            resp = requests.get(
                f"{self.INSTANTLY_BASE}/campaigns",
                headers=headers,
                params={"limit": 100},
                timeout=30,
            )
            resp.raise_for_status()
            data = resp.json()
            campaigns = {}
            for c in data.get("items", []) + data.get("data", []):
                cid = c.get("id") or c.get("campaign_id", "")
                cname = c.get("name") or c.get("campaign_name", "")
                if cid:
                    campaigns[cid] = cname
            log.debug(f"HARVEST: Found {len(campaigns)} campaigns")
            return campaigns
        except Exception as e:
            log.warning(f"HARVEST: Could not fetch campaign list: {e}")
            return {INSTANTLY_CAMPAIGN_ID: "Sunaiva Partner Outreach - AU Agencies"}

    def _fetch_campaign_analytics(
        self, requests, headers: Dict, campaigns: Dict[str, str]
    ) -> List[Dict]:
        """
        Fetch analytics for each known campaign.

        Instantly v2 endpoint strategy:
          1. GET /api/v2/analytics/campaign/count?id=<campaign_id>
             Returns: total_count, open_count, reply_count, bounced_count, etc.
          2. Fallback: derive from emails list (GET /api/v2/emails?campaign_id=...)

        NOTE: Campaign must be active and have sent emails for analytics to
              contain non-zero data. Paused / empty campaigns return zeros --
              this is expected and handled gracefully.
        """
        campaign_ids = list(campaigns.keys()) or [INSTANTLY_CAMPAIGN_ID]
        metrics_list = []

        for campaign_id in campaign_ids:
            try:
                # Primary: v2 analytics/campaign/count endpoint
                resp = requests.get(
                    f"{self.INSTANTLY_BASE}/analytics/campaign/count",
                    headers=headers,
                    params={"id": campaign_id},
                    timeout=30,
                )

                campaign_data: Dict[str, Any] = {}

                if resp.status_code in (200, 201):
                    raw = resp.json()
                    # Response may be direct dict or nested under 'data'
                    campaign_data = raw.get("data", raw) if isinstance(raw, dict) else {}
                elif resp.status_code == 401:
                    log.warning(
                        f"HARVEST: Analytics API auth failed for {campaign_id[:8]}... "
                        "-- API key may lack analytics scope. Returning zero metrics."
                    )
                    campaign_data = {}
                elif resp.status_code == 404:
                    log.debug(
                        f"HARVEST: analytics/campaign/count not found (404) for "
                        f"{campaign_id[:8]}... -- trying email count fallback"
                    )
                    # Fallback: count emails by status from /api/v2/emails
                    campaign_data = self._fetch_analytics_from_emails(
                        requests, headers, campaign_id
                    )
                else:
                    log.warning(
                        f"HARVEST: Instantly analytics returned HTTP {resp.status_code} "
                        f"for {campaign_id[:8]}..."
                    )
                    campaign_data = {}

                # Normalise field names across API versions
                sent = int(
                    campaign_data.get("total_count")
                    or campaign_data.get("emails_sent")
                    or campaign_data.get("sent", 0)
                    or 0
                )
                opened = int(
                    campaign_data.get("open_count")
                    or campaign_data.get("unique_opens")
                    or campaign_data.get("opened", 0)
                    or 0
                )
                replied = int(
                    campaign_data.get("reply_count")
                    or campaign_data.get("replies")
                    or campaign_data.get("replied", 0)
                    or 0
                )
                clicked = int(
                    campaign_data.get("click_count")
                    or campaign_data.get("unique_clicks")
                    or campaign_data.get("clicked", 0)
                    or 0
                )
                bounced = int(
                    campaign_data.get("bounced_count")
                    or campaign_data.get("bounces")
                    or campaign_data.get("bounced", 0)
                    or 0
                )

                metrics_list.append({
                    "campaign_id": campaign_id,
                    "campaign_name": campaigns.get(campaign_id, "unknown"),
                    "sent": sent,
                    "opened": opened,
                    "replied": replied,
                    "clicked": clicked,
                    "bounced": bounced,
                    "open_rate": safe_divide(opened, sent),
                    "reply_rate": safe_divide(replied, sent),
                    "click_rate": safe_divide(clicked, sent),
                    "bounce_rate": safe_divide(bounced, sent),
                    "conversion_rate": 0.0,   # Not provided by API -- set manually
                    "spam_rate": 0.0,
                    "harvested_at": datetime.now(AEST).isoformat(),
                })

                if sent > 0:
                    log.info(
                        f"HARVEST: {campaign_id[:8]}... "
                        f"sent={sent} "
                        f"open={safe_divide(opened, sent):.1%} "
                        f"reply={safe_divide(replied, sent):.1%}"
                    )
                else:
                    log.info(
                        f"HARVEST: {campaign_id[:8]}... "
                        f"sent=0 (campaign paused or not yet started -- "
                        f"zero metrics recorded, fitness unchanged)"
                    )

            except Exception as e:
                log.error(f"HARVEST: Error fetching analytics for {campaign_id}: {e}")

        return metrics_list

    def _fetch_analytics_from_emails(
        self, requests, headers: Dict, campaign_id: str
    ) -> Dict[str, int]:
        """
        Fallback: derive campaign analytics by counting emails by their status.
        Uses GET /api/v2/emails?campaign_id=<id> and counts status fields.
        """
        try:
            # We page through all emails for this campaign
            all_emails = []
            starting_after = None

            for _ in range(10):   # max 10 pages (1000 emails)
                params: Dict[str, Any] = {
                    "campaign_id": campaign_id,
                    "limit": 100,
                }
                if starting_after:
                    params["starting_after"] = starting_after

                resp = requests.get(
                    f"{self.INSTANTLY_BASE}/emails",
                    headers=headers,
                    params=params,
                    timeout=30,
                )
                if resp.status_code != 200:
                    break
                data = resp.json()
                items = data.get("items", [])
                all_emails.extend(items)
                if len(items) < 100:
                    break
                starting_after = items[-1].get("id")

            if not all_emails:
                return {}

            # Count by status fields
            sent = len(all_emails)
            opened = sum(1 for e in all_emails if e.get("opened") or e.get("is_opened"))
            replied = sum(1 for e in all_emails if e.get("replied") or e.get("is_replied"))
            bounced = sum(1 for e in all_emails if e.get("bounced") or e.get("is_bounced"))

            log.debug(
                f"HARVEST email fallback: sent={sent} opened={opened} "
                f"replied={replied} bounced={bounced}"
            )
            return {
                "total_count": sent,
                "open_count": opened,
                "reply_count": replied,
                "bounced_count": bounced,
            }
        except Exception as e:
            log.warning(f"HARVEST: Email fallback failed: {e}")
            return {}


# ---------------------------------------------------------------------------
# Phase 2: CULL
# ---------------------------------------------------------------------------

class AssetCuller:
    """Identify and archive bottom-performing assets."""

    def __init__(self, conn, dry_run: bool = False):
        self.conn = conn
        self.dry_run = dry_run

    def cull(self, asset_type: str) -> List[str]:
        """
        Archive bottom 10% of assets for a given type.
        Safety: never go below MIN_SURVIVORS_PER_TYPE active assets.
        Returns list of archived asset_ids.
        """
        log.info(f"CULL: Evaluating {asset_type} assets...")
        cur = self.conn.cursor()

        # All active assets with sufficient data to be eligible for culling
        min_data = MIN_DATA_THRESHOLDS.get(asset_type, {})
        threshold_key = next(iter(min_data), "sent")
        threshold_val = next(iter(min_data.values()), 50)

        cur.execute("""
            SELECT asset_id, fitness_score, metrics
            FROM genesis.evolution_assets
            WHERE asset_type = %s AND status = 'active'
            ORDER BY fitness_score ASC
        """, (asset_type,))
        rows = cur.fetchall()
        total_active = len(rows)

        if total_active <= MIN_SURVIVORS_PER_TYPE:
            log.info(
                f"CULL: Only {total_active} active {asset_type} assets -- "
                f"skipping (minimum is {MIN_SURVIVORS_PER_TYPE})"
            )
            return []

        # Filter to those with sufficient data
        eligible = []
        for asset_id, fitness_score, raw_metrics in rows:
            metrics = jsonb_col(raw_metrics)
            if metrics.get(threshold_key, 0) >= threshold_val and fitness_score > 0.0:
                eligible.append({"asset_id": asset_id, "fitness_score": fitness_score})

        if len(eligible) <= MIN_SURVIVORS_PER_TYPE:
            log.info(
                f"CULL: Only {len(eligible)} eligible {asset_type} assets -- "
                "skipping cull (not enough data-sufficient assets)"
            )
            return []

        # Bottom 10% candidates
        n = len(eligible)
        cull_count = max(1, int(n * 0.10))

        # Enforce minimum survivors
        remaining_after_cull = n - cull_count
        if remaining_after_cull < MIN_SURVIVORS_PER_TYPE:
            cull_count = n - MIN_SURVIVORS_PER_TYPE
            if cull_count <= 0:
                log.info(
                    f"CULL: Cannot cull {asset_type} without going below "
                    f"{MIN_SURVIVORS_PER_TYPE} survivors -- skipping"
                )
                return []

        to_cull = eligible[:cull_count]
        culled_ids = [a["asset_id"] for a in to_cull]

        if self.dry_run:
            log.info(
                f"CULL [DRY RUN]: Would archive {len(culled_ids)} "
                f"{asset_type} assets: {culled_ids}"
            )
            return culled_ids

        # Archive -- never hard-delete
        for asset_id in culled_ids:
            cur.execute("""
                UPDATE genesis.evolution_assets
                SET status = 'archived',
                    archived_at = NOW(),
                    archived_reason = 'alpha_evolve_cull'
                WHERE asset_id = %s
            """, (asset_id,))

        self.conn.commit()
        log.info(f"CULL: Archived {len(culled_ids)} {asset_type} assets: {culled_ids}")
        return culled_ids


# ---------------------------------------------------------------------------
# Phase 3: MUTATE
# ---------------------------------------------------------------------------

class AssetMutator:
    """Generate mutations of elite assets using Gemini (or OpenRouter fallback)."""

    def __init__(self, conn, qdrant_client=None, dry_run: bool = False):
        self.conn = conn
        self.qdrant = qdrant_client
        self.dry_run = dry_run
        self.mutations_per_elite = int(
            os.environ.get("ALPHA_EVOLVE_MUTATIONS", DEFAULT_MUTATIONS_PER_ELITE)
        )
        self._gemini_key = (
            os.environ.get("GEMINI_API_KEY")
            or os.environ.get("GOOGLE_API_KEY")
            or self._load_gemini_key_from_genesis()
        )

    def _load_gemini_key_from_genesis(self) -> Optional[str]:
        """Try to load Gemini key from genesis secrets_loader."""
        try:
            from core.secrets_loader import get_gemini_api_key
            return get_gemini_api_key()
        except Exception:
            return None

    def get_elites(self, asset_type: str) -> List[Dict]:
        """Fetch top 10% performing active assets (by fitness score)."""
        cur = self.conn.cursor()
        cur.execute("""
            SELECT asset_id, fitness_score, content, version, metadata, metrics
            FROM genesis.evolution_assets
            WHERE asset_type = %s AND status = 'active' AND fitness_score > 0
            ORDER BY fitness_score DESC
        """, (asset_type,))
        rows = cur.fetchall()

        if not rows:
            log.info(f"MUTATE: No scored {asset_type} assets -- nothing to mutate")
            return []

        elite_count = max(1, int(len(rows) * 0.10))
        elites = []
        for row in rows[:elite_count]:
            elites.append({
                "asset_id": row[0],
                "fitness_score": float(row[1]),
                "content": jsonb_col(row[2]),
                "version": row[3],
                "metadata": jsonb_col(row[4]),
                "metrics": jsonb_col(row[5]),
            })
        log.info(
            f"MUTATE: {len(elites)} elite {asset_type} assets selected "
            f"(top 10% of {len(rows)})"
        )
        return elites

    def generate_mutations(self, elite: Dict, asset_type: str) -> List[Dict]:
        """
        Ask Gemini to produce N mutations of an elite asset.
        Each mutation is a dict with the new content + mutation metadata.
        Returns list of mutation dicts (may be empty on API failure).
        """
        if self.dry_run:
            log.info(
                f"MUTATE [DRY RUN]: Would generate {self.mutations_per_elite} "
                f"mutations of {elite['asset_id']}"
            )
            return self._make_dummy_mutations(elite, asset_type)

        log.info(
            f"MUTATE: Generating {self.mutations_per_elite} mutations "
            f"of {elite['asset_id']} (fitness={elite['fitness_score']:.4f})"
        )

        prompt_builders = {
            "email_template": self._email_mutation_prompt,
            "voice_prompt": self._voice_mutation_prompt,
            "widget_config": self._widget_mutation_prompt,
        }
        builder = prompt_builders.get(asset_type)
        if not builder:
            log.warning(f"MUTATE: No mutation prompt for {asset_type}")
            return []

        prompt = builder(elite)

        # Attempt Gemini first, fall back to OpenRouter
        mutations = self._call_gemini(prompt) or self._call_openrouter(prompt)
        if not mutations:
            log.error(
                f"MUTATE: Both Gemini and OpenRouter failed for "
                f"{elite['asset_id']} -- skipping mutations"
            )
            return []

        # Post-generation validation
        if asset_type == "email_template":
            mutations = [m for m in mutations if self._validate_email_mutation(m)]

        # Deduplicate via Qdrant (skip gracefully if unavailable)
        unique = self._deduplicate_via_qdrant(mutations, asset_type)
        log.info(
            f"MUTATE: {len(mutations)} generated, {len(unique)} unique "
            f"after dedup, returning {min(len(unique), self.mutations_per_elite)}"
        )
        return unique[:self.mutations_per_elite]

    def store_mutations(
        self, mutations: List[Dict], parent: Dict, asset_type: str, cycle_id: str
    ) -> List[Dict]:
        """
        Write mutation records into genesis.mutation_test_results for testing.
        Returns list of stored records with mutation_id assigned.
        """
        if self.dry_run:
            return [
                {
                    "mutation_id": f"DRY_{cycle_id}_{i:03d}",
                    "parent_id": parent["asset_id"],
                    "mutation": m,
                }
                for i, m in enumerate(mutations)
            ]

        cur = self.conn.cursor()
        stored = []
        for i, mutation in enumerate(mutations):
            mutation_id = f"MUT_{cycle_id}_{asset_type[:3].upper()}_{i:03d}_{uuid.uuid4().hex[:6]}"
            cur.execute("""
                INSERT INTO genesis.mutation_test_results
                (mutation_id, parent_id, asset_type, mutation_content,
                 parent_fitness, cycle_id, status, created_at)
                VALUES (%s, %s, %s, %s, %s, %s, 'pending', NOW())
                ON CONFLICT (mutation_id) DO NOTHING
            """, (
                mutation_id,
                parent["asset_id"],
                asset_type,
                json.dumps(mutation),
                parent["fitness_score"],
                cycle_id,
            ))
            stored.append({
                "mutation_id": mutation_id,
                "parent_id": parent["asset_id"],
                "asset_type": asset_type,
                "mutation": mutation,
                "parent_fitness": parent["fitness_score"],
            })

        self.conn.commit()
        log.info(
            f"MUTATE: Stored {len(stored)} mutations in "
            "genesis.mutation_test_results"
        )
        return stored

    # ------------------------------------------------------------------
    # Mutation prompts
    # ------------------------------------------------------------------

    def _email_mutation_prompt(self, elite: Dict) -> str:
        content = elite["content"]
        return f"""You are Alpha Evolve, a Darwinian optimisation engine for sales email templates.

ELITE PARENT (current top performer, fitness_score={elite['fitness_score']:.4f}):
Subject: {content.get('subject_line', 'N/A')}
Body:
{content.get('body', 'N/A')}

Campaign metrics: {json.dumps(elite.get('metrics', {}), indent=2)}

Available Instantly personalisation variables:
  {{{{email}}}}, {{{{firstName}}}}, {{{{companyName}}}}, {{{{website}}}},
  {{{{agencyType}}}}, {{{{location}}}}

YOUR TASK: Generate exactly {self.mutations_per_elite} MUTATED versions of this email.

MUTATION DIMENSIONS (vary one or more per mutation):
1. Subject line: different hooks, lengths, personalisation
2. Opening line: different rapport-building approaches
3. Value proposition framing: different angles on the same offer
4. Social proof: add/remove/change numbers or proof points
5. Call-to-action: different asks (reply, click, book)
6. Tone: shift along formal <-> casual spectrum
7. Length: shorter (under 80 words) or longer (up to 200 words) variants
8. Urgency: add/remove scarcity or time pressure

CONSTRAINTS (HARD RULES -- must not be violated):
- Keep ALL {{{{variable}}}} merge tags intact and spelled correctly
- Must comply with Australian Spam Act 2003 -- include [Unsubscribe] at the end
- Must be truthful (no fabricated statistics or fake testimonials)
- Maximum 200 words per email body
- EACH mutation must differ meaningfully from the parent (not just a word swap)
- Do NOT reproduce the parent verbatim in any mutation

OUTPUT FORMAT -- JSON array ONLY, no markdown, no prose:
[
  {{
    "subject_line": "...",
    "body": "...",
    "mutation_dimensions": ["tone", "cta"],
    "mutation_rationale": "Shifted to casual tone with reply-based CTA"
  }}
]"""

    def _voice_mutation_prompt(self, elite: Dict) -> str:
        content = elite["content"]
        return f"""You are Alpha Evolve, optimising AI voice agent system prompts for Telnyx.

ELITE PARENT (fitness_score={elite['fitness_score']:.4f}):
{content.get('system_prompt', 'N/A')}

Metrics: {json.dumps(elite.get('metrics', {}), indent=2)}

YOUR TASK: Generate exactly {self.mutations_per_elite} MUTATED versions.

MUTATION DIMENSIONS:
1. Greeting style: formal vs warm vs direct
2. Question ordering: change discovery sequence
3. Objection handling: different rebuttal strategies
4. Booking CTA timing: earlier vs later in conversation
5. Verbal padding phrases: different natural stall tactics
6. Personality warmth: adjust empathy markers
7. Information density: more/less detail per response
8. Closing technique: different ways to secure the booking

CONSTRAINTS:
- Maintain brand identity (Australian, professional, helpful)
- Include booking/scheduling capability
- Handle price, timing, "just looking" objections
- Maximum 2000 characters for system prompt

OUTPUT FORMAT -- JSON array ONLY:
[
  {{
    "system_prompt": "...",
    "mutation_dimensions": ["greeting", "booking_timing"],
    "mutation_rationale": "Earlier booking CTA with warmer greeting"
  }}
]"""

    def _widget_mutation_prompt(self, elite: Dict) -> str:
        content = elite["content"]
        config = content.get("config", content)
        return f"""You are Alpha Evolve, optimising website widget configurations.

ELITE PARENT config (fitness_score={elite['fitness_score']:.4f}):
{json.dumps(config, indent=2)}

Metrics: {json.dumps(elite.get('metrics', {}), indent=2)}

YOUR TASK: Generate exactly {self.mutations_per_elite} config variants.

MUTATION DIMENSIONS:
1. Greeting text: different opening messages
2. CTA text: different button labels
3. Auto-open delay: timing variations 0-15000ms
4. Position: bottom-right vs bottom-left
5. Theme colour: variations within brand palette
6. Voice selection: different Telnyx NaturalHD voice names

CONSTRAINTS:
- Must include greeting, CTA text, and position
- Colours must be valid hex codes
- auto_open_delay_ms between 0 and 30000
- voice_id must be valid Telnyx NaturalHD voice name

OUTPUT FORMAT -- JSON array ONLY:
[
  {{
    "config": {{"greeting": "...", "theme_color": "...", ...}},
    "mutation_dimensions": ["greeting", "auto_open_delay"],
    "mutation_rationale": "Shorter delay with more casual greeting"
  }}
]"""

    # ------------------------------------------------------------------
    # LLM callers
    # ------------------------------------------------------------------

    def _call_gemini(self, prompt: str) -> Optional[List[Dict]]:
        """Call Gemini generative API to produce mutations."""
        if not self._gemini_key:
            log.warning("MUTATE: No Gemini API key available")
            return None
        try:
            import google.generativeai as genai
            genai.configure(api_key=self._gemini_key)
            model = genai.GenerativeModel(GEMINI_MUTATION_MODEL)
            response = model.generate_content(
                prompt,
                generation_config=genai.types.GenerationConfig(
                    temperature=0.9,
                    max_output_tokens=8192,
                    response_mime_type="application/json",
                ),
            )
            raw = response.text.strip()
            return self._parse_mutation_json(raw)
        except Exception as e:
            log.warning(f"MUTATE: Gemini call failed: {e}")
            return None

    def _call_openrouter(self, prompt: str) -> Optional[List[Dict]]:
        """Fall back to OpenRouter (Kimi K2.5 / MiniMax M2.5) for mutations."""
        api_key = os.environ.get("OPENROUTER_API_KEY")
        if not api_key:
            log.warning("MUTATE: OPENROUTER_API_KEY not set -- cannot fall back")
            return None
        try:
            import requests
            resp = requests.post(
                "https://openrouter.ai/api/v1/chat/completions",
                headers={
                    "Authorization": f"Bearer {api_key}",
                    "Content-Type": "application/json",
                    "HTTP-Referer": "https://sunaivadigital.com",
                    "X-Title": "Alpha Evolve Daemon",
                },
                json={
                    "model": "moonshotai/kimi-k2-5",
                    "messages": [
                        {"role": "system", "content": "You are Alpha Evolve."},
                        {"role": "user", "content": prompt},
                    ],
                    "temperature": 0.9,
                    "max_tokens": 8192,
                },
                timeout=60,
            )
            resp.raise_for_status()
            content = resp.json()["choices"][0]["message"]["content"]
            return self._parse_mutation_json(content)
        except Exception as e:
            log.warning(f"MUTATE: OpenRouter fallback failed: {e}")
            return None

    def _parse_mutation_json(self, raw: str) -> Optional[List[Dict]]:
        """Extract a JSON array from LLM output (handles markdown fences)."""
        # Strip markdown code fences if present
        raw = re.sub(r"^```(?:json)?\s*", "", raw.strip(), flags=re.MULTILINE)
        raw = re.sub(r"\s*```$", "", raw.strip(), flags=re.MULTILINE)
        try:
            parsed = json.loads(raw)
            if isinstance(parsed, list):
                return parsed
            if isinstance(parsed, dict):
                return parsed.get("mutations", [parsed])
            return None
        except json.JSONDecodeError as e:
            log.warning(f"MUTATE: Could not parse LLM JSON response: {e}")
            log.debug(f"MUTATE: Raw response was: {raw[:300]}")
            return None

    # ------------------------------------------------------------------
    # Post-generation validation
    # ------------------------------------------------------------------

    def _validate_email_mutation(self, mutation: Dict) -> bool:
        """
        Validate that an email mutation meets hard constraints.
        Returns True if valid, False if it should be rejected.
        """
        subject = mutation.get("subject_line", "")
        body = mutation.get("body", "")

        # Must have both fields
        if not subject or not body:
            log.debug("MUTATE: Rejected mutation -- missing subject or body")
            return False

        # Body must not be too long (200 words max)
        word_count = len(body.split())
        if word_count > 250:  # slight tolerance
            log.debug(f"MUTATE: Rejected mutation -- body too long ({word_count} words)")
            return False

        # Must include unsubscribe
        if "[unsubscribe]" not in body.lower():
            # Attempt to add it automatically
            mutation["body"] = body.rstrip() + "\n\n[Unsubscribe]"

        # Merge tags must be syntactically valid (no broken {{ }} pairs)
        for field in [subject, body]:
            open_braces = field.count("{{")
            close_braces = field.count("}}")
            if open_braces != close_braces:
                log.debug(f"MUTATE: Rejected mutation -- mismatched merge tag braces")
                return False

        return True

    # ------------------------------------------------------------------
    # Deduplication
    # ------------------------------------------------------------------

    def _deduplicate_via_qdrant(
        self, mutations: List[Dict], asset_type: str
    ) -> List[Dict]:
        """
        Use Qdrant semantic similarity to reject near-duplicate mutations.
        Skips gracefully if Qdrant is unavailable.
        """
        if not self.qdrant:
            return mutations

        unique = []
        for mutation in mutations:
            text = json.dumps(mutation, sort_keys=True)
            try:
                from qdrant_client.models import Filter, FieldCondition, MatchValue
                results = self.qdrant.search(
                    collection_name="alpha_evolve_assets",
                    query_vector=self._embed_text(text),
                    limit=1,
                    score_threshold=SIMILARITY_THRESHOLD,
                    query_filter=Filter(
                        must=[
                            FieldCondition(
                                key="asset_type",
                                match=MatchValue(value=asset_type),
                            )
                        ]
                    ),
                )
                if not results:
                    unique.append(mutation)
                else:
                    log.debug(
                        f"MUTATE: Rejected near-duplicate "
                        f"(similarity={results[0].score:.3f})"
                    )
            except Exception as e:
                log.warning(f"MUTATE: Qdrant dedup error: {e} -- accepting mutation")
                unique.append(mutation)

        return unique

    def _embed_text(self, text: str) -> List[float]:
        """Generate embedding vector for deduplication."""
        try:
            import google.generativeai as genai
            if self._gemini_key:
                genai.configure(api_key=self._gemini_key)
                result = genai.embed_content(
                    model="models/text-embedding-004",
                    content=text,
                )
                return result["embedding"]
        except Exception:
            pass
        return [0.0] * 768   # Zero vector -- dedup will be skipped

    # ------------------------------------------------------------------
    # Dry-run helpers
    # ------------------------------------------------------------------

    def _make_dummy_mutations(self, elite: Dict, asset_type: str) -> List[Dict]:
        """Return placeholder mutations for dry-run mode."""
        if asset_type == "email_template":
            return [
                {
                    "subject_line": f"[DRY RUN] Variant {i+1}: {elite['content'].get('subject_line', 'N/A')}",
                    "body": f"[DRY RUN mutation {i+1}] {elite['content'].get('body', '')[:50]}...\n\n[Unsubscribe]",
                    "mutation_dimensions": ["tone"],
                    "mutation_rationale": f"Dry-run dummy variant {i+1}",
                }
                for i in range(self.mutations_per_elite)
            ]
        return [{"dry_run": True, "variant": i} for i in range(self.mutations_per_elite)]


# ---------------------------------------------------------------------------
# Phase 4: TEST
# ---------------------------------------------------------------------------

class MutationTester:
    """
    Phase 1: Generate Jules Pro task descriptions for manual submission.
    Phase 2+: Auto-submit via Jules API when available.
    """

    def __init__(self, conn, dry_run: bool = False):
        self.conn = conn
        self.dry_run = dry_run
        JULES_TASKS_DIR.mkdir(parents=True, exist_ok=True)

    def generate_test_tasks(
        self,
        stored_mutations: List[Dict],
        parent: Dict,
        asset_type: str,
        cycle_id: str,
    ) -> List[Dict]:
        """
        Generate Jules Pro task descriptions for each mutation.
        Writes markdown file to hive/jules_evolve_tasks/.
        Returns list of task dicts.
        """
        if not stored_mutations:
            return []

        tasks = []
        task_builders = {
            "email_template": self._email_test_task,
            "voice_prompt": self._voice_test_task,
            "widget_config": self._widget_test_task,
        }
        builder = task_builders.get(asset_type)
        if not builder:
            return []

        for record in stored_mutations:
            task = builder(
                record["mutation_id"],
                record["mutation"],
                parent,
            )
            tasks.append(task)

        # Write Jules task file
        timestamp = datetime.now(AEST).strftime("%Y%m%d_%H%M")
        output_file = (
            JULES_TASKS_DIR /
            f"jules_tasks_{cycle_id}_{asset_type}_{timestamp}.md"
        )
        self._write_tasks_file(tasks, output_file, cycle_id, asset_type)

        mode = "[DRY RUN] " if self.dry_run else ""
        log.info(f"TEST: {mode}Generated {len(tasks)} Jules tasks -> {output_file}")
        return tasks

    # ------------------------------------------------------------------
    # Task builders
    # ------------------------------------------------------------------

    def _email_test_task(
        self, task_id: str, mutation: Dict, parent: Dict
    ) -> Dict:
        parent_content = parent.get("content", {})
        return {
            "task_id": task_id,
            "title": f"Test email mutation: {mutation.get('mutation_rationale', 'variant')}",
            "priority": "medium",
            "estimated_minutes": 5,
            "description": f"""# Alpha Evolve Mutation Test -- Email Template

**Task ID**: {task_id}
**Parent ID**: {parent['asset_id']}
**Parent fitness**: {parent['fitness_score']:.4f}

## Parent (current winner)

**Subject**: {parent_content.get('subject_line', 'N/A')}

**Body**:
```
{parent_content.get('body', 'N/A')[:400]}
```

## Mutation

**Subject**: {mutation.get('subject_line', 'N/A')}
**Dimensions changed**: {mutation.get('mutation_dimensions', [])}
**Rationale**: {mutation.get('mutation_rationale', 'N/A')}

**Body**:
```
{mutation.get('body', 'N/A')[:400]}
```

## Test Instructions

1. Load 10 representative Australian agency lead profiles (job title, location, agency type)
2. Use Gemini Flash to role-play as each lead persona and predict:
   - Would they open this email? (Yes/No + confidence 0-1)
   - Would they reply? (Yes/No + confidence 0-1)
   - What objection would they raise?
3. Calculate predicted open_rate, reply_rate
4. Apply `email_fitness()` to compute predicted_fitness_score
5. Compare against parent fitness of {parent['fitness_score']:.4f}
6. Write result to genesis.mutation_test_results table:
   - Set `mutation_fitness` = predicted score
   - Set `test_verdict` = 'win' | 'lose' | 'inconclusive'
   - Set `status` = 'completed'

## Acceptance Criteria

- [ ] 10 persona simulations completed
- [ ] predicted_fitness_score calculated using email_fitness()
- [ ] Comparison against parent documented
- [ ] Result written to genesis.mutation_test_results (status='completed')
""",
        }

    def _voice_test_task(
        self, task_id: str, mutation: Dict, parent: Dict
    ) -> Dict:
        parent_content = parent.get("content", {})
        return {
            "task_id": task_id,
            "title": f"Test voice prompt mutation: {mutation.get('mutation_rationale', 'variant')}",
            "priority": "medium",
            "estimated_minutes": 8,
            "description": f"""# Alpha Evolve Mutation Test -- Voice Prompt

**Task ID**: {task_id}
**Parent fitness**: {parent['fitness_score']:.4f}

## Parent system prompt (excerpt)
```
{parent_content.get('system_prompt', 'N/A')[:400]}...
```

## Mutation
**Dimensions**: {mutation.get('mutation_dimensions', [])}
**Rationale**: {mutation.get('mutation_rationale', 'N/A')}

```
{mutation.get('system_prompt', 'N/A')[:400]}...
```

## Test Instructions

1. Load 5 historical call transcripts from genesis.call_insights
2. Simulate 5 conversations using Gemini Flash as caller persona
3. Track: call duration, booking attempt, sentiment score
4. Apply voice_fitness() to compute predicted_fitness_score
5. Compare vs parent, write result to genesis.mutation_test_results

## Acceptance Criteria
- [ ] 5 simulations completed
- [ ] Predicted fitness score calculated
- [ ] Result written to genesis.mutation_test_results
""",
        }

    def _widget_test_task(
        self, task_id: str, mutation: Dict, parent: Dict
    ) -> Dict:
        return {
            "task_id": task_id,
            "title": f"Test widget config mutation: {mutation.get('mutation_rationale', 'variant')}",
            "priority": "low",
            "estimated_minutes": 3,
            "description": f"""# Alpha Evolve Mutation Test -- Widget Config

**Task ID**: {task_id}
**Parent fitness**: {parent['fitness_score']:.4f}

## Parent config
```json
{json.dumps(parent.get('content', {}), indent=2)[:400]}
```

## Mutation config
```json
{json.dumps(mutation.get('config', mutation), indent=2)[:400]}
```
**Dimensions**: {mutation.get('mutation_dimensions', [])}

## Test Instructions

1. Evaluate UX principles (contrast ratio, CTA visibility, timing)
2. Score predicted interaction_rate and lead_capture_rate
3. Apply widget_fitness() for predicted_fitness_score
4. Compare vs parent, write result to genesis.mutation_test_results
""",
        }

    # ------------------------------------------------------------------
    # File writer
    # ------------------------------------------------------------------

    def _write_tasks_file(
        self, tasks: List[Dict], output_file: Path, cycle_id: str, asset_type: str
    ) -> None:
        lines = [
            f"# Jules Pro Tasks -- Alpha Evolve Cycle {cycle_id}",
            "",
            f"**Asset Type**: {asset_type}",
            f"**Generated**: {datetime.now(AEST).isoformat()}",
            f"**Tasks**: {len(tasks)}",
            f"**Submit at**: https://jules.google.com",
            f"**Cycle budget**: {MAX_JULES_TASKS_PER_CYCLE} tasks max this cycle",
            "",
            "---",
            "",
        ]
        for task in tasks[:MAX_JULES_TASKS_PER_CYCLE]:
            lines.extend([
                f"## {task['task_id']}: {task['title']}",
                f"**Priority**: {task['priority']}",
                f"**Estimated**: {task['estimated_minutes']} minutes",
                "",
                task["description"],
                "",
                "---",
                "",
            ])
        output_file.write_text("\n".join(lines), encoding="utf-8")


# ---------------------------------------------------------------------------
# Phase 5: ASCEND
# ---------------------------------------------------------------------------

class AssetPromoter:
    """
    Compare mutation test results against parents.
    Promote winners. Discard losers.

    Phase 1: Checks results populated by Jules from the PREVIOUS cycle.
    """

    def __init__(self, conn, redis_client=None, dry_run: bool = False):
        self.conn = conn
        self.redis = redis_client
        self.dry_run = dry_run

    def evaluate_and_promote(self, cycle_id: str) -> List[Dict]:
        """
        Find completed mutation test results with no promotion decision yet.
        Promote mutations that beat their parent. Discard losers.
        Returns list of promotion records.
        """
        log.info("ASCEND: Checking for completed mutation test results...")
        cur = self.conn.cursor()
        cur.execute("""
            SELECT mutation_id, parent_id, asset_type,
                   mutation_fitness, parent_fitness, mutation_content, test_verdict
            FROM genesis.mutation_test_results
            WHERE status = 'completed' AND promoted IS NULL
            ORDER BY (COALESCE(mutation_fitness, 0) - parent_fitness) DESC
        """)
        rows = cur.fetchall()

        if not rows:
            log.info("ASCEND: No completed tests pending promotion decision")
            return []

        log.info(f"ASCEND: {len(rows)} completed tests to evaluate")
        promotions = []

        for row in rows:
            (mutation_id, parent_id, asset_type,
             raw_mut_fitness, parent_fitness, raw_content, test_verdict) = row

            mutation_fitness = float(raw_mut_fitness or 0.0)
            parent_fitness = float(parent_fitness or 0.0)
            mutation_content = jsonb_col(raw_content)

            if mutation_fitness <= parent_fitness:
                log.info(
                    f"ASCEND: {mutation_id} lost "
                    f"({mutation_fitness:.4f} <= {parent_fitness:.4f}) -- discarding"
                )
                if not self.dry_run:
                    cur.execute("""
                        UPDATE genesis.mutation_test_results
                        SET promoted = FALSE, evaluated_at = NOW()
                        WHERE mutation_id = %s
                    """, (mutation_id,))
                continue

            improvement = (
                (mutation_fitness - parent_fitness) / parent_fitness
                if parent_fitness > 0 else 1.0
            )
            log.info(
                f"ASCEND: {mutation_id} BEATS parent "
                f"({mutation_fitness:.4f} > {parent_fitness:.4f}, "
                f"+{improvement*100:.1f}%)"
            )

            if self.dry_run:
                promotions.append({
                    "mutation_id": mutation_id,
                    "parent_id": parent_id,
                    "improvement": improvement,
                    "is_major": improvement >= MAJOR_IMPROVEMENT_THRESHOLD,
                })
                continue

            # Promote: insert new active asset
            new_version = self._get_next_version(parent_id, asset_type, cur)
            new_asset_id = f"{parent_id}_v{new_version}"

            # Fetch parent metadata for inheritance
            cur.execute("""
                SELECT metadata FROM genesis.evolution_assets WHERE asset_id = %s
            """, (parent_id,))
            parent_row = cur.fetchone()
            parent_metadata = jsonb_col(parent_row[0]) if parent_row else {}

            cur.execute("""
                INSERT INTO genesis.evolution_assets
                (asset_id, asset_type, content, version, parent_id,
                 fitness_score, status, metrics, metadata, created_at)
                VALUES (%s, %s, %s, %s, %s, %s, 'active', '{}', %s, NOW())
                ON CONFLICT (asset_id) DO NOTHING
            """, (
                new_asset_id, asset_type,
                json.dumps(mutation_content),
                new_version, parent_id,
                mutation_fitness,
                json.dumps(parent_metadata),
            ))

            # Retire parent to 'superseded'
            cur.execute("""
                UPDATE genesis.evolution_assets
                SET status = 'superseded',
                    superseded_by = %s,
                    updated_at = NOW()
                WHERE asset_id = %s
            """, (new_asset_id, parent_id))

            # Mark test result as promoted
            cur.execute("""
                UPDATE genesis.mutation_test_results
                SET promoted = TRUE,
                    promoted_asset_id = %s,
                    evaluated_at = NOW()
                WHERE mutation_id = %s
            """, (new_asset_id, mutation_id))

            # Update Redis hot cache
            if self.redis:
                try:
                    cache_key = f"active_asset:{asset_type}:{new_asset_id}"
                    self.redis.set(
                        cache_key, json.dumps(mutation_content), ex=86400
                    )
                    self.redis.delete(f"active_asset:{asset_type}:{parent_id}")
                    log.debug(f"ASCEND: Redis cache updated for {new_asset_id}")
                except Exception as e:
                    log.warning(f"ASCEND: Redis cache update failed: {e}")

            promotions.append({
                "mutation_id": mutation_id,
                "new_asset_id": new_asset_id,
                "parent_id": parent_id,
                "improvement": improvement,
                "is_major": improvement >= MAJOR_IMPROVEMENT_THRESHOLD,
            })

        if not self.dry_run:
            self.conn.commit()

        log.info(f"ASCEND: {len(promotions)} promotions this cycle")
        return promotions

    def _get_next_version(self, parent_id: str, asset_type: str, cur) -> int:
        cur.execute("""
            SELECT MAX(version)
            FROM genesis.evolution_assets
            WHERE (asset_id = %s OR parent_id = %s) AND asset_type = %s
        """, (parent_id, parent_id, asset_type))
        row = cur.fetchone()
        return (row[0] or 1) + 1


# ---------------------------------------------------------------------------
# Phase 6: LOG
# ---------------------------------------------------------------------------

class CycleLogger:
    """Record evolution cycle results to Bloodstream and KG."""

    def __init__(self, conn, dry_run: bool = False):
        self.conn = conn
        self.dry_run = dry_run

    def log_cycle(self, report: CycleReport) -> None:
        """Write cycle report to PostgreSQL, KG axiom file, and markdown file."""
        log.info(f"LOG: Recording cycle {report.cycle_id}...")

        if not self.dry_run:
            self._write_to_postgres(report)

        if report.promotions > 0:
            self._write_kg_axiom(report)

        self._write_report_file(report)

        if report.major_improvements:
            self._notify_major_improvements(report)

        log.info(f"LOG: Cycle {report.cycle_id} recorded successfully")

    def _write_to_postgres(self, report: CycleReport) -> None:
        cur = self.conn.cursor()
        try:
            cur.execute("""
                INSERT INTO genesis.evolution_cycles
                (cycle_id, started_at, completed_at, assets_scored,
                 assets_culled, mutations_generated, mutations_tested,
                 promotions, major_improvements, errors, dry_run, cycle_version)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                ON CONFLICT (cycle_id) DO UPDATE SET
                    completed_at = EXCLUDED.completed_at,
                    assets_scored = EXCLUDED.assets_scored,
                    assets_culled = EXCLUDED.assets_culled,
                    mutations_generated = EXCLUDED.mutations_generated,
                    mutations_tested = EXCLUDED.mutations_tested,
                    promotions = EXCLUDED.promotions,
                    major_improvements = EXCLUDED.major_improvements,
                    errors = EXCLUDED.errors
            """, (
                report.cycle_id,
                report.started_at,
                report.completed_at,
                report.assets_scored,
                report.assets_culled,
                report.mutations_generated,
                report.mutations_tested,
                report.promotions,
                json.dumps(report.major_improvements),
                json.dumps(report.errors),
                report.dry_run,
                CYCLE_VERSION,
            ))
            self.conn.commit()
            log.info("LOG: Cycle record written to genesis.evolution_cycles")
        except Exception as e:
            self.conn.rollback()
            log.error(f"LOG: Failed to write cycle to PostgreSQL: {e}")

    def _write_kg_axiom(self, report: CycleReport) -> None:
        """Append an axiom to KNOWLEDGE_GRAPH/axioms/alpha_evolve_daemon.jsonl."""
        axiom = {
            "id": f"AED-{report.cycle_id}",
            "axiom": (
                f"Alpha Evolve Daemon cycle {report.cycle_id}: "
                f"Scored {report.assets_scored} assets, culled {report.assets_culled}, "
                f"generated {report.mutations_generated} mutations, "
                f"promoted {report.promotions} winners. "
                + (
                    f"Major improvements: {len(report.major_improvements)}."
                    if report.major_improvements
                    else "No major improvements this cycle."
                )
            ),
            "source": "scripts/alpha_evolve_daemon.py",
            "confidence": 0.99,
            "category": "evolution",
            "domain": "alpha_evolve_daemon",
            "timestamp": report.completed_at,
        }
        try:
            with open(str(KG_AXIOMS_FILE), "a", encoding="utf-8") as f:
                f.write(json.dumps(axiom) + "\n")
            log.info(f"LOG: KG axiom written to {KG_AXIOMS_FILE}")
        except Exception as e:
            log.error(f"LOG: Failed to write KG axiom: {e}")

    def _write_report_file(self, report: CycleReport) -> None:
        """Write human-readable markdown cycle report."""
        PROGRESS_DIR.mkdir(parents=True, exist_ok=True)
        report_file = PROGRESS_DIR / f"alpha_evolve_{report.cycle_id}.md"

        dry_tag = " (DRY RUN)" if report.dry_run else ""
        content = f"""# Alpha Evolve Cycle Report: {report.cycle_id}{dry_tag}

**Started**: {report.started_at}
**Completed**: {report.completed_at}
**Dry Run**: {report.dry_run}
**Version**: {CYCLE_VERSION}

## Results

| Metric | Value |
|--------|-------|
| Assets Scored | {report.assets_scored} |
| Assets Culled | {report.assets_culled} |
| Mutations Generated | {report.mutations_generated} |
| Mutations Tested (Jules tasks) | {report.mutations_tested} |
| Promotions | {report.promotions} |
| Major Improvements (>20%) | {len(report.major_improvements)} |
| Errors | {len(report.errors)} |

## Major Improvements

"""
        if report.major_improvements:
            for imp in report.major_improvements:
                content += (
                    f"- **{imp.get('new_asset_id', imp.get('mutation_id', 'unknown'))}**: "
                    f"+{imp.get('improvement', 0)*100:.1f}% over parent "
                    f"{imp.get('parent_id', 'unknown')}\n"
                )
        else:
            content += "_None this cycle._\n"

        if report.errors:
            content += "\n## Errors\n\n"
            for err in report.errors:
                content += f"- {err}\n"

        content += f"\n---\n_Generated by alpha_evolve_daemon.py v{CYCLE_VERSION}_\n"

        try:
            report_file.write_text(content, encoding="utf-8")
            log.info(f"LOG: Cycle report written to {report_file}")
        except Exception as e:
            log.error(f"LOG: Failed to write report file: {e}")

    def _notify_major_improvements(self, report: CycleReport) -> None:
        """
        Phase 1: Write major improvements to notification log file.
        Phase 2: Send Telegram via AIVA bot.
        Phase 3: Email via Instantly.ai to kinan@protonmail.com.
        """
        log.info(f"LOG: {len(report.major_improvements)} MAJOR improvements detected!")
        try:
            with open(str(MAJOR_IMPROVEMENTS_LOG), "a", encoding="utf-8") as f:
                for imp in report.major_improvements:
                    asset_id = imp.get("new_asset_id", imp.get("mutation_id", "unknown"))
                    f.write(
                        f"[{report.completed_at}] MAJOR: {asset_id} "
                        f"beats {imp.get('parent_id', 'unknown')} "
                        f"by {imp.get('improvement', 0)*100:.1f}%\n"
                    )
        except Exception as e:
            log.error(f"LOG: Failed to write major improvements log: {e}")


# ---------------------------------------------------------------------------
# Status command
# ---------------------------------------------------------------------------

def show_status(conn) -> None:
    """Print current generation status to stdout."""
    cur = conn.cursor()
    print("\n" + "=" * 60)
    print("ALPHA EVOLVE DAEMON -- Generation Status")
    print("=" * 60)

    # Assets by type and status
    cur.execute("""
        SELECT asset_type, status, COUNT(*) as cnt,
               AVG(fitness_score) as avg_fitness,
               MAX(fitness_score) as max_fitness
        FROM genesis.evolution_assets
        GROUP BY asset_type, status
        ORDER BY asset_type, status
    """)
    rows = cur.fetchall()
    if rows:
        print("\nAssets by type/status:")
        print(f"  {'Asset Type':<20} {'Status':<12} {'Count':>6} {'Avg Fitness':>12} {'Max Fitness':>12}")
        print("  " + "-" * 64)
        for r in rows:
            print(
                f"  {r[0]:<20} {r[1]:<12} {r[2]:>6} "
                f"{float(r[3] or 0):>12.4f} {float(r[4] or 0):>12.4f}"
            )
    else:
        print("\nNo assets in genesis.evolution_assets yet.")
        print("Run --cycle --seed to populate from INSTANTLY_SEQUENCE_PARTNER_PITCH.md")

    # Recent cycles
    cur.execute("""
        SELECT cycle_id, started_at, assets_scored, assets_culled,
               mutations_generated, promotions, dry_run
        FROM genesis.evolution_cycles
        ORDER BY started_at DESC
        LIMIT 10
    """)
    cycles = cur.fetchall()
    if cycles:
        print(f"\nRecent cycles (last {len(cycles)}):")
        print(
            f"  {'Cycle ID':<12} {'Started':<25} "
            f"{'Scored':>6} {'Culled':>6} {'Muts':>6} {'Promoted':>8} {'DryRun':>8}"
        )
        print("  " + "-" * 75)
        for c in cycles:
            print(
                f"  {c[0]:<12} {str(c[1])[:24]:<25} "
                f"{c[2]:>6} {c[3]:>6} {c[4]:>6} {c[5]:>8} {str(c[6]):>8}"
            )
    else:
        print("\nNo evolution cycles run yet.")

    # Pending mutations
    cur.execute("""
        SELECT COUNT(*) FROM genesis.mutation_test_results
        WHERE status = 'pending'
    """)
    pending = cur.fetchone()[0]
    cur.execute("""
        SELECT COUNT(*) FROM genesis.mutation_test_results
        WHERE status = 'completed' AND promoted IS NULL
    """)
    awaiting_ascend = cur.fetchone()[0]

    print(f"\nMutation pipeline:")
    print(f"  Pending Jules tests:   {pending}")
    print(f"  Awaiting ASCEND:       {awaiting_ascend}")
    print()


# ---------------------------------------------------------------------------
# Seed initial assets from INSTANTLY_SEQUENCE_PARTNER_PITCH.md
# ---------------------------------------------------------------------------

def seed_initial_email_assets(conn, dry_run: bool = False) -> int:
    """
    Seed genesis.evolution_assets with the 5 email steps x 2 subject variants
    from data/LEADS/INSTANTLY_SEQUENCE_PARTNER_PITCH.md.

    This is a one-time bootstrap operation. Existing assets are not overwritten.
    Returns count of newly inserted assets.
    """
    sequence_file = (
        GENESIS_ROOT / "data" / "LEADS" / "INSTANTLY_SEQUENCE_PARTNER_PITCH.md"
    )
    if not sequence_file.exists():
        log.warning(f"SEED: Sequence file not found at {sequence_file}")
        return 0

    # Parse the known structure of the file
    content = sequence_file.read_text(encoding="utf-8")

    # Extract email blocks -- they follow "## EMAIL N:" pattern
    email_blocks = re.split(r"(?m)^## EMAIL \d+:", content)[1:]

    # Known subject line pairs and bodies (parsed from the known structure)
    assets_to_seed = _parse_email_sequence(email_blocks)

    if not assets_to_seed:
        log.warning("SEED: Could not parse email sequence -- using hardcoded seed")
        assets_to_seed = _hardcoded_seed_assets()

    cur = conn.cursor()
    inserted = 0
    for asset in assets_to_seed:
        if dry_run:
            log.info(f"SEED [DRY RUN]: Would insert {asset['asset_id']}")
            inserted += 1
            continue
        try:
            cur.execute("""
                INSERT INTO genesis.evolution_assets
                (asset_id, asset_type, content, version, parent_id,
                 fitness_score, status, metrics, metadata, created_at)
                VALUES (%s, 'email_template', %s, 1, NULL, 0.0, 'active', '{}', %s, NOW())
                ON CONFLICT (asset_id) DO NOTHING
            """, (
                asset["asset_id"],
                json.dumps(asset["content"]),
                json.dumps({
                    "campaign_id": INSTANTLY_CAMPAIGN_ID,
                    "step": asset.get("step", 1),
                    "variant": asset.get("variant", "A"),
                }),
            ))
            if cur.rowcount > 0:
                inserted += 1
        except Exception as e:
            log.warning(f"SEED: Failed to insert {asset['asset_id']}: {e}")

    if not dry_run:
        conn.commit()

    log.info(f"SEED: Inserted {inserted} initial email_template assets")
    return inserted


def _parse_email_sequence(email_blocks: List[str]) -> List[Dict]:
    """Parse email blocks from the INSTANTLY_SEQUENCE markdown format."""
    assets = []
    for step_idx, block in enumerate(email_blocks, start=1):
        # Extract subject lines (marked with "### Subject Line A/B")
        subjects = re.findall(
            r"### Subject Line ([A-Z])\n+```\n(.*?)\n```",
            block,
            re.DOTALL,
        )
        # Extract body (first ```...``` block after "### Body")
        body_match = re.search(
            r"### Body\n+```\n(.*?)\n```",
            block,
            re.DOTALL,
        )
        body = body_match.group(1).strip() if body_match else ""

        for variant_letter, subject in subjects:
            asset_id = (
                f"email_partner_pitch_step{step_idx}_v{variant_letter.lower()}_seed"
            )
            assets.append({
                "asset_id": asset_id,
                "step": step_idx,
                "variant": variant_letter,
                "content": {
                    "subject_line": subject.strip(),
                    "body": body,
                },
            })

    return assets


def _hardcoded_seed_assets() -> List[Dict]:
    """Minimal hardcoded seeds if parsing fails."""
    return [
        {
            "asset_id": "email_partner_pitch_step1_va_seed",
            "step": 1, "variant": "A",
            "content": {
                "subject_line": "New revenue stream for {{companyName}} -- zero dev work",
                "body": (
                    "Hey team,\n\nI'm reaching out because {{companyName}} is exactly the "
                    "kind of agency that could add $50K-$150K in annual recurring revenue "
                    "with almost no extra work.\n\nWe've built an AI-powered Talking Widget "
                    "that agencies embed on their clients' websites. It answers visitor "
                    "questions with a real human-sounding voice, captures leads 24/7, and "
                    "books appointments.\n\nYour clients pay a monthly subscription. You "
                    "earn 20-33% recurring commission for life.\n\nInterested in a 15-min "
                    "demo? Reply \"interested\".\n\nCheers,\nKinan\nSunaiva Digital\n"
                    "sunaivadigital.com\n\n[Unsubscribe]"
                ),
            },
        },
        {
            "asset_id": "email_partner_pitch_step1_vb_seed",
            "step": 1, "variant": "B",
            "content": {
                "subject_line": "{{companyName}} + AI voice widgets = recurring revenue",
                "body": (
                    "Hey team,\n\nQuick question -- how much of your agency's revenue is "
                    "recurring vs project-based?\n\nWe've built an AI Talking Widget that "
                    "agencies white-label for their clients. $197-$597/mo per client, "
                    "20-33% goes to you.\n\nOne script tag install. We handle everything.\n\n"
                    "Worth a 15-min look?\n\nCheers,\nKinan\nSunaiva Digital\n\n[Unsubscribe]"
                ),
            },
        },
    ]


# ---------------------------------------------------------------------------
# Daemon mode
# ---------------------------------------------------------------------------

def run_daemon_loop() -> None:
    """
    Run as a long-running daemon, executing a cycle at 03:00 AEST daily.
    Uses a simple sleep loop -- for production, prefer systemd timer instead.
    """
    log.info("DAEMON: Alpha Evolve Daemon starting in daemon mode")
    log.info("DAEMON: Will execute cycle daily at 03:00 AEST (17:00 UTC)")
    log.info("DAEMON: Press Ctrl+C to stop")

    while True:
        now = datetime.now(AEST)
        # Next 03:00 AEST
        next_run = now.replace(hour=3, minute=0, second=0, microsecond=0)
        if now >= next_run:
            next_run += timedelta(days=1)

        sleep_seconds = (next_run - now).total_seconds()
        log.info(
            f"DAEMON: Sleeping {sleep_seconds/3600:.1f}h until "
            f"{next_run.isoformat()}"
        )
        try:
            time.sleep(sleep_seconds)
        except KeyboardInterrupt:
            log.info("DAEMON: Shutdown requested -- exiting")
            return

        # Run cycle
        log.info(f"DAEMON: Waking up at {datetime.now(AEST).isoformat()}")
        try:

            class _Args:
                dry_run = False
                force = False
                asset_type = None
                seed = False

            run_cycle(_Args())
        except Exception as e:
            log.error(f"DAEMON: Cycle failed: {e}", exc_info=True)


# ---------------------------------------------------------------------------
# Main cycle orchestrator
# ---------------------------------------------------------------------------

def run_cycle(args) -> Optional[CycleReport]:
    """Execute one complete Alpha Evolve cycle (all 6 phases)."""

    # ------------------------------------------------------------------
    # Pre-flight checks
    # ------------------------------------------------------------------
    if os.environ.get("ALPHA_EVOLVE_PAUSE", "").lower() == "true" and not getattr(args, "force", False):
        log.info(
            "Alpha Evolve PAUSED (ALPHA_EVOLVE_PAUSE=true). "
            "Use --force to override."
        )
        return None

    dry_run = getattr(args, "dry_run", False) or (
        os.environ.get("ALPHA_EVOLVE_DRY_RUN", "").lower() == "true"
    )
    if dry_run:
        log.info("=== DRY RUN MODE -- No changes will be made ===")

    cycle_id = datetime.now(AEST).strftime("%Y%m%d_%H%M%S")
    started_at = datetime.now(AEST).isoformat()

    log.info("=" * 60)
    log.info(f"ALPHA EVOLVE DAEMON v{CYCLE_VERSION} -- Cycle {cycle_id}")
    log.info(f"Started:  {started_at}")
    log.info(f"Dry Run:  {dry_run}")
    log.info(f"Phase:    1 (email templates only)")
    log.info("=" * 60)

    errors: List[str] = []
    total_scored = 0
    total_culled = 0
    total_mutations = 0
    total_tested = 0
    promotions_list: List[Dict] = []

    # ------------------------------------------------------------------
    # Connect to infrastructure
    # ------------------------------------------------------------------
    try:
        pg_conn = get_pg_connection()
    except RuntimeError as e:
        log.error(f"FATAL: {e}")
        return None

    # Ensure tables exist
    ensure_schema(pg_conn)

    # Optional seed
    if getattr(args, "seed", False):
        seed_initial_email_assets(pg_conn, dry_run=dry_run)

    # Optional Redis (non-fatal if unavailable)
    redis_client = None
    try:
        from elestio_config import RedisConfig
        import redis as redis_lib
        redis_client = redis_lib.Redis(**RedisConfig.get_connection_params())
        redis_client.ping()
        log.info("Redis connected")
    except Exception as e:
        log.warning(f"Redis unavailable: {e} -- continuing without cache updates")

    # Optional Qdrant (non-fatal if unavailable)
    qdrant_client = None
    try:
        from qdrant_client import QdrantClient
        from elestio_config import QdrantConfig
        qdrant_client = QdrantClient(**QdrantConfig.get_client_params())
        log.info("Qdrant connected")
    except Exception as e:
        log.warning(f"Qdrant unavailable: {e} -- continuing without semantic dedup")

    # ------------------------------------------------------------------
    # Initialise phase objects
    # ------------------------------------------------------------------
    harvester = MetricsHarvester(pg_conn, dry_run)
    culler = AssetCuller(pg_conn, dry_run)
    mutator = AssetMutator(pg_conn, qdrant_client, dry_run)
    tester = MutationTester(pg_conn, dry_run)
    promoter = AssetPromoter(pg_conn, redis_client, dry_run)
    logger = CycleLogger(pg_conn, dry_run)

    # ------------------------------------------------------------------
    # Determine which asset types to process (Phase 1 = email only)
    # ------------------------------------------------------------------
    requested_type = getattr(args, "asset_type", None)
    if requested_type:
        asset_types = [requested_type]
    else:
        asset_types = PHASE_1_ASSET_TYPES   # email_template only in Phase 1

    # ------------------------------------------------------------------
    # Main per-asset-type loop
    # ------------------------------------------------------------------
    for asset_type in asset_types:
        log.info(f"\n{'='*40}")
        log.info(f"Processing: {asset_type}")
        log.info(f"{'='*40}")

        try:
            # Phase 1: HARVEST
            log.info(f"--- Phase 1: HARVEST ({asset_type}) ---")
            if asset_type == "email_template":
                metrics_list = harvester.harvest_email_metrics()
                if metrics_list:
                    harvester.update_email_metrics_in_bloodstream(metrics_list)
            harvester.update_fitness_scores(asset_type)

            # Count scored assets
            cur = pg_conn.cursor()
            cur.execute("""
                SELECT COUNT(*) FROM genesis.evolution_assets
                WHERE asset_type = %s AND status = 'active' AND fitness_score > 0
            """, (asset_type,))
            scored_count = cur.fetchone()[0]
            total_scored += scored_count
            log.info(f"HARVEST: {scored_count} {asset_type} assets with fitness > 0")

            # Phase 2: CULL
            log.info(f"--- Phase 2: CULL ({asset_type}) ---")
            culled_ids = culler.cull(asset_type)
            total_culled += len(culled_ids)

            # Phase 3: MUTATE
            log.info(f"--- Phase 3: MUTATE ({asset_type}) ---")
            elites = mutator.get_elites(asset_type)
            all_stored_mutations: List[Dict] = []

            for elite in elites:
                raw_mutations = mutator.generate_mutations(elite, asset_type)
                if not raw_mutations:
                    continue

                stored = mutator.store_mutations(
                    raw_mutations, elite, asset_type, cycle_id
                )
                all_stored_mutations.extend(stored)
                total_mutations += len(stored)

            # Phase 4: TEST
            log.info(f"--- Phase 4: TEST ({asset_type}) ---")
            if all_stored_mutations and elites:
                # Group by parent for Jules task file organisation
                parent_map: Dict[str, Dict] = {e["asset_id"]: e for e in elites}
                by_parent: Dict[str, List[Dict]] = {}
                for m in all_stored_mutations:
                    pid = m["parent_id"]
                    by_parent.setdefault(pid, []).append(m)

                for parent_id, parent_mutations in by_parent.items():
                    parent = parent_map.get(parent_id, {"asset_id": parent_id, "fitness_score": 0.0, "content": {}})
                    tasks = tester.generate_test_tasks(
                        parent_mutations, parent, asset_type, cycle_id
                    )
                    total_tested += len(tasks)

        except Exception as e:
            err_msg = f"Error processing {asset_type}: {e}"
            log.error(err_msg, exc_info=True)
            errors.append(err_msg)
            try:
                pg_conn.rollback()
            except Exception:
                pass

    # ------------------------------------------------------------------
    # Phase 5: ASCEND (checks results from PREVIOUS cycle's Jules tests)
    # ------------------------------------------------------------------
    log.info("\n--- Phase 5: ASCEND ---")
    try:
        promotions = promoter.evaluate_and_promote(cycle_id)
        promotions_list.extend(promotions)
    except Exception as e:
        err_msg = f"ASCEND phase error: {e}"
        log.error(err_msg, exc_info=True)
        errors.append(err_msg)

    # ------------------------------------------------------------------
    # Phase 6: LOG
    # ------------------------------------------------------------------
    log.info("\n--- Phase 6: LOG ---")
    completed_at = datetime.now(AEST).isoformat()
    major_improvements = [p for p in promotions_list if p.get("is_major")]

    report = CycleReport(
        cycle_id=cycle_id,
        started_at=started_at,
        completed_at=completed_at,
        assets_scored=total_scored,
        assets_culled=total_culled,
        mutations_generated=total_mutations,
        mutations_tested=total_tested,
        promotions=len(promotions_list),
        major_improvements=major_improvements,
        errors=errors,
        dry_run=dry_run,
    )

    logger.log_cycle(report)

    # ------------------------------------------------------------------
    # Summary
    # ------------------------------------------------------------------
    log.info("\n" + "=" * 60)
    log.info(f"ALPHA EVOLVE CYCLE {cycle_id} COMPLETE")
    log.info(f"  Assets Scored:      {total_scored}")
    log.info(f"  Assets Culled:      {total_culled}")
    log.info(f"  Mutations Generated:{total_mutations}")
    log.info(f"  Jules Tasks Created:{total_tested}")
    log.info(f"  Promotions:         {len(promotions_list)}")
    log.info(f"  Major (>20%):       {len(major_improvements)}")
    log.info(f"  Errors:             {len(errors)}")
    log.info(f"  Dry Run:            {dry_run}")
    log.info(f"  Duration:           {started_at} -> {completed_at}")
    log.info("=" * 60)

    if dry_run:
        log.info("DRY RUN complete. No changes were made to the database.")
        log.info(f"Jules tasks (for review): {JULES_TASKS_DIR}")

    # Cleanup
    try:
        pg_conn.close()
    except Exception:
        pass

    return report


# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------

def main() -> None:
    parser = argparse.ArgumentParser(
        description="Alpha Evolve Daemon -- Autonomous nightly evolution for Genesis assets",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  python scripts/alpha_evolve_daemon.py --dry-run        # Simulate without changes
  python scripts/alpha_evolve_daemon.py --cycle          # Run one evolution cycle
  python scripts/alpha_evolve_daemon.py --cycle --seed   # Seed + run cycle
  python scripts/alpha_evolve_daemon.py --status         # Show generation status
  python scripts/alpha_evolve_daemon.py --daemon         # Run as daemon (03:00 AEST)

Environment vars:
  INSTANTLY_API_KEY         Override hardcoded Instantly key
  GEMINI_API_KEY            Gemini key for LLM mutations
  OPENROUTER_API_KEY        Fallback LLM if Gemini unavailable
  ALPHA_EVOLVE_PAUSE=true   Pause the daemon
  ALPHA_EVOLVE_DRY_RUN=true Force dry-run mode
  ALPHA_EVOLVE_MUTATIONS=N  Mutations per elite (default 5)
        """,
    )

    mode_group = parser.add_mutually_exclusive_group(required=True)
    mode_group.add_argument(
        "--cycle", action="store_true",
        help="Run one complete evolution cycle",
    )
    mode_group.add_argument(
        "--dry-run", action="store_true",
        help="Simulate a cycle without making any changes (safe to run anytime)",
    )
    mode_group.add_argument(
        "--status", action="store_true",
        help="Show current generation status (read-only)",
    )
    mode_group.add_argument(
        "--daemon", action="store_true",
        help="Run as long-running daemon (fires at 03:00 AEST daily)",
    )

    parser.add_argument(
        "--force", action="store_true",
        help="Override ALPHA_EVOLVE_PAUSE env var",
    )
    parser.add_argument(
        "--asset-type", choices=ASSET_TYPES, dest="asset_type",
        help="Process only this asset type (default: all Phase 1 types)",
    )
    parser.add_argument(
        "--seed", action="store_true",
        help="Seed initial email assets from INSTANTLY_SEQUENCE_PARTNER_PITCH.md before cycling",
    )

    args = parser.parse_args()

    # ------------------------------------------------------------------
    # Route to correct mode
    # ------------------------------------------------------------------
    if args.status:
        try:
            conn = get_pg_connection()
            ensure_schema(conn)
            show_status(conn)
            conn.close()
        except Exception as e:
            log.error(f"STATUS: Cannot connect to database: {e}")
            sys.exit(1)
        return

    if args.daemon:
        run_daemon_loop()
        return

    # --cycle or --dry-run
    report = run_cycle(args)
    if report and report.errors:
        # Non-zero exit if there were errors (useful for cron monitoring)
        sys.exit(1)


# ---------------------------------------------------------------------------
# Session Intelligence: Transcript Scanning + TITAN Memory Auto-Update
# ---------------------------------------------------------------------------
# Added: 2026-02-23 Alpha Evolve Upgrade
# These functions run on the Stop hook event (session end) to:
#   1. Scan recent KG entity files for new learnings
#   2. Extract structured learnings and synthesize new axioms
#   3. Auto-update the TITAN MEMORY section in all CLAUDE.md targets
# ---------------------------------------------------------------------------

def run_session_intelligence(dry_run: bool = False) -> Dict[str, Any]:
    """
    Master orchestrator for end-of-session intelligence pipeline.
    Calls titan_memory_updater and axiom_synthesizer as subprocess modules.

    Returns dict with results summary.
    """
    import subprocess
    results: Dict[str, Any] = {
        "titan_updated": False,
        "axioms_synthesized": False,
        "errors": [],
    }

    python_exe = sys.executable
    genesis_root = str(GENESIS_ROOT)

    # Step 1: TITAN Memory Update
    titan_script = str(GENESIS_ROOT / "core" / "titan_memory_updater.py")
    if Path(titan_script).exists():
        try:
            cmd = [python_exe, "-NoProfile", titan_script, "--top", "20"]
            if dry_run:
                cmd.append("--dry-run")
            proc = subprocess.run(
                cmd,
                capture_output=True,
                text=True,
                timeout=60,
                cwd=genesis_root,
            )
            if proc.returncode == 0:
                results["titan_updated"] = True
                log.info("SESSION INTELLIGENCE: TITAN Memory updated successfully")
            else:
                results["errors"].append(f"titan_memory_updater exited {proc.returncode}: {proc.stderr[:200]}")
                log.warning(f"SESSION INTELLIGENCE: TITAN Memory update failed: {proc.stderr[:200]}")
        except Exception as e:
            results["errors"].append(f"titan_memory_updater exception: {e}")
            log.error(f"SESSION INTELLIGENCE: TITAN Memory updater error: {e}")
    else:
        log.warning(f"SESSION INTELLIGENCE: titan_memory_updater.py not found at {titan_script}")

    # Step 2: Axiom Synthesis
    axiom_script = str(GENESIS_ROOT / "core" / "axiom_synthesizer.py")
    if Path(axiom_script).exists():
        try:
            cmd = [python_exe, "-NoProfile", axiom_script, "--min-freq", "1"]
            if dry_run:
                cmd.append("--dry-run")
            proc = subprocess.run(
                cmd,
                capture_output=True,
                text=True,
                timeout=60,
                cwd=genesis_root,
            )
            if proc.returncode == 0:
                results["axioms_synthesized"] = True
                log.info("SESSION INTELLIGENCE: Axiom synthesis complete")
            else:
                results["errors"].append(f"axiom_synthesizer exited {proc.returncode}: {proc.stderr[:200]}")
                log.warning(f"SESSION INTELLIGENCE: Axiom synthesis failed: {proc.stderr[:200]}")
        except Exception as e:
            results["errors"].append(f"axiom_synthesizer exception: {e}")
            log.error(f"SESSION INTELLIGENCE: Axiom synthesizer error: {e}")
    else:
        log.warning(f"SESSION INTELLIGENCE: axiom_synthesizer.py not found at {axiom_script}")

    # Step 3: Log the session intelligence run to KG
    entity = {
        "id": f"session_intel_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}",
        "type": "session_intelligence_run",
        "date": datetime.now(timezone.utc).isoformat(),
        "titan_updated": results["titan_updated"],
        "axioms_synthesized": results["axioms_synthesized"],
        "errors": results["errors"],
        "dry_run": dry_run,
    }
    kg_path = GENESIS_ROOT / "KNOWLEDGE_GRAPH" / "entities" / "session_intelligence_runs.jsonl"
    try:
        kg_path.parent.mkdir(parents=True, exist_ok=True)
        with open(str(kg_path), "a", encoding="utf-8") as fh:
            fh.write(json.dumps(entity) + "\n")
    except Exception as e:
        log.warning(f"SESSION INTELLIGENCE: Could not write KG entity: {e}")

    log.info(
        f"SESSION INTELLIGENCE complete: "
        f"titan={results['titan_updated']} "
        f"axioms={results['axioms_synthesized']} "
        f"errors={len(results['errors'])}"
    )
    return results


def scan_session_ctm_files(since_date: Optional[str] = None) -> List[Dict]:
    """
    Scan all session CTM files in KNOWLEDGE_GRAPH/entities/ for recent learnings.
    Returns list of structured learning dicts extracted from session CTMs.

    Args:
        since_date: ISO date string (YYYY-MM-DD). Only scan files with this date or later.
                    Defaults to today.
    """
    from datetime import date as dt_date
    if since_date is None:
        since_date = dt_date.today().isoformat()

    kg_dir = GENESIS_ROOT / "KNOWLEDGE_GRAPH" / "entities"
    if not kg_dir.exists():
        return []

    learnings = []
    ctm_pattern = re.compile(r"session_\d+_ctm_.*\.jsonl$")

    for fpath in sorted(kg_dir.glob("session_*ctm*.jsonl")):
        if not ctm_pattern.search(fpath.name):
            continue

        try:
            lines = fpath.read_text(encoding="utf-8", errors="replace").splitlines()
            for line in lines:
                line = line.strip()
                if not line:
                    continue
                try:
                    rec = json.loads(line)
                except json.JSONDecodeError:
                    continue

                # Extract key_insight fields
                if rec.get("key_insight"):
                    learnings.append({
                        "source": fpath.name,
                        "type": rec.get("type", "session_ctm"),
                        "date": rec.get("date", since_date),
                        "insight": rec["key_insight"],
                    })

                # Extract next_actions p0 items as learnings
                if rec.get("p0"):
                    for action in rec["p0"]:
                        learnings.append({
                            "source": fpath.name,
                            "type": "next_action_p0",
                            "date": rec.get("date", since_date),
                            "insight": action,
                        })

                # Extract blockers as known issues learnings
                if rec.get("blocker"):
                    learnings.append({
                        "source": fpath.name,
                        "type": "known_blocker",
                        "date": rec.get("date", since_date),
                        "insight": f"BLOCKER: {rec['blocker']}",
                    })
        except Exception as e:
            log.debug(f"Could not scan {fpath.name}: {e}")

    log.info(f"SESSION SCAN: Extracted {len(learnings)} learnings from CTM files")
    return learnings


if __name__ == "__main__":
    # Check if invoked as session intelligence runner
    if len(sys.argv) > 1 and sys.argv[1] == "--session-intelligence":
        dry = "--dry-run" in sys.argv
        run_session_intelligence(dry_run=dry)
    else:
        main()


# VERIFICATION_STAMP
# Story: Alpha Evolve Daemon Phase 1
# Verified By: Claude Opus 4.6
# Verified At: 2026-02-17T20:00:00+10:00
# Features: 6-phase cycle (HARVEST/CULL/MUTATE/TEST/ASCEND/LOG),
#   --dry-run, --cycle, --status, --daemon, --seed, --force flags,
#   Elestio PostgreSQL + Redis + Qdrant integration, Gemini + OpenRouter
#   mutation generation, Instantly.ai API v2 harvest, Qdrant semantic dedup,
#   email validation (Spam Act compliance, merge tags, word count),
#   min 3 survivors safety, archive-not-delete, pause env var,
#   KG axiom logging, major improvement notifications, Jules task generation
# Phase: 1 (email_template evolution only)
# Lines: ~2475
# Status: COMPLETE -- ready for dry-run testing
