
    i9                     j   d Z ddlZddlZddlZddlZddlZej                  j                  d       ddlm	Z	 ddl
Z
ddlZ
ddlZddlZddlmZmZ ddlmZmZmZ ddlmZmZ ddlmZmZ dd	lmZ dd
lmZmZmZm Z m!Z!m"Z"m#Z#m$Z$m%Z%m&Z&m'Z'm(Z( ddl)m*Z* ddl+m,Z, ddl-Z-ddl.Z.ddl/Z/ ej`                  e1      Z2e2jg                  ejh                          G d de      Z5 G d de      Z6 G d de      Z7e G d d             Z8ee8ge%d   f   Z9ee8gdf   Z: G d d      Z;e G d d             Z< G d d      Z= G d d      Z> G d d      Z? G d d       Z@ G d! d"      ZAd#e$eBe eB   f   fd$ZCd,d%eDd&eEfd'ZFd( ZGe1d)k(  r6 ej                  ej                  d*+        ej                   eG              yy)-a  
AIVA Queen Event Bus System - Asynchronous Communication Infrastructure

This module provides a complete event-driven communication system for AIVA Queen,
enabling decoupled, asynchronous messaging between components with full persistence,
filtering, dead letter handling, and replay capabilities.

Components:
    - Event: Core event data structure with metadata
    - EventBus: Central event dispatcher with async support
    - EventSubscriber: Subscribe to specific event types
    - EventPublisher: Publish events to the bus
    - EventFilter: Filter events by multiple criteria
    - EventPersistence: Persist events for replay and audit
    - DeadLetterQueue: Handle failed event deliveries

Author: AIVA Queen Orchestrator
Version: 1.0.0
    Nz)/mnt/e/genesis-system/data/genesis-memory)PostgresConfig)ABCabstractmethod)	dataclassfieldasdict)datetime	timedelta)Enumauto)Path)AnyCallableDictListOptionalSetTupleUnion	AwaitableTypeVarGenericProtocol)defaultdict)wrapsc                   $    e Zd ZdZdZdZdZdZdZy)EventPriorityz+Event priority levels for processing order.r               N)	__name__
__module____qualname____doc__CRITICALHIGHNORMALLOW
BACKGROUND     K/mnt/e/genesis-system/AIVA/queen_outputs/communication/comm_02_event_bus.pyr   r   5   s    5HDF
CJr,   r   c                   r    e Zd ZdZ e       Z e       Z e       Z e       Z e       Z	 e       Z
 e       Zy)EventStatusz$Status of an event in its lifecycle.N)r"   r#   r$   r%   r   PENDING
PROCESSING	DELIVEREDFAILEDDEAD_LETTEREDREPLAYEDEXPIREDr+   r,   r-   r/   r/   >   s6    .fGJIVFFMvHfGr,   r/   c                   :    e Zd ZdZ e       Z e       Z e       Zy)DeliveryPolicyz"Event delivery guarantee policies.N)r"   r#   r$   r%   r   AT_MOST_ONCEAT_LEAST_ONCEEXACTLY_ONCEr+   r,   r-   r8   r8   I   s    ,6LFM6Lr,   r8   c                      e Zd ZU dZeed<   eeef   ed<    ed       Z	eed<    ee
j                        Ze
ed<   dZeed	<   d
Zee   ed<   d
Zee   ed<   ej$                  Zeed<   dZeed<    ee      Zeeef   ed<   dZeed<   dZeed<   ej6                  Zeed<   d Zedefd       Z edefd       Z!deeef   fdZ"e#deeef   dd fd       Z$dedeeef   dd fdZ%y
)Eventa  
    Core event data structure with full metadata support.

    Attributes:
        event_type: Category/type identifier for routing
        payload: The actual event data
        event_id: Unique identifier (auto-generated)
        timestamp: When the event was created
        source: Origin component/service identifier
        correlation_id: For tracking related events
        causation_id: The event that caused this one
        priority: Processing priority level
        ttl_seconds: Time-to-live before expiration
        metadata: Additional key-value metadata
        version: Schema version for evolution
        retry_count: Number of delivery attempts
    
event_typepayloadc                  <    t        t        j                               S N)struuiduuid4r+   r,   r-   <lambda>zEvent.<lambda>e   s    #djjl2C r,   default_factoryevent_id	timestampunknownsourceNcorrelation_idcausation_idpriority  ttl_secondsmetadataz1.0versionr   retry_countstatusc                    | j                   st        d      t        | j                  t              st        d      t        | j
                  t              r$t        j                  | j
                        | _        t        | j                  t              rt        | j                        | _	        t        | j                  t              rt        | j                     | _        yy)z"Validate and normalize event data.zevent_type cannot be emptyzpayload must be a dictionaryN)r>   
ValueError
isinstancer?   dictrI   rB   r	   fromisoformatrN   intr   rT   r/   selfs    r-   __post_init__zEvent.__post_init__q   s    9::$,,-;<<dnnc*%33DNNCDNdmmS))$--8DMdkk3'%dkk2DK (r,   returnc                 v    | j                   t        | j                        z   }t        j                         |kD  S )z(Check if the event has exceeded its TTL.)seconds)rI   r
   rP   r	   utcnow)r\   expiry_times     r-   
is_expiredzEvent.is_expired~   s/     nny9I9I'JJ ;..r,   c                     | j                    d| j                   dt        j                  | j                  d       }t        j                  |j                               j                         dd S )z7Generate a content-based fingerprint for deduplication.:T)	sort_keysN   )	r>   rK   jsondumpsr?   hashlibsha256encode	hexdigest)r\   contents     r-   fingerprintzEvent.fingerprint   sZ     __%Qt{{m1TZZX\5]4^_~~gnn./99;CR@@r,   c                 j   | j                   | j                  | j                  | j                  j	                         | j
                  | j                  | j                  | j                  j                  | j                  | j                  | j                  | j                  | j                  j                  dS )zSerialize event to dictionary.)rH   r>   r?   rI   rK   rL   rM   rN   rP   rQ   rR   rS   rT   )rH   r>   r?   rI   	isoformatrK   rL   rM   rN   valuerP   rQ   rR   rS   rT   namer[   s    r-   to_dictzEvent.to_dict   s     //||113kk"11 --++++||++kk&&
 	
r,   datac                     |j                         }d|v rt        |d         |d<   d|v rt        |d      |d<   d|v r.t        |d   t              rt        j                  |d         |d<    | di |S )z"Deserialize event from dictionary.rN   rT   rI   r+   )copyr   r/   rW   rB   r	   rY   )clsru   s     r-   	from_dictzEvent.from_dict   s     yy{,T*-=>Dt(h8DN$:d;.?#E ( 6 6tK7H ID{T{r,   c                    t        ||| j                  xs | j                  | j                  |j                  d| j                        |j                  d| j
                        i | j                  |j                  di             S )z5Create a derived event maintaining correlation chain.rK   rN   rQ   )r>   r?   rL   rM   rK   rN   rQ   )r=   rL   rH   getrK   rN   rQ   )r\   r>   r?   kwargss       r-   derivezEvent.derive   sn    !..?$--::h4ZZ
DMM:DDJ)CD
 	
