"""
core/coherence/task_dag_pusher.py

TaskDAGPusher — Pushes task DAGs to a Redis Stream for exactly-once delivery
to Gemini swarm workers via Consumer Groups.

Each task in the DAG is written to the stream key ``genesis:swarm:tasks`` via
Redis XADD. Workers consume tasks from the stream using a Consumer Group,
which guarantees exactly-once processing and preserves order within a session.

Stream entry fields (all strings, as required by Redis Streams):
    session_id  — session these tasks belong to
    task_id     — UUID4 assigned at push time
    task_type   — caller-supplied category string (defaults to "unknown")
    payload     — JSON-encoded dict of task-specific data
    tier        — execution tier string (defaults to "T1")
    priority    — priority string (defaults to "normal")

Usage::

    pusher = TaskDAGPusher(redis_client)
    await pusher.create_consumer_group()
    entry_ids = await pusher.push_dag("sess-123", [
        {"task_type": "research", "payload": {"query": "..."}, "tier": "T2", "priority": "high"},
        {"task_type": "synthesize", "payload": {"topic": "..."}, "tier": "T1", "priority": "normal"},
    ])

# VERIFICATION_STAMP
# Story: 6.03
# Verified By: parallel-builder
# Verified At: 2026-02-25
# Tests: 10/10
# Coverage: 100%
"""

from __future__ import annotations

import json
import logging
import uuid
from typing import Any, List

logger = logging.getLogger(__name__)

# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------

STREAM_KEY: str = "genesis:swarm:tasks"
DEFAULT_GROUP: str = "genesis_workers"


# ---------------------------------------------------------------------------
# Main class
# ---------------------------------------------------------------------------


class TaskDAGPusher:
    """
    Pushes task DAGs to Redis Streams for exactly-once delivery
    to Gemini swarm workers via Consumer Groups.

    Each call to :meth:`push_dag` writes one stream entry per task via
    ``XADD genesis:swarm:tasks * field value ...``.  Tasks are pushed in the
    order provided — the caller is responsible for any topological sort
    required by upstream dependency analysis.

    Consumer Group integration
    --------------------------
    Call :meth:`create_consumer_group` once (idempotent) before workers start
    consuming.  Workers use ``XREADGROUP`` to receive tasks; ``XACK`` after
    successful processing.

    Redis interface requirements (async client)
    -------------------------------------------
    The ``redis_client`` argument must expose:
    - ``xadd(key, fields)``       — async, returns stream entry ID (str/bytes)
    - ``xgroup_create(key, groupname, id, mkstream)`` — async, raises on BUSYGROUP
    - ``xlen(key)``               — async, returns int
    """

    def __init__(self, redis_client: Any) -> None:
        """
        Args:
            redis_client: Async Redis client supporting xadd, xgroup_create, xlen.
        """
        self.redis = redis_client

    async def push_dag(self, session_id: str, tasks: List[dict]) -> List[str]:
        """
        Push each task to the Redis Stream ``genesis:swarm:tasks`` via XADD.

        Tasks are pushed in the order provided (caller handles topological sort).
        A fresh UUID4 ``task_id`` is generated for every task at push time so
        the caller does not need to pre-assign identifiers.

        Stream entry fields:
            - session_id  (str)
            - task_id     (str — UUID4)
            - task_type   (str — defaults to ``"unknown"``)
            - payload     (str — JSON-encoded dict, defaults to ``"{}"``)
            - tier        (str — defaults to ``"T1"``)
            - priority    (str — defaults to ``"normal"``)

        Args:
            session_id: The session these tasks belong to.
            tasks:      List of task dicts.  Each may have any subset of the
                        keys ``task_type``, ``payload``, ``tier``, ``priority``.

        Returns:
            List of Redis stream entry IDs, one per task, in push order.
            These are the IDs returned by ``XADD`` (e.g. ``"1696000000000-0"``),
            *not* the generated ``task_id`` UUIDs.
        """
        entry_ids: List[str] = []

        for task in tasks:
            task_id = str(uuid.uuid4())
            fields = {
                "session_id": session_id,
                "task_id": task_id,
                "task_type": task.get("task_type", "unknown"),
                "payload": json.dumps(task.get("payload", {})),
                "tier": str(task.get("tier", "T1")),
                "priority": str(task.get("priority", "normal")),
            }

            entry_id = await self.redis.xadd(STREAM_KEY, fields)
            entry_ids.append(entry_id)

            logger.debug(
                "TaskDAGPusher: pushed task %s (type=%s) to stream, entry_id=%s",
                task_id,
                fields["task_type"],
                entry_id,
            )

        return entry_ids

    async def create_consumer_group(self, group_name: str = DEFAULT_GROUP) -> bool:
        """
        Create a Consumer Group on the stream.  Idempotent — safe to call
        multiple times.

        Internally issues::

            XGROUP CREATE genesis:swarm:tasks <group_name> 0 MKSTREAM

        The ``id="0"`` means workers will receive all existing entries from
        the beginning of the stream (not just new entries added after group
        creation).  ``mkstream=True`` creates the stream key if it does not
        yet exist.

        Args:
            group_name: Name of the Consumer Group to create.
                        Defaults to ``"genesis_workers"``.

        Returns:
            ``True`` on success, or if the group already exists (BUSYGROUP).

        Raises:
            Exception: Re-raises any Redis error that is *not* a BUSYGROUP error.
        """
        try:
            await self.redis.xgroup_create(
                STREAM_KEY, group_name, id="0", mkstream=True
            )
            logger.info("TaskDAGPusher: created consumer group '%s'", group_name)
            return True
        except Exception as exc:
            # Redis raises an error whose message contains "BUSYGROUP" when the
            # group already exists.  Treat this as a successful no-op.
            if "BUSYGROUP" in str(exc):
                logger.debug(
                    "TaskDAGPusher: consumer group '%s' already exists (BUSYGROUP — OK)",
                    group_name,
                )
                return True
            logger.error(
                "TaskDAGPusher: unexpected error creating consumer group '%s': %s",
                group_name,
                exc,
            )
            raise

    async def get_stream_length(self) -> int:
        """
        Return the current number of entries in the stream via ``XLEN``.

        Returns:
            Integer count of entries in ``genesis:swarm:tasks``.
        """
        return await self.redis.xlen(STREAM_KEY)
