
    ]i'                        d Z ddlmZ ddlZddlmZ ddlmZmZm	Z	m
Z
mZ ddlmZ e G d d             Z G d	 d
      Zy)uy  
core/coherence/redis_master_state.py

RedisMasterState — Versioned OCC (Optimistic Concurrency Control) state store.

Uses Redis WATCH + MULTI/EXEC (pipeline transaction) to ensure that concurrent
agents cannot corrupt shared state. The OCC pattern:

    1. WATCH the key (register a "dirty" detector on the key)
    2. GET current state, verify the caller's expected version matches
    3. MULTI (begin transaction)
    4. SET new state (version+1, patched data)
    5. EXEC (commit) — Redis returns None if the watched key was modified between
       WATCH and EXEC, triggering a WatchError in redis-py

If two agents concurrently read version 5 and both try to write version 6, only
the first EXEC succeeds; the second raises WatchError → CommitResult(conflict=True).

State structure stored in Redis:
    {"version": <int>, "data": <dict>}

Simple patch application (not full RFC 6902 — StateDelta handles that):
    {"op": "replace", "path": "/key", "value": v}  → data[key] = v
    {"op": "add",     "path": "/key", "value": v}  → data[key] = v
    {"op": "remove",  "path": "/key"}               → del data[key]

The "path" uses the key name without a leading "/" — i.e. "replace", path "/name"
strips the leading slash and uses "name" as the dict key.

# VERIFICATION_STAMP
# Story: 6.02
# Verified By: parallel-builder
# Verified At: 2026-02-25
# Tests: 9/9
# Coverage: 100%
    )annotationsN)	dataclass)AnyDictListOptionalTuple)
WatchErrorc                  0    e Zd ZU dZded<   ded<   ded<   y)CommitResulta  Result of a commit_patch or initialize_state operation.

    Attributes:
        success:     True if the write committed successfully.
        new_version: The version number after a successful commit (version+1).
                     0 when success=False.
        conflict:    True when a concurrent writer modified the key between our
                     WATCH and EXEC, causing the transaction to abort.
    boolsuccessintnew_versionconflictN)__name__
