
    `ik*                        d Z ddlmZ ddlZddlmZmZ ddlmZm	Z	m
Z
  ej                  e      ZdZe G d d             Z G d	 d
      Zy)u,  
core/coherence/occ_commit.py

OCCCommitEngine — Barrier Sync + OCC Write.

Orchestrates the full commit cycle for a multi-agent swarm round:

    1. BARRIER: wait for all expected workers to submit their StateDelta
       proposals to the StagingArea (uses StagingArea.wait_for_all).
    2. MERGE: pass collected deltas through the SemanticMergeInterceptor to
       resolve contradictions and produce a single merged patch.
    3. OCC WRITE: commit the merged patch to RedisMasterState with OCC
       retry logic (max 3 attempts on version conflict).
    4. SAGA: record outcome via saga_writer if provided.
    5. LEDGER: emit a merge event to ColdLedger on success if provided.

OCC retry loop (MAX_RETRIES = 3):
    On each attempt, the engine re-reads the current state snapshot
    (version + data) from RedisMasterState and re-tries commit_patch.
    If all retries are exhausted the commit returns
    OccCommitResult(success=False, saga_status="conflict_exhausted").

Dependency injection:
    All collaborators are injected via the constructor.  None of them are
    required at import time — pass mocks in tests.

Usage::

    engine = OCCCommitEngine(
        staging_area=staging,
        merge_interceptor=merger,
        master_state=rms,
        saga_writer=saga,   # optional
        cold_ledger=ledger, # optional
    )
    result = await engine.execute_commit(session_id="sess-abc", expected_workers=3)
    assert result.success

# VERIFICATION_STAMP
# Story: 6.06
# Verified By: parallel-builder
# Verified At: 2026-02-25
# Tests: 14/14
# Coverage: 100%
    )annotationsN)	dataclassfield)AnyListOptional   c                  b    e Zd ZU dZded<    ee      Zded<   dZded	<   dZ	ded
<   dZ
ded<   y)OccCommitResultu  
    Result of an OCCCommitEngine.execute_commit() call.

    Attributes:
        success:      True when the patch was committed successfully.
        merged_patch: The merged patch list produced by the merge interceptor.
                      Empty list when success=False.
        version:      The new version number after a successful commit
                      (i.e. old_version + 1).  The last-known version on
                      failure.
        retries:      Number of OCC retry attempts made (0-based: 0 means the
                      first attempt succeeded or failed without a conflict retry).
        saga_status:  Saga outcome string:
                        "completed"          — committed successfully.
                        "merge_failed"       — merge interceptor returned failure.
                        "conflict_exhausted" — all OCC retries failed.
                        "unknown"            — initial/unset value (should not
                                               appear in a finished result).
    boolsuccess)default_factoryz	List[Any]merged_patchr   intversionretriesunknownstrsaga_statusN)__name__
__module____qualname____doc____annotations__r   listr   r   r   r        2/mnt/e/genesis-system/core/coherence/occ_commit.pyr   r   C   s;    ( M#D9L)9GSGS K r   r   c                  X    e Zd ZdZ	 	 d	 	 	 	 	 	 	 	 	 	 	 ddZ	 	 	 	 	 	 d	dZd
dZddZy)OCCCommitEngineu  
    Barrier-sync + OCC commit coordinator for multi-agent swarm rounds.

    Constructor args:
        staging_area:      StagingArea — collects worker deltas in Redis.
        merge_interceptor: Object with an async merge(deltas, current_state, version)
                           method that returns an object with .success and
                           .merged_patch attributes.
        master_state:      RedisMasterState — versioned OCC state store.
                           Must expose:
                             - async get_snapshot(session_id) -> (version, data)
                             - async commit_patch(session_id, version, patch)
                               -> CommitResult(success, new_version, conflict)
        saga_writer:       Optional object with async close_saga(session_id, status).
        cold_ledger:       Optional object with sync write_event(session_id, event, data).
    Nc                J    || _         || _        || _        || _        || _        y )N)stagingmergermastersaga_writerledger)selfstaging_areamerge_interceptormaster_stater%   cold_ledgers         r   __init__zOCCCommitEngine.__init__w   s(     $'"&!r   c                \  K   t         j                  d||       | j                  j                  ||d       d{   }t         j                  dt	        |      |       d}t        t              D ]\  }| j                  j                  |       d{   \  }}| j                  j                  |||       d{   }|j                  sCt         j                  d||       | j                  |d       d{    t        d	g ||d
      c S | j                  j                  |||j                          d{   }|j                  re|dz   }	t         j#                  d||	|       | j                  |d       d{    | j%                  |d       t        d|j                   |	|d
      c S t         j                  d||dz   t               _ t         j'                  dt        |       | j                  |d       d{    t        d	g |t        d
      S 7 7 7 e7 +7 7 7 (w)a  
        Run the full barrier-sync + OCC commit cycle.

        Steps:
            1. Wait for all expected_workers to submit deltas (barrier).
            2. Merge all deltas via merge_interceptor.
            3. Attempt OCC commit with up to MAX_RETRIES retries on conflict.
            4. Record outcome to saga_writer (if provided).
            5. Emit merge event to cold_ledger on success (if provided).

        Args:
            session_id:       Session identifier shared by all workers.
            expected_workers: How many worker delta submissions to wait for.

        Returns:
            OccCommitResult describing the outcome of the commit cycle.
        z5OCCCommitEngine: waiting for %d workers on session %si`  )
