
    +i<                        d Z ddlmZ ddlmZ ddlmZ ddlmZ ddlm	Z	  G d de      Z
e G d	 d
             ZddlZddlZddlmZmZ  ej                  e      ZdZdZ G d d      Z G d d      Zy)u  
core/bridge/openclaw_bridge.py

OpenClaw<>Genesis Bridge — Inbound Message Schema (Story 8.01)
AIVA RLM Nexus PRD v2, Module 8

Defines the standard message format for bidirectional communication between
AIVA (running on Mac Mini via OpenClaw gateway) and the Genesis swarm running
on the E: drive WSL2 environment.

All messages conform to ``OpenClawMessage`` regardless of direction.
Priority routing and expiry enforcement are the responsibility of the routing
layer (BridgeWriter / BridgeReader — Stories 8.02 / 8.03), not the schema.

Usage::

    from core.bridge.openclaw_bridge import MessageDirection, OpenClawMessage
    from datetime import datetime, timezone
    import uuid

    msg = OpenClawMessage(
        message_id=str(uuid.uuid4()),
        session_id="session-abc123",
        direction=MessageDirection.AIVA_TO_GENESIS,
        payload={"intent": "task_request", "body": "Book George's Cairns call"},
        priority=2,
        created_at=datetime.now(timezone.utc),
    )
    )annotations)	dataclass)datetime)Enum)Optionalc                      e Zd ZdZdZdZy)MessageDirectiona  Direction of travel for an OpenClaw bridge message.

    Values
    ------
    AIVA_TO_GENESIS
        Message originates on the Mac Mini (AIVA / OpenClaw gateway) and
        travels to the Genesis WSL2 environment for swarm execution.
    GENESIS_TO_AIVA
        Message originates in the Genesis swarm and travels to AIVA on the
        Mac Mini for injection into her active session or cron jobs.
    aiva_to_genesisgenesis_to_aivaN)__name__
__module____qualname____doc__AIVA_TO_GENESISGENESIS_TO_AIVA     4/mnt/e/genesis-system/core/bridge/openclaw_bridge.pyr	   r	   ,   s    
 (O'Or   r	   c                  \    e Zd ZU dZded<   ded<   ded<   ded<   d	ed
<   ded<   dZded<   y)OpenClawMessageu*  Standard envelope for all OpenClaw<>Genesis bridge traffic.

    Fields
    ------
    message_id : str
        Unique identifier for this message (UUID v4 recommended).
    session_id : str
        Identifier for the AIVA or Genesis session that produced the message.
        Used for grouping related messages and injecting context.
    direction : MessageDirection
        Whether this message is AIVA→Genesis or Genesis→AIVA.
    payload : dict
        Arbitrary structured data.  Typical keys:

        * ``intent``   — classified intent signal from AIVA (AIVA_TO_GENESIS)
        * ``task_id``  — swarm task ID for result correlation
        * ``body``     — free-form text or structured result
        * ``injection`` — context to inject into AIVA's next session
    priority : int
        Routing priority for the message:

        * ``1`` — critical (SLA: route immediately, no batching)
        * ``2`` — high (SLA: route within 500 ms)
        * ``3`` — normal (SLA: route within 3 s)

        Validation of out-of-range values is intentionally left to the
        routing layer (BridgeWriter) so this schema remains a pure value
        object with no side effects.
    created_at : datetime
        UTC wall-clock time when the message was constructed.
        MUST be timezone-aware (``tzinfo`` set).
    expires_at : Optional[datetime]
        Optional absolute expiry time.  A ``None`` value means the message
        does not expire.  When set, BridgeReader must discard messages where
        ``expires_at < datetime.now(timezone.utc)``.
    str
message_id
session_idr	   	directiondictpayloadintpriorityr   
created_atNzOptional[datetime]
expires_at)r   r   r   r   __annotations__r    r   r   r   r   r   B   s5    #J OOMM%)J")r   r   N)r   timezonezbridge:queue:aiva_to_genesiszbridge:queue:genesis_to_aivac                      e Zd ZdZd ZddZy)BridgeWriteru  Writes OpenClawMessages to Redis bridge queues.

    Serialises each message to JSON and RPUSH-es it to the appropriate
    Redis list, based on the message's direction.

    Parameters
    ----------
    redis_client:
        Redis client exposing an ``rpush(key, value)`` method.
        Pass a :class:`unittest.mock.MagicMock` in tests — no live Redis
        connection is required.
    c                    || _         y Nredisselfredis_clients     r   __init__zBridgeWriter.__init__   	    !
