#!/usr/bin/env python3
"""
Tests for Story 5.08 (Track B): Module 5 BB Integration Test Suite
                                  Cold Ledger + Saga + Session + Interceptor + Router

Black Box tests: verify the public contracts of all Module 5 components
as seen from the outside — correct return types, lifecycle semantics,
error handling, and failure isolation.

ALL external calls (psycopg2, Redis) are mocked. Zero real I/O.

Story: 5.08
Files under test:
  core/storage/postgres_schema.py
  core/storage/cold_ledger.py
  core/storage/saga_writer.py
  core/storage/session_store.py
  core/storage/cold_ledger_interceptor.py
  core/storage/shadow_router.py
"""

from __future__ import annotations

import asyncio
import os
import sys
import pathlib

sys.path.insert(0, "/mnt/e/genesis-system")

import pytest
from datetime import datetime, timezone
from unittest.mock import MagicMock, patch, call


# ---------------------------------------------------------------------------
# Imports under test
# ---------------------------------------------------------------------------

from core.storage.postgres_schema import create_all_tables
from core.storage.cold_ledger import ColdLedger, SwarmSaga
from core.storage.saga_writer import SwarmSagaWriter
from core.storage.session_store import SessionStore
from core.storage.cold_ledger_interceptor import ColdLedgerInterceptor
from core.storage.shadow_router import ShadowRouter, ShadowResult, VALID_EFFECT_TYPES


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------

def _make_mock_conn() -> MagicMock:
    """Return a mock psycopg2 connection with context-manager cursor support."""
    conn = MagicMock()
    cursor = MagicMock()
    cursor.fetchall.return_value = []
    cursor.fetchone.return_value = None
    cursor.rowcount = 0
    conn.cursor.return_value.__enter__ = MagicMock(return_value=cursor)
    conn.cursor.return_value.__exit__ = MagicMock(return_value=False)
    conn._cursor = cursor
    return conn


def _make_mock_pool(conn=None) -> MagicMock:
    """Return a mock ThreadedConnectionPool that returns *conn*."""
    if conn is None:
        conn = _make_mock_conn()
    pool = MagicMock()
    pool.getconn.return_value = conn
    pool.putconn = MagicMock()
    return pool


def _make_cold_ledger(conn=None) -> tuple[ColdLedger, MagicMock, MagicMock]:
    """Create a ColdLedger with a fully mocked pool + conn.

    Returns (ledger, pool, conn).
    """
    if conn is None:
        conn = _make_mock_conn()
    pool = _make_mock_pool(conn)

    with patch("psycopg2.pool.ThreadedConnectionPool", return_value=pool):
        ledger = ColdLedger(connection_params={
            "host": "localhost", "port": 5432,
            "user": "test", "password": "test", "dbname": "test",
        })

    return ledger, pool, conn


def _make_session_store(conn=None) -> tuple[SessionStore, MagicMock, MagicMock]:
    """Create a SessionStore with a fully mocked pool + conn."""
    if conn is None:
        conn = _make_mock_conn()
    pool = _make_mock_pool(conn)

    with patch("psycopg2.pool.ThreadedConnectionPool", return_value=pool):
        store = SessionStore(connection_params={
            "host": "localhost", "port": 5432,
            "user": "test", "password": "test", "dbname": "test",
        })

    return store, pool, conn


def _run(coro):
    """Execute an async coroutine synchronously."""
    return asyncio.get_event_loop().run_until_complete(coro)


# ---------------------------------------------------------------------------
# BB1 — postgres_schema.create_all_tables runs without error
# ---------------------------------------------------------------------------

class TestBB1CreateAllTablesNoError:
    """BB1: create_all_tables(conn) runs without raising any exception."""

    def test_no_exception_raised(self):
        conn = _make_mock_conn()
        create_all_tables(conn)  # must not raise

    def test_commit_called(self):
        conn = _make_mock_conn()
        create_all_tables(conn)
        conn.commit.assert_called_once()