timeout_msNz5OCCCommitEngine: collected %d delta(s) for session %sr   z8OCCCommitEngine: merge failed on session %s (attempt %d)merge_failedF)r   r   r   r   r      z@OCCCommitEngine: committed session %s at version %d (attempt %d)	completedsaga_committedTz:OCCCommitEngine: OCC conflict on session %s, attempt %d/%dz<OCCCommitEngine: all %d OCC retries exhausted for session %sconflict_exhausted)loggerdebugr"   wait_for_alllenrangeMAX_RETRIESr$   get_snapshotr#   merger   warning_close_sagar   commit_patchr   info_emit_ledger_eventerror)
r'   
session_idexpected_workersdeltasr   attemptcurrent_statemerge_resultcommit_resultnew_versions
             r   execute_commitzOCCCommitEngine.execute_commit   sC    2 	C	

 ||00 1 
 

 	CK	
 [) 5	G+/;;+C+CJ+O%O"G] "&!2!26='!RRL''N
 &&z>BBB&!!### .  #'++":":G\%>%># M $$%k# &&z;???''
4DE& !-!:!:'# +  NNL!	a5	t 	J	

 z+?@@@,
 	
a
$ &P S C @4 	As   9H,HAH,H'H,;H <<H,8H#9A H,9H&:AH,<H(=BH,H*H,H, H,#H,&H,(H,*H,c                   K   | j                   &	 | j                   j                  ||       d{    yy7 # t        $ r }t        j	                  d|       Y d}~yd}~ww xY ww)z6Call saga_writer.close_saga if a writer is configured.Nz2OCCCommitEngine: saga_writer.close_saga raised: %s)r%   
close_saga	Exceptionr4   r<   )r'   rB   statusexcs       r   r=   zOCCCommitEngine._close_saga  s`     '&&11*fEEE (E H# s7   A$8 68 A$8 	A!AA$A!!A$c                    | j                   	 | j                   j                  ||i        yy# t        $ r }t        j	                  d|       Y d}~yd}~ww xY w)z7Call cold_ledger.write_event if a ledger is configured.Nz3OCCCommitEngine: cold_ledger.write_event raised: %s)r&   write_eventrM   r4   r<   )r'   rB   eventrO   s       r   r@   z"OCCCommitEngine._emit_ledger_event  sT    ;;"''
E2> #  I3 s   - 	AAA)NN)r(   r   r)   r   r*   r   r%   Optional[Any]r+   rS   returnNone)rB   r   rC   r   rT   r   )rB   r   rN   r   rT   rU   )rB   r   rR   r   rT   rU   )r   r   r   r   r,   rJ   r=   r@   r   r   r   r    r    e   s~    , &*%)"" " 	"
 #" #" 
"$t
t
 t
 
	t
tr   r    )r   
__future__r   loggingdataclassesr   r   typingr   r   r   	getLoggerr   r4   r9   r   r    r   r   r   <module>r[      s[   ,\ #  ( & &			8	$  ! ! !Bp pr   