
    5iW)                        d Z ddlmZ ddlZddlZddlmZ ddlmZ ddlm	Z	  ej                  e      ZdZe G d d	             Z G d
 d      Zy)u  EventSourcingStream — Immutable event append and replay for Genesis.

Provides an event sourcing layer on top of the ColdLedger (L4 Postgres).
Events are written append-only to ColdLedger and optionally published to a
Redis Stream (``genesis:events``) for real-time consumption by subscribers.

Usage::

    from core.storage.event_sourcing import EventSourcingStream, GenesisEvent
    import uuid
    from datetime import datetime

    stream = EventSourcingStream(cold_ledger=ledger, redis_client=redis)

    event = GenesisEvent(
        id=str(uuid.uuid4()),
        session_id="session-uuid",
        event_type="dispatch_start",
        payload={"agent": "forge"},
        version=1,
        created_at=datetime.utcnow(),
    )
    stream.append(event)

    all_events = stream.replay("session-uuid")
    state      = stream.get_current_state("session-uuid")

Design notes:
  - Version ordering is the authoritative ordering mechanism (not created_at).
    Callers are responsible for assigning monotonically increasing version numbers.
  - Redis XADD is best-effort: a Redis failure logs an error but does NOT raise
    so ColdLedger (the source of truth) is never rolled back.
  - get_current_state folds events sequentially in version order — each event's
    payload is shallow-merged into the state dict (later versions win on key collision).
  - NO SQLite anywhere in this file (Genesis Rule 7).
    )annotationsN)	dataclass)datetime)Optionalzgenesis:eventsc                  N    e Zd ZU dZded<   ded<   ded<   ded<   ded	<   d
ed<   y)GenesisEventu  Typed representation of one event in the immutable event log.

    Fields
    ------
    id : str
        UUID4 string — unique identifier for this event record.
    session_id : str
        UUID string of the owning Genesis session.
    event_type : str
        Short label describing the event, e.g. ``"dispatch_start"``.
    payload : dict
        Arbitrary JSON-serialisable dict containing event data.
    version : int
        Monotonically increasing integer assigned by the caller.
        Determines replay order — lower versions are applied first.
    created_at : datetime
        Wall-clock timestamp when the event was created (UTC).
    strid
