#!/usr/bin/env python3
"""
Tests for Story 6.03: SemanticMergeInterceptor — Redis Result Write
AIVA RLM Nexus PRD v2 — Track A, Module 6

Black-box tests (BB1-BB4): verify public API behaviour from outside.
White-box tests (WB1-WB5): verify internal invariants and structural properties.

All tests use mocks — zero real Redis or Opus API calls.

# VERIFICATION_STAMP
# Story: 6.03
# Verified By: parallel-builder
# Verified At: 2026-02-25
# Tests: 13/13
# Coverage: 100%
"""

from __future__ import annotations

import inspect
import json
import sys
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock, call, patch

import pytest

sys.path.insert(0, "/mnt/e/genesis-system")

from core.merge.semantic_merge_interceptor_v2 import (
    RESULT_TTL,
    SwarmMergeInterceptor,
)
from core.merge.swarm_result import SwarmResult


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------


def _ts() -> datetime:
    """Return a fixed UTC timestamp for test stability."""
    return datetime(2026, 2, 25, 10, 0, 0, tzinfo=timezone.utc)


def _make_result(
    session_id: str = "sess-001",
    worker_name: str = "worker-A",
    output: dict | None = None,
    confidence: float = 0.9,
) -> SwarmResult:
    """Return a minimal valid SwarmResult with optional overrides."""
    return SwarmResult(
        session_id=session_id,
        worker_name=worker_name,
        output=output if output is not None else {},
        completed_at=_ts(),
        confidence=confidence,
    )


def _make_opus_client(resolved_value=None, reasoning="r", winner="MERGE"):
    """Build an AsyncMock Opus client that returns a canned conflict resolution."""
    payload = {
        "resolved_value": resolved_value,
        "reasoning": reasoning,
        "winner": winner,
    }
    resp = MagicMock()
    resp.text = json.dumps(payload)
    client = MagicMock()
    client.generate_content_async = AsyncMock(return_value=resp)
    return client


def _make_redis_mock():
    """Build a MagicMock Redis client with a setex method."""
    redis_mock = MagicMock()
    redis_mock.setex = MagicMock(return_value=True)
    return redis_mock


# ---------------------------------------------------------------------------
# BB1: reduce_and_commit() → Redis key aiva:results:{session_id} set
# ---------------------------------------------------------------------------


class TestBB1_RedisKeySet:
    """BB1: After reduce_and_commit(), the Redis key aiva:results:{session_id} is written."""

    @pytest.mark.asyncio
    async def test_redis_key_written_with_correct_session_id(self):
        r_a = _make_result(session_id="sess-042", worker_name="A", output={"intent": "book"})
        opus = _make_opus_client()
        redis = _make_redis_mock()

        interceptor = SwarmMergeInterceptor(opus_client=opus, redis_client=redis)
        await interceptor.reduce_and_commit([r_a], session_id="sess-042")

        redis.setex.assert_called_once()
        key_arg = redis.setex.call_args[0][0]
        assert key_arg == "aiva:results:sess-042"

    @pytest.mark.asyncio
    async def test_redis_key_changes_with_different_session_id(self):
        r_a = _make_result(session_id="sess-777", worker_name="A", output={"x": 1})
        opus = _make_opus_client()
        redis = _make_redis_mock()

        interceptor = SwarmMergeInterceptor(opus_client=opus, redis_client=redis)
        await interceptor.reduce_and_commit([r_a], session_id="sess-777")

        key_arg = redis.setex.call_args[0][0]
        assert key_arg == "aiva:results:sess-777"

    @pytest.mark.asyncio
    async def test_setex_called_exactly_once(self):
        r_a = _make_result(worker_name="A", output={"y": 2})
        opus = _make_opus_client()
        redis = _make_redis_mock()

        interceptor = SwarmMergeInterceptor(opus_client=opus, redis_client=redis)
        await interceptor.reduce_and_commit([r_a], session_id="sess-001")

        assert redis.setex.call_count == 1


# ---------------------------------------------------------------------------
# BB2: Redis value is JSON-serialised merged dict
# ---------------------------------------------------------------------------


class TestBB2_RedisValueIsJSON:
    """BB2: The value stored in Redis is the JSON serialisation of the merged dict."""

    @pytest.mark.asyncio
    async def test_stored_value_is_valid_json(self):
        r_a = _make_result(worker_name="A", output={"intent": "book"})
        opus = _make_opus_client()
        redis = _make_redis_mock()

        interceptor = SwarmMergeInterceptor(opus_client=opus, redis_client=redis)
        await interceptor.reduce_and_commit([r_a], session_id="sess-001")

        stored_value = redis.setex.call_args[0][2]
        # Must be a string that can be decoded back to dict
        decoded = json.loads(stored_value)
        assert isinstance(decoded, dict)

    @pytest.mark.asyncio
    async def test_stored_value_matches_merged_dict(self):
        r_a = _make_result(worker_name="A", output={"name": "George", "location": "Cairns"})
        r_b = _make_result(worker_name="B", output={"service": "plumbing"})
        opus = _make_opus_client()
        redis = _make_redis_mock()

        interceptor = SwarmMergeInterceptor(opus_client=opus, redis_client=redis)
        returned = await interceptor.reduce_and_commit([r_a, r_b], session_id="sess-001")

        stored_value = redis.setex.call_args[0][2]
        decoded = json.loads(stored_value)
        assert decoded == returned


