
    iGJ                        d Z ddlmZ ddlZddlZddlZddlZddlmZ ddl	m	Z	m
Z
 ddlmZ ddlmZmZ ddlmZ  ej$                  e      Z ed	      Zd
Ze G d d             Z G d d      Zy)u  
core/coherence/coherence_orchestrator.py

CoherenceOrchestrator — 8-Step Coherence Execution Flow Coordinator.

Runs the complete coherence pipeline for a multi-agent swarm round:

    Step 1  MAP     — Push task DAG to Redis Stream via TaskDAGPusher.
    Step 2  CLAIM   — Workers auto-claim tasks via XREADGROUP (background).
    Step 3  PROPOSE — Workers run and submit StateDelta to StagingArea (background).
    Step 4  BARRIER — Wait for all worker deltas via StagingArea.wait_for_all.
    Step 5  REDUCE  — OCCCommitEngine collects deltas + SemanticMergeInterceptor merges.
    Step 6  COMMIT  — OCCCommitEngine issues OCC conditional write to RedisMasterState.
    Step 7  RELEASE — Execute merged side-effects (log to events.jsonl).
    Step 8  SCAR    — On any worker failure, write scar event to events.jsonl.

Steps 2 and 3 are implicit — workers are background consumers that run
independently once the DAG is pushed to the stream. The orchestrator's
role is to coordinate steps 1, 4-8.

Total orchestration timeout: 120 seconds (asyncio.wait_for on execute()).

Dependency injection:
    All collaborators are injected via the constructor. None are required
    at import time — pass mocks in tests.

Usage::

    orchestrator = CoherenceOrchestrator(
        dag_pusher=pusher,
        staging_area=staging,
        occ_engine=engine,
        bulkhead=guard,
    )
    result = await orchestrator.execute("sess-abc", tasks=[
        {"task_type": "research", "payload": {"query": "..."}},
        {"task_type": "synthesize", "payload": {}},
    ])
    assert result.success

# VERIFICATION_STAMP
# Story: 6.08
# Verified By: parallel-builder
# Verified At: 2026-02-25
# Tests: 22/22
# Coverage: 100%
    )annotationsN)	dataclass)datetimetimezone)Path)AnyOptional)uuid4z5/mnt/e/genesis-system/data/observability/events.jsonlx   c                  D    e Zd ZU dZded<   ded<   ded<   ded	<   ded
<   y)CoherenceResultaV  
    Result of a CoherenceOrchestrator.execute() call.

    Attributes:
        success:          True when the full pipeline completed with a
                          committed OCC write.
        committed_state:  The merged patch list from the OCC commit engine.
                          Empty dict when success=False.
        saga_id:          UUID4 identifier for this orchestration run.
        workers_succeeded: Number of worker tasks (from BulkheadResult)
                          that completed successfully.
        workers_failed:   Number of worker tasks that raised exceptions.
    boolsuccessdictcommitted_statestrsaga_idintworkers_succeededworkers_failedN)__name__
__module____qualname____doc____annotations__     >/mnt/e/genesis-system/core/coherence/coherence_orchestrator.pyr   r   Q   s%     MLr   r   c                      e Zd ZdZ	 	 	 	 	 d	 	 	 	 	 	 	 	 	 	 	 d	dZ	 	 	 	 	 	 d
dZ	 	 	 	 	 	 	 	 ddZ	 	 	 	 	 	 	 	 ddZddZy)CoherenceOrchestratoruL  
    Coordinates the complete 8-step coherence execution pipeline.

    Constructor args:
        dag_pusher:   TaskDAGPusher — pushes task DAGs to Redis Stream.
                      Must expose: async push_dag(session_id, tasks) -> list[str]
        staging_area: StagingArea — collects worker deltas in Redis Hash.
                      Must expose: async wait_for_all(session_id, expected_count, timeout_ms)
        occ_engine:   OCCCommitEngine — barrier sync + OCC write coordinator.
                      Must expose: async execute_commit(session_id, expected_workers) -> OccCommitResult
        bulkhead:     BulkheadGuard — asyncio.gather exception isolation.
                      Must expose: async run_with_bulkhead(tasks) -> list[BulkheadResult]
        qdrant_client: Optional Qdrant client (reserved for future use).
    Nc                J    || _         || _        || _        || _        || _        y )N)