session_id
event_typedictpayloadintversionr   
created_atN)__name__
__module____qualname____doc____annotations__     4/mnt/e/genesis-system/core/storage/event_sourcing.pyr   r   9   s(    & 	GOOMLr   r   c                  `    e Zd ZdZd
ddZddZddZ	 	 	 	 	 	 ddZddZddZ	e
dd	       Zy)EventSourcingStreama  Append-only event stream backed by ColdLedger (L4) with optional Redis publish.

    Args:
        cold_ledger:  A ``ColdLedger`` instance exposing ``write_event()`` and
                      ``get_events()``.
        redis_client: Optional Redis client.  When provided, each appended event
                      is also published to the ``genesis:events`` Redis Stream via
                      ``XADD``.  Redis failures are logged but never raised.
    Nc                     || _         || _        y N)ledgerredis)selfcold_ledgerredis_clients      r   __init__zEventSourcingStream.__init__f   s    !!
r   c                "   |j                   |j                  |j                  j                         d|j                  }| j
                  j                  |j                  |j                  |       | j                  | j                  ||       yy)uf  Write an event to ColdLedger and publish to Redis Stream.

        The ColdLedger write is mandatory and will raise on failure.
        The Redis XADD is best-effort — any exception is caught, logged,
        and does NOT prevent the ColdLedger record from persisting.

        Args:
            event: A fully populated ``GenesisEvent`` instance.
        event_idr   r   )r   r   r   N)r
   r   r   	isoformatr   r   write_eventr   r   r   _publish_to_redis)r    eventenriched_payloads      r   appendzEventSourcingStream.appendn   s     }}**446
 mm	
 	''''$ 	  	
 ::!""5*:; "r   c                    | j                   j                  |      }|D cg c]  }| j                  |       }}|j                  d        |S c c}w )at  Return all events for a session, ordered by version ascending.

        Reads from ColdLedger (L4).  Events are sorted by their embedded
        ``version`` field extracted from the stored payload.

        Args:
            session_id: UUID string of the session to replay.

        Returns:
            Ordered list of ``GenesisEvent`` instances (may be empty).
        c                    | j                   S r   )r   )es    r   <lambda>z,EventSourcingStream.replay.<locals>.<lambda>   s
    !)) r   )key)r   
get_events_row_to_eventsort)r    r   rowsroweventss        r   replayzEventSourcingStream.replay   sO     {{%%j159:c$$$S)::+, ;s   Ac                n    | j                  |      }|D cg c]  }|j                  |k\  s| c}S c c}w )u%  Return events with version >= from_version, ordered by version ascending.

        Useful for incremental state reconstruction — e.g. when a snapshot
        exists at version N and only events after N need to be replayed.

        Args:
            session_id:   UUID string of the session to replay.
            from_version: Lower bound (inclusive) — events with version
                          strictly less than this are excluded.

        Returns:
            Filtered, ordered list of ``GenesisEvent`` instances (may be empty).
        )r8   r   )r    r   from_version
all_eventsr/   s        r   replay_from_versionz'EventSourcingStream.replay_from_version   s1      [[,
%Cal)BCCCs   22c                    | j                  |      }i }|D ]D  }|j                  j                         D ci c]  \  }}|dvr|| }}}|j                  |       F |S c c}}w )u  Fold all events into a single state dict.

        Events are applied sequentially in version order.  Each event's
        ``payload`` (excluding the internal bookkeeping keys ``event_id``,
        ``version``, ``created_at``) is shallow-merged into the accumulating
        state dict — later versions override earlier ones on key collision.

        Args:
            session_id: UUID string of the session whose state to compute.

        Returns:
            Merged state dict.  Returns ``{}`` for an unknown or empty session.
        r%   )r8   r   itemsupdate)r    r   r7   stater*   kvuser_payloads           r   get_current_statez%EventSourcingStream.get_current_state   s}     Z( 	'E "MM//1AqAA 1L 
 LL&	' s   Ac           
     t   	 | j                   j                  t        |j                  |j                  |j
                  t        |j                        t        j                  |      d       y# t        $ rA}t        j                  d|j                  |j                  |j                  |       Y d}~yd}~ww xY w)zIPublish event to Redis Stream via XADD.  Failures are logged, not raised.)r&   r   r   r   r   ur   EventSourcingStream: Redis XADD failed for event %s (session=%s, version=%d) — event is in ColdLedger. Error: %sN)r   xaddREDIS_STREAM_KEYr
   r   r   r	   r   jsondumps	Exceptionloggererror)r    r*   r+   excs       r   r)   z%EventSourcingStream._publish_to_redis   s    	JJOO  %"'"2"2"'"2"2"5==1#zz*:;	  	LLQ   	s   A*A- -	B767B22B7c                6   | j                  di       }t        |t              rt        j                  |      }t        |j                  dd            }|j                  d      xs | j                  d      }t        |t              rt        j                  |      }n't        |t              r|}nt        j                         }|j                  d      xs t        | j                  dd            }t        |t        | d         t        | d	         |||
      S )a  Convert a raw ColdLedger row dict into a ``GenesisEvent``.

        ColdLedger rows have keys: ``id``, ``session_id``, ``event_type``,
        ``payload``, ``created_at``.  The ``payload`` dict contains the
        enriched data we stored in ``append()``, including the original
        ``version`` and ``event_id``.

        Handles the case where psycopg2 has already parsed the JSONB column
        into a Python dict (normal production path) as well as the string
        fallback (some test/mock paths).
        r   r   r   r   r&   r
    r   r   )r
   r   r   r   r   r   )
get
isinstancer	   rH   loadsr   r   fromisoformatutcnowr   )r6   r   r   created_at_rawr   r&   s         r   r3   z!EventSourcingStream._row_to_event   s     '')R(gs#jj)G gkk)Q/0 !\2Kcggl6Knc*!//?J1'J!*J ;;z*Dc#''$2C.D3|,-3|,-!
 	
r   r   )returnNone)r*   r   rV   rW   )r   r	   rV   list[GenesisEvent])r   r	   r:   r   rV   rX   )r   r	   rV   r   )r*   r   r+   r   rV   rW   )r6   r   rV   r   )r   r   r   r   r#   r,   r8   r<   rD   r)   staticmethodr3   r   r   r   r   r   [   sU    "<>"DD-0D	D&<. (
 (
r   r   )r   
__future__r   rH   loggingdataclassesr   r   typingr   	getLoggerr   rK   rG   r   r   r   r   r   <module>r_      s_   #J #   !  			8	$ $    Bt
 t
r   