
    Oi                        U d Z ddlmZ ddlZddlZddlZddlmZmZ  ej                  e
      ZdZded<   dZded	<    G d
 d      Zy)u	  
core/coherence/task_dag_pusher.py

TaskDAGPusher — Pushes task DAGs to a Redis Stream for exactly-once delivery
to Gemini swarm workers via Consumer Groups.

Each task in the DAG is written to the stream key ``genesis:swarm:tasks`` via
Redis XADD. Workers consume tasks from the stream using a Consumer Group,
which guarantees exactly-once processing and preserves order within a session.

Stream entry fields (all strings, as required by Redis Streams):
    session_id  — session these tasks belong to
    task_id     — UUID4 assigned at push time
    task_type   — caller-supplied category string (defaults to "unknown")
    payload     — JSON-encoded dict of task-specific data
    tier        — execution tier string (defaults to "T1")
    priority    — priority string (defaults to "normal")

Usage::

    pusher = TaskDAGPusher(redis_client)
    await pusher.create_consumer_group()
    entry_ids = await pusher.push_dag("sess-123", [
        {"task_type": "research", "payload": {"query": "..."}, "tier": "T2", "priority": "high"},
        {"task_type": "synthesize", "payload": {"topic": "..."}, "tier": "T1", "priority": "normal"},
    ])

# VERIFICATION_STAMP
# Story: 6.03
# Verified By: parallel-builder
# Verified At: 2026-02-25
# Tests: 10/10
# Coverage: 100%
    )annotationsN)AnyListzgenesis:swarm:tasksstr
STREAM_KEYgenesis_workersDEFAULT_GROUPc                  4    e Zd ZdZddZddZefd	dZd
dZy)TaskDAGPusheru  
    Pushes task DAGs to Redis Streams for exactly-once delivery
    to Gemini swarm workers via Consumer Groups.

    Each call to :meth:`push_dag` writes one stream entry per task via
    ``XADD genesis:swarm:tasks * field value ...``.  Tasks are pushed in the
    order provided — the caller is responsible for any topological sort
    required by upstream dependency analysis.

    Consumer Group integration
    --------------------------
    Call :meth:`create_consumer_group` once (idempotent) before workers start
    consuming.  Workers use ``XREADGROUP`` to receive tasks; ``XACK`` after
    successful processing.

    Redis interface requirements (async client)
    -------------------------------------------
    The ``redis_client`` argument must expose:
    - ``xadd(key, fields)``       — async, returns stream entry ID (str/bytes)
    - ``xgroup_create(key, groupname, id, mkstream)`` — async, raises on BUSYGROUP
    - ``xlen(key)``               — async, returns int
    c                    || _         y)zj
        Args:
            redis_client: Async Redis client supporting xadd, xgroup_create, xlen.
        N)redis)selfredis_clients     7/mnt/e/genesis-system/core/coherence/task_dag_pusher.py__init__zTaskDAGPusher.__init__R   s    
 "
    c                  K   g }|D ]  }t        t        j                               }|||j                  dd      t	        j
                  |j                  di             t        |j                  dd            t        |j                  dd            d}| j                  j                  t        |       d	{   }|j                  |       t        j                  d
||d   |        |S 7 4w)ub  
        Push each task to the Redis Stream ``genesis:swarm:tasks`` via XADD.

        Tasks are pushed in the order provided (caller handles topological sort).
        A fresh UUID4 ``task_id`` is generated for every task at push time so
        the caller does not need to pre-assign identifiers.

        Stream entry fields:
            - session_id  (str)
            - task_id     (str — UUID4)
            - task_type   (str — defaults to ``"unknown"``)
            - payload     (str — JSON-encoded dict, defaults to ``"{}"``)
            - tier        (str — defaults to ``"T1"``)
            - priority    (str — defaults to ``"normal"``)

        Args:
            session_id: The session these tasks belong to.
            tasks:      List of task dicts.  Each may have any subset of the
                        keys ``task_type``, ``payload``, ``tier``, ``priority``.

        Returns:
            List of Redis stream entry IDs, one per task, in push order.
            These are the IDs returned by ``XADD`` (e.g. ``"1696000000000-0"``),
            *not* the generated ``task_id`` UUIDs.
        	task_typeunknownpayloadtierT1prioritynormal)
session_idtask_idr   r   r   r   Nz>TaskDAGPusher: pushed task %s (type=%s) to stream, entry_id=%s)r   uuiduuid4getjsondumpsr   xaddr   appendloggerdebug)r   r   tasks	entry_idstaskr   fieldsentry_ids           r   push_dagzTaskDAGPusher.push_dagY   s     4  "	 	D$**,'G("!XXk9=::dhhy"&=>DHHVT23X >?F "ZZ__Z@@HX&LLP{#		*  As   B6C/8C-95C/c                6  K   	 | j                   j                  t        |dd       d{    t        j	                  d|       y7 # t
        $ rE}dt        |      v rt        j                  d|       Y d}~yt        j                  d||        d}~ww xY ww)	u  
        Create a Consumer Group on the stream.  Idempotent — safe to call
        multiple times.

        Internally issues::

            XGROUP CREATE genesis:swarm:tasks <group_name> 0 MKSTREAM

        The ``id="0"`` means workers will receive all existing entries from
        the beginning of the stream (not just new entries added after group
        creation).  ``mkstream=True`` creates the stream key if it does not
        yet exist.

        Args:
            group_name: Name of the Consumer Group to create.
                        Defaults to ``"genesis_workers"``.

        Returns:
            ``True`` on success, or if the group already exists (BUSYGROUP).

        Raises:
            Exception: Re-raises any Redis error that is *not* a BUSYGROUP error.
        0T)idmkstreamNz*TaskDAGPusher: created consumer group '%s'	BUSYGROUPuD   TaskDAGPusher: consumer group '%s' already exists (BUSYGROUP — OK)z@TaskDAGPusher: unexpected error creating consumer group '%s': %s)	r   xgroup_creater   r$   info	Exceptionr   r%   error)r   
group_nameexcs      r   create_consumer_groupz#TaskDAGPusher.create_consumer_group   s     0	****J3 +    KKDjQ	
  	 c#h&Z LLR
 	sD   B&A AA BA 	B#B4B9BBBc                \   K   | j                   j                  t               d{   S 7 w)z
        Return the current number of entries in the stream via ``XLEN``.

        Returns:
            Integer count of entries in ``genesis:swarm:tasks``.
        N)r   xlenr   )r   s    r   get_stream_lengthzTaskDAGPusher.get_stream_length   s!      ZZ__Z0000s   #,*,N)r   r   returnNone)r   r   r&   z
List[dict]r;   z	List[str])r5   r   r;   bool)r;   int)	__name__
__module____qualname____doc__r   r+   r	   r7   r:    r   r   r   r   :   s#    ."1f =J ,\1r   r   )rB   
__future__r   r    loggingr   typingr   r   	getLoggerr?   r$   r   __annotations__r	   r   rC   r   r   <module>rI      sP   !F #    			8	$ (
C '&s &G1 G1r   