dag_pusherstaging_area
occ_enginebulkheadqdrant_client)selfr"   r#   r$   r%   r&   s         r   __init__zCoherenceOrchestrator.__init__}   s)     %($ *r   c           	       K   t        t                     }t        j                  d||t	        |             	 t        j                  | j                  |||      t               d{   }|S 7 # t
        j                  $ r8 t        j                  d|t               | j                  d||dt        d        w xY ww)a  
        Run the complete 8-step coherence pipeline with a 120-second timeout.

        Wraps _execute_pipeline() in asyncio.wait_for so the caller always
        gets a result within ORCHESTRATION_TIMEOUT_SECONDS.

        Args:
            session_id: Session identifier shared by all workers.
            tasks:      List of task dicts, each describing one unit of work.
                        Each task may include keys: task_type, payload, tier, priority.

        Returns:
            CoherenceResult describing the final pipeline outcome.

        Raises:
            asyncio.TimeoutError: If the pipeline exceeds 120 seconds.
                                  The caller should treat this as a failure.
        zACoherenceOrchestrator: starting saga %s for session %s (%d tasks))timeoutNz2CoherenceOrchestrator: saga %s TIMED OUT after %dsscarorchestration_timeout)r   
session_idreasontimeout_seconds)r   r
   loggerinfolenasynciowait_for_execute_pipelineORCHESTRATION_TIMEOUT_SECONDSTimeoutErrorerror_write_event)r'   r-   tasksr   results        r   executezCoherenceOrchestrator.execute   s     . eg,OJ		
	"++&&z5'B5 F, - ## 	LLD- &",5'D	 !	s/   5B>/A0 'A.(A0 ,B>.A0 0AB;;B>c           
       K   t         j                  d|       | j                  E| j                  j                  ||       d{   }t         j                  dt	        |      |       nt         j                  d       t         j                  d|       t         j                  d|       d}d}| j
                  t        |      D cg c]-  \  }}|j                  dd	|       | j                  |||      f/ }	}}| j
                  j                  |	       d{   }
|	D ]"  \  }}t        |d
      s|j                          $ t        d |
D              }t        d |
D              }t         j                  d|||       nt	        |      }d}t         j                  d|       | j                  P| j                  j                  |t	        |      d       d{   }t         j                  d|t	        |             nt         j                  d       t         j                  d|       d}i }| j                  | j                  j!                  |t	        |             d{   }|j"                  r0d|j$                  i}t         j'                  d||j(                         n7t         j+                  d||j,                         nt         j                  d       t         j                  d|       ||j"                  nd}|r| j/                  d|||||d       t         j                  d|       |dkD  s|sN||||d}||j"                  s|j,                  |d<   | j/                  d |       t         j+                  d!|||       |xr |dk(  }t1        |||||"      }t         j'                  d#||||       |S 7 {c c}}w 7 7 7 fw)$z
        Internal implementation of the 8-step pipeline.

        Each step is annotated with its name so the execution order is
        visible at a glance in logs and in code review.
        z+CoherenceOrchestrator [Step 1: MAP] saga=%sNz1CoherenceOrchestrator [MAP] pushed %d entries: %szACoherenceOrchestrator [MAP] dag_pusher=None, skipping stream pushuN   CoherenceOrchestrator [Step 2: CLAIM] saga=%s (implicit — workers listening)uP   CoherenceOrchestrator [Step 3: PROPOSE] saga=%s (implicit — workers proposing)r   	task_typeztask-closec              3  :   K   | ]  }|j                   sd   yw   Nr   .0rs     r   	<genexpr>z:CoherenceOrchestrator._execute_pipeline.<locals>.<genexpr>  s     #M!199A#M   c              3  :   K   | ]  }|j                   rd   ywrA   rC   rD   s     r   rG   z:CoherenceOrchestrator._execute_pipeline.<locals>.<genexpr>  s      NqAII NrH   zACoherenceOrchestrator [PROPOSE] saga=%s workers: %d ok, %d failedz/CoherenceOrchestrator [Step 4: BARRIER] saga=%si`  )expected_count