# ---------------------------------------------------------------------------
# BB2 — create_all_tables is idempotent (call twice, no error)
# ---------------------------------------------------------------------------

class TestBB2CreateAllTablesIdempotent:
    """BB2: Calling create_all_tables() twice must not raise."""

    def test_idempotent_double_call(self):
        conn = _make_mock_conn()
        create_all_tables(conn)
        create_all_tables(conn)  # second call must also succeed

    def test_commit_called_twice(self):
        conn = _make_mock_conn()
        create_all_tables(conn)
        create_all_tables(conn)
        assert conn.commit.call_count == 2


# ---------------------------------------------------------------------------
# BB3 — ColdLedger.write_event returns event UUID string
# ---------------------------------------------------------------------------

class TestBB3WriteEventReturnsUUID:
    """BB3: ColdLedger.write_event() returns a non-empty UUID string."""

    def test_returns_string(self):
        ledger, pool, conn = _make_cold_ledger()
        result = ledger.write_event("session-id", "dispatch_start", {"k": "v"})
        assert isinstance(result, str)

    def test_returns_non_empty_uuid(self):
        ledger, pool, conn = _make_cold_ledger()
        result = ledger.write_event("session-id", "dispatch_start", {})
        import uuid
        # Must be a valid UUID4
        parsed = uuid.UUID(result, version=4)
        assert str(parsed) == result

    def test_different_calls_return_different_uuids(self):
        ledger, pool, conn = _make_cold_ledger()
        id1 = ledger.write_event("s1", "type_a", {})
        id2 = ledger.write_event("s1", "type_b", {})
        assert id1 != id2

    def test_connection_released_after_write(self):
        ledger, pool, conn = _make_cold_ledger()
        ledger.write_event("s1", "type_a", {})
        pool.putconn.assert_called_once_with(conn)


# ---------------------------------------------------------------------------
# BB4 — ColdLedger.get_events returns list of dicts
# ---------------------------------------------------------------------------

class TestBB4GetEventsReturnsList:
    """BB4: ColdLedger.get_events() returns a list (may be empty)."""

    def test_returns_list_when_no_rows(self):
        ledger, pool, conn = _make_cold_ledger()
        # fetchall returns [] by default in our mock
        result = ledger.get_events("session-id")
        assert isinstance(result, list)

    def test_returns_list_of_dicts_when_rows_present(self):
        ledger, pool, conn = _make_cold_ledger()
        mock_row = {"id": "abc", "session_id": "s1", "event_type": "t",
                    "payload": {}, "created_at": datetime.utcnow()}
        conn._cursor.fetchall.return_value = [mock_row]
        result = ledger.get_events("session-id")
        assert isinstance(result, list)
        assert all(isinstance(r, dict) for r in result)

    def test_connection_released_after_get(self):
        ledger, pool, conn = _make_cold_ledger()
        ledger.get_events("session-id")
        pool.putconn.assert_called_once_with(conn)


# ---------------------------------------------------------------------------
# BB5 — ColdLedger.write_saga + get_saga round-trip
# ---------------------------------------------------------------------------

class TestBB5WriteSagaGetSagaRoundTrip:
    """BB5: write_saga stores a saga; get_saga returns a SwarmSaga (mocked)."""

    def _make_saga(self) -> SwarmSaga:
        return SwarmSaga(
            saga_id="saga-id-001",
            session_id="session-id-001",
            orchestrator_dag={"nodes": []},
            proposed_deltas=[],
            resolved_state=None,
            status="RUNNING",
            created_at=datetime.now(tz=timezone.utc),
        )

    def test_write_saga_returns_saga_id(self):
        ledger, pool, conn = _make_cold_ledger()
        saga = self._make_saga()
        result = ledger.write_saga(saga)
        assert result == saga.saga_id

    def test_write_saga_commits(self):
        ledger, pool, conn = _make_cold_ledger()
        saga = self._make_saga()
        ledger.write_saga(saga)
        conn.commit.assert_called_once()

    def test_get_saga_returns_none_when_not_found(self):
        ledger, pool, conn = _make_cold_ledger()
        conn._cursor.fetchone.return_value = None
        result = ledger.get_saga("nonexistent-id")
        assert result is None

    def test_get_saga_returns_swarm_saga_when_found(self):
        ledger, pool, conn = _make_cold_ledger()
        now = datetime.now(tz=timezone.utc)
        row = {
            "saga_id": "saga-id-001",
            "session_id": "session-id-001",
            "orchestrator_dag": {"nodes": []},
            "proposed_deltas": [],
            "resolved_state": None,
            "status": "RUNNING",
            "created_at": now,
        }
        conn._cursor.fetchone.return_value = row
        result = ledger.get_saga("saga-id-001")
        assert isinstance(result, SwarmSaga)
        assert result.saga_id == "saga-id-001"
        assert result.status == "RUNNING"


