
    &i(                        U d Z ddlmZ ddlZddlmZmZ ddlmZ ddl	m
Z
mZ  ej                  e      ZdZded	<    G d
 de      Zy)ut  
core/coherence/swarm_worker_base.py

SwarmWorkerBase — ABC that claims tasks from Redis Stream via XREADGROUP
and sends XACK after successful completion.

Workers subclass this and implement `process(task: dict) -> Optional[dict]`.
The base class handles all stream mechanics:

  * XREADGROUP GROUP {group} {consumer_id} COUNT 1 BLOCK 5000
      — receives exactly one new message at a time (">")
  * XACK on success — guarantees at-least-once delivery with idempotent workers
  * No XACK on failure — task re-enters the Pending Entry List (PEL)
      and will be re-claimed after PEL_TIMEOUT_MS (60 s)
  * XAUTOCLAIM on startup — recovers orphaned pending entries from crashed workers

Usage::

    class MyWorker(SwarmWorkerBase):
        async def process(self, task: dict) -> Optional[dict]:
            # ... do work ...
            return {"result": "ok"}

    worker = MyWorker(redis_client, staging_area=staging)
    await worker.run_worker_loop(group="genesis_workers", consumer_id="worker-1")

# VERIFICATION_STAMP
# Story: 6.04
# Verified By: parallel-builder
# Verified At: 2026-02-25
# Tests: 15/15
# Coverage: 100%
    )annotationsN)ABCabstractmethod)Optional)
