"""
core/merge/semantic_merge_interceptor_v2.py

SwarmMergeInterceptor — sends conflicting SwarmResults to Opus 4.6 for
semantic resolution and returns a single merged dict.

Note on naming
--------------
This is the Track A counterpart to ``SemanticMergeInterceptor`` in
``core/merge/semantic_merge_interceptor.py`` (Track B).  Track B operates
on RFC 6902 StateDelta patches with ``ConflictDetector``.  THIS class
operates on ``SwarmResult`` output dicts with ``SwarmConflictDetector``.
The class is named ``SwarmMergeInterceptor`` (not ``SemanticMergeInterceptor``)
to avoid name collision.

Flow
----
1. ``SwarmConflictDetector.get_conflicts(results)``
2a. No conflicts  → dict-union of all results' outputs (no Opus call)
2b. Conflicts     → for each conflicting (result_a, result_b, key),
                    format MERGE_PROMPT and call Opus 4.6
                    → parse JSON response → insert resolved_value
3. Assemble final dict, attach ``_merge_reasoning`` when conflicts existed.
4. (optional) ``reduce_and_commit()`` writes merged dict to Redis under
   ``aiva:results:{session_id}`` with a 600-second TTL so AIVA can consume it.

Opus is ONLY called when at least one conflict exists.

# VERIFICATION_STAMP
# Story: 6.02
# Verified By: parallel-builder
# Verified At: 2026-02-25
# Tests: 14/14
# Coverage: 100%
"""

from __future__ import annotations

import json
import logging
from typing import Any

from core.merge.swarm_result import SwarmConflictDetector, SwarmResult

logger = logging.getLogger(__name__)


# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------

#: TTL (seconds) for merged results stored in Redis.
#: AIVA has 10 minutes to consume ``aiva:results:{session_id}`` before it expires.
RESULT_TTL = 600  # 10 minutes


# ---------------------------------------------------------------------------
# Merge prompt template
# ---------------------------------------------------------------------------

MERGE_PROMPT = """
You are the Genesis Semantic Reducer. Multiple swarm workers have produced conflicting outputs.

SESSION: {session_id}
WORKER A ({worker_a}): {output_a}
WORKER B ({worker_b}): {output_b}
CONFLICT ON KEY: {conflict_key}

Analyze both outputs and return the most semantically correct resolution.
Respond ONLY with valid JSON: {{"resolved_value": "...", "reasoning": "...", "winner": "A|B|MERGE"}}
"""


# ---------------------------------------------------------------------------
# SwarmMergeInterceptor
# ---------------------------------------------------------------------------


