"""
AIVA Queen Event Bus System - Asynchronous Communication Infrastructure

This module provides a complete event-driven communication system for AIVA Queen,
enabling decoupled, asynchronous messaging between components with full persistence,
filtering, dead letter handling, and replay capabilities.

Components:
    - Event: Core event data structure with metadata
    - EventBus: Central event dispatcher with async support
    - EventSubscriber: Subscribe to specific event types
    - EventPublisher: Publish events to the bus
    - EventFilter: Filter events by multiple criteria
    - EventPersistence: Persist events for replay and audit
    - DeadLetterQueue: Handle failed event deliveries

Author: AIVA Queen Orchestrator
Version: 1.0.0
"""

import asyncio
import json
import uuid
import hashlib
import sys
sys.path.append('/mnt/e/genesis-system/data/genesis-memory')
from elestio_config import PostgresConfig
import psycopg2
import psycopg2.extras
import logging
import threading
from abc import ABC, abstractmethod
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta
from enum import Enum, auto
from pathlib import Path
from typing import (
    Any, Callable, Dict, List, Optional, Set, Tuple,
    Union, Awaitable, TypeVar, Generic, Protocol
)
from collections import defaultdict
from functools import wraps
import traceback
import pickle
import gzip


# Configure module logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


class EventPriority(Enum):
    """Event priority levels for processing order."""
    CRITICAL = 0
    HIGH = 1
    NORMAL = 2
    LOW = 3
    BACKGROUND = 4


class EventStatus(Enum):
    """Status of an event in its lifecycle."""
    PENDING = auto()
    PROCESSING = auto()
    DELIVERED = auto()
    FAILED = auto()
    DEAD_LETTERED = auto()
    REPLAYED = auto()
    EXPIRED = auto()


class DeliveryPolicy(Enum):
    """Event delivery guarantee policies."""
    AT_MOST_ONCE = auto()    # Fire and forget
    AT_LEAST_ONCE = auto()   # Retry until acknowledged
    EXACTLY_ONCE = auto()    # Deduplicated delivery


@dataclass
class Event:
    """
    Core event data structure with full metadata support.

    Attributes:
        event_type: Category/type identifier for routing
        payload: The actual event data
        event_id: Unique identifier (auto-generated)
        timestamp: When the event was created
        source: Origin component/service identifier
        correlation_id: For tracking related events
        causation_id: The event that caused this one
        priority: Processing priority level
        ttl_seconds: Time-to-live before expiration
        metadata: Additional key-value metadata
        version: Schema version for evolution
        retry_count: Number of delivery attempts
    """
    event_type: str
    payload: Dict[str, Any]
    event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: datetime = field(default_factory=datetime.utcnow)
    source: str = "unknown"
    correlation_id: Optional[str] = None
    causation_id: Optional[str] = None
    priority: EventPriority = EventPriority.NORMAL
    ttl_seconds: int = 3600
    metadata: Dict[str, Any] = field(default_factory=dict)
    version: str = "1.0"
    retry_count: int = 0
    status: EventStatus = EventStatus.PENDING

    def __post_init__(self):
        """Validate and normalize event data."""
        if not self.event_type:
            raise ValueError("event_type cannot be empty")
        if not isinstance(self.payload, dict):
            raise ValueError("payload must be a dictionary")
        if isinstance(self.timestamp, str):
            self.timestamp = datetime.fromisoformat(self.timestamp)
        if isinstance(self.priority, int):
            self.priority = EventPriority(self.priority)
        if isinstance(self.status, str):
            self.status = EventStatus[self.status]

    @property
    def is_expired(self) -> bool:
        """Check if the event has exceeded its TTL."""
        expiry_time = self.timestamp + timedelta(seconds=self.ttl_seconds)
        return datetime.utcnow() > expiry_time

    @property
    def fingerprint(self) -> str:
        """Generate a content-based fingerprint for deduplication."""
        content = f"{self.event_type}:{self.source}:{json.dumps(self.payload, sort_keys=True)}"
        return hashlib.sha256(content.encode()).hexdigest()[:16]

    def to_dict(self) -> Dict[str, Any]:
        """Serialize event to dictionary."""
        return {
            "event_id": self.event_id,
            "event_type": self.event_type,
            "payload": self.payload,
            "timestamp": self.timestamp.isoformat(),
            "source": self.source,
            "correlation_id": self.correlation_id,
            "causation_id": self.causation_id,
            "priority": self.priority.value,
            "ttl_seconds": self.ttl_seconds,
            "metadata": self.metadata,
            "version": self.version,
            "retry_count": self.retry_count,
            "status": self.status.name
        }

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> "Event":
        """Deserialize event from dictionary."""
        data = data.copy()
        if "priority" in data:
            data["priority"] = EventPriority(data["priority"])
        if "status" in data:
            data["status"] = EventStatus[data["status"]]
        if "timestamp" in data and isinstance(data["timestamp"], str):
            data["timestamp"] = datetime.fromisoformat(data["timestamp"])
        return cls(**data)

    def derive(self, event_type: str, payload: Dict[str, Any], **kwargs) -> "Event":
        """Create a derived event maintaining correlation chain."""
        return Event(
            event_type=event_type,
            payload=payload,
            correlation_id=self.correlation_id or self.event_id,
            causation_id=self.event_id,
            source=kwargs.get("source", self.source),
            priority=kwargs.get("priority", self.priority),
            metadata={**self.metadata, **kwargs.get("metadata", {})}
        )


# Type alias for event handlers
EventHandler = Callable[[Event], Awaitable[None]]
SyncEventHandler = Callable[[Event], None]