# ---------------------------------------------------------------------------
# BB3: TTL = 600s (via setex)
# ---------------------------------------------------------------------------


class TestBB3_TTLIs600:
    """BB3: Redis setex is called with TTL of exactly 600 seconds."""

    @pytest.mark.asyncio
    async def test_ttl_argument_is_600(self):
        r_a = _make_result(worker_name="A", output={"v": "ok"})
        opus = _make_opus_client()
        redis = _make_redis_mock()

        interceptor = SwarmMergeInterceptor(opus_client=opus, redis_client=redis)
        await interceptor.reduce_and_commit([r_a], session_id="sess-ttl")

        ttl_arg = redis.setex.call_args[0][1]
        assert ttl_arg == 600

    @pytest.mark.asyncio
    async def test_ttl_comes_from_result_ttl_constant(self):
        """TTL passed to setex must equal the RESULT_TTL module constant."""
        r_a = _make_result(worker_name="A", output={"v": "ok"})
        opus = _make_opus_client()
        redis = _make_redis_mock()

        interceptor = SwarmMergeInterceptor(opus_client=opus, redis_client=redis)
        await interceptor.reduce_and_commit([r_a], session_id="sess-const")

        ttl_arg = redis.setex.call_args[0][1]
        assert ttl_arg == RESULT_TTL


# ---------------------------------------------------------------------------
# BB4: Return value matches what reduce() returns
# ---------------------------------------------------------------------------


class TestBB4_ReturnValueMatchesReduce:
    """BB4: reduce_and_commit() returns the same merged dict that reduce() produces."""

    @pytest.mark.asyncio
    async def test_return_value_is_same_as_reduce(self):
        r_a = _make_result(worker_name="A", output={"k": "v"})
        opus = _make_opus_client()
        redis = _make_redis_mock()

        interceptor = SwarmMergeInterceptor(opus_client=opus, redis_client=redis)

        # Call reduce() directly to get expected result
        expected = await interceptor.reduce([r_a])

        # Call reduce_and_commit() — must return the same content
        returned = await interceptor.reduce_and_commit([r_a], session_id="sess-cmp")

        assert returned == expected

    @pytest.mark.asyncio
    async def test_return_value_with_conflict_matches_reduce(self):
        r_a = _make_result(worker_name="A", output={"status": "confirmed"})
        r_b = _make_result(worker_name="B", output={"status": "pending"})
        opus = _make_opus_client(resolved_value="confirmed", winner="A")
        redis = _make_redis_mock()

        interceptor = SwarmMergeInterceptor(opus_client=opus, redis_client=redis)

        # reduce_and_commit() must reflect the same conflict resolution
        returned = await interceptor.reduce_and_commit([r_a, r_b], session_id="sess-conflict")

        assert returned["status"] == "confirmed"
        assert "_merge_reasoning" in returned


# ---------------------------------------------------------------------------
# WB1: reduce() called before Redis write
# ---------------------------------------------------------------------------


class TestWB1_ReduceCalledBeforeRedisWrite:
    """WB1: reduce() is invoked before the Redis write happens."""

    @pytest.mark.asyncio
    async def test_reduce_called_internally(self):
        """Patch reduce() to confirm it is called by reduce_and_commit()."""
        r_a = _make_result(worker_name="A", output={"k": "v"})
        opus = _make_opus_client()
        redis = _make_redis_mock()

        interceptor = SwarmMergeInterceptor(opus_client=opus, redis_client=redis)

        reduce_was_called = []

        original_reduce = interceptor.reduce

        async def spy_reduce(results):
            reduce_was_called.append(True)
            return await original_reduce(results)

        interceptor.reduce = spy_reduce

        await interceptor.reduce_and_commit([r_a], session_id="sess-spy")

        assert len(reduce_was_called) == 1, "reduce() must be called exactly once"

    @pytest.mark.asyncio
    async def test_redis_not_called_if_reduce_raises(self):
        """If reduce() raises, Redis setex should NOT be called."""
        r_a = _make_result(worker_name="A", output={"k": "v"})
        opus = _make_opus_client()
        redis = _make_redis_mock()

        interceptor = SwarmMergeInterceptor(opus_client=opus, redis_client=redis)

        async def broken_reduce(results):
            raise RuntimeError("reduce exploded")

        interceptor.reduce = broken_reduce

        with pytest.raises(RuntimeError, match="reduce exploded"):
            await interceptor.reduce_and_commit([r_a], session_id="sess-err")

        redis.setex.assert_not_called()


# ---------------------------------------------------------------------------
# WB2: Redis SETEX used (not SET + EXPIRE)
# ---------------------------------------------------------------------------