r,   )&r"   r#   r$   r%   rB   __annotations__r   r   r   rH   r	   ra   rI   rK   rL   r   rM   r   r(   rN   rP   rZ   rX   rQ   rR   rS   r/   r0   rT   r]   propertyboolrc   ro   rt   classmethodry   r}   r+   r,   r-   r=   r=   P   sX   " O#s(^*CDHcD@Ix@FC$(NHSM("&L(3-&+22Hm2K$T:Hd38n:GSK%--FK-3 /D / /
 AS A A

c3h 
$ 	T#s(^ 	 	 	

 

tCH~ 

G 

r,   r=   c                      e Zd ZdZd Zdedd fdZdedd fdZdedd fdZd	edd fd
Z		 	 dde
e   de
e   dd fdZdeegef   dd fdZdededd fdZ	 	 dde
e   de
e   dd fdZdeegef   dd fdZdedefdZdedefdZddZy) EventFiltera  
    Flexible event filtering with multiple criteria support.

    Supports filtering by:
        - Event types (exact match or pattern)
        - Source patterns
        - Payload field conditions
        - Priority ranges
        - Time windows
        - Custom predicates
    c                     t               | _        t               | _        d | _        d | _        g | _        d | _        g | _        t               | _        y rA   )	set_type_patterns_source_patterns_min_priority_max_priority_payload_conditions_time_window_custom_predicates_exclude_typesr[   s    r-   __init__zEventFilter.__init__   sH    (+*-%6:6:AC AEAC(+r,   event_typesr^   c                 <    | j                   j                  |       | S )z#Filter by exact event type matches.)r   updater\   r   s     r-   by_typezEventFilter.by_type       "";/r,   prefixc                 B    | j                   j                  | d       | S )z$Filter by event type prefix pattern.*)r   add)r\   r   s     r-   by_type_prefixzEventFilter.by_type_prefix   s!    6(!-r,   c                 <    | j                   j                  |       | S )zExclude specific event types.)r   r   r   s     r-   exclude_typeszEventFilter.exclude_types   r   r,   sourcesc                 <    | j                   j                  |       | S )zFilter by source identifiers.)r   r   )r\   r   s     r-   	by_sourcezEventFilter.by_source   s    $$W-r,   Nmin_prioritymax_priorityc                 "    || _         || _        | S )z%Filter by priority range (inclusive).)r   r   )r\   r   r   s      r-   by_priority_rangezEventFilter.by_priority_range   s     *)r,   	conditionc                 <    | j                   j                  |       | S )z"Add a payload condition predicate.r   append)r\   r   s     r-   
by_payloadzEventFilter.by_payload   s      ''	2r,   r   rr   c                 H    | j                   j                  fd       | S )z$Filter by exact payload field value.c                 ,    | j                        k(  S rA   )r{   )pr   rr   s    r-   rE   z.EventFilter.by_payload_field.<locals>.<lambda>   s    !%%,%2G r,   r   )r\   r   rr   s    ``r-   by_payload_fieldzEventFilter.by_payload_field   s      ''(GHr,   startendc                 `    |xs t         j                  |xs t         j                  f| _        | S )z!Filter by event timestamp window.)r	   minmaxr   )r\   r   r   s      r-   by_time_windowzEventFilter.by_time_window   s-     !X\\8<<
 r,   	predicatec                 <    | j                   j                  |       | S )zAdd a custom filter predicate.)r   r   )r\   r   s     r-   with_predicatezEventFilter.with_predicate  s    &&y1r,   r>   c                     | j                   sy| j                   D ]1  }|j                  d      r|j                  |dd       s) y||k(  s1 y y)z%Check if event type matches patterns.Tr   NF)r   endswith
startswith)r\   r>   patterns      r-   _match_typezEventFilter._match_type
  sX    ""** 	G$(("6w&	 r,   eventc                    |j                   | j                  v ry| j                  |j                         sy| j                  r|j                  | j                  vry| j
                  .|j                  j                  | j
                  j                  kD  ry| j                  .|j                  j                  | j                  j                  k  ry| j                  r'| j                  \  }}||j                  cxk  r|k  sy y| j                  D ]  }	  ||j                        s y | j                  D ]  }	  ||      s y y# t        $ r Y  yw xY w# t        $ r Y  yw xY w)z.Check if an event matches all filter criteria.FT)r>   r   r   r   rK   r   rN   rr   r   r   rI   r   r?   	Exceptionr   )r\   r   r   r   r   r   s         r-   matcheszEventFilter.matches  sm    t222  0 01   U\\9N9N%N )~~##d&8&8&>&>>)~~##d&8&8&>&>> **JE3U__33 4 11 	I /  0	 00 	I '  (	     s$   E8E	EE	E"!E"c                 <   t               }| j                  |j                  z  |_        | j                  |j                  z  |_        | j                  |j                  z  |_        | j                  |j                  z   |_        | j
                  |j
                  z   |_        | j                  rP|j                  rDt        t        | j                  j                  |j                  j                              |_        |S | j                  xs |j                  |_        |S )zCombine filters with AND logic.)
r   r   r   r   r   r   r   r   r   rr   )r\   othercombineds      r-   __and__zEventFilter.__and__D  s    ="&"5"58L8L"L$($9$9E<R<R$R!"&"5"58L8L"L'+'?'?%B[B['[$&*&=&=@X@X&X# %"5"5%2D&&,,e.A.A.G.GH&H"  &*%7%7%N5;N;NH"r,   )NN)r   r   r^   r   )r"   r#   r$   r%   r   rB   r   r   r   r   r   r   r   r   r   r   r   r   r   r	   r   r=   r   r   r   r   r+   r,   r-   r   r      s@   
.C M 
S ] 
# - 
# -  1504}- }- 
	HdVT\$: } 
c # -  %)"&
!
 h
 
	
%$(? M 

c 
d 
,U ,t ,\r,   r   c                       e Zd ZU dZeed<   eeef   ed<   e	ed<   dZ
eed<   dZeed<   d	Zeed
<    eej$                        Zeed<   dZeed<   ej,                  Zeed<   y)Subscriptiona  
    Represents an active event subscription.

    Attributes:
        subscriber_id: Unique identifier for the subscription
        handler: The callback function to invoke
        event_filter: Filter criteria for matching events
        is_async: Whether the handler is async
        max_retries: Maximum delivery retry attempts
        timeout_seconds: Handler execution timeout
        created_at: When the subscription was created
        is_active: Whether the subscription is currently active
    subscriber_idhandlerevent_filterTis_asyncr    max_retries      >@timeout_secondsrF   
created_at	is_activedelivery_policyN)r"   r#   r$   r%   rB   r~   r   EventHandlerSyncEventHandlerr   r   r   r   rZ   r   floatr   r	   ra   r   r   r8   r:   r   r+   r,   r-   r   r   X  sq     <!1122HdK!OU! AJAIt&4&B&BO^Br,   r   c                   \   e Zd ZdZ	 	 	 d$deeef   dedefdZd Z	d Z
ded	eeef   fd
Zdeded	efdZded	efdZded	ee   fdZd	efdZ	 	 	 	 d%dee   dededed	ee   f
dZd&deded	ee   fdZded	ee   fdZdeded	efdZded	efdZd	efd Zd!ed	efd"Zd	eee f   fd#Z!y)'EventPersistencea  
    Persistent storage for events with PostgreSQL backend (Elestio).

    Provides:
        - Event archival and retrieval
        - Indexed queries by type, source, time
        - Compression for large payloads
        - Automatic cleanup of expired events
    db_pathcompress_thresholdauto_cleanup_daysc                 r    || _         || _        t        j                         | _        | j                          y rA   )r   r   	threadingLock_lock_init_database)r\   r   r   r   s       r-   r   zEventPersistence.__init__}  s.     #5!2^^%
r,   c                 P    t        j                  di t        j                         S )z)Get a PostgreSQL connection from Elestio.r+   )psycopg2connectr   get_connection_paramsr[   s    r-   	_get_connzEventPersistence._get_conn  s    I."F"F"HIIr,   c                     | j                         }	 |j                         5 }|j                  d       ddd       |j                          |j	                          y# 1 sw Y   *xY w# |j	                          w xY w)zCEnsure PostgreSQL tables exist (tables should already exist in PG).ao  
                    CREATE TABLE IF NOT EXISTS eb_events (
                        event_id TEXT PRIMARY KEY,
                        event_type TEXT NOT NULL,
                        source TEXT NOT NULL,
                        timestamp TEXT NOT NULL,
                        priority INTEGER NOT NULL,
                        status TEXT NOT NULL,
                        correlation_id TEXT,
                        causation_id TEXT,
                        payload_compressed INTEGER DEFAULT 0,
                        payload_data BYTEA NOT NULL,
                        metadata TEXT,
                        version TEXT,
                        retry_count INTEGER DEFAULT 0,
                        ttl_seconds INTEGER,
                        created_at TEXT DEFAULT CURRENT_TIMESTAMP,
                        fingerprint TEXT
                    )
                N)r   cursorexecutecommitclose)r\   conncurs      r-   r   zEventPersistence._init_database  sd    ~~	 #  * KKMJJL/ . JJLs!   A) AA) A&"A) )A;r?   r^   c                     t        j                  |      j                  d      }t        |      | j                  kD  rt        j                  |      dfS |dfS )z*Serialize and optionally compress payload.utf-8TF)rh   ri   rl   lenr   gzipcompress)r\   r?   ru   s      r-   _serialize_payloadz#EventPersistence._serialize_payload  sK    zz'"))'2t9t...==&,,U{r,   ru   
compressedc                 x    |rt        j                  |      }t        j                  |j	                  d            S )z-Deserialize and decompress payload if needed.r   )r   
decompressrh   loadsdecode)r\   ru   r   s      r-   _deserialize_payloadz%EventPersistence._deserialize_payload  s+    ??4(Dzz$++g.//r,   r   c                    	 | j                  |j                        \  }}| j                  5  | j                         }	 |j	                         5 }|j                  d|j                  |j                  |j                  |j                  j                         |j                  j                  |j                  j                  |j                  |j                   |rdndt#        j$                  |      t'        j(                  |j*                        |j,                  |j.                  |j0                  |j2                  f       ddd       |j5                          |j7                          	 ddd       y# 1 sw Y   3xY w# |j7                          w xY w# 1 sw Y   yxY w# t8        $ r/}t:        j=                  d|j                   d|        Y d}~yd}~ww xY w)	z
        Persist an event to storage.

        Args:
            event: The event to store

        Returns:
            True if stored successfully, False otherwise
        a  
                            INSERT INTO eb_events (
                                event_id, event_type, source, timestamp, priority,
                                status, correlation_id, causation_id, payload_compressed,
                                payload_data, metadata, version, retry_count, ttl_seconds,
                                fingerprint
                            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                            ON CONFLICT (event_id) DO UPDATE SET
                                event_type = EXCLUDED.event_type,
                                source = EXCLUDED.source,
                                timestamp = EXCLUDED.timestamp,
                                priority = EXCLUDED.priority,
                                status = EXCLUDED.status,
                                correlation_id = EXCLUDED.correlation_id,
                                causation_id = EXCLUDED.causation_id,
                                payload_compressed = EXCLUDED.payload_compressed,
                                payload_data = EXCLUDED.payload_data,
                                metadata = EXCLUDED.metadata,
                                version = EXCLUDED.version,
                                retry_count = EXCLUDED.retry_count,
                                ttl_seconds = EXCLUDED.ttl_seconds,
                                fingerprint = EXCLUDED.fingerprint
                        r   r   NTzFailed to store event : F)r   r?   r   r   r   r   rH   r>   rK   rI   rq   rN   rr   rT   rs   rL   rM   r   Binaryrh   ri   rQ   rR   rS   rP   ro   r   r   r   loggererror)r\   r   payload_datar   r   r   es          r-   storezEventPersistence.store  sz   4	'+'>'>u}}'M$L* -!~~'+! '# %. "NN!,,!LL!OO557!NN00!LL--!00!..!+A$OOL9 JJu~~6!MM!--!--!---&'P KKMJJL[-!\ W' 'T JJL[-!\  	LL1%..1AA3GH	sd   *F FE<C0E0>E<F'F 0E9	5E<<FFFF F 	G&%GGrH   c                    | j                         }	 |j                  t        j                  j                        5 }|j                  d|f       |j                         }|r*| j                  |      cddd       |j                          S 	 ddd       |j                          y# 1 sw Y   xY w# |j                          w xY w)zRetrieve a single event by ID.cursor_factoryz+SELECT * FROM eb_events WHERE event_id = %sN)	r   r   r   extrasRealDictCursorr   fetchone_row_to_eventr   r\   rH   r   r   rows        r-   retrievezEventPersistence.retrieve  s    ~~
	HOO,J,JK 3sAK lln--c23 3 JJL 3 JJL3 3 JJLs(   *B2 6B&2	B2 B2 &B/+B2 2Cc                 p   |d   }t        |t              rt        |      }| j                  |t	        |d               }t        |d   |d   |d   t        j                  |d         t        |d         t        |d      |d	   |d
   ||d   rt        j                  |d         ni |d   |d   |d         S )z*Convert a database row to an Event object.r   payload_compressedrH   r>   rK   rI   rN   rT   rL   rM   rQ   rR   rS   rP   )rH   r>   rK   rI   rN   rT   rL   rM   r?   rQ   rR   rS   rP   )rW   
memoryviewbytesr   r   r=   r	   rY   r   r/   rh   r   )r\   r  r   r?   s       r-   r  zEventPersistence._row_to_event  s    >*lJ/ .L++)*+
 _<(x=,,S-=>"3z?3s8}-/0^,47
OTZZJ0	NM*M*
 	
r,   Nr   limitoffset
order_descc                    |rdnd}| j                         }	 |j                  t        j                  j                        5 }|j                  d| d||f       |j                         }ddd       |j                          D 	cg c]  }	| j                  |	       }
}	|r!|
D cg c]  }|j                  |      s| }
}|
S # 1 sw Y   \xY w# |j                          w xY wc c}	w c c}w )a7  
        Query events with optional filtering.

        Args:
            event_filter: Optional filter criteria
            limit: Maximum events to return
            offset: Pagination offset
            order_desc: Order by timestamp descending

        Returns:
            List of matching events
        DESCASCr   z+SELECT * FROM eb_events ORDER BY timestamp z LIMIT %s OFFSET %sN)
r   r   r   r   r   r   fetchallr   r  r   )r\   r   r
  r  r  orderr   r   rowsr  eventsr   s               r-   queryzEventPersistence.query   s    & %%~~	HOO,J,JK &sA%H[\FO ||~& JJL59:c$$$S)::!'CA<+?+?+BaCFC& & JJL: Ds5   *C )C+C C''C,>C,CC C$r>   c                    | j                         }	 |j                  t        j                  j                        5 }|j                  d||f       |j                         D cg c]  }| j                  |       c}cddd       |j                          S c c}w # 1 sw Y   nxY w	 |j                          y# |j                          w xY w)zQuery events by type.r   zNSELECT * FROM eb_events WHERE event_type = %s ORDER BY timestamp DESC LIMIT %sN	r   r   r   r   r   r   r  r  r   )r\   r>   r
  r   r   r  s         r-   query_by_typezEventPersistence.query_by_typeG  s    ~~	HOO,J,JK Ksd' <?<<>JC**3/JK K JJL KK K K JJLDJJL4   *B: 'B#B;B=	B: BB%!B: :CrL   c                    | j                         }	 |j                  t        j                  j                        5 }|j                  d||f       |j                         D cg c]  }| j                  |       c}cddd       |j                          S c c}w # 1 sw Y   nxY w	 |j                          y# |j                          w xY w)z&Get all events in a correlation chain.r   zSELECT * FROM eb_events
                       WHERE correlation_id = %s OR event_id = %s
                       ORDER BY timestamp ASCNr  )r\   rL   r   r   r  s        r-   query_by_correlationz%EventPersistence.query_by_correlationT  s    ~~
	HOO,J,JK Ks1 $^4	 <?<<>JC**3/JK K JJL KK K K JJLDJJLr  rT   c                    	 | j                   5  | j                         }	 |j                         5 }|j                  d|j                  |f       ddd       |j                          |j                          	 ddd       y# 1 sw Y   3xY w# |j                          w xY w# 1 sw Y   yxY w# t        $ r%}t        j                  d| d|        Y d}~yd}~ww xY w)zUpdate an event's status.z4UPDATE eb_events SET status = %s WHERE event_id = %sNTzFailed to update status for r   F)
r   r   r   r   rs   r   r   r   r   r   )r\   rH   rT   r   r   r   s         r-   update_statuszEventPersistence.update_statusc  s    	 
!~~'! #R#[[(3
 KKMJJL
!   JJL
!  	LL7zA3GH	sb   B. B"BBB'B"8B. B
	BBB""B+'B. +B. .	C7CCc                    | j                   5  | j                         }	 |j                         5 }|j                  d|f       |j                  d|f       |j	                         }ddd       |j                          r|d   nd	 |j                          cddd       S # 1 sw Y   <xY w# |j                          w xY w# 1 sw Y   yxY w)z+Increment retry count and return new value.zFUPDATE eb_events SET retry_count = retry_count + 1 WHERE event_id = %sz5SELECT retry_count FROM eb_events WHERE event_id = %sNr   )r   r   r   r   r  r   r   r  s        r-   increment_retryz EventPersistence.increment_retryv  s    ZZ 	>>#D[[] 	)cKK`! KKO! ,,.C	) !$s1v!+