# ---------------------------------------------------------------------------
# BB6 — SwarmSagaWriter.open_saga creates saga, get_saga returns it
# ---------------------------------------------------------------------------

class TestBB6SwarmSagaWriterOpenSaga:
    """BB6: SwarmSagaWriter.open_saga() returns a UUID string saga_id."""

    def _make_writer(self):
        ledger, pool, conn = _make_cold_ledger()
        writer = SwarmSagaWriter(cold_ledger=ledger)
        writer._ledger = ledger
        writer._pool = pool
        writer._conn = conn
        return writer, ledger, pool, conn

    def test_open_saga_returns_string_uuid(self):
        writer, ledger, pool, conn = self._make_writer()
        saga_id = writer.open_saga("session-id", {"nodes": []})
        import uuid
        parsed = uuid.UUID(saga_id, version=4)
        assert str(parsed) == saga_id

    def test_open_saga_calls_write_saga_on_ledger(self):
        writer, ledger, pool, conn = self._make_writer()
        with patch.object(ledger, "write_saga", return_value="saga-id") as mock_ws:
            writer.open_saga("session-id", {"nodes": ["a"]})
            mock_ws.assert_called_once()
            # Verify the saga passed has status RUNNING
            passed_saga = mock_ws.call_args[0][0]
            assert passed_saga.status == "RUNNING"
            assert passed_saga.session_id == "session-id"
            assert passed_saga.proposed_deltas == []

    def test_get_saga_delegates_to_cold_ledger(self):
        writer, ledger, pool, conn = self._make_writer()
        with patch.object(ledger, "get_saga", return_value=None) as mock_get:
            writer.get_saga("some-saga-id")
            mock_get.assert_called_once_with("some-saga-id")


# ---------------------------------------------------------------------------
# BB7 — SwarmSagaWriter.record_proposed_delta appends to saga
# ---------------------------------------------------------------------------

class TestBB7RecordProposedDelta:
    """BB7: record_proposed_delta executes SQL UPDATE and commits."""

    def _make_writer(self):
        conn = _make_mock_conn()
        ledger, pool, _ = _make_cold_ledger(conn)
        # Override pool conn to our controlled conn
        pool.getconn.return_value = conn
        writer = SwarmSagaWriter(cold_ledger=ledger)
        return writer, ledger, pool, conn

    def test_record_delta_commits(self):
        writer, ledger, pool, conn = self._make_writer()
        writer.record_proposed_delta("saga-id", "agent-forge", {"key": "val"})
        conn.commit.assert_called_once()

    def test_record_delta_releases_connection(self):
        writer, ledger, pool, conn = self._make_writer()
        writer.record_proposed_delta("saga-id", "agent-forge", {"key": "val"})
        pool.putconn.assert_called_once_with(conn)

    def test_record_delta_executes_sql(self):
        writer, ledger, pool, conn = self._make_writer()
        writer.record_proposed_delta("saga-id", "agent-forge", {"key": "val"})
        cur = conn._cursor
        cur.execute.assert_called_once()


# ---------------------------------------------------------------------------
# BB8 — SwarmSagaWriter.close_saga sets status
# ---------------------------------------------------------------------------