class SwarmMergeInterceptor:
    """
    Resolves conflicts between SwarmResult objects using Opus 4.6.

    Parameters
    ----------
    opus_client:
        Client used to call Opus 4.6 for conflict resolution.  Must expose
        one of:
        - ``generate_content_async(prompt) -> response`` (async preferred)
        - ``generate_content(prompt) -> response``       (sync fallback)
        Response objects must have a ``.text`` attribute or be str-coercible.
        Opus is ONLY called when at least one conflict exists.
    redis_client:
        Optional Redis client (e.g. ``redis.Redis`` or ``redis.StrictRedis``).
        When provided, ``reduce_and_commit()`` writes the merged result to
        ``aiva:results:{session_id}`` via ``setex`` with ``RESULT_TTL`` seconds.
        When ``None``, ``reduce_and_commit()`` silently skips the Redis write
        and still returns the merged dict.
    """

    def __init__(self, opus_client: Any, redis_client: Any = None) -> None:
        self.opus = opus_client
        self.redis = redis_client
        self._detector = SwarmConflictDetector()

    # ------------------------------------------------------------------
    # Public API
    # ------------------------------------------------------------------

    async def reduce(self, results: list[SwarmResult]) -> dict:
        """
        Merge *results* into a single resolved dict.

        Algorithm
        ---------
        1. Run SwarmConflictDetector.get_conflicts(results).
        2. Compute base merge: union of all output dicts (later results
           overwrite earlier ones for non-conflicting keys).
        3. For each conflict:
           a. Format MERGE_PROMPT with the two conflicting results and key.
           b. Call Opus 4.6 for the resolved_value.
           c. On JSON parse failure → fall back to the value from the
              result with the higher confidence score.
           d. Store per-conflict reasoning in ``_merge_reasoning``.
        4. Apply resolved values over the base merge.
        5. Return final dict.  If any conflicts existed, ``_merge_reasoning``
           key is present.

        Parameters
        ----------
        results:
            List of SwarmResult objects, possibly spanning multiple workers
            for the same session.

        Returns
        -------
        dict
            Merged output dict.  Contains ``_merge_reasoning`` key only
            when at least one conflict was resolved via Opus.
        """
        # Step 1 — detect conflicts
        conflicts = self._detector.get_conflicts(results)

        # Step 2 — compute base dict-union merge (all results, no Opus)
        merged: dict[str, Any] = {}
        for result in results:
            merged.update(result.output)

        # Step 2a — no conflicts: fast return
        if not conflicts:
            return merged

        # Step 2b — resolve each conflict with Opus
        reasoning_log: list[dict] = []

        for result_a, result_b, conflict_key in conflicts:
            prompt = MERGE_PROMPT.format(
                session_id=result_a.session_id,
                worker_a=result_a.worker_name,
                output_a=json.dumps(result_a.output),
                worker_b=result_b.worker_name,
                output_b=json.dumps(result_b.output),
                conflict_key=conflict_key,
            )

            resolved_value, entry = await self._resolve_conflict(
                prompt=prompt,
                result_a=result_a,
                result_b=result_b,
                conflict_key=conflict_key,
            )

            merged[conflict_key] = resolved_value
            reasoning_log.append(entry)

        merged["_merge_reasoning"] = reasoning_log
        return merged

    async def reduce_and_commit(
        self,
        results: list[SwarmResult],
        session_id: str,
    ) -> dict:
        """
        Merge *results* and write the final dict to Redis for AIVA consumption.

        Algorithm
        ---------
        1. Calls ``reduce(results)`` to obtain the merged dict.
        2. If ``redis_client`` is set, serialises the merged dict to JSON and
           calls ``redis.setex(key, RESULT_TTL, value)`` where::

               key = f"aiva:results:{session_id}"
               RESULT_TTL = 600  (10 minutes)

        3. Returns the merged dict (identical to what ``reduce()`` returned).

        Notes
        -----
        - If ``redis_client`` is ``None`` (not injected), the Redis write is
          silently skipped and ``reduce()`` still returns its result.
        - ``setex`` is used (not ``set`` + ``expire``) so the key write and TTL
          assignment are atomic.

        Parameters
        ----------
        results:
            List of SwarmResult objects to merge.
        session_id:
            Unique session identifier.  The Redis key written is
            ``aiva:results:{session_id}``.

        Returns
        -------
        dict
            The fully merged output dict (same object returned by ``reduce()``).
        """
        merged = await self.reduce(results)
        if self.redis is not None:
            self.redis.setex(
                f"aiva:results:{session_id}",
                RESULT_TTL,
                json.dumps(merged),
            )
        return merged

    # ------------------------------------------------------------------
    # Internal helpers
    # ------------------------------------------------------------------

    async def _resolve_conflict(
        self,
        prompt: str,
        result_a: SwarmResult,
        result_b: SwarmResult,
        conflict_key: str,
    ) -> tuple[Any, dict]:
        """
        Call Opus and parse its JSON response for one conflicting key.

        Returns
        -------
        (resolved_value, reasoning_entry)

        On JSON parse failure, falls back to the value from the result
        with the higher confidence score (result_a wins on tie).
        """
        try:
            opus_text = await self._call_opus(prompt)
            parsed = json.loads(opus_text)

            resolved_value = parsed["resolved_value"]
            winner = parsed.get("winner", "MERGE")
            reasoning = parsed.get("reasoning", "")

            # If winner is explicitly A or B, use that result's raw value.
            if winner == "A":
                resolved_value = result_a.output.get(conflict_key, resolved_value)
            elif winner == "B":
                resolved_value = result_b.output.get(conflict_key, resolved_value)
            # MERGE → use resolved_value from Opus as-is

            entry = {
                "key": conflict_key,
                "winner": winner,
                "reasoning": reasoning,
                "worker_a": result_a.worker_name,
                "worker_b": result_b.worker_name,
            }
            return resolved_value, entry

        except Exception as exc:
            logger.warning(
                "SwarmMergeInterceptor: Opus resolution failed for key %r (%s: %s)"
                " — falling back to higher-confidence result",
                conflict_key,
                type(exc).__name__,
                exc,
            )
            # Fallback: higher confidence wins; result_a wins on tie
            if result_b.confidence > result_a.confidence:
                fallback_result = result_b
            else:
                fallback_result = result_a

            fallback_value = fallback_result.output.get(conflict_key)
            entry = {
                "key": conflict_key,
                "winner": fallback_result.worker_name,
                "reasoning": f"Opus parse failure: {exc}; used higher-confidence result",
                "worker_a": result_a.worker_name,
                "worker_b": result_b.worker_name,
            }
            return fallback_value, entry

    async def _call_opus(self, prompt: str) -> str:
        """
        Dispatch *prompt* to the injected Opus client.

        Supports both async (``generate_content_async``) and sync
        (``generate_content``) client interfaces for compatibility with
        different SDK versions and test mocks.
        """
        if hasattr(self.opus, "generate_content_async"):
            response = await self.opus.generate_content_async(prompt)
        else:
            response = self.opus.generate_content(prompt)

        if hasattr(response, "text"):
            return response.text
        return str(response)