!	 		) 	) 

!	 	s9   CB-7B!& B-C!B*	&B--B??CCc                    t        j                         t        | j                        z
  }| j                  5  | j                         }	 |j                         5 }|j                  d|j                         f       |j                  }ddd       |j                          |j                          cddd       S # 1 sw Y   4xY w# |j                          w xY w# 1 sw Y   yxY w)z/Remove expired events and return count deleted.)daysz*DELETE FROM eb_events WHERE timestamp < %sN)r	   ra   r
   r   r   r   r   r   rq   rowcountr   r   )r\   cutoffr   r   counts        r-   cleanup_expiredz EventPersistence.cleanup_expired  s    "YD4J4J%KKZZ 	>>#D
[[] )cKKD))+-  LLE) 

	 	) ) 

	 	s;   C
C.B;C!C;C	 CCCC%ro   c                 (   | j                         }	 |j                         5 }|j                  d|f       |j                         ducddd       |j	                          S # 1 sw Y   nxY w	 |j	                          y# |j	                          w xY w)z/Check if an event with this fingerprint exists.z6SELECT 1 FROM eb_events WHERE fingerprint = %s LIMIT 1N)r   r   r   r  r   )r\   ro   r   r   s       r-   check_duplicatez EventPersistence.check_duplicate  s    ~~	 2#L N ||~T12 2 JJL2 2 2 JJLDJJLs"   A? %A!	A? !A*&A? ?Bc                    | j                         }	 |j                         5 }|j                  d       |j                         d   }|j                  d       t	        |j                               }|j                  d       t	        |j                               }|||dcddd       |j                          S # 1 sw Y   nxY w	 |j                          y# |j                          w xY w)zGet storage statistics.zSELECT COUNT(*) FROM eb_eventsr   z>SELECT event_type, COUNT(*) FROM eb_events GROUP BY event_typez6SELECT status, COUNT(*) FROM eb_events GROUP BY status)total_eventsr   	by_statusN)r   r   r   r  rX   r  r   )r\   r   r   totalr   r)  s         r-   get_statisticszEventPersistence.get_statistics  s    ~~	 #<=q)T s||~.L !0	 %*&!* $ JJL%  $ JJLDJJLs#   C A>B: 	C :C?C C*)zevent_store.dbi      )Nd   r   T)r-  )"r"   r#   r$   r%   r   rB   r   rZ   r   r   r   r   r   r	  r   r   r   r=   r   r   r  r  r   r   r  r  r  r/   r  r  r$  r&  r   r+  r+   r,   r-   r   r   r  s    %5"&!#	
sDy!
  
 	
J:$ 53E 0 0D 0T 0>5 >T >@ %  
E 
6 /3%{+% % 	%
 % 
e%N C $u+ 3 4; c ; 4 &  ( "3 4 S#X r,   r   c            	           e Zd ZdZ	 	 	 ddedededefdZdede	d	e
d
dfdZded
efdZde
ded
efdZd
eee
ef      fdZde
d
eee
ef      fdZddee
   d
efdZd
ee
ef   fdZy)DeadLetterQueuea  
    Handle failed event deliveries with retry and diagnostic support.

    Provides:
        - Storage of failed events with error context
        - Retry mechanisms with backoff
        - Analysis of failure patterns
        - Manual replay capabilities
    persistencer   base_delay_secondsmax_delay_secondsc                     || _         || _        || _        || _        i | _        t        j                         | _        i | _        y rA   )	r0  r   