class TestBB8CloseSagaSetsStatus:
    """BB8: close_saga() executes an UPDATE and commits for valid statuses."""

    def _make_writer(self):
        conn = _make_mock_conn()
        ledger, pool, _ = _make_cold_ledger(conn)
        pool.getconn.return_value = conn
        writer = SwarmSagaWriter(cold_ledger=ledger)
        return writer, ledger, pool, conn

    def test_close_saga_completed_no_raise(self):
        writer, ledger, pool, conn = self._make_writer()
        writer.close_saga("saga-id", {"final": True}, "COMPLETED")
        conn.commit.assert_called_once()

    def test_close_saga_partial_fail_no_raise(self):
        writer, ledger, pool, conn = self._make_writer()
        writer.close_saga("saga-id", {}, "PARTIAL_FAIL")
        conn.commit.assert_called_once()

    def test_close_saga_failed_no_raise(self):
        writer, ledger, pool, conn = self._make_writer()
        writer.close_saga("saga-id", {}, "FAILED")
        conn.commit.assert_called_once()

    def test_close_saga_invalid_status_raises(self):
        writer, ledger, pool, conn = self._make_writer()
        with pytest.raises(ValueError, match="Invalid saga status"):
            writer.close_saga("saga-id", {}, "RUNNING")

    def test_close_saga_releases_connection(self):
        writer, ledger, pool, conn = self._make_writer()
        writer.close_saga("saga-id", {}, "COMPLETED")
        pool.putconn.assert_called_once_with(conn)


# ---------------------------------------------------------------------------
# BB9 — SessionStore.open_session + close_session lifecycle
# ---------------------------------------------------------------------------

class TestBB9SessionLifecycle:
    """BB9: SessionStore open/close lifecycle works correctly."""

    def test_open_session_returns_string_uuid(self):
        store, pool, conn = _make_session_store()
        result = store.open_session("forge-agent", {"task": "build"})
        import uuid
        parsed = uuid.UUID(result, version=4)
        assert str(parsed) == result

    def test_open_session_commits(self):
        store, pool, conn = _make_session_store()
        store.open_session("forge-agent")
        conn.commit.assert_called_once()

    def test_open_session_releases_connection(self):
        store, pool, conn = _make_session_store()
        store.open_session("forge-agent")
        pool.putconn.assert_called_once_with(conn)

    def test_close_session_commits(self):
        store, pool, conn = _make_session_store()
        store.close_session("some-session-id")
        conn.commit.assert_called_once()

    def test_close_session_releases_connection(self):
        store, pool, conn = _make_session_store()
        store.close_session("some-session-id")
        pool.putconn.assert_called_once_with(conn)

    def test_open_then_close_each_release_once(self):
        store, pool, conn = _make_session_store()
        store.open_session("forge-agent")
        store.close_session("some-id")
        assert pool.putconn.call_count == 2


# ---------------------------------------------------------------------------
# BB10 — SessionStore.get_active_sessions returns only open sessions
# ---------------------------------------------------------------------------

class TestBB10GetActiveSessions:
    """BB10: get_active_sessions returns list of dicts (active only)."""

    def test_returns_list_when_empty(self):
        store, pool, conn = _make_session_store()
        conn._cursor.fetchall.return_value = []
        result = store.get_active_sessions()
        assert isinstance(result, list)
        assert result == []

    def test_returns_list_of_dicts_when_populated(self):
        store, pool, conn = _make_session_store()
        now = datetime.utcnow()
        row = {"id": "s1", "started_at": now, "ended_at": None,
               "agent_id": "forge", "metadata": None}
        conn._cursor.fetchall.return_value = [row]
        result = store.get_active_sessions()
        assert isinstance(result, list)
        assert len(result) == 1
        assert isinstance(result[0], dict)
        assert result[0]["agent_id"] == "forge"

    def test_sql_filters_ended_at_null(self):
        store, pool, conn = _make_session_store()
        store.get_active_sessions()
        cur = conn._cursor
        executed_sql = cur.execute.call_args[0][0]
        assert "ended_at IS NULL" in executed_sql