class EventFilter:
    """
    Flexible event filtering with multiple criteria support.

    Supports filtering by:
        - Event types (exact match or pattern)
        - Source patterns
        - Payload field conditions
        - Priority ranges
        - Time windows
        - Custom predicates
    """

    def __init__(self):
        self._type_patterns: Set[str] = set()
        self._source_patterns: Set[str] = set()
        self._min_priority: Optional[EventPriority] = None
        self._max_priority: Optional[EventPriority] = None
        self._payload_conditions: List[Callable[[Dict], bool]] = []
        self._time_window: Optional[Tuple[datetime, datetime]] = None
        self._custom_predicates: List[Callable[[Event], bool]] = []
        self._exclude_types: Set[str] = set()

    def by_type(self, *event_types: str) -> "EventFilter":
        """Filter by exact event type matches."""
        self._type_patterns.update(event_types)
        return self

    def by_type_prefix(self, prefix: str) -> "EventFilter":
        """Filter by event type prefix pattern."""
        self._type_patterns.add(f"{prefix}*")
        return self

    def exclude_types(self, *event_types: str) -> "EventFilter":
        """Exclude specific event types."""
        self._exclude_types.update(event_types)
        return self

    def by_source(self, *sources: str) -> "EventFilter":
        """Filter by source identifiers."""
        self._source_patterns.update(sources)
        return self

    def by_priority_range(
        self,
        min_priority: Optional[EventPriority] = None,
        max_priority: Optional[EventPriority] = None
    ) -> "EventFilter":
        """Filter by priority range (inclusive)."""
        self._min_priority = min_priority
        self._max_priority = max_priority
        return self

    def by_payload(self, condition: Callable[[Dict], bool]) -> "EventFilter":
        """Add a payload condition predicate."""
        self._payload_conditions.append(condition)
        return self

    def by_payload_field(self, field: str, value: Any) -> "EventFilter":
        """Filter by exact payload field value."""
        self._payload_conditions.append(lambda p: p.get(field) == value)
        return self

    def by_time_window(
        self,
        start: Optional[datetime] = None,
        end: Optional[datetime] = None
    ) -> "EventFilter":
        """Filter by event timestamp window."""
        self._time_window = (
            start or datetime.min,
            end or datetime.max
        )
        return self

    def with_predicate(self, predicate: Callable[[Event], bool]) -> "EventFilter":
        """Add a custom filter predicate."""
        self._custom_predicates.append(predicate)
        return self

    def _match_type(self, event_type: str) -> bool:
        """Check if event type matches patterns."""
        if not self._type_patterns:
            return True
        for pattern in self._type_patterns:
            if pattern.endswith("*"):
                if event_type.startswith(pattern[:-1]):
                    return True
            elif event_type == pattern:
                return True
        return False

    def matches(self, event: Event) -> bool:
        """Check if an event matches all filter criteria."""
        # Check exclusions first
        if event.event_type in self._exclude_types:
            return False

        # Check type patterns
        if not self._match_type(event.event_type):
            return False

        # Check source patterns
        if self._source_patterns and event.source not in self._source_patterns:
            return False

        # Check priority range
        if self._min_priority is not None:
            if event.priority.value > self._min_priority.value:
                return False
        if self._max_priority is not None:
            if event.priority.value < self._max_priority.value:
                return False

        # Check time window
        if self._time_window:
            start, end = self._time_window
            if not (start <= event.timestamp <= end):
                return False

        # Check payload conditions
        for condition in self._payload_conditions:
            try:
                if not condition(event.payload):
                    return False
            except Exception:
                return False

        # Check custom predicates
        for predicate in self._custom_predicates:
            try:
                if not predicate(event):
                    return False
            except Exception:
                return False

        return True

    def __and__(self, other: "EventFilter") -> "EventFilter":
        """Combine filters with AND logic."""
        combined = EventFilter()
        combined._type_patterns = self._type_patterns | other._type_patterns
        combined._source_patterns = self._source_patterns | other._source_patterns
        combined._exclude_types = self._exclude_types | other._exclude_types
        combined._payload_conditions = self._payload_conditions + other._payload_conditions
        combined._custom_predicates = self._custom_predicates + other._custom_predicates

        # Handle priority ranges
        if self._min_priority and other._min_priority:
            combined._min_priority = EventPriority(
                min(self._min_priority.value, other._min_priority.value)
            )
        else:
            combined._min_priority = self._min_priority or other._min_priority

        return combined


@dataclass
class Subscription:
    """
    Represents an active event subscription.

    Attributes:
        subscriber_id: Unique identifier for the subscription
        handler: The callback function to invoke
        event_filter: Filter criteria for matching events
        is_async: Whether the handler is async
        max_retries: Maximum delivery retry attempts
        timeout_seconds: Handler execution timeout
        created_at: When the subscription was created
        is_active: Whether the subscription is currently active
    """
    subscriber_id: str
    handler: Union[EventHandler, SyncEventHandler]
    event_filter: EventFilter
    is_async: bool = True
    max_retries: int = 3
    timeout_seconds: float = 30.0
    created_at: datetime = field(default_factory=datetime.utcnow)
    is_active: bool = True
    delivery_policy: DeliveryPolicy = DeliveryPolicy.AT_LEAST_ONCE