base_delay	max_delay_failed_eventsr   r   r   _retry_tasks)r\   r0  r   r1  r2  s        r-   r   zDeadLetterQueue.__init__  s@     '&,*9;^^%
57r,   r   r   r   r^   Nc           
         | j                   5  |j                  | j                  vrB|g t        j                         t        j                         d| j                  |j                  <   | j                  |j                     d   j                  |t        |      j                  t        |      t        j                         t        j                         j                         |j                  d       t        j                         | j                  |j                     d<   ddd       t        j                  |_        | j                   j#                  |       t$        j'                  d|j                   d| d|        y# 1 sw Y   bxY w)	z
        Add a failed event to the dead letter queue.

        Args:
            event: The event that failed
            error: The exception that occurred
            subscriber_id: Which subscriber failed to process it
        r   failuresfirst_failurelast_failurer:  )r   
error_typeerror_message	tracebackrI   rS   r<  NEvent z added to DLQ after failure in r   )r   rH   r6  r	   ra   r   typer"   rB   r?  
format_excrq   rS   r/   r4   rT   r0  r   r   warning)r\   r   r   r   s       r-   enqueuezDeadLetterQueue.enqueue  sA    ZZ 	T~~T%8%88" "%-__%6$,OO$5	7##ENN3 /
;BB!."5k22!$U&113%__.88:$00D  CK//BSD/?#	T( #00u%U^^$$CM?RTUZT[\	
/	T 	Ts   DFF
rS   c                 R    | j                   d|z  z  }t        || j                        S )z$Calculate exponential backoff delay.r   )r4  r   r5  )r\   rS   delays      r-   get_backoff_delayz!DeadLetterQueue.get_backoff_delay  s&    1#345$..))r,   rH   r   c                   K   | j                   5  || j                  vr#t        j                  d| d       	 ddd       y| j                  |   }|d   }ddd       j                  | j
                  k\  r't        j                  d| d| j
                   d       y| j                  |j                        }t        j                  d| d	| d
       t        j                  |       d{    	 |xj                  dz  c_        t        j                  |_         ||       d{    | j                   5  | j                  |= ddd       t        j                  |_        | j                  j                  |       t        j                  d|        y# 1 sw Y   DxY w7 7 ~# 1 sw Y   cxY w# t         $ r8}t        j                  d| d|        | j#                  ||d       Y d}~yd}~ww xY ww)z
        Attempt to retry a failed event.

        Args:
            event_id: The event to retry
            handler: The handler to use for retry

        Returns:
            True if retry succeeded, False otherwise
        r@  z not found in DLQNFr   z exceeded max retries ()zRetrying event z after zs delayr   zSuccessfully retried event TzRetry failed for event r   retry_handler)r   r6  r   r   rS   r   rG  infoasynciosleepr/   r1   rT   r5   r0  r   r   rD  )r\   rH   r   entryr   rF  r   s          r-   retry_eventzDeadLetterQueue.retry_event	  s     ZZ 	#t222vhZ/@AB	# 	#
 ''1E'NE	#  0 00LL6(+B4CSCSBTTUVW&&u'8'89ohZwugWEFmmE"""	"&11EL%.    2''12 '//EL""5)KK5hZ@A=	# 	# 	#
 !2 2  	LL28*BqcBCLL?3	s   G8)F	G8FBG8,F$-G825F4 'F&(F4 8F(AF4 G8F!	G8&F4 (F1-F4 4	G5=.G0+G80G55G8c                 B   | j                   5  | j                  j                         D cg c]R  \  }}||d   j                  t	        |d         |d   j                         |d   j                         |d   d   d   dT c}}cddd       S c c}}w # 1 sw Y   yxY w)	z(Get all events in the dead letter queue.r   r:  r;  r<  r   r>  )rH   r>   failure_countr;  r<  