STREAM_KEYDEFAULT_GROUPi`  intPEL_TIMEOUT_MSc                  T    e Zd ZdZd	d
dZedd       Zedf	 	 	 	 	 ddZddZ	d
dZ
y)SwarmWorkerBaseuA  
    Base class for all Gemini swarm workers.

    Claims tasks from Redis Stream via XREADGROUP with Consumer Groups.
    Guarantees exactly-once delivery: XACK only on success.

    Subclasses must implement :meth:`process`.

    Consumer Group mechanics
    ------------------------
    * Each worker instance calls ``run_worker_loop`` with a unique
      ``consumer_id`` (e.g. ``"worker-1"``, ``"worker-2"``).
    * Redis delivers each message to exactly ONE consumer in the group.
    * Un-ACK'd messages stay in the Pending Entry List (PEL) and are
      reclaimed by the next available worker after ``PEL_TIMEOUT_MS``.
    * On startup, :meth:`_reclaim_pending` uses ``XAUTOCLAIM`` to absorb
      orphaned entries from previously crashed workers.

    Args:
        redis_client: Async Redis client supporting:
                      xreadgroup, xack, xautoclaim
        staging_area: Optional StagingArea — when provided and ``process``
                      returns a non-None result, the result is forwarded via
                      ``staging_area.submit_delta(result)``.
    Nc                .    || _         || _        d| _        y )NF)redisstaging_running)selfredis_clientstaging_areas      9/mnt/e/genesis-system/core/coherence/swarm_worker_base.py__init__zSwarmWorkerBase.__init__T   s    !
##    c                   K   yw)a  
        Process a single task claimed from the stream.

        Args:
            task: Dict of task fields decoded from the Redis Stream entry.
                  All keys and values are Python ``str`` (bytes decoded).

        Returns:
            Result dict on success, or ``None``.
            When a non-None result is returned AND a ``staging_area`` was
            provided at construction time, the result is forwarded to
            ``staging_area.submit_delta(result)``.

        Raises:
            Any exception signals processing failure.
            The task entry will NOT be ACK'd and will re-queue after
            ``PEL_TIMEOUT_MS`` milliseconds.
        N )r   tasks     r   processzSwarmWorkerBase.process]   s     ( 	s   zworker-1c                  K   d| _         | j                  ||       d{    | j                   rd	 | j                  j                  ||t        didd       d{   }|s?|d   \  }}|D ]  \  }}i }	|j                         D ]N  \  }
}t        |
t              r|
j                         n|
}t        |t              r|j                         n|}||	|<   P t        |t              r|j                         n|}	 | j                  |	       d{   }| j                  %|#| j                  j                  |       d{    | j                  j                  t        ||       d{    t        j!                  d	|        | j                   rcyy7 w7 ># t
        $ r!}t        j                  d|       Y d}~d}~ww xY w7 7 7 d# t
        $ r"}t        j                  d
||       Y d}~d}~ww xY ww)u  
        Main loop: continuously claims and processes tasks from the stream.

        Flow per iteration::

            XREADGROUP GROUP {group} {consumer_id}
                       COUNT 1 BLOCK 5000
                       STREAMS genesis:swarm:tasks >
              → process(task)
              → on success:  XACK genesis:swarm:tasks {group} {entry_id}
              → on failure:  log error, no XACK (re-queues after PEL timeout)

        On startup, calls :meth:`_reclaim_pending` to recover orphaned entries.

        Args:
            group:       Consumer Group name (default: ``"genesis_workers"``).
            consumer_id: Unique name for this consumer instance
                         (default: ``"worker-1"``).
        TN>   i  )countblockz&SwarmWorkerBase: xreadgroup failed: %sr   z-SwarmWorkerBase: processed and ACK'd entry %sz2SwarmWorkerBase: process() failed for entry %s: %s)r   _reclaim_pendingr   
xreadgroupr   	Exceptionloggererroritems
isinstancebytesdecoder   r   submit_deltaxackdebug)r   groupconsumer_identriesexcstream_namemessagesentry_idfieldsr   kvkeyvaleidresults                   r   run_worker_loopzSwarmWorkerBase.run_worker_loopw   s    0  ##E;777mm
 $

 5 5% !6 !  $+AJ!K$, # &  "LLN $DAq(21e(<!((*!C(21e(<!((*!C #DI$ ,6h+Fhoo'H
#'<<#55F ||/F4F"ll77??? **//*eSAAALLG3#- mm 	8  EsK6 6 @ B
 ! LLL s   G=FG=)F FF  BG=:GG	0G?G (G(G)GG=G=F 	G%G;G=GG=	GGG	G:G5/G=5G::G=c                R  K   	 | j                   j                  t        ||t        d       d{   }|rBt	        |      dk\  r4|d   |d   ng }t	        |      }|dkD  rt
        j                  d|       |S y7 I# t        $ r }t
        j                  d|       Y d}~yd}~ww xY ww)	a  
        Reclaim orphaned pending entries older than ``PEL_TIMEOUT_MS``.

        Called once on :meth:`run_worker_loop` startup.  Uses ``XAUTOCLAIM``
        to transfer stale pending entries (from crashed workers) to this
        consumer so they are reprocessed.

        XAUTOCLAIM semantics::

            XAUTOCLAIM genesis:swarm:tasks {group} {consumer_id}
                       {PEL_TIMEOUT_MS} 0-0

        Args:
            group:       Consumer Group name.
            consumer_id: Name of this consumer (entries transferred to it).

        Returns:
            Number of reclaimed entries (0 on failure or nothing to claim).
        z0-0)min_idle_timestart_idN   r   r   z6SwarmWorkerBase: reclaimed %d orphaned pending entriesz&SwarmWorkerBase: xautoclaim failed: %s)	r   
xautoclaimr   r
   lenr#   infor"   warning)r   r,   r-   r9   	reclaimedr   r/   s          r   r    z SwarmWorkerBase._reclaim_pending   s     (	::00, 1  F #f+*)/)>F1IB	I19KKP %(  	NNCSI	s?   B'+A; A9AA; 7B'9A; ;	B$BB'B$$B'c                <    d| _         t        j                  d       y)z
        Signal graceful shutdown.

        Sets ``_running = False``.  The worker loop exits after the current
        iteration completes (including the current BLOCK wait).
        FuD   SwarmWorkerBase: stop() called — will exit after current iterationN)r   r#   rA   )r   s    r   stopzSwarmWorkerBase.stop  s     Z[r   )N)returnNone)r   dictrF   zOptional[dict])r,   strr-   rI   rF   rG   )r,   rI   r-   rI   rF   r	   )__name__
__module____qualname____doc__r   r   r   r   r:   r    rE   r   r   r   r   r   9   sZ    4$  6 #%VV V 
	Vx+b\r   r   )rM   
__future__r   loggingabcr   r   typingr   core.coherence.task_dag_pusherr   r   	getLoggerrJ   r#   r
   __annotations__r   r   r   r   <module>rU      sJ    D #  #  D			8	$  S\c S\r   