class EventPersistence:
    """
    Persistent storage for events with PostgreSQL backend (Elestio).

    Provides:
        - Event archival and retrieval
        - Indexed queries by type, source, time
        - Compression for large payloads
        - Automatic cleanup of expired events
    """

    def __init__(
        self,
        db_path: Union[str, Path] = "event_store.db",
        compress_threshold: int = 1024,
        auto_cleanup_days: int = 30
    ):
        # db_path is kept for API compatibility but ignored; uses Elestio PostgreSQL
        self.compress_threshold = compress_threshold
        self.auto_cleanup_days = auto_cleanup_days
        self._lock = threading.Lock()
        self._init_database()

    def _get_conn(self):
        """Get a PostgreSQL connection from Elestio."""
        return psycopg2.connect(**PostgresConfig.get_connection_params())

    def _init_database(self):
        """Ensure PostgreSQL tables exist (tables should already exist in PG)."""
        conn = self._get_conn()
        try:
            with conn.cursor() as cur:
                cur.execute("""
                    CREATE TABLE IF NOT EXISTS eb_events (
                        event_id TEXT PRIMARY KEY,
                        event_type TEXT NOT NULL,
                        source TEXT NOT NULL,
                        timestamp TEXT NOT NULL,
                        priority INTEGER NOT NULL,
                        status TEXT NOT NULL,
                        correlation_id TEXT,
                        causation_id TEXT,
                        payload_compressed INTEGER DEFAULT 0,
                        payload_data BYTEA NOT NULL,
                        metadata TEXT,
                        version TEXT,
                        retry_count INTEGER DEFAULT 0,
                        ttl_seconds INTEGER,
                        created_at TEXT DEFAULT CURRENT_TIMESTAMP,
                        fingerprint TEXT
                    )
                """)
            conn.commit()
        finally:
            conn.close()

    def _serialize_payload(self, payload: Dict) -> Tuple[bytes, bool]:
        """Serialize and optionally compress payload."""
        data = json.dumps(payload).encode("utf-8")
        if len(data) > self.compress_threshold:
            return gzip.compress(data), True
        return data, False

    def _deserialize_payload(self, data: bytes, compressed: bool) -> Dict:
        """Deserialize and decompress payload if needed."""
        if compressed:
            data = gzip.decompress(data)
        return json.loads(data.decode("utf-8"))

    def store(self, event: Event) -> bool:
        """
        Persist an event to storage.

        Args:
            event: The event to store

        Returns:
            True if stored successfully, False otherwise
        """
        try:
            payload_data, compressed = self._serialize_payload(event.payload)

            with self._lock:
                conn = self._get_conn()
                try:
                    with conn.cursor() as cur:
                        cur.execute("""
                            INSERT INTO eb_events (
                                event_id, event_type, source, timestamp, priority,
                                status, correlation_id, causation_id, payload_compressed,
                                payload_data, metadata, version, retry_count, ttl_seconds,
                                fingerprint
                            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                            ON CONFLICT (event_id) DO UPDATE SET
                                event_type = EXCLUDED.event_type,
                                source = EXCLUDED.source,
                                timestamp = EXCLUDED.timestamp,
                                priority = EXCLUDED.priority,
                                status = EXCLUDED.status,
                                correlation_id = EXCLUDED.correlation_id,
                                causation_id = EXCLUDED.causation_id,
                                payload_compressed = EXCLUDED.payload_compressed,
                                payload_data = EXCLUDED.payload_data,
                                metadata = EXCLUDED.metadata,
                                version = EXCLUDED.version,
                                retry_count = EXCLUDED.retry_count,
                                ttl_seconds = EXCLUDED.ttl_seconds,
                                fingerprint = EXCLUDED.fingerprint
                        """, (
                            event.event_id,
                            event.event_type,
                            event.source,
                            event.timestamp.isoformat(),
                            event.priority.value,
                            event.status.name,
                            event.correlation_id,
                            event.causation_id,
                            1 if compressed else 0,
                            psycopg2.Binary(payload_data),
                            json.dumps(event.metadata),
                            event.version,
                            event.retry_count,
                            event.ttl_seconds,
                            event.fingerprint
                        ))
                    conn.commit()
                finally:
                    conn.close()
            return True
        except Exception as e:
            logger.error(f"Failed to store event {event.event_id}: {e}")
            return False

    def retrieve(self, event_id: str) -> Optional[Event]:
        """Retrieve a single event by ID."""
        conn = self._get_conn()
        try:
            with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
                cur.execute(
                    "SELECT * FROM eb_events WHERE event_id = %s",
                    (event_id,)
                )
                row = cur.fetchone()
                if row:
                    return self._row_to_event(row)
        finally:
            conn.close()
        return None

    def _row_to_event(self, row) -> Event:
        """Convert a database row to an Event object."""
        payload_data = row["payload_data"]
        if isinstance(payload_data, memoryview):
            payload_data = bytes(payload_data)
        payload = self._deserialize_payload(
            payload_data,
            bool(row["payload_compressed"])
        )
        return Event(
            event_id=row["event_id"],
            event_type=row["event_type"],
            source=row["source"],
            timestamp=datetime.fromisoformat(row["timestamp"]),
            priority=EventPriority(row["priority"]),
            status=EventStatus[row["status"]],
            correlation_id=row["correlation_id"],
            causation_id=row["causation_id"],
            payload=payload,
            metadata=json.loads(row["metadata"]) if row["metadata"] else {},
            version=row["version"],
            retry_count=row["retry_count"],
            ttl_seconds=row["ttl_seconds"]
        )

    def query(
        self,
        event_filter: Optional[EventFilter] = None,
        limit: int = 100,
        offset: int = 0,
        order_desc: bool = True
    ) -> List[Event]:
        """
        Query events with optional filtering.

        Args:
            event_filter: Optional filter criteria
            limit: Maximum events to return
            offset: Pagination offset
            order_desc: Order by timestamp descending

        Returns:
            List of matching events
        """
        order = "DESC" if order_desc else "ASC"

        conn = self._get_conn()
        try:
            with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
                cur.execute(
                    f"SELECT * FROM eb_events ORDER BY timestamp {order} LIMIT %s OFFSET %s",
                    (limit, offset)
                )
                rows = cur.fetchall()
        finally:
            conn.close()

        events = [self._row_to_event(row) for row in rows]

        if event_filter:
            events = [e for e in events if event_filter.matches(e)]

        return events

    def query_by_type(self, event_type: str, limit: int = 100) -> List[Event]:
        """Query events by type."""
        conn = self._get_conn()
        try:
            with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
                cur.execute(
                    "SELECT * FROM eb_events WHERE event_type = %s ORDER BY timestamp DESC LIMIT %s",
                    (event_type, limit)
                )
                return [self._row_to_event(row) for row in cur.fetchall()]
        finally:
            conn.close()

    def query_by_correlation(self, correlation_id: str) -> List[Event]:
        """Get all events in a correlation chain."""
        conn = self._get_conn()
        try:
            with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
                cur.execute(
                    """SELECT * FROM eb_events
                       WHERE correlation_id = %s OR event_id = %s
                       ORDER BY timestamp ASC""",
                    (correlation_id, correlation_id)
                )
                return [self._row_to_event(row) for row in cur.fetchall()]
        finally:
            conn.close()

    def update_status(self, event_id: str, status: EventStatus) -> bool:
        """Update an event's status."""
        try:
            with self._lock:
                conn = self._get_conn()
                try:
                    with conn.cursor() as cur:
                        cur.execute(
                            "UPDATE eb_events SET status = %s WHERE event_id = %s",
                            (status.name, event_id)
                        )
                    conn.commit()
                finally:
                    conn.close()
            return True
        except Exception as e:
            logger.error(f"Failed to update status for {event_id}: {e}")
            return False

    def increment_retry(self, event_id: str) -> int:
        """Increment retry count and return new value."""
        with self._lock:
            conn = self._get_conn()
            try:
                with conn.cursor() as cur:
                    cur.execute(
                        "UPDATE eb_events SET retry_count = retry_count + 1 WHERE event_id = %s",
                        (event_id,)
                    )
                    cur.execute(
                        "SELECT retry_count FROM eb_events WHERE event_id = %s",
                        (event_id,)
                    )
                    row = cur.fetchone()
                conn.commit()
                return row[0] if row else 0
            finally:
                conn.close()

    def cleanup_expired(self) -> int:
        """Remove expired events and return count deleted."""
        cutoff = datetime.utcnow() - timedelta(days=self.auto_cleanup_days)
        with self._lock:
            conn = self._get_conn()
            try:
                with conn.cursor() as cur:
                    cur.execute(
                        "DELETE FROM eb_events WHERE timestamp < %s",
                        (cutoff.isoformat(),)
                    )
                    count = cur.rowcount
                conn.commit()
                return count
            finally:
                conn.close()

    def check_duplicate(self, fingerprint: str) -> bool:
        """Check if an event with this fingerprint exists."""
        conn = self._get_conn()
        try:
            with conn.cursor() as cur:
                cur.execute(
                    "SELECT 1 FROM eb_events WHERE fingerprint = %s LIMIT 1",
                    (fingerprint,)
                )
                return cur.fetchone() is not None
        finally:
            conn.close()

    def get_statistics(self) -> Dict[str, Any]:
        """Get storage statistics."""
        conn = self._get_conn()
        try:
            with conn.cursor() as cur:
                cur.execute("SELECT COUNT(*) FROM eb_events")
                total = cur.fetchone()[0]
                cur.execute(
                    "SELECT event_type, COUNT(*) FROM eb_events GROUP BY event_type"
                )
                by_type = dict(cur.fetchall())
                cur.execute(
                    "SELECT status, COUNT(*) FROM eb_events GROUP BY status"
                )
                by_status = dict(cur.fetchall())

                return {
                    "total_events": total,
                    "by_type": by_type,
                    "by_status": by_status
                }
        finally:
            conn.close()