last_errorN)r   r6  itemsr>   r   rq   )r\   eidrN  s      r-   get_failed_eventsz!DeadLetterQueue.get_failed_events=  s    ZZ 	 #'"5"5";";"=
 C !$"'.";";%(z):%;%*?%;%E%E%G$).$9$C$C$E"'
"3B"7"H
	 	
	 	s   BABBBBc                    | j                   5  || j                  v rU| j                  |   }|d   j                         |d   |d   j                         |d   j                         dcddd       S 	 ddd       y# 1 sw Y   yxY w)z.Get detailed failure information for an event.r   r:  r;  r<  r9  N)r   r6  rt   rq   )r\   rH   rN  s      r-   get_failure_detailsz#DeadLetterQueue.get_failure_detailsL  s    ZZ 	4...++H5"7^335 %j 1%*?%;%E%E%G$).$9$C$C$E		 	.	 	 s   AA;;Bc                 
   | j                   5  |r/|| j                  v r| j                  |= 	 ddd       y	 ddd       yt        | j                        }| j                  j                          |cddd       S # 1 sw Y   yxY w)z
        Remove events from the DLQ.

        Args:
            event_id: Specific event to remove, or None for all

        Returns:
            Number of events removed
        Nr   r   )r   r6  r   clear)r\   rH   r#  s      r-   purgezDeadLetterQueue.purgeY  s     ZZ 		t222++H5			 		
 		 		 D//0##))+		 		 		s   A9A90A99Bc                 V   | j                   5  | j                  sdi dcddd       S t        t              }t        t              }t        t              }| j                  j	                         D ]F  }||d   j
                  xx   dz  cc<   |d   D ]"  }||d   xx   dz  cc<   ||d   xx   dz  cc<   $ H t        | j                        t        d	 | j                  j	                         D              t        |      t        |      t        |      d
cddd       S # 1 sw Y   yxY w)z)Analyze failure patterns for diagnostics.r   )total_failurespatternsNr   r   r:  r=  r   c              3   8   K   | ]  }t        |d            yw)r:  N)r   ).0r   s     r-   	<genexpr>z3DeadLetterQueue.analyze_failures.<locals>.<genexpr>  s      .+,C*&.s   )total_failed_eventstotal_failure_attemptsby_error_typeby_subscriberby_event_type)	r   r6  r   rZ   valuesr>   r   sumrX   )r\   error_typessubscriber_failuresevent_type_failuresrN  failures         r-   analyze_failuresz DeadLetterQueue.analyze_failuresn  s*   ZZ 	&&*+<	 	 +6c*:K2=c2B2=c2B,,335 G#E'N$=$=>!C>$Z0 GG 56!;6'(@AQFAGG (+4+>+>'?*- .040C0C0J0J0L. + "&k!2!%&9!:!%&9!:	 	 	s   DC-DD()         ?g     r@rA   )r"   r#   r$   r%   r   rZ   r   r   r=   r   rB   rD  rG  r   r   rO  r   r   r   rU  r   rW  rZ  rl  r+   r,   r-   r/  r/    s    $'#(8%8 8 "	8
 !8'
'
 '
 	'

 
'
R*S *U *
22 2 
	2h4S#X#7 C HT#s(^4L hsm s *$sCx. r,   r/  c                       e Zd ZdZdefdZddZej                  dddfded	e	ee
f   d
edee   dee	ee
f      dedefdZdedefdZdee   dee   fdZde	ee
f   fdZy)EventPublisherz
    Publish events to the event bus with batching and validation.

    Features:
        - Single and batch publishing
        - Event validation
        - Automatic metadata enrichment
        - Publishing metrics
    rK   c                 <    || _         d | _        d| _        d| _        y Nr   )rK   _bus_published_count_failed_countr\   rK   s     r-   r   zEventPublisher.__init__  s     *.	 !r,   r^   c                     || _         | S )z$Bind this publisher to an event bus.rs  r\   buss     r-   bindzEventPublisher.bind      	r,   NrO   r>   r?   rN   rL   rQ   rP   c           	      @  K   | j                   st        d      t        ||| j                  |||xs i |      }	 | j                   j	                  |       d{    | xj
                  dz  c_        |S 7 # t        $ r}| xj                  dz  c_         d}~ww xY ww)aw  
        Publish a single event.

        Args:
            event_type: Type identifier for routing
            payload: Event data
            priority: Processing priority
            correlation_id: For tracking related events
            metadata: Additional metadata
            ttl_seconds: Time-to-live

        Returns:
            The published Event object
        #Publisher not bound to an event bus)r>   r?   rK   rN   rL   rQ   rP   Nr   )rs  RuntimeErrorr=   rK   dispatchrt  r   ru  )	r\   r>   r?   rN   rL   rQ   rP   r   r   s	            r-   publishzEventPublisher.publish  s     . yyDEE!;;)^#
	))$$U+++!!Q&!L ,  	!#	s:   8BA7 A5A7 4B5A7 7	B BBBr   c                    K   | j                   st        d      | j                  |_        | j                   j                  |       d{    | xj                  dz  c_        |S 7 w)z Publish a pre-constructed event.r~  Nr   )rs  r  rK   r  rt  r\   r   s     r-   publish_eventzEventPublisher.publish_event  sW     yyDEE{{ii  '''" 	(s   AA'	A%