timeout_msz=CoherenceOrchestrator [BARRIER] saga=%s collected %d delta(s)zCCoherenceOrchestrator [BARRIER] staging_area=None, skipping barrierz?CoherenceOrchestrator [Step 5: REDUCE / Step 6: COMMIT] saga=%s)expected_workersmerged_patchz>CoherenceOrchestrator [COMMIT] saga=%s committed at version %dz5CoherenceOrchestrator [COMMIT] saga=%s OCC failed: %szCCoherenceOrchestrator [COMMIT] occ_engine=None, skipping OCC commitz/CoherenceOrchestrator [Step 7: RELEASE] saga=%sTrelease)r   r-   r   r   r   z,CoherenceOrchestrator [Step 8: SCAR] saga=%s)r   r-   r   r   occ_failure_reasonr+   zLCoherenceOrchestrator [SCAR] saga=%s: %d worker(s) failed, commit_success=%s)r   r   r   r   r   u_   CoherenceOrchestrator: saga %s COMPLETE — success=%s, workers_succeeded=%d, workers_failed=%d)r0   debugr"   push_dagr2   r%   	enumerateget_run_workerrun_with_bulkheadhasattrr?   sumr#   wait_for_allr$   execute_commitr   rM   r1   versionwarningsaga_statusr9   r   )r'   r-   r:   r   	entry_idsr   r   itaskworker_corosbulkhead_results_corodeltas
occ_resultr   commit_successscar_payloadoverall_successr;   s                       r   r5   z'CoherenceOrchestrator._execute_pipeline   s      	BGL??&"oo66z5IIILLCI LLS 	\	
 	^	
 ==$  )/
 At HH[E!+6$$Zq9L  &*]]%D%D\%RR ( !44)JJL! !$#M/?#M M  N,< NNNLLS!	 !$E
N
 	FP(,,99"5z! :  F
 LLOF LLU 	Mw	
 
 "??&#==!$U  >   J !! $2:3J3J"KT&& K** LLU 	FP/9/E++4&",):&4'6	 	CWMA^"("0%6	"L %j.@.@5?5K5K12fl3NN$ )Bn.A #+/)
 	6	
 { JV  S28s^   AON?BO	2O;!OOO6B(OOB	O(O)EOOOOc                8   K   |||j                  dd      ddS w)a  
        Default worker coroutine.

        In production this would invoke the real Gemini swarm worker logic.
        In tests, the BulkheadGuard's task list is replaced with mock coroutines.

        Args:
            session_id:  The orchestration session identifier.
            task:        The task dict from the DAG.
            task_index:  Zero-based index of this task in the DAG.

        Returns:
            A dict representing the worker's output (placeholder for now).
        r>   unknown	completed)r-   
task_indexr>   status)rS   )r'   r-   r_   rl   s       r   rT   z!CoherenceOrchestrator._run_worker  s)     * %$+y9!	
 	
s   c                   	 t         j                  j                  dd       |t        j                  t
        j                        j                         d|}t         j                  dd      5 }|j                  t        j                  |      dz          d	d	d	       t        j                  d
|t                y	# 1 sw Y   %xY w# t        $ r!}t        j                  d||       Y d	}~y	d	}~ww xY w)u  
        Append a JSON-lines event to the observability log.

        The log file is created (including parent directories) on first write.
        Failures are logged but never propagated — event logging must never
        crash the orchestration pipeline.

        Args:
            event_type: Short label (e.g. "release", "scar").
            payload:    Arbitrary dict of event metadata.
        T)parentsexist_ok)tz)
event_type	timestampazutf-8)encoding
Nz+CoherenceOrchestrator: wrote %s event to %sz3CoherenceOrchestrator: failed to write %s event: %s)EVENTS_LOG_PATHparentmkdirr   nowr   utc	isoformatopenwritejsondumpsr0   rP   	Exceptionr8   )r'   rr   payloadentryfhexcs         r   r9   z"CoherenceOrchestrator._write_event  s    	""(((E(%\\X\\:DDF E
 !%%cG%< 3E*T123LL=3 3  	LLE 	s0   A/C	 1(B=#C	 =CC	 		C3C..C3)NNNNN)r"   Optional[Any]r#   r   r$   r   r%   r   r&   r   returnNone)r-   r   r:   
list[dict]r   r   )r-   r   r:   r   r   r   r   r   )r-   r   r_   r   rl   r   r   r   )rr   r   r   r   r   r   )	r   r   r   r   r(   r<   r5   rT   r9   r   r   r   r    r    m   s    " %)&*$("&'++!+ $+ "	+
  + %+ 
+$66 6 
	6xOO O 	O
 
Oj

 
 	

 

>r   r    )r   
__future__r   r3   r   loggingosdataclassesr   r   r   pathlibr   typingr   r	   uuidr
   	getLoggerr   r0   rw   r6   r   r    r   r   r   <module>r      sw   .` #    	 ! '    			8	$ NO !$    6q qr   