class DeadLetterQueue:
    """
    Handle failed event deliveries with retry and diagnostic support.

    Provides:
        - Storage of failed events with error context
        - Retry mechanisms with backoff
        - Analysis of failure patterns
        - Manual replay capabilities
    """

    def __init__(
        self,
        persistence: EventPersistence,
        max_retries: int = 5,
        base_delay_seconds: float = 1.0,
        max_delay_seconds: float = 300.0
    ):
        self.persistence = persistence
        self.max_retries = max_retries
        self.base_delay = base_delay_seconds
        self.max_delay = max_delay_seconds
        self._failed_events: Dict[str, Dict[str, Any]] = {}
        self._lock = threading.Lock()
        self._retry_tasks: Dict[str, asyncio.Task] = {}

    def enqueue(
        self,
        event: Event,
        error: Exception,
        subscriber_id: str
    ) -> None:
        """
        Add a failed event to the dead letter queue.

        Args:
            event: The event that failed
            error: The exception that occurred
            subscriber_id: Which subscriber failed to process it
        """
        with self._lock:
            if event.event_id not in self._failed_events:
                self._failed_events[event.event_id] = {
                    "event": event,
                    "failures": [],
                    "first_failure": datetime.utcnow(),
                    "last_failure": datetime.utcnow()
                }

            self._failed_events[event.event_id]["failures"].append({
                "subscriber_id": subscriber_id,
                "error_type": type(error).__name__,
                "error_message": str(error),
                "traceback": traceback.format_exc(),
                "timestamp": datetime.utcnow().isoformat(),
                "retry_count": event.retry_count
            })
            self._failed_events[event.event_id]["last_failure"] = datetime.utcnow()

        # Update event status in persistence
        event.status = EventStatus.DEAD_LETTERED
        self.persistence.store(event)

        logger.warning(
            f"Event {event.event_id} added to DLQ after failure in {subscriber_id}: {error}"
        )

    def get_backoff_delay(self, retry_count: int) -> float:
        """Calculate exponential backoff delay."""
        delay = self.base_delay * (2 ** retry_count)
        return min(delay, self.max_delay)

    async def retry_event(
        self,
        event_id: str,
        handler: EventHandler
    ) -> bool:
        """
        Attempt to retry a failed event.

        Args:
            event_id: The event to retry
            handler: The handler to use for retry

        Returns:
            True if retry succeeded, False otherwise
        """
        with self._lock:
            if event_id not in self._failed_events:
                logger.error(f"Event {event_id} not found in DLQ")
                return False

            entry = self._failed_events[event_id]
            event = entry["event"]

        if event.retry_count >= self.max_retries:
            logger.error(f"Event {event_id} exceeded max retries ({self.max_retries})")
            return False

        delay = self.get_backoff_delay(event.retry_count)
        logger.info(f"Retrying event {event_id} after {delay}s delay")

        await asyncio.sleep(delay)

        try:
            event.retry_count += 1
            event.status = EventStatus.PROCESSING
            await handler(event)

            # Success - remove from DLQ
            with self._lock:
                del self._failed_events[event_id]

            event.status = EventStatus.REPLAYED
            self.persistence.store(event)

            logger.info(f"Successfully retried event {event_id}")
            return True

        except Exception as e:
            logger.error(f"Retry failed for event {event_id}: {e}")
            self.enqueue(event, e, "retry_handler")
            return False

    def get_failed_events(self) -> List[Dict[str, Any]]:
        """Get all events in the dead letter queue."""
        with self._lock:
            return [
                {
                    "event_id": eid,
                    "event_type": entry["event"].event_type,
                    "failure_count": len(entry["failures"]),
                    "first_failure": entry["first_failure"].isoformat(),
                    "last_failure": entry["last_failure"].isoformat(),
                    "last_error": entry["failures"][-1]["error_message"]
                }
                for eid, entry in self._failed_events.items()
            ]

    def get_failure_details(self, event_id: str) -> Optional[Dict[str, Any]]:
        """Get detailed failure information for an event."""
        with self._lock:
            if event_id in self._failed_events:
                entry = self._failed_events[event_id]
                return {
                    "event": entry["event"].to_dict(),
                    "failures": entry["failures"],
                    "first_failure": entry["first_failure"].isoformat(),
                    "last_failure": entry["last_failure"].isoformat()
                }
        return None

    def purge(self, event_id: Optional[str] = None) -> int:
        """
        Remove events from the DLQ.

        Args:
            event_id: Specific event to remove, or None for all

        Returns:
            Number of events removed
        """
        with self._lock:
            if event_id:
                if event_id in self._failed_events:
                    del self._failed_events[event_id]
                    return 1
                return 0
            else:
                count = len(self._failed_events)
                self._failed_events.clear()
                return count

    def analyze_failures(self) -> Dict[str, Any]:
        """Analyze failure patterns for diagnostics."""
        with self._lock:
            if not self._failed_events:
                return {"total_failures": 0, "patterns": {}}

            error_types: Dict[str, int] = defaultdict(int)
            subscriber_failures: Dict[str, int] = defaultdict(int)
            event_type_failures: Dict[str, int] = defaultdict(int)

            for entry in self._failed_events.values():
                event_type_failures[entry["event"].event_type] += 1
                for failure in entry["failures"]:
                    error_types[failure["error_type"]] += 1
                    subscriber_failures[failure["subscriber_id"]] += 1

            return {
                "total_failed_events": len(self._failed_events),
                "total_failure_attempts": sum(
                    len(e["failures"]) for e in self._failed_events.values()
                ),
                "by_error_type": dict(error_types),
                "by_subscriber": dict(subscriber_failures),
                "by_event_type": dict(event_type_failures)
            }