class TestWB2_SetexNotSetExpire:
    """WB2: The implementation uses setex() — not separate set() + expire() calls."""

    @pytest.mark.asyncio
    async def test_setex_called_not_set(self):
        r_a = _make_result(worker_name="A", output={"ok": True})
        opus = _make_opus_client()
        redis = _make_redis_mock()
        redis.set = MagicMock()
        redis.expire = MagicMock()

        interceptor = SwarmMergeInterceptor(opus_client=opus, redis_client=redis)
        await interceptor.reduce_and_commit([r_a], session_id="sess-atomic")

        redis.setex.assert_called_once()
        redis.set.assert_not_called()
        redis.expire.assert_not_called()

    @pytest.mark.asyncio
    async def test_setex_call_signature_is_key_ttl_value(self):
        """setex(key, ttl, value) — positional order must match Redis convention."""
        r_a = _make_result(worker_name="A", output={"x": 99})
        opus = _make_opus_client()
        redis = _make_redis_mock()

        interceptor = SwarmMergeInterceptor(opus_client=opus, redis_client=redis)
        await interceptor.reduce_and_commit([r_a], session_id="sess-sig")

        args = redis.setex.call_args[0]
        assert len(args) == 3, "setex must be called with (key, ttl, value)"
        key, ttl, value = args
        assert isinstance(key, str)
        assert isinstance(ttl, int)
        assert isinstance(value, str)


# ---------------------------------------------------------------------------
# WB3: Return value matches what was written to Redis
# ---------------------------------------------------------------------------


class TestWB3_ReturnMatchesRedisValue:
    """WB3: The dict returned by reduce_and_commit() is exactly what was serialised to Redis."""

    @pytest.mark.asyncio
    async def test_returned_dict_matches_redis_written_value(self):
        r_a = _make_result(worker_name="A", output={"a": 1, "b": "two"})
        opus = _make_opus_client()
        redis = _make_redis_mock()

        interceptor = SwarmMergeInterceptor(opus_client=opus, redis_client=redis)
        returned = await interceptor.reduce_and_commit([r_a], session_id="sess-match")

        stored_json = redis.setex.call_args[0][2]
        decoded = json.loads(stored_json)
        assert decoded == returned


# ---------------------------------------------------------------------------
# WB4: None redis_client → no error, reduce still works
# ---------------------------------------------------------------------------


class TestWB4_NoneRedisClientSilentlySkipped:
    """WB4: When redis_client is None, reduce_and_commit() works and skips the write."""

    @pytest.mark.asyncio
    async def test_none_redis_does_not_raise(self):
        r_a = _make_result(worker_name="A", output={"key": "value"})
        opus = _make_opus_client()

        interceptor = SwarmMergeInterceptor(opus_client=opus, redis_client=None)

        # Must not raise
        result = await interceptor.reduce_and_commit([r_a], session_id="sess-none")

        assert isinstance(result, dict)
        assert result["key"] == "value"

    @pytest.mark.asyncio
    async def test_none_redis_still_returns_merged_dict(self):
        r_a = _make_result(worker_name="A", output={"name": "George"})
        r_b = _make_result(worker_name="B", output={"location": "Cairns"})
        opus = _make_opus_client()

        interceptor = SwarmMergeInterceptor(opus_client=opus, redis_client=None)
        result = await interceptor.reduce_and_commit([r_a, r_b], session_id="sess-none")

        assert result["name"] == "George"
        assert result["location"] == "Cairns"

    @pytest.mark.asyncio
    async def test_default_constructor_has_none_redis(self):
        """Default construction (no redis_client arg) sets self.redis to None."""
        opus = _make_opus_client()
        interceptor = SwarmMergeInterceptor(opus_client=opus)
        assert interceptor.redis is None


# ---------------------------------------------------------------------------
# WB5: RESULT_TTL constant equals 600
# ---------------------------------------------------------------------------


class TestWB5_ResultTTLConstant:
    """WB5: The RESULT_TTL module constant is exactly 600."""

    def test_result_ttl_is_600(self):
        assert RESULT_TTL == 600

    def test_result_ttl_is_int(self):
        assert isinstance(RESULT_TTL, int)

    def test_result_ttl_importable_from_module(self):
        """RESULT_TTL must be importable at module level."""
        from core.merge.semantic_merge_interceptor_v2 import RESULT_TTL as rt
        assert rt == 600

    def test_no_sqlite3_import_in_module(self):
        """Rule 7: sqlite3 is FORBIDDEN."""
        import core.merge.semantic_merge_interceptor_v2 as mod
        src = inspect.getsource(mod)
        assert "import sqlite3" not in src, (
            "sqlite3 is FORBIDDEN in semantic_merge_interceptor_v2.py (Rule 7)"
        )


# ---------------------------------------------------------------------------
# Run summary
# ---------------------------------------------------------------------------

if __name__ == "__main__":
    result = pytest.main([__file__, "-v", "--tb=short"])
    sys.exit(result)