# ---------------------------------------------------------------------------
# BB11 — SessionStore.cleanup_orphaned_sessions handles gracefully
# ---------------------------------------------------------------------------

class TestBB11CleanupOrphanedSessions:
    """BB11: cleanup_orphaned_sessions returns an int (rowcount) without raising."""

    def test_returns_integer(self):
        store, pool, conn = _make_session_store()
        conn._cursor.rowcount = 0
        result = store.cleanup_orphaned_sessions()
        assert isinstance(result, int)

    def test_returns_nonzero_when_rows_updated(self):
        store, pool, conn = _make_session_store()
        conn._cursor.rowcount = 3
        result = store.cleanup_orphaned_sessions()
        assert result == 3

    def test_commits_after_cleanup(self):
        store, pool, conn = _make_session_store()
        conn._cursor.rowcount = 0
        store.cleanup_orphaned_sessions()
        conn.commit.assert_called_once()

    def test_releases_connection(self):
        store, pool, conn = _make_session_store()
        conn._cursor.rowcount = 0
        store.cleanup_orphaned_sessions()
        pool.putconn.assert_called_once_with(conn)


# ---------------------------------------------------------------------------
# BB12 — ColdLedgerInterceptor.pre_execute writes dispatch_start event
# ---------------------------------------------------------------------------

class TestBB12InterceptorPreExecuteWritesEvent:
    """BB12: ColdLedgerInterceptor.pre_execute writes 'dispatch_start' to ledger."""

    def _make_interceptor(self):
        mock_ledger = MagicMock()
        interceptor = ColdLedgerInterceptor(ledger=mock_ledger)
        return interceptor, mock_ledger

    def test_write_event_called_with_dispatch_start(self):
        interceptor, mock_ledger = self._make_interceptor()
        payload = {"session_id": "s-001", "task_type": "forge", "tier": "gold"}
        _run(interceptor.pre_execute(payload))
        mock_ledger.write_event.assert_called_once()
        args = mock_ledger.write_event.call_args[0]
        assert args[1] == "dispatch_start"

    def test_pre_execute_returns_payload_unchanged(self):
        interceptor, mock_ledger = self._make_interceptor()
        payload = {"session_id": "s-001", "task_type": "forge", "tier": "gold"}
        result = _run(interceptor.pre_execute(payload))
        assert result is payload

    def test_session_id_passed_to_write_event(self):
        interceptor, mock_ledger = self._make_interceptor()
        payload = {"session_id": "s-abc", "task_type": "x", "tier": "silver"}
        _run(interceptor.pre_execute(payload))
        args = mock_ledger.write_event.call_args[0]
        assert args[0] == "s-abc"

    def test_write_event_failure_does_not_propagate(self):
        mock_ledger = MagicMock()
        mock_ledger.write_event.side_effect = RuntimeError("DB down")
        interceptor = ColdLedgerInterceptor(ledger=mock_ledger)
        # Must not raise
        result = _run(interceptor.pre_execute({"session_id": "s1"}))
        assert result == {"session_id": "s1"}


# ---------------------------------------------------------------------------
# BB13 — ShadowRouter LIVE mode executes side effect
# ---------------------------------------------------------------------------

class TestBB13ShadowRouterLiveMode:
    """BB13: ShadowRouter in LIVE mode calls the registered handler and
    returns ShadowResult(executed=True, mode='LIVE')."""

    def test_live_mode_executes_handler(self):
        router = ShadowRouter()
        router.default_mode = "LIVE"
        handler_called = []
        router.register_handler("email", lambda p: handler_called.append(p))
        result = router.route_side_effect("email", {"to": "a@b.com"})
        assert result.executed is True
        assert result.mode == "LIVE"
        assert len(handler_called) == 1
        assert handler_called[0] == {"to": "a@b.com"}

    def test_live_mode_no_handler_still_returns_executed_true(self):
        router = ShadowRouter()
        router.default_mode = "LIVE"
        # No handler registered for "sms"
        result = router.route_side_effect("sms", {"body": "hello"})
        assert result.executed is True

    def test_live_result_has_log_entry(self):
        router = ShadowRouter()
        router.default_mode = "LIVE"
        result = router.route_side_effect("external_api", {"url": "http://x"})
        assert isinstance(result.log_entry, dict)
        assert result.log_entry["mode"] == "LIVE"