A'r  c                   K   | j                   st        d      g }|D ]]  }| j                  |_        	 | j                   j                  |       d{    |j	                  |       | xj
                  dz  c_        _ |S 7 .# t        $ rD}| xj                  dz  c_        t        j                  d|j                   d|        Y d}~d}~ww xY ww)z
        Publish multiple events efficiently.

        Args:
            events: List of events to publish

        Returns:
            List of successfully published events
        r~  Nr   zFailed to publish event r   )rs  r  rK   r  r   rt  r   ru  r   r   rH   )r\   r  	publishedr   r   s        r-   publish_batchzEventPublisher.publish_batch  s      yyDEE	 	OE;;ELOii((///  '%%*%	O  0  O""a'"77Gr!MNNOs@   0CBB *B<C B	C:C
C
CCc                     | j                   | j                  | j                  | j                  | j                  z   dkD  r(| j                  | j                  | j                  z   z  dS ddS )zGet publishing statistics.r   rn  )rK   published_countfailed_countsuccess_rate)rK   rt  ru  r[   s    r-   r+  zEventPublisher.get_statistics  st     kk#44 .. ))D,>,>>!C %%)>)>ASAS)ST
 	
 JM
 	
r,   )rz  EventBusr^   rp  )r"   r#   r$   r%   rB   r   r{  r   r(   r   r   r   rZ   r=   r  r  r   r  r+  r+   r,   r-   rp  rp    s    s  #0"6"6(,-1** c3h*  	*
 !* 4S>** * 
*X 5 $u+ $u+ 4

S#X 

r,   rp  c                       e Zd ZdZdefdZddZddej                  fde	d	e
eef   d
edededefdZded	e
eef   defdZdedefdZdefdZdedefdZdedefdZdeeeef      fdZdeeef   fdZy)EventSubscriberz
    Subscribe to events from the event bus.

    Features:
        - Type-safe subscriptions
        - Filter-based routing
        - Automatic handler wrapping
        - Subscription management
    r   c                 J    || _         d | _        i | _        d| _        d| _        y rr  )r   rs  _subscriptions_received_count_error_countr\   r   s     r-   r   zEventSubscriber.__init__  s)    **.	79 r,   r^   c                     || _         | S )z%Bind this subscriber to an event bus.rx  ry  s     r-   r{  zEventSubscriber.bind  r|  r,   r    r   r   r   r   r   r   c           	      f   | j                   st        d      | j                   dt        j                         j
                  dd  }t        j                  |      }t        |||||||      }|| j                  |<   | j                   j                  |       t        j                  d|        |S )a  
        Subscribe to events matching the filter.

        Args:
            event_filter: Criteria for matching events
            handler: Callback function for matched events
            max_retries: Max delivery retry attempts
            timeout_seconds: Handler timeout
            delivery_policy: Delivery guarantee level

        Returns:
            Subscription ID
        z$Subscriber not bound to an event busre   N   )r   r   r   r   r   r   r   zRegistered subscription )rs  r  r   rC   rD   hexrL  iscoroutinefunctionr   r  _register_subscriptionr   rK  )	r\   r   r   r   r   r   subscription_idr   subscriptions	            r-   	subscribezEventSubscriber.subscribe  s    * yyEFF!//0$**,2B2B2A2F1GH..w7#)%#++
 0<O,		((6..?@Ar,   r>   c                 X     | j                   t               j                  |      |fi |S )z9Convenience method to subscribe to a specific event type.)r  r   r   )r\   r>   r   r|   s       r-   subscribe_to_typez!EventSubscriber.subscribe_to_type>  s4     t~~M!!*-
 
 	
r,   r  c                     || j                   v rM| j                   |= | j                  r| j                  j                  |       t        j	                  d|        yy)z
        Remove a subscription.

        Args:
            subscription_id: The subscription to remove

        Returns:
            True if removed, False if not found
        zUnsubscribed TF)r  rs  _unregister_subscriptionr   rK  r\   r  s     r-   unsubscribezEventSubscriber.unsubscribeK  sS     d111##O4yy		22?CKK-'89:r,   c                     t        | j                        }t        | j                  j                               D ]  }| j	                  |        |S )z*Remove all subscriptions and return count.)r   r  listkeysr  )r\   r#  sub_ids      r-   unsubscribe_allzEventSubscriber.unsubscribe_all]  sH    D''(4..3356 	%FV$	%r,   c                 J    || j                   v rd| j                   |   _        yy)z!Temporarily pause a subscription.FTr  r   r  s     r-   pausezEventSubscriber.paused  s)    d111=BD0:r,   c                 J    || j                   v rd| j                   |   _        yy)zResume a paused subscription.TFr  r  s     r-   resumezEventSubscriber.resumek  s)    d111=AD0:r,   c           	          | j                   j                         D cg c]J  }|j                  |j                  |j                  |j
                  |j                  j                         dL c}S c c}w )z+Get information about active subscriptions.)r  r   r   r   r   )r  rf  r   r   r   r   r   rq   )r\   subs     r-   get_subscriptionsz!EventSubscriber.get_subscriptionsr  se     **113	
  $'#4#4LL ]]"!nn668	
 		
 	