class EventPublisher:
    """
    Publish events to the event bus with batching and validation.

    Features:
        - Single and batch publishing
        - Event validation
        - Automatic metadata enrichment
        - Publishing metrics
    """

    def __init__(self, source: str):
        self.source = source
        self._bus: Optional["EventBus"] = None
        self._published_count = 0
        self._failed_count = 0

    def bind(self, bus: "EventBus") -> "EventPublisher":
        """Bind this publisher to an event bus."""
        self._bus = bus
        return self

    async def publish(
        self,
        event_type: str,
        payload: Dict[str, Any],
        priority: EventPriority = EventPriority.NORMAL,
        correlation_id: Optional[str] = None,
        metadata: Optional[Dict[str, Any]] = None,
        ttl_seconds: int = 3600
    ) -> Event:
        """
        Publish a single event.

        Args:
            event_type: Type identifier for routing
            payload: Event data
            priority: Processing priority
            correlation_id: For tracking related events
            metadata: Additional metadata
            ttl_seconds: Time-to-live

        Returns:
            The published Event object
        """
        if not self._bus:
            raise RuntimeError("Publisher not bound to an event bus")

        event = Event(
            event_type=event_type,
            payload=payload,
            source=self.source,
            priority=priority,
            correlation_id=correlation_id,
            metadata=metadata or {},
            ttl_seconds=ttl_seconds
        )

        try:
            await self._bus.dispatch(event)
            self._published_count += 1
            return event
        except Exception as e:
            self._failed_count += 1
            raise

    async def publish_event(self, event: Event) -> Event:
        """Publish a pre-constructed event."""
        if not self._bus:
            raise RuntimeError("Publisher not bound to an event bus")

        event.source = self.source
        await self._bus.dispatch(event)
        self._published_count += 1
        return event

    async def publish_batch(self, events: List[Event]) -> List[Event]:
        """
        Publish multiple events efficiently.

        Args:
            events: List of events to publish

        Returns:
            List of successfully published events
        """
        if not self._bus:
            raise RuntimeError("Publisher not bound to an event bus")

        published = []
        for event in events:
            event.source = self.source
            try:
                await self._bus.dispatch(event)
                published.append(event)
                self._published_count += 1
            except Exception as e:
                self._failed_count += 1
                logger.error(f"Failed to publish event {event.event_id}: {e}")

        return published

    def get_statistics(self) -> Dict[str, Any]:
        """Get publishing statistics."""
        return {
            "source": self.source,
            "published_count": self._published_count,
            "failed_count": self._failed_count,
            "success_rate": (
                self._published_count / (self._published_count + self._failed_count)
                if (self._published_count + self._failed_count) > 0 else 1.0
            )
        }