__module____qualname____doc____annotations__     :/mnt/e/genesis-system/core/coherence/redis_master_state.pyr   r   4   s     MNr   r   c                      e Zd ZdZdZddZddZedd       Zedd       Z	edd       Z
ddZ	 	 	 	 	 	 	 	 dd	Z	 	 	 	 	 	 dd
Zy)RedisMasterStatea  
    Versioned, OCC-protected Redis state store for multi-agent coherence.

    Each session's state is stored at:
        genesis:state:master:<session_id>

    Value format (JSON):
        {"version": <int>, "data": <dict>}

    Usage::

        rms = RedisMasterState(redis_client)
        version, data = await rms.get_snapshot("sess-abc")
        result = await rms.commit_patch("sess-abc", version, patch_ops)
        if result.conflict:
            # Retry: re-read snapshot and rebuild patch
            ...
    zgenesis:state:master:c                    || _         y)zm
        Args:
            redis_client: An async redis.asyncio.Redis instance (or compatible mock).
        N)redis)selfredis_clients     r   __init__zRedisMasterState.__init__`   s    
 "
r   c                "    | j                    | S )z(Build the Redis key for a given session.)
KEY_PREFIX)r   
session_ids     r   _keyzRedisMasterState._keyk   s    //":,//r   c                   ddl }|j                  |       }|D ]g  }|j                  d      }|j                  dd      }|j                  d      }|dv r|j                  d      ||<   P|d	k(  sV|j	                  |d       i |S )
u  
        Apply a simplified patch list to a data dict.

        Operations supported:
            "replace" — set data[key] = value (key must already exist is NOT
                        enforced here; simple replace also creates the key)
            "add"     — set data[key] = value
            "remove"  — delete data[key]

        The "path" field is an RFC 6902-style JSON Pointer (e.g. "/name").
        The leading "/" is stripped to derive the flat dict key.

        Returns a new dict — does NOT mutate the input.
        r   Noppath /)replaceaddvalueremove)copydeepcopygetlstrippop)datapatchr.   resultop_dictr&   r'   keys           r   _apply_patch_simplez$RedisMasterState._apply_patch_simpleo   s      	t$ 		&GT"B;;vr*D++c"C''%kk'2sx

3%		& r   c                6    t        j                  | |dd      S )zSerialize state to JSON string.versionr3   ),:)
separators)jsondumpsr:   s     r   _encodezRedisMasterState._encode   s     zzgt<TTr   c                @    t        j                  |       }|d   |d   fS )z+Deserialize JSON string to (version, data).r;   r3   )r?   loads)rawstates     r   _decodezRedisMasterState._decode   s%     

3i %-00r   c                   K   | j                  |      }| j                  j                  |       d{   }|sdi fS | j                  |      S 7 w)a  
        Return the current (version, data) for a session.

        For a fresh session that has never been initialized, returns (0, {}).

        Args:
            session_id: Unique identifier for the session.

        Returns:
            Tuple of (version: int, data: dict).
        Nr   )r$   r   r0   rF   )r   r#   r7   rD   s       r   get_snapshotzRedisMasterState.get_snapshot   sK      ii
#JJNN3''r7N||C   (s   0AAAc                0  K   | j                  |      }| j                  j                         4 d{   }	 |j                  |       d{    |j	                  |       d{   }|r| j                  |      \  }}ndi }}||k7  r7|j                          d{    t        ddd      cddd      d{    S | j                  ||      }	|dz   }
| j                  |
|	      }|j                          |j                  ||       |j                          d{    t        d|
d      cddd      d{    S 7 7 7 7 7 7 .7 # t        $ r# t        ddd      cY cddd      d{  7   S w xY w# 1 d{  7  sw Y   yxY ww)ud  
        Apply patch to state using Optimistic Concurrency Control.

        OCC flow:
            1. WATCH key — Redis monitors it for external changes.
            2. GET current state.
            3. Check that current version == expected version.
               If not → version mismatch → return conflict (not a WatchError,
               just a version check failure — still conflict=True).
            4. Apply patch to produce new data.
            5. MULTI (begin transaction).
            6. SET new state (version+1, new_data).
            7. EXEC — commits if key has not been modified since WATCH.
               WatchError from redis-py → conflict=True.

        Args:
            session_id: Unique identifier for the session.
            version:    The version the caller read from get_snapshot().
                        Must match the current version in Redis for the
                        commit to proceed.
            patch:      List of simplified patch operations (see _apply_patch_simple).

        Returns:
            CommitResult with success, new_version, conflict fields.
        Nr   FTr   r   r      )r$   r   pipelinewatchr0   rF   unwatchr   r8   rA   multisetexecuter
   )r   r#   r;   r4   r7   piperD   current_versioncurrent_datanew_datar   new_raws               r   commit_patchzRedisMasterState.commit_patch   s    > ii
#::&&( "	Q "	QD!Qjjo%% !HHSM)48LL4E1O\45r\O #g-,,.(('1tT!"	Q "	Q "	Q&  33L%H%k,,{H= 

g& lln$$#DkTYZ="	Q "	Q "	Q & * )"	Q8 %9"	Q@  Q#Eq4PPE"	Q "	Q "	Q@QA"	Q "	Q "	Qs   0FEFFEEE%E&7EE
E/F;E<FAEE E1F=E>FEE
EFEFE>)F*F6E97F=E>>FFF
FFc                   K   | j                  |      }| j                  d|      }| j                  j                  ||d       d{   }|rt	        ddd      S t	        ddd      S 7 "w)a  
        Set the initial state for a session (version=1).

        Uses Redis SET NX (set-if-not-exists) to ensure idempotency.
        If the key already exists, returns conflict=True.

        Args:
            session_id:   Unique identifier for the session.
            initial_data: The initial data dict to store.

        Returns:
            CommitResult(success=True, new_version=1, conflict=False) on success.
            CommitResult(success=False, new_version=0, conflict=True) if key exists.
        rK   T)nxNFrJ   r   )r$   rA   r   rP   r   )r   r#   initial_datar7   rV   r5   s         r   initialize_statez!RedisMasterState.initialize_state   si     & ii
#,,q,/zz~~c7t~<<!eLL  1tLL =s   AA,A*#A,N)r   r   returnNone)r#   strr\   r^   )r3   Dict[str, Any]r4   List[Dict[str, Any]]r\   r_   )r;   r   r3   r_   r\   r^   )rD   zbytes | strr\   Tuple[int, Dict[str, Any]])r#   r^   r\   ra   )r#   r^   r;   r   r4   r`   r\   r   )r#   r^   rZ   r_   r\   r   )r   r   r   r   r"   r    r$   staticmethodr8   rA   rF   rH   rW   r[   r   r   r   r   r   J   s    & )J"0  < U U 1 1!$CQCQ CQ $	CQ
 
CQJMM %M 
	Mr   r   )r   
__future__r   r?   dataclassesr   typingr   r   r   r   r	   redis.exceptionsr
   r   r   r   r   r   <module>rg      sF   #J #  ! 3 3 '   *EM EMr   