s   AA/c                     | j                   t        | j                        t        d | j                  j	                         D              | j
                  | j                  dS )zGet subscriber statistics.c              3   :   K   | ]  }|j                   sd   yw)r   N)r   )r_  ss     r-   r`  z1EventSubscriber.get_statistics.<locals>.<genexpr>  s      (1;;(s   )r   subscription_countactive_subscriptionsreceived_counterror_count)r   r   r  rg  rf  r  r  r[   s    r-   r+  zEventSubscriber.get_statistics  s]     "//"%d&9&9":$' (..557( % #22,,
 	
r,   N)rz  r  r^   r  )r"   r#   r$   r%   rB   r   r{  r8   r:   r   r   r   r   rZ   r   r  r  r   r  r  r  r  r   r   r   r  r+  r+   r,   r-   r  r    s   c  !%*8*F*F)!) |%556) 	)
 ) () 
)V

 |%556

 

3 4 $ S T c d 
4S#X#7 


S#X 

r,   r  c                   6   e Zd ZdZ	 	 	 	 d"dee   dededefdZd Z	d#d	efd
Z
defdZdefdZdededefdZdedee   fdZdedefdZdefdZdefdZ	 	 	 	 d$dee   dee   dee   dedef
dZdedefdZdedefdZdee   fd Zdeee f   fd!Z!y)%r  ag  
    Central event dispatcher with async support and delivery guarantees.

    Features:
        - Asynchronous event dispatch
        - Priority-based processing
        - Delivery guarantees (at-most-once, at-least-once, exactly-once)
        - Event persistence and replay
        - Dead letter queue for failed events
        - Metrics and monitoring
    Nr0  enable_persistenceworker_countmax_queue_sizec                    i | _         t        t              | _        d | _        |xs |r
t               nd | _        | j                  rt        | j                        nd | _        || _	        || _
        g | _        d| _        t        j                         | _        t               | _        d| _        d| _        d| _        d| _        d | _        y )NFrO   r   )r  r   r   _type_index_event_queuer   _persistencer/  _dlq_worker_count_max_queue_size_workers_is_runningrL  r   r   _processed_fingerprints_fingerprint_ttl_dispatched_count_delivered_countru  _start_time)r\   r0  r  r  r  s        r-   r   zEventBus.__init__  s     8:0;C0@37' 
"4$ 	 ;?:K:KOD$5$56QU	)-,. \\^
14$ $ "# !/3r,   c                   K   | j                   ryt        j                  | j                        | _        d| _         t        j                         | _        t        | j                        D ]D  }t        j                  | j                  d|             }| j                  j                  |       F t        j                  d| j                   d       yw)zStart the event bus workers.N)maxsizeTzworker-zEventBus started with z workers)r  rL  PriorityQueuer  r  r	   ra   r  ranger  create_task_worker_loopr  r   r   rK  )r\   iworkers      r-   r   zEventBus.start  s     #11$:N:NO#??, t))* 	)A(():):WQC=)IJFMM  (	) 	,T-?-?,@IJs   CCgracefulc                 X  K   d| _         |r"| j                  j                          d{    | j                  D ]  }|j	                           t        j                  | j                  ddi d{    | j                  j                          t        j                  d       y7 |7 6w)z{
        Stop the event bus.

        Args:
            graceful: If True, wait for pending events to be processed
        FNreturn_exceptionsTzEventBus stopped)
r  r  joinr  cancelrL  gatherrY  r   rK  )r\   r  r  s      r-   stopzEventBus.stop  s      !##((*** mm 	FMMO	 nndmmDtDDD&' + 	Es"   'B*B&AB*1B(25B*(B*	worker_idc                   K   t         j                  d| d       | j                  r|	 t        j                  | j
                  j                         d       d{   \  }}	 | j                  |       d{    | j
                  j                          | j                  r|t         j                  d| d       y7 b# t        j                  $ r Y t        j                  $ r Y Ew xY w7 t# t        $ r%}t         j                  d| d|        Y d}~d}~ww xY w# | j
                  j                          w xY ww)z1Worker loop for processing events from the queue.zWorker z startedrn  timeoutNz error processing event: z stopped)r   debugr  rL  wait_forr  r{   TimeoutErrorCancelledError_process_eventr   r   	task_done)r\   r  rN   r   r   s        r-   r  zEventBus._worker_loop  s    wyk23	(/(8(8%%))+) #%.))%000 !!++-# & 	wyk23!# '' ))  1 Pwyk1J1#NOOP !!++-s   &D?2C  B>C  $C/ 8C-9C/ =&D?$D?>C   C*D?C*'D?)C**D?-C/ /	D8DD  DD   D<<D?r   c                   K   |j                   r`t        j                  |_        | j                  r| j                  j                  |       t        j                  d|j                   d       yt        j                  |_        | j                  |      }|s=t        j                  |_        | j                  r| j                  j                  |       yg }|D ]  }|j                  s|j                  t        j                  k(  r;|j                   | j"                  v r#t        j                  d|j                          ht%        j&                  | j)                  ||            }|j+                  |        t%        j,                  |ddi d{   }t/        d |D              }|rPt        j                  |_        | xj0                  dz  c_        | j"                  j3                  |j                          n*t        j4                  |_        | xj6                  dz  c_        | j                  r| j                  j                  |       yy7 w)	z>Process a single event by dispatching to matching subscribers.r@  z expired, skippingNzSkipping duplicate event r  Tc              3   F   K   | ]  }t        |t              r|d u   yw)TN)rW   r   )r_  rs     r-   r`  z*EventBus._process_event.<locals>.<genexpr>  s     UJq)<T!t)Us   !	!r   )rc   r/   r6   rT   r  r   r   r  rH   r1   _find_matching_subscriptionsr2   r   r   r8   r;   ro   r  rL  r  _deliver_to_subscriberr   r  allr  r   r3   ru  )r\   r   matching_subsdelivery_tasksr  taskresultsall_successs           r-   r  zEventBus._process_event  s    &..EL  !!''.LL6%..!11CDE"-- 99%@&00EL  !!''. ) 	(L)) ++~/J/JJ$$(D(DDLL#<U^^<L!MN&&++E<@D !!$'	(   O$OO UWUU&00EL!!Q&!((,,U->->?&--EL!###E*  Ps   FIIB<Ir  r^   c           	      X  K   	 |j                   r9t        j                  |j                  |      |j                         d{    yt        j
                         j                  d|j                  |       d{    y7 =7 # t        j                  $ r| t        j                  d|j                   d|j                          | j                  r>| j                  j                  |t        d|j                   d      |j                         Y yt        $ r}t        j                  d	|j                   d|j                   d
|        | j                  rD|j                   t"        j$                  k7  r'| j                  j                  |||j                         Y d}~yd}~ww xY ww)z*Deliver an event to a specific subscriber.r  NTzHandler timeout for z
 on event zHandler timed out after r  FzHandler error for r   )r   rL  r  r   r   get_event_looprun_in_executorr  r   rC  r   rH   r  rD  r   r   r   r8   r9   )r\   r   r  r   s       r-   r  zEventBus._deliver_to_subscriber*  s    !	$$&& ((/(88    ,,.>> ((  
  ## 
	NN&|'A'A&B*U^^L\] yy		!! #;L<X<X;YYZ![\ ..
  	LL$\%?%?$@
5>>JZZ\]^\_` yy\99^=X=XX		!!%L,F,FG	sl   F*?B BB F*	2B ;B<B  F*B B BF'F*F'BF"F*"F''F*c                     g }| j                   j                  |j                  t                     }|| j                   j                  dt                     z  }|D ]M  }|| j                  v s| j                  |   }|j
                  j                  |      s=|j                  |       O | j                  j                         D ]7  \  }}||vs|j
                  j                  |      s'|j                  |       9 |S )z+Find all subscriptions that match an event.r   )	r  r{   r>   r   r  r   r   r   rS  )r\   r   matchingpotential_idsr  r  s         r-   r  z%EventBus._find_matching_subscriptionsS  s     ((,,U-=-=suE%(8(8(<(<S#%(HH $ 	)F,,,))&1##++E2OOC(		)  ..446 	%KFC]*s/?/?/G/G/N$	% r,   c                   K   | j                   st        d      | j                  r| j                  j                  |       | j                  j                  |j                  j                  |f       d{    | xj                  dz  c_        t        j                  d|j                   d|j                          |j                  S 7 Tw)z
        Dispatch an event for processing.

        Args:
            event: The event to dispatch

        Returns:
            The event ID
        zEventBus is not runningNr   zDispatched event z	 of type )r  r  r  r   r  putrN   rr   r  r   r  rH   r>   r  s     r-   r  zEventBus.dispatchi  s      899 ##E* ##U^^%9%95$ABBB!#((8	%BRBRASTU~~	 	Cs   A3C5C