class EventSubscriber:
    """
    Subscribe to events from the event bus.

    Features:
        - Type-safe subscriptions
        - Filter-based routing
        - Automatic handler wrapping
        - Subscription management
    """

    def __init__(self, subscriber_id: str):
        self.subscriber_id = subscriber_id
        self._bus: Optional["EventBus"] = None
        self._subscriptions: Dict[str, Subscription] = {}
        self._received_count = 0
        self._error_count = 0

    def bind(self, bus: "EventBus") -> "EventSubscriber":
        """Bind this subscriber to an event bus."""
        self._bus = bus
        return self

    def subscribe(
        self,
        event_filter: EventFilter,
        handler: Union[EventHandler, SyncEventHandler],
        max_retries: int = 3,
        timeout_seconds: float = 30.0,
        delivery_policy: DeliveryPolicy = DeliveryPolicy.AT_LEAST_ONCE
    ) -> str:
        """
        Subscribe to events matching the filter.

        Args:
            event_filter: Criteria for matching events
            handler: Callback function for matched events
            max_retries: Max delivery retry attempts
            timeout_seconds: Handler timeout
            delivery_policy: Delivery guarantee level

        Returns:
            Subscription ID
        """
        if not self._bus:
            raise RuntimeError("Subscriber not bound to an event bus")

        subscription_id = f"{self.subscriber_id}:{uuid.uuid4().hex[:8]}"
        is_async = asyncio.iscoroutinefunction(handler)

        subscription = Subscription(
            subscriber_id=subscription_id,
            handler=handler,
            event_filter=event_filter,
            is_async=is_async,
            max_retries=max_retries,
            timeout_seconds=timeout_seconds,
            delivery_policy=delivery_policy
        )

        self._subscriptions[subscription_id] = subscription
        self._bus._register_subscription(subscription)

        logger.info(f"Registered subscription {subscription_id}")
        return subscription_id

    def subscribe_to_type(
        self,
        event_type: str,
        handler: Union[EventHandler, SyncEventHandler],
        **kwargs
    ) -> str:
        """Convenience method to subscribe to a specific event type."""
        return self.subscribe(
            EventFilter().by_type(event_type),
            handler,
            **kwargs
        )

    def unsubscribe(self, subscription_id: str) -> bool:
        """
        Remove a subscription.

        Args:
            subscription_id: The subscription to remove

        Returns:
            True if removed, False if not found
        """
        if subscription_id in self._subscriptions:
            del self._subscriptions[subscription_id]
            if self._bus:
                self._bus._unregister_subscription(subscription_id)
            logger.info(f"Unsubscribed {subscription_id}")
            return True
        return False

    def unsubscribe_all(self) -> int:
        """Remove all subscriptions and return count."""
        count = len(self._subscriptions)
        for sub_id in list(self._subscriptions.keys()):
            self.unsubscribe(sub_id)
        return count

    def pause(self, subscription_id: str) -> bool:
        """Temporarily pause a subscription."""
        if subscription_id in self._subscriptions:
            self._subscriptions[subscription_id].is_active = False
            return True
        return False

    def resume(self, subscription_id: str) -> bool:
        """Resume a paused subscription."""
        if subscription_id in self._subscriptions:
            self._subscriptions[subscription_id].is_active = True
            return True
        return False

    def get_subscriptions(self) -> List[Dict[str, Any]]:
        """Get information about active subscriptions."""
        return [
            {
                "subscription_id": sub.subscriber_id,
                "is_async": sub.is_async,
                "is_active": sub.is_active,
                "max_retries": sub.max_retries,
                "created_at": sub.created_at.isoformat()
            }
            for sub in self._subscriptions.values()
        ]

    def get_statistics(self) -> Dict[str, Any]:
        """Get subscriber statistics."""
        return {
            "subscriber_id": self.subscriber_id,
            "subscription_count": len(self._subscriptions),
            "active_subscriptions": sum(
                1 for s in self._subscriptions.values() if s.is_active
            ),
            "received_count": self._received_count,
            "error_count": self._error_count
        }