r   c                  K   |j                   t        j                  t        j                        }|j                   }|j
                   |j                  t        j                        }||k  r0t        j                  d|j                  |j                                y|j                  j                  t        j                  j                  k(  rt        }nt         }|j                  |j"                  |j                  j                  |j$                  |j&                  |j(                  j                         |j                   |j                   j                         ndd}	 | j*                  j-                  |t/        j0                  |             t        j                  d|j                  |       y# t2        $ r$ t        j5                  d|j                  |       Y yw xY ww)	u  Serialise *message* to JSON and RPUSH to the appropriate bridge queue.

        Parameters
        ----------
        message:
            The :class:`OpenClawMessage` to enqueue.

        Returns
        -------
        bool
            ``True`` on success.
            ``False`` when:

            * the message has expired (``expires_at`` is in the past), or
            * the underlying Redis call raises an exception.

        Notes
        -----
        * Uses ``RPUSH`` (not ``LPUSH``) to preserve FIFO order.
        * Expired check normalises naive datetimes to UTC before comparison.
        * Serialises via ``json.dumps`` — never pickle.
        Ntzinfou5   BridgeWriter: message %s expired at %s — discardingFr   r   r   r   r   r   r    z%BridgeWriter: pushed message %s to %sTz-BridgeWriter: failed to push message %s to %s)r    r   nowr"   utcr0   replaceloggerdebugr   	isoformatr   valuer	   r   BRIDGE_QUEUE_AIVA_TO_GENESISBRIDGE_QUEUE_GENESIS_TO_AIVAr   r   r   r   r(   rpushjsondumps	Exception	exception)r*   messager2   expires	queue_keyr   s         r   sendzBridgeWriter.send   s    4 ),,x||,C((G~~%!///>}K&&%%'
  ""&6&F&F&L&LL4I4I ",,!,, **00((!,,668292D2D2P"",,.VZ

	JJY

7(;<LL7""
  	?""
 	s+   EG#"AF3 2G#3*G G#G  G#N)r@   r   returnbool)r   r   r   r   r,   rC   r   r   r   r$   r$      s    "Rr   r$   c                  ,    e Zd ZdZd Z	 d	 	 	 	 	 ddZy)BridgeReaderu  Reads OpenClawMessages from Redis bridge queues using blocking pop.

    Calls ``BLPOP`` with a configurable timeout so callers can process the
    next available message without spinning in a tight loop.

    Parameters
    ----------
    redis_client:
        Redis client exposing a ``blpop(keys, timeout)`` method that mirrors
        the redis-py ``blpop`` signature:

        * ``blpop(keys, timeout=0)`` — blocks up to *timeout* seconds.
        * Returns ``(key_bytes, value_bytes)`` on success.
        * Returns ``None`` on timeout (no message available).

        Pass a :class:`unittest.mock.MagicMock` in tests — no live Redis
        connection is required.
    c                    || _         y r&   r'   r)   s     r   r,   zBridgeReader.__init__  r-   r   c           	     >  K   |j                   t        j                  j                   k(  rt        }nt        }| j
                  j                  ||      }|y|\  }}	 t        j                  |      }	 t        |d         }	|d   }
t        j                  |
      }|j                   |j                  t         j"                        }d}|j%                  d      Dt        j                  |d         }|j                   |j                  t         j"                        }t'        |d	   |d
   |	|d   |d   ||      S # t        j                  t        f$ r}t        d|d|       |d}~ww xY w# t(        t        t        f$ r}t        d|d|       |d}~ww xY ww)u-  Pop the next message from the bridge queue for *direction*.

        Uses ``BLPOP`` (blocking left-pop) so the call parks the coroutine
        for up to *timeout_s* seconds rather than spinning.

        Parameters
        ----------
        direction : MessageDirection
            Which queue to read from.  Mapped as:

            * ``AIVA_TO_GENESIS`` → ``bridge:queue:aiva_to_genesis``
            * ``GENESIS_TO_AIVA`` → ``bridge:queue:genesis_to_aiva``
        timeout_s : int, optional
            Maximum seconds to block waiting for a message.  Defaults to 1.
            Pass ``0`` for an immediate non-blocking pop.

        Returns
        -------
        OpenClawMessage
            The next message if one was available within *timeout_s*.
        None
            If the queue was empty and the timeout elapsed (no exception).

        Raises
        ------
        ValueError
            If the raw bytes retrieved from Redis cannot be decoded as JSON
            or are missing required ``OpenClawMessage`` fields.

        Notes
        -----
        * Uses ``BLPOP``, never ``LPOP`` in a loop — a single blocking call.
        * ``timeout_s`` is passed through to Redis verbatim (not hardcoded).
        * Direction enum is resolved via ``.value`` to stay robust against
          module-reload edge cases in tests.
        * Datetime fields are parsed from ISO 8601 strings back to
          :class:`~datetime.datetime` objects with UTC timezone.
        * Direction string is parsed back to the :class:`MessageDirection`
          enum member.
        )timeoutNz(BridgeReader: malformed JSON from queue z: r   r   r/   r    r   r   r   r   r1   z1BridgeReader: failed to deserialise message from )r8   r	   r   r9   r:   r(   blpopr<   loadsJSONDecodeError	TypeError
ValueErrorr   fromisoformatr0   r4   r"   r3   getr   KeyError)r*   r   	timeout_srB   result_keyrawdataexcmsg_directioncreated_at_rawr   r    s                r   pollzBridgeReader.poll  s    ` ??.>>DDD4I4I
 !!)Y!?>
 	c
	::c?D	,T+->?M!,/N!//?J  ('//x||/D
-1Jxx%1%33D4FG
$$,!+!3!38<<!3!HJ"--'Yj)%% 1 $$i0 	:9-r#O	B *i0 	CI=PRSVRWX	sI   AFD< 3CE/ ;F<E,E''E,,F/FFFFN)   )r   z'MessageDirection'rS   r   rD   z'Optional[OpenClawMessage]')r   r   r   r   r,   r[   r   r   r   rG   rG      s6    &" k%k k 
%	kr   rG   )r   
__future__r   dataclassesr   r   enumr   typingr   r	   r   r<   loggingr"   	getLoggerr   r5   r9   r:   r$   rG   r   r   r   <module>rc      s   < # !   (t (, ,* ,* ,*z   '			8	$= = c cjB Br   