6ACc                 :   || j                   |j                  <   |j                  j                  D ]*  }| j                  |   j                  |j                         , |j                  j                  s)| j                  d   j                  |j                         yy)z%Register a subscription with the bus.r   N)r  r   r   r   r  r   )r\   r  r   s      r-   r  zEventBus._register_subscription  s    :FL667 $00?? 	FGW%)),*D*DE	F ((77S!%%l&@&@A 8r,   r  c                     || j                   v r| j                   |= | j                  j                         D ]  }|j                  |        y)z#Remove a subscription from the bus.N)r  r  rf  discard)r\   r  type_sets      r-   r  z!EventBus._unregister_subscription  sL    d111##O4 ((//1 	.H_-	.r,   r   	from_timeto_timer
  c                   K   | j                   st        d      |xs
 t               }|s|r|j                  ||      }| j                   j	                  ||d      }d}|D ]d  }t
        j                  |_        t        j                         j                         |j                  d<   | j                  |       d{    |dz  }f t        j                  d| d	       |S 7 &w)
a@  
        Replay historical events from persistence.

        Args:
            event_filter: Optional filter for events to replay
            from_time: Start of time range
            to_time: End of time range
            limit: Maximum events to replay

        Returns:
            Number of events replayed
        zPersistence not enabledF)r   r
  r  r   replayed_atNr   z	Replayed z events)r  r  r   r   r  r/   r5   rT   r	   ra   rq   rQ   r  r   rK  )	r\   r   r  r  r
  replay_filterr  replayedr   s	            r-   replay_eventszEventBus.replay_events  s     &   899 %5)88GLM""((& ) 
  	E&//EL,4OO,=,G,G,IENN=)--&&&MH		 	iz12	 's   B9C$;C"<'C$rK   c                 6    t        |      j                  |       S )z&Create a bound publisher for this bus.)rp  r{  rv  s     r-   create_publisherzEventBus.create_publisher  s    f%**400r,   r   c                 6    t        |      j                  |       S )z'Create a bound subscriber for this bus.)r  r{  r  s     r-   create_subscriberzEventBus.create_subscriber  s    }-22488r,   c                     | j                   S )z#Get the dead letter queue instance.)r  r[   s    r-   get_dead_letter_queuezEventBus.get_dead_letter_queue  s    yyr,   c           
      :   | j                   r/t        j                         | j                   z
  j                         nd}| j                  || j
                  t        | j                        | j                  r| j                  j                         nd| j                  | j                  | j                  |dkD  r| j                  |z  ndd	}| j                  r| j                  j                         |d<   | j                  r| j                  j!                         |d<   |S )z!Get comprehensive bus statistics.r   )	
is_runninguptime_secondsr  r  
queue_sizedispatched_countdelivered_countr  events_per_secondr0  dead_letter_queue)r  r	   ra   total_secondsr  r  r   r  r  qsizer  r  ru  r  r+  r  rl  )r\   uptimestatss      r-   r+  zEventBus.get_statistics  s      __!1!11@@B%& 	 **$ .."%d&9&9":7;7H7H$++113a $ 6 6#44 ..39A:&&/1
 #'#4#4#C#C#EE- 99)-)C)C)EE%&r,   )NTr!   i'  )T)NNNi  )"r"   r#   r$   r%   r   r   r   rZ   r   r   r  rB   r  r=   r  r   r  r   r  r  r  r  r   r	   r  rp  r  r  r  r/  r  r   r   r+  r+   r,   r-   r  r    sv   
 37#'#4./4 !4 	4
 48K (4 (,4C 424+% 4+l'' #' 
	'R% D<N ,E c 0	B< 	B. . /3(,&*){+) H%) (#	)
 ) 
)V1s 1~ 19s 9 9x'@ S#X r,   r  r   c                       fd}|S )z
    Decorator to mark a function as an event handler.

    Usage:
        @event_handler("user.created")
        async def handle_user_created(event: Event):
            pass
    c                 V    t        t              r	g| _        n| _        d| _        | S )NT)rW   rB   _event_types_is_event_handler)funcr   s    r-   	decoratorz event_handler.<locals>.decorator  s,    k3'!,D +D!%r,   r+   )r   r  s   ` r-   event_handlerr    s     r,   r   rF  c                       fd}|S )z
    Decorator to add retry logic to an event handler.

    Usage:
        @with_retry(max_retries=5, delay=2.0)
        async def handle_event(event: Event):
            pass
    c                 B     t               dt        f fd       }|S )Nr   c                    K   d }t        dz         D ]  }	  |        d {   c S  |7 	# t        $ r5}|}|k  r$t        j                  d|z  z         d {  7   Y d }~Td }~ww xY ww)Nr   r   )r  r   rL  rM  )r   rR  attemptr   rF  r  r   s       r-   wrapperz.with_retry.<locals>.decorator.<locals>.wrapper  s     J q1 DD!%e,,D  -  D!"J,%mmEQ'\,BCCCDsB   A0/-/A0/	A-%A(A A(#A0(A--A0)r   r=   )r  r  rF  r   s   ` r-   r  zwith_retry.<locals>.decorator  s'    	t		 		 
		 r,   r+   )r   rF  r  s   `` r-   
with_retryr    s     r,   c                  P  K   t               } t        | d      }|j                          d{    |j                  d      }|j	                  d      }g dt
        ffd	}|j                  t               j                  d
      |t        j                         |j                  ddddt        j                         d{    |j                  ddddidd       d{    t        j                  d       d{    |j!                         }t#        dt%        j&                  |dt(                      | j+                  d      }t#        dt-        |              |j/                  d       d{    t#        dt-                      y7 m7 7 7 7 %w)z!Demonstrate the event bus system.r!   )r0  r  Nuser_service)rK   notification_service)r   r   c                 z   K   j                  |        t        d| j                   d| j                          y w)NzReceived event: z - )r   printr>   r?   )r   received_eventss    r-   handle_user_eventz(example_usage.<locals>.handle_user_event+  s6     u% !1!1 2#emm_EFs   8;zuser.)r   zuser.created12345zuser@example.com)user_idemail)r>   r?   rN   zuser.updatedrs   zNew Name)r'  changeszsession-abc)r>   r?   rL   r   z
Bus Statistics: r   )indentdefaultz 
Persisted user.created events: T)r  z
Total received events: )r   r  r   r  r  r=   r  r   r   r8   r:   r  r   r'   rL  rM  r+  r#  rh   ri   rB   r  r   r  )r0  rz  	publisher
subscriberr%  r  r  r$  s          @r-   example_usager.    s     #$K
{
;C ))+ $$N$;I &&5K&LJ OGu G
 $$W-&44   

!#.@A##     

!#0DE$     --
  E	tzz%3GH
IJ &&~6F	-c&k]
;< ((D(
!!!	%c/&:%;
<=c .  "sY   +F&FBF&F F&'F (F&F"A9F&>F$?F&F& F&"F&$F&__main__z4%(asctime)s - %(name)s - %(levelname)s - %(message)s)levelformat)r    rn  )Kr%   rL  rh   rC   rj   syspathr   elestio_configr   r   psycopg2.extrasloggingr   abcr   r   dataclassesr   r   r   r	   r
   enumr   r   pathlibr   typingr   r   r   r   r   r   r   r   r   r   r   r   collectionsr   	functoolsr   r?  pickler   	getLoggerr"   r   setLevelDEBUGr   r/   r8   r=   r   r   r   r   r   r/  rp  r  r  rB   r  rZ   r   r  r.  basicConfigINFOrunr+   r,   r-   <module>rE     s  (     
 ; < )     # 0 0 (      $     
		8	$  D $ T  a
 a
 a
J 401UGTM* [ [| C C C2L L^
E EPp
 p
fM
 M
`[ [~
uS$s)^4 &C E 49>x zGllE GKK  r,   