class EventBus:
    """
    Central event dispatcher with async support and delivery guarantees.

    Features:
        - Asynchronous event dispatch
        - Priority-based processing
        - Delivery guarantees (at-most-once, at-least-once, exactly-once)
        - Event persistence and replay
        - Dead letter queue for failed events
        - Metrics and monitoring
    """

    def __init__(
        self,
        persistence: Optional[EventPersistence] = None,
        enable_persistence: bool = True,
        worker_count: int = 4,
        max_queue_size: int = 10000
    ):
        self._subscriptions: Dict[str, Subscription] = {}
        self._type_index: Dict[str, Set[str]] = defaultdict(set)
        self._event_queue: asyncio.PriorityQueue = None
        self._persistence = persistence or (
            EventPersistence() if enable_persistence else None
        )
        self._dlq = DeadLetterQueue(self._persistence) if self._persistence else None
        self._worker_count = worker_count
        self._max_queue_size = max_queue_size
        self._workers: List[asyncio.Task] = []
        self._is_running = False
        self._lock = asyncio.Lock()
        self._processed_fingerprints: Set[str] = set()
        self._fingerprint_ttl = 3600  # 1 hour

        # Metrics
        self._dispatched_count = 0
        self._delivered_count = 0
        self._failed_count = 0
        self._start_time: Optional[datetime] = None

    async def start(self):
        """Start the event bus workers."""
        if self._is_running:
            return

        self._event_queue = asyncio.PriorityQueue(maxsize=self._max_queue_size)
        self._is_running = True
        self._start_time = datetime.utcnow()

        # Start worker tasks
        for i in range(self._worker_count):
            worker = asyncio.create_task(self._worker_loop(f"worker-{i}"))
            self._workers.append(worker)

        logger.info(f"EventBus started with {self._worker_count} workers")

    async def stop(self, graceful: bool = True):
        """
        Stop the event bus.

        Args:
            graceful: If True, wait for pending events to be processed
        """
        self._is_running = False

        if graceful:
            # Wait for queue to drain
            await self._event_queue.join()

        # Cancel workers
        for worker in self._workers:
            worker.cancel()

        await asyncio.gather(*self._workers, return_exceptions=True)
        self._workers.clear()

        logger.info("EventBus stopped")

    async def _worker_loop(self, worker_id: str):
        """Worker loop for processing events from the queue."""
        logger.debug(f"Worker {worker_id} started")

        while self._is_running:
            try:
                # Get event from priority queue (blocks until available)
                priority, event = await asyncio.wait_for(
                    self._event_queue.get(),
                    timeout=1.0
                )
            except asyncio.TimeoutError:
                continue
            except asyncio.CancelledError:
                break

            try:
                await self._process_event(event)
            except Exception as e:
                logger.error(f"Worker {worker_id} error processing event: {e}")
            finally:
                self._event_queue.task_done()

        logger.debug(f"Worker {worker_id} stopped")

    async def _process_event(self, event: Event):
        """Process a single event by dispatching to matching subscribers."""
        if event.is_expired:
            event.status = EventStatus.EXPIRED
            if self._persistence:
                self._persistence.store(event)
            logger.debug(f"Event {event.event_id} expired, skipping")
            return

        event.status = EventStatus.PROCESSING

        # Find matching subscriptions
        matching_subs = self._find_matching_subscriptions(event)

        if not matching_subs:
            event.status = EventStatus.DELIVERED  # No subscribers is not a failure
            if self._persistence:
                self._persistence.store(event)
            return

        # Dispatch to all matching subscribers
        delivery_tasks = []
        for subscription in matching_subs:
            if not subscription.is_active:
                continue

            # Check for exactly-once delivery
            if subscription.delivery_policy == DeliveryPolicy.EXACTLY_ONCE:
                if event.fingerprint in self._processed_fingerprints:
                    logger.debug(f"Skipping duplicate event {event.event_id}")
                    continue

            task = asyncio.create_task(
                self._deliver_to_subscriber(event, subscription)
            )
            delivery_tasks.append(task)

        # Wait for all deliveries
        results = await asyncio.gather(*delivery_tasks, return_exceptions=True)

        # Check results
        all_success = all(r is True for r in results if not isinstance(r, Exception))

        if all_success:
            event.status = EventStatus.DELIVERED
            self._delivered_count += 1
            self._processed_fingerprints.add(event.fingerprint)
        else:
            event.status = EventStatus.FAILED
            self._failed_count += 1

        if self._persistence:
            self._persistence.store(event)

    async def _deliver_to_subscriber(
        self,
        event: Event,
        subscription: Subscription
    ) -> bool:
        """Deliver an event to a specific subscriber."""
        try:
            if subscription.is_async:
                await asyncio.wait_for(
                    subscription.handler(event),
                    timeout=subscription.timeout_seconds
                )
            else:
                # Run sync handler in thread pool
                await asyncio.get_event_loop().run_in_executor(
                    None,
                    subscription.handler,
                    event
                )
            return True

        except asyncio.TimeoutError:
            logger.warning(
                f"Handler timeout for {subscription.subscriber_id} on event {event.event_id}"
            )
            if self._dlq:
                self._dlq.enqueue(
                    event,
                    TimeoutError(f"Handler timed out after {subscription.timeout_seconds}s"),
                    subscription.subscriber_id
                )
            return False

        except Exception as e:
            logger.error(
                f"Handler error for {subscription.subscriber_id} on event {event.event_id}: {e}"
            )
            if self._dlq and subscription.delivery_policy != DeliveryPolicy.AT_MOST_ONCE:
                self._dlq.enqueue(event, e, subscription.subscriber_id)
            return False

    def _find_matching_subscriptions(self, event: Event) -> List[Subscription]:
        """Find all subscriptions that match an event."""
        matching = []

        # First check type index for quick matching
        potential_ids = self._type_index.get(event.event_type, set())
        potential_ids = potential_ids | self._type_index.get("*", set())

        # Check each potential subscription
        for sub_id in potential_ids:
            if sub_id in self._subscriptions:
                sub = self._subscriptions[sub_id]
                if sub.event_filter.matches(event):
                    matching.append(sub)

        # Also check subscriptions not in the type index
        for sub_id, sub in self._subscriptions.items():
            if sub_id not in potential_ids and sub.event_filter.matches(event):
                matching.append(sub)

        return matching

    async def dispatch(self, event: Event) -> str:
        """
        Dispatch an event for processing.

        Args:
            event: The event to dispatch

        Returns:
            The event ID
        """
        if not self._is_running:
            raise RuntimeError("EventBus is not running")

        # Persist the event
        if self._persistence:
            self._persistence.store(event)

        # Add to priority queue
        await self._event_queue.put((event.priority.value, event))
        self._dispatched_count += 1

        logger.debug(f"Dispatched event {event.event_id} of type {event.event_type}")
        return event.event_id

    def _register_subscription(self, subscription: Subscription):
        """Register a subscription with the bus."""
        self._subscriptions[subscription.subscriber_id] = subscription

        # Update type index
        for pattern in subscription.event_filter._type_patterns:
            self._type_index[pattern].add(subscription.subscriber_id)

        if not subscription.event_filter._type_patterns:
            self._type_index["*"].add(subscription.subscriber_id)

    def _unregister_subscription(self, subscription_id: str):
        """Remove a subscription from the bus."""
        if subscription_id in self._subscriptions:
            del self._subscriptions[subscription_id]

        # Clean up type index
        for type_set in self._type_index.values():
            type_set.discard(subscription_id)

    async def replay_events(
        self,
        event_filter: Optional[EventFilter] = None,
        from_time: Optional[datetime] = None,
        to_time: Optional[datetime] = None,
        limit: int = 1000
    ) -> int:
        """
        Replay historical events from persistence.

        Args:
            event_filter: Optional filter for events to replay
            from_time: Start of time range
            to_time: End of time range
            limit: Maximum events to replay

        Returns:
            Number of events replayed
        """
        if not self._persistence:
            raise RuntimeError("Persistence not enabled")

        # Build filter with time constraints
        replay_filter = event_filter or EventFilter()
        if from_time or to_time:
            replay_filter = replay_filter.by_time_window(from_time, to_time)

        events = self._persistence.query(
            event_filter=replay_filter,
            limit=limit,
            order_desc=False  # Replay in chronological order
        )

        replayed = 0
        for event in events:
            event.status = EventStatus.REPLAYED
            event.metadata["replayed_at"] = datetime.utcnow().isoformat()
            await self.dispatch(event)
            replayed += 1

        logger.info(f"Replayed {replayed} events")
        return replayed

    def create_publisher(self, source: str) -> EventPublisher:
        """Create a bound publisher for this bus."""
        return EventPublisher(source).bind(self)

    def create_subscriber(self, subscriber_id: str) -> EventSubscriber:
        """Create a bound subscriber for this bus."""
        return EventSubscriber(subscriber_id).bind(self)

    def get_dead_letter_queue(self) -> Optional[DeadLetterQueue]:
        """Get the dead letter queue instance."""
        return self._dlq

    def get_statistics(self) -> Dict[str, Any]:
        """Get comprehensive bus statistics."""
        uptime = (
            (datetime.utcnow() - self._start_time).total_seconds()
            if self._start_time else 0
        )

        stats = {
            "is_running": self._is_running,
            "uptime_seconds": uptime,
            "worker_count": self._worker_count,
            "subscription_count": len(self._subscriptions),
            "queue_size": self._event_queue.qsize() if self._event_queue else 0,
            "dispatched_count": self._dispatched_count,
            "delivered_count": self._delivered_count,
            "failed_count": self._failed_count,
            "events_per_second": (
                self._dispatched_count / uptime if uptime > 0 else 0
            )
        }

        if self._persistence:
            stats["persistence"] = self._persistence.get_statistics()

        if self._dlq:
            stats["dead_letter_queue"] = self._dlq.analyze_failures()

        return stats


# Convenience decorators for event handlers
def event_handler(event_types: Union[str, List[str]]):
    """
    Decorator to mark a function as an event handler.

    Usage:
        @event_handler("user.created")
        async def handle_user_created(event: Event):
            pass
    """
    def decorator(func):
        if isinstance(event_types, str):
            func._event_types = [event_types]
        else:
            func._event_types = event_types
        func._is_event_handler = True
        return func
    return decorator


def with_retry(max_retries: int = 3, delay: float = 1.0):
    """
    Decorator to add retry logic to an event handler.

    Usage:
        @with_retry(max_retries=5, delay=2.0)
        async def handle_event(event: Event):
            pass
    """
    def decorator(func):
        @wraps(func)
        async def wrapper(event: Event):
            last_error = None
            for attempt in range(max_retries + 1):
                try:
                    return await func(event)
                except Exception as e:
                    last_error = e
                    if attempt < max_retries:
                        await asyncio.sleep(delay * (2 ** attempt))
            raise last_error
        return wrapper
    return decorator


# Example usage and testing
async def example_usage():
    """Demonstrate the event bus system."""

    # Create the event bus with persistence (uses Elestio PostgreSQL)
    persistence = EventPersistence()
    bus = EventBus(persistence=persistence, worker_count=4)

    # Start the bus
    await bus.start()

    # Create a publisher
    publisher = bus.create_publisher(source="user_service")

    # Create a subscriber
    subscriber = bus.create_subscriber(subscriber_id="notification_service")

    # Define an event handler
    received_events = []

    async def handle_user_event(event: Event):
        received_events.append(event)
        print(f"Received event: {event.event_type} - {event.payload}")

    # Subscribe to user events
    subscriber.subscribe(
        EventFilter().by_type_prefix("user."),
        handle_user_event,
        delivery_policy=DeliveryPolicy.AT_LEAST_ONCE
    )

    # Publish some events
    await publisher.publish(
        event_type="user.created",
        payload={"user_id": "12345", "email": "user@example.com"},
        priority=EventPriority.HIGH
    )

    await publisher.publish(
        event_type="user.updated",
        payload={"user_id": "12345", "changes": {"name": "New Name"}},
        correlation_id="session-abc"
    )

    # Wait for processing
    await asyncio.sleep(1)

    # Get statistics
    stats = bus.get_statistics()
    print(f"\nBus Statistics: {json.dumps(stats, indent=2, default=str)}")

    # Query persisted events
    events = persistence.query_by_type("user.created")
    print(f"\nPersisted user.created events: {len(events)}")

    # Stop the bus
    await bus.stop(graceful=True)

    print(f"\nTotal received events: {len(received_events)}")


if __name__ == "__main__":
    # Configure logging
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    )

    # Run the example
    asyncio.run(example_usage())