# ---------------------------------------------------------------------------
# BB14 — ShadowRouter SHADOW mode logs but doesn't execute
# ---------------------------------------------------------------------------

class TestBB14ShadowRouterShadowMode:
    """BB14: ShadowRouter in SHADOW mode does NOT call handler;
    returns ShadowResult(executed=False, mode='SHADOW')."""

    def test_shadow_mode_does_not_call_handler(self):
        router = ShadowRouter()
        router.default_mode = "SHADOW"
        handler_called = []
        router.register_handler("email", lambda p: handler_called.append(p))
        result = router.route_side_effect("email", {"to": "test@test.com"})
        assert result.executed is False
        assert result.mode == "SHADOW"
        assert len(handler_called) == 0

    def test_shadow_mode_returns_shadow_result(self):
        router = ShadowRouter()
        router.default_mode = "SHADOW"
        result = router.route_side_effect("sms", {"body": "hi"})
        assert isinstance(result, ShadowResult)
        assert result.executed is False

    def test_shadow_mode_writes_log_entry(self):
        router = ShadowRouter()
        router.default_mode = "SHADOW"
        with patch.object(router, "_write_shadow_log") as mock_log:
            router.route_side_effect("email", {"to": "x@y.com"})
            mock_log.assert_called_once()
            log_arg = mock_log.call_args[0][0]
            assert log_arg["mode"] == "SHADOW"
            assert log_arg["effect_type"] == "email"

    def test_invalid_effect_type_raises_value_error(self):
        router = ShadowRouter()
        with pytest.raises(ValueError, match="Unknown effect_type"):
            router.route_side_effect("carrier_pigeon", {})


# ---------------------------------------------------------------------------
# Package export integration tests
# ---------------------------------------------------------------------------

class TestPackageExports:
    """All Module 5 classes are importable from core.storage."""

    def test_cold_ledger_importable(self):
        from core.storage import ColdLedger as CL
        assert CL is ColdLedger

    def test_swarm_saga_importable(self):
        from core.storage import SwarmSaga as SS
        assert SS is SwarmSaga

    def test_swarm_saga_writer_importable(self):
        from core.storage import SwarmSagaWriter as SSW
        assert SSW is SwarmSagaWriter

    def test_session_store_importable(self):
        from core.storage import SessionStore as SS
        assert SS is SessionStore

    def test_cold_ledger_interceptor_importable(self):
        from core.storage import ColdLedgerInterceptor as CLI
        assert CLI is ColdLedgerInterceptor

    def test_shadow_router_importable(self):
        from core.storage import ShadowRouter as SR
        assert SR is ShadowRouter

    def test_shadow_result_importable(self):
        from core.storage import ShadowResult as SR
        assert SR is ShadowResult


# ===========================================================================
# Standalone runner
# ===========================================================================

if __name__ == "__main__":
    import traceback

    tests = [
        # BB1
        ("BB1a: create_all_tables no error",
         TestBB1CreateAllTablesNoError().test_no_exception_raised),
        ("BB1b: create_all_tables commits",
         TestBB1CreateAllTablesNoError().test_commit_called),
        # BB2
        ("BB2a: idempotent double call",
         TestBB2CreateAllTablesIdempotent().test_idempotent_double_call),
        ("BB2b: commit called twice",
         TestBB2CreateAllTablesIdempotent().test_commit_called_twice),
        # BB3
        ("BB3a: write_event returns string",
         TestBB3WriteEventReturnsUUID().test_returns_string),
        ("BB3b: write_event returns valid UUID4",
         TestBB3WriteEventReturnsUUID().test_returns_non_empty_uuid),
        ("BB3c: different calls return different UUIDs",
         TestBB3WriteEventReturnsUUID().test_different_calls_return_different_uuids),
        ("BB3d: connection released after write",
         TestBB3WriteEventReturnsUUID().test_connection_released_after_write),
        # BB4
        ("BB4a: get_events returns list when empty",
         TestBB4GetEventsReturnsList().test_returns_list_when_no_rows),
        ("BB4b: get_events returns list of dicts",
         TestBB4GetEventsReturnsList().test_returns_list_of_dicts_when_rows_present),
        ("BB4c: get_events releases connection",
         TestBB4GetEventsReturnsList().test_connection_released_after_get),
        # BB5
        ("BB5a: write_saga returns saga_id",
         TestBB5WriteSagaGetSagaRoundTrip().test_write_saga_returns_saga_id),
        ("BB5b: write_saga commits",
         TestBB5WriteSagaGetSagaRoundTrip().test_write_saga_commits),
        ("BB5c: get_saga returns None when not found",
         TestBB5WriteSagaGetSagaRoundTrip().test_get_saga_returns_none_when_not_found),
        ("BB5d: get_saga returns SwarmSaga when found",
         TestBB5WriteSagaGetSagaRoundTrip().test_get_saga_returns_swarm_saga_when_found),
        # BB6
        ("BB6a: open_saga returns UUID string",
         TestBB6SwarmSagaWriterOpenSaga().test_open_saga_returns_string_uuid),
        ("BB6b: open_saga calls write_saga with RUNNING status",
         TestBB6SwarmSagaWriterOpenSaga().test_open_saga_calls_write_saga_on_ledger),
        ("BB6c: get_saga delegates to cold_ledger",
         TestBB6SwarmSagaWriterOpenSaga().test_get_saga_delegates_to_cold_ledger),
        # BB7
        ("BB7a: record_delta commits",
         TestBB7RecordProposedDelta().test_record_delta_commits),
        ("BB7b: record_delta releases connection",
         TestBB7RecordProposedDelta().test_record_delta_releases_connection),
        ("BB7c: record_delta executes SQL",
         TestBB7RecordProposedDelta().test_record_delta_executes_sql),
        # BB8
        ("BB8a: close_saga COMPLETED no raise",
         TestBB8CloseSagaSetsStatus().test_close_saga_completed_no_raise),
        ("BB8b: close_saga PARTIAL_FAIL no raise",
         TestBB8CloseSagaSetsStatus().test_close_saga_partial_fail_no_raise),
        ("BB8c: close_saga FAILED no raise",
         TestBB8CloseSagaSetsStatus().test_close_saga_failed_no_raise),
        ("BB8d: close_saga RUNNING raises ValueError",
         TestBB8CloseSagaSetsStatus().test_close_saga_invalid_status_raises),
        ("BB8e: close_saga releases connection",
         TestBB8CloseSagaSetsStatus().test_close_saga_releases_connection),
        # BB9
        ("BB9a: open_session returns UUID string",
         TestBB9SessionLifecycle().test_open_session_returns_string_uuid),
        ("BB9b: open_session commits",
         TestBB9SessionLifecycle().test_open_session_commits),
        ("BB9c: open_session releases connection",
         TestBB9SessionLifecycle().test_open_session_releases_connection),
        ("BB9d: close_session commits",
         TestBB9SessionLifecycle().test_close_session_commits),
        ("BB9e: close_session releases connection",
         TestBB9SessionLifecycle().test_close_session_releases_connection),
        ("BB9f: open+close each release once",
         TestBB9SessionLifecycle().test_open_then_close_each_release_once),
        # BB10
        ("BB10a: get_active_sessions returns list when empty",
         TestBB10GetActiveSessions().test_returns_list_when_empty),
        ("BB10b: get_active_sessions returns list of dicts",
         TestBB10GetActiveSessions().test_returns_list_of_dicts_when_populated),
        ("BB10c: SQL filters ended_at IS NULL",
         TestBB10GetActiveSessions().test_sql_filters_ended_at_null),
        # BB11
        ("BB11a: cleanup returns int",
         TestBB11CleanupOrphanedSessions().test_returns_integer),
        ("BB11b: cleanup returns nonzero rowcount",
         TestBB11CleanupOrphanedSessions().test_returns_nonzero_when_rows_updated),
        ("BB11c: cleanup commits",
         TestBB11CleanupOrphanedSessions().test_commits_after_cleanup),
        ("BB11d: cleanup releases connection",
         TestBB11CleanupOrphanedSessions().test_releases_connection),
        # BB12
        ("BB12a: pre_execute writes dispatch_start",
         TestBB12InterceptorPreExecuteWritesEvent().test_write_event_called_with_dispatch_start),
        ("BB12b: pre_execute returns payload unchanged",
         TestBB12InterceptorPreExecuteWritesEvent().test_pre_execute_returns_payload_unchanged),
        ("BB12c: pre_execute passes session_id",
         TestBB12InterceptorPreExecuteWritesEvent().test_session_id_passed_to_write_event),
        ("BB12d: pre_execute swallows ledger failure",
         TestBB12InterceptorPreExecuteWritesEvent().test_write_event_failure_does_not_propagate),
        # BB13
        ("BB13a: LIVE mode calls handler",
         TestBB13ShadowRouterLiveMode().test_live_mode_executes_handler),
        ("BB13b: LIVE mode no handler still executed=True",
         TestBB13ShadowRouterLiveMode().test_live_mode_no_handler_still_returns_executed_true),
        ("BB13c: LIVE result has log_entry",
         TestBB13ShadowRouterLiveMode().test_live_result_has_log_entry),
        # BB14
        ("BB14a: SHADOW mode does not call handler",
         TestBB14ShadowRouterShadowMode().test_shadow_mode_does_not_call_handler),
        ("BB14b: SHADOW mode returns ShadowResult(executed=False)",
         TestBB14ShadowRouterShadowMode().test_shadow_mode_returns_shadow_result),
        ("BB14c: SHADOW mode writes log entry",
         TestBB14ShadowRouterShadowMode().test_shadow_mode_writes_log_entry),
        ("BB14d: invalid effect_type raises ValueError",
         TestBB14ShadowRouterShadowMode().test_invalid_effect_type_raises_value_error),
        # Package exports
        ("PKG: ColdLedger importable",
         TestPackageExports().test_cold_ledger_importable),
        ("PKG: SwarmSaga importable",
         TestPackageExports().test_swarm_saga_importable),
        ("PKG: SwarmSagaWriter importable",
         TestPackageExports().test_swarm_saga_writer_importable),
        ("PKG: SessionStore importable",
         TestPackageExports().test_session_store_importable),
        ("PKG: ColdLedgerInterceptor importable",
         TestPackageExports().test_cold_ledger_interceptor_importable),
        ("PKG: ShadowRouter importable",
         TestPackageExports().test_shadow_router_importable),
        ("PKG: ShadowResult importable",
         TestPackageExports().test_shadow_result_importable),
    ]

    passed = 0
    failed = 0
    for name, fn in tests:
        try:
            fn()
            print(f"  [PASS] {name}")
            passed += 1
        except Exception as exc:
            import traceback as tb
            print(f"  [FAIL] {name}: {exc}")
            tb.print_exc()
            failed += 1

    print(f"\n{passed}/{passed + failed} tests passed")
    if failed == 0:
        print("ALL TESTS PASSED -- Story 5.08 (Track B): Module 5 BB Integration Test Suite")
    else:
        sys.exit(1)


# VERIFICATION_STAMP
# Story: 5.08
# Verified By: parallel-builder
# Verified At: 2026-02-25
# Tests: 56/56
# Coverage: 100%
