#!/usr/bin/env python3
"""
Gemini Live Session - Revenue First Mode
========================================
Direct desktop voice bridge to Gemini 3 Flash.
Identity: Antigravity (Genesis System Core).

Features:
- Real-time Audio Streaming (In/Out)
- Barge-in (via model VAD)
- Tools: Google Search, Browser Navigation
- "System Ready" Chime
"""

import asyncio
import time
import os
import sys
import traceback
import webbrowser
import json
from pathlib import Path
import logging
import audioop
import subprocess

logger = logging.getLogger(__name__)
from skills.auto_browse_skill import AutoBrowseSkill
from core.skills.ghl_base import registry as ghl_registry
# Ensure skills are registered
import skills.ghl.extract_api_key 
import skills.ghl.create_subaccount

try:
    import pyaudio
except ImportError:
    print("Error: pyaudio not installed. Run: pip install pyaudio")
    sys.exit(1)

try:
    from google import genai
    from google.genai import types
except ImportError:
    print("Error: google-genai not installed. Run: pip install google-genai")
    sys.exit(1)

try:
    import pyttsx3
    TTS_AVAILABLE = True
except ImportError:
    TTS_AVAILABLE = False
    print("Warning: pyttsx3 not installed. System chime disabled.")

def load_env_file():
    """
    UVS-H05: Secure credential loading.

    Priority:
    1. System keyring (Windows Credential Manager, etc.)
    2. Environment variables
    3. .env file (fallback only, for backward compatibility)

    SECURITY: Does NOT print credential values in logs.
    """
    try:
        # Try secure credential manager first
        from core.security.credential_manager import get_gemini_api_key, check_credential_status

        status = check_credential_status()
        if status.get("gemini_configured"):
            # Credential found via keyring or env var - no .env needed
            logger.debug("Credentials loaded securely (keyring/env)")
            return

        # Fallback: Load .env file (backward compatibility)
        # NOTE: This is deprecated and should be migrated to keyring
        env_path = Path(__file__).parent.parent.parent / ".env"
        if env_path.exists():
            logger.debug(f"Loading credentials from .env (consider migrating to keyring)")
            with open(env_path, "r") as f:
                for line in f:
                    line = line.strip()
                    if not line or line.startswith("#"):
                        continue
                    if "=" in line:
                        key, value = line.split("=", 1)
                        key = key.strip()
                        # Don't overwrite existing env vars
                        if key not in os.environ:
                            os.environ[key] = value.strip()
                            # UVS-H05: Do NOT print credential values
                            if "KEY" in key.upper() or "SECRET" in key.upper():
                                logger.debug(f"Loaded credential: {key}=****")
                            else:
                                logger.debug(f"Loaded config: {key}")
        else:
            logger.warning("No credentials found in keyring, env, or .env file")

    except ImportError:
        # Credential manager not available, use legacy loading
        logger.warning("Secure credential manager not available, using legacy .env loading")
        env_path = Path(__file__).parent.parent.parent / ".env"
        if env_path.exists():
            with open(env_path, "r") as f:
                for line in f:
                    line = line.strip()
                    if not line or line.startswith("#"):
                        continue
                    if "=" in line:
                        key, value = line.split("=", 1)
                        if key.strip() not in os.environ:
                            os.environ[key.strip()] = value.strip()
    except Exception as e:
        logger.error(f"Error loading credentials: {type(e).__name__}")

# Configuration
# =============
MODEL_ID = "models/gemini-3-flash"
# Capture at native hardware rate 44.1k to avoid driver static
# Send at 16k for Gemini 
AUDIO_SAMPLE_RATE_CAPTURE = 44100 # Default fallback, overwritten below
AUDIO_SAMPLE_RATE_SEND = 16000
AUDIO_SAMPLE_RATE_RECEIVE = 24000
CHUNK_SIZE = 2048 # Reduced for faster interactivity (was 4096)

SYSTEM_INSTRUCTION = """
IDENTITY: I AM ANTIGRAVITY (Genesis System Core).
PROTOCOL: UNIFIED VOICE-VISION MENTORSHIP (Phase E: Mentorship Integration).

THINK-ACT-OBSERVE LOOP:
1. **THINK**: Analyze vision (16 FPS). Reasoning about business strategy.
2. **ACT**: 
   - **MENTOR NARRATION**: Explain *The Why* before *The How*. (e.g., "We are using a sub-account structure to separate your client data for better security...").
   - **EXECUTE**: Use skills or cursor tools.
3. **OBSERVE**: Verify outcome and update progress.

STRATEGIC MENTORSHIP RULES:
1. **PROACTIVE PAUSING**: If a configuration task is complex, use `pause_for_founder_question` to ask if they follow your logic.
2. **FOUNDER FIRST**: The founder owns the vision; you own the execution.
3. **SHARED CURSOR**: Use the Sparkle Cursor to guide attention.
4. **MUTUAL AUTONOMY**: Yield immediately if the user interacts.
5. **HANDOVER**: Sensitive actions require verbal "Yes".
"""

# Tools
# =====

CHROME_PATH = r"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe"

# UVS-H04: URL allowlist for subprocess execution
ALLOWED_URL_HOSTS = [
    'www.google.com',
    'google.com',
    'app.gohighlevel.com',
    'gohighlevel.com',
    'docs.google.com',
    'drive.google.com',
]

BLOCKED_URL_SCHEMES = ['file', 'javascript', 'data', 'vbscript', 'about']


def validate_and_build_url(base_url: str, query: str = None) -> str:
    """
    UVS-H04: Validate and build safe URLs for subprocess execution.

    Args:
        base_url: Base URL (must be https://)
        query: Optional query string to encode safely

    Returns:
        Safe URL string

    Raises:
        ValueError: If URL is unsafe
    """
    from urllib.parse import urlparse, quote_plus, urljoin

    # Parse the base URL
    parsed = urlparse(base_url)

    # Validate scheme - must be https (or http for local dev)
    if parsed.scheme.lower() not in ('https', 'http'):
        logger.warning(f"[SECURITY] Blocked unsafe URL scheme: {parsed.scheme}")
        raise ValueError(f"URL scheme must be https, got: {parsed.scheme}")

    # Check for blocked schemes that might be embedded
    for blocked in BLOCKED_URL_SCHEMES:
        if blocked in base_url.lower():
            logger.warning(f"[SECURITY] Blocked URL containing: {blocked}")
            raise ValueError(f"URL contains blocked pattern: {blocked}")

    # Validate host against allowlist
    if parsed.netloc.lower() not in ALLOWED_URL_HOSTS:
        logger.warning(f"[SECURITY] URL host not in allowlist: {parsed.netloc}")
        raise ValueError(f"URL host not allowed: {parsed.netloc}")

    # Build the final URL with properly encoded query
    if query:
        safe_query = quote_plus(query)
        return f"{base_url}?q={safe_query}"

    return base_url


def google_search(query: str):
    """
    Search Google for the query (Forced Chrome).

    UVS-H04: URL injection prevention via:
    - URL scheme validation (https only)
    - Host allowlist (google.com)
    - Query encoding (quote_plus)
    """
    from urllib.parse import quote_plus

    print(f"[Tool] Searching Google: {query}")

    try:
        # Build safe URL with encoded query
        url = validate_and_build_url("https://www.google.com/search", query)

        # Use subprocess.Popen for absolute non-blocking
        subprocess.Popen([CHROME_PATH, url])
        return {"result": f"Opened Google Search for: {query} in Chrome"}

    except ValueError as e:
        logger.warning(f"[SECURITY] Google search blocked: {e}")
        return {"error": f"Search blocked for security: {e}"}

TOOLS = [
    {
        "function_declarations": [
            {
                "name": "google_search",
                "description": "Search Google for information.",
                "parameters": {
                    "type": "OBJECT",
                    "properties": {
                        "query": {"type": "STRING", "description": "The search query."}
                    },
                    "required": ["query"]
                }
            },
            {
                "name": "navigate_browser",
                "description": "Navigate the web browser to a specific URL.",
                "parameters": {
                    "type": "OBJECT",
                    "properties": {
                        "url": {"type": "STRING", "description": "The URL to visit."}
                    },
                    "required": ["url"]
                }
            },
            {
                "name": "pause_for_founder_question",
                "description": "Stop current execution and wait for the founder to ask a question or provide feedback. Use this for complex strategic steps.",
                "parameters": {
                    "type": "OBJECT",
                    "properties": {
                        "reason": {"type": "STRING", "description": "The strategic reason for pausing (e.g., 'Do you have the DNS records ready?')."}
                    },
                    "required": ["reason"]
                }
            },
            {
                "name": "auto_browse",
                "description": "Execute a complex browser task using the native January 2026 Auto-Browse tool. Use this for navigating GHL, clicking buttons, and investigating the UI.",
                "parameters": {
                    "type": "OBJECT",
                    "properties": {
                        "instruction": {"type": "STRING", "description": "The high-level instruction (e.g., 'Go to Agency Settings and find the API key')."},
                        "confirmed": {"type": "BOOLEAN", "description": "Set to true ONLY if the founder has verbally confirmed this specific action."}
                    },
                    "required": ["instruction"]
                }
            },
            {
                "name": "move_cursor",
                "description": "Move the Shared Cursor (Sparkle Icon) to a specific coordinate on the screen to guide the user's attention.",
                "parameters": {
                    "type": "OBJECT",
                    "properties": {
                        "x": {"type": "INTEGER", "description": "X coordinate in pixels."},
                        "y": {"type": "INTEGER", "description": "Y coordinate in pixels."}
                    },
                    "required": ["x", "y"]
                }
            },
            {
                "name": "zoom_viewport",
                "description": "Zoom in on a specific area of the screen for detailed inspection of small elements (e.g., GHL menu text).",
                "parameters": {
                    "type": "OBJECT",
                    "properties": {
                        "x": {"type": "INTEGER", "description": "Top-left X coordinate of the zoom area."},
                        "y": {"type": "INTEGER", "description": "Top-left Y coordinate of the zoom area."},
                        "width": {"type": "INTEGER", "description": "Width of the zoom area (default 400)."},
                        "height": {"type": "INTEGER", "description": "Height of the zoom area (default 400)."}
                    },
                    "required": ["x", "y"]
                }
            },
            {
                "name": "anchor_element",
                "description": "Lock the Sparkle Cursor to a specific DOM element (e.g., a 'Save' button). The cursor will follow the element during scrolling.",
                "parameters": {
                    "type": "OBJECT",
                    "properties": {
                        "selector": {"type": "STRING", "description": "CSS selector of the element to anchor to."}
                    },
                    "required": ["selector"]
                }
            },
            {
                "name": "gesture_cursor",
                "description": "Perform a visual gesture with the Sparkle Cursor to highlight an element (e.g., 'circle' it).",
                "parameters": {
                    "type": "OBJECT",
                    "properties": {
                        "type": {"type": "STRING", "enum": ["circle", "underline", "point"], "description": "Type of gesture."},
                        "selector": {"type": "STRING", "description": "CSS selector of the element to highlight."}
                    },
                    "required": ["type", "selector"]
                }
            },
            {
                "name": "clear_zoom",
                "description": "Clear any active zoom and return to full viewport view.",
                "parameters": {"type": "OBJECT", "properties": {}}
            },
            {
                "name": "visual_reset",
                "description": "Clear ALL visual indicators (Sparkle, Progress, Zoom) and reset to a clean state. Use this when a strategic section is finished.",
                "parameters": {"type": "OBJECT", "properties": {}}
            }
        ]
    }
]

def get_dynamic_tools():
    """Build the final toolset including registered GHL skills."""
    base_tools = list(TOOLS)
    ghl_tools = ghl_registry.get_tool_definitions()
    if ghl_tools:
        # Add to the first tool group's function declarations
        base_tools[0]["function_declarations"].extend(ghl_tools)
    return base_tools

class GeminiLiveSession:
    """
    Gemini Live Voice Session.

    VERIFICATION_STAMP (UVS-H08, UVS-H09)
    - UVS-H08: Thread safety for async callbacks with RLock
    - UVS-H09: Single stream start with is_active check
    """

    def __init__(self):
        import threading

        load_env_file()
        self.api_key = os.environ.get("GEMINI_API_KEY")
        if not self.api_key:
            print("Error: GEMINI_API_KEY environment variable not set.")
            sys.exit(1)

        self.client = genai.Client(api_key=self.api_key)
        self.audio = pyaudio.PyAudio()
        self.input_stream = None
        self.output_stream = None

        # UVS-H08: Thread-safe state management
        self._state_lock = threading.RLock()  # Reentrant lock for nested access
        self._is_running = False
        self._session = None  # Protected by _session_lock
        self._session_lock = asyncio.Lock() if hasattr(asyncio, 'Lock') else None

        self.last_audio_time = 0
        self.stop_playback = False  # Local VAD Flag
        self.auto_browse_skill = AutoBrowseSkill()
        self.auto_browse_skill.controller.on_user_activity = self._handle_user_activity
        self.requires_handover = False
        self.pending_action = None
        self.loop = None  # Will be set in start()

        # UVS-H23: Conductor thread lifecycle management
        self._conductor_stop_flag = threading.Event()
        self._conductor_thread = None

        # UVS-H24: Context injection queue for conductor protocol
        self._context_queue = None  # Will be asyncio.Queue, created in start()

    # UVS-H08: Thread-safe property accessors
    @property
    def is_running(self):
        with self._state_lock:
            return self._is_running

    @is_running.setter
    def is_running(self, value):
        with self._state_lock:
            self._is_running = value

    @property
    def session(self):
        with self._state_lock:
            return self._session

    @session.setter
    def session(self, value):
        with self._state_lock:
            self._session = value

    # VAD Threshold (Adjust based on mic sensitivity)
    VAD_THRESHOLD = 500

    def _input_callback(self, in_data, frame_count, time_info, status):
        """
        Threaded callback for non-blocking audio capture + Local VAD.

        UVS-H08: Thread-safe access to shared state.
        """
        import audioop

        # UVS-H08: Thread-safe state check
        with self._state_lock:
            if not self._is_running:
                return (None, pyaudio.paContinue)

        try:
            # 1. Calculate RMS (Volume)
            rms = audioop.rms(in_data, 2)  # 2 bytes width

            # 2. Local Barge-In Trigger
            if rms > self.VAD_THRESHOLD:
                if not self.stop_playback:
                    # Only print once per interruption to avoid spam
                    print(f" [VAD] Interruption Detected (RMS: {rms}) -> Muting Output")
                self.stop_playback = True

            self.loop.call_soon_threadsafe(self.audio_queue.put_nowait, in_data)
        except AttributeError:
            # Loop not yet initialized
            pass
        except Exception as e:
            logger.debug(f"Audio callback error: {type(e).__name__}: {e}")

        return (None, pyaudio.paContinue)

    async def _handle_user_activity(self):
        """
        Callback when user takes over the mouse.

        UVS-H08: Thread-safe session access.
        """
        # Mutual Autonomy Triggered
        print("\n[MUTUAL AUTONOMY] User interference detected. Yielding...")
        self.stop_playback = True  # Stop speaking
        await self.auto_browse_skill.controller.stop_all_actions()  # Stop clicking/navigating

        # Notify the AI via text if possible (thread-safe session access)
        with self._state_lock:
            session = self._session

        if session:
            try:
                await session.send_realtime_input(
                    media=types.Blob(
                        data="[SYSTEM] User has taken control of the mouse. Yield immediately and narrate your acknowledgment.".encode(),
                        mime_type="text/plain"
                    )
                )
            except Exception as e:
                logger.debug(f"Failed to send user activity notification: {e}")

    def _monitor_conductor_context(self):
        """
        Polls .tisktask/context.json for updates (Conductor Protocol).

        UVS-H23: Added stop flag check for proper thread lifecycle management.
        UVS-H24: Implements actual context injection via queue.
        """
        import time
        context_path = Path(__file__).parent.parent.parent / ".tisktask" / "context.json"
        last_mtime = 0

        # UVS-H23: Check both is_running and dedicated stop flag
        while not self._conductor_stop_flag.is_set() and self.is_running:
            try:
                if context_path.exists():
                    mtime = context_path.stat().st_mtime
                    if mtime > last_mtime:
                        with open(context_path, "r") as f:
                            data = json.load(f)

                        # UVS-H24: Build context update message
                        context_update = {
                            "type": "conductor_context",
                            "current_task": data.get("current_task"),
                            "active_pipeline": data.get("active_pipeline"),
                            "priority": data.get("priority", "normal"),
                            "timestamp": time.time()
                        }

                        # UVS-H24: Push to context queue for session injection
                        if self._context_queue and self.loop:
                            try:
                                # Thread-safe queue put from sync thread
                                self.loop.call_soon_threadsafe(
                                    self._context_queue.put_nowait,
                                    context_update
                                )
                                logger.debug(f"Conductor context queued: {context_update['current_task']}")
                            except Exception as queue_err:
                                logger.warning(f"Failed to queue context update: {queue_err}")

                        # Also print for visibility
                        update = f"\n[CONDUCTOR UPDATE]: Current Focus: {data.get('current_task')}. Pipeline: {data.get('active_pipeline')}"
                        print(f"Conductor Sync: {update.strip()}")
                        last_mtime = mtime
            except Exception as e:
                print(f"Conductor Sync Error: {e}")
            # UVS-H23: Use Event.wait() for responsive shutdown instead of sleep
            self._conductor_stop_flag.wait(2)

    async def start(self):
        """Start the audio stream pipeline."""
        global AUDIO_SAMPLE_RATE_CAPTURE, AUDIO_SAMPLE_RATE_SEND
        
        print("\nLoading .env from: E:\\genesis-system\\.env")
        print("Both GOOGLE_API_KEY and GEMINI_API_KEY are set. Using GOOGLE_API_KEY.")

        self.audio = pyaudio.PyAudio()
        self.loop = asyncio.get_running_loop()
        self.audio_queue = asyncio.Queue()

        # UVS-H24: Create context queue for conductor protocol
        self._context_queue = asyncio.Queue()

        # UVS-H23: Start Conductor Watcher with proper lifecycle tracking
        import threading
        self.is_running = True
        self._conductor_stop_flag.clear()  # Reset stop flag for fresh start
        self._conductor_thread = threading.Thread(
            target=self._monitor_conductor_context,
            daemon=True,
            name="GeminiConductor"
        )
        self._conductor_thread.start()

        # Audio Stream Strategy: SMART HFP PAIRING (Scanning Loop)
        # UVS-H16: Add timeout to device enumeration to prevent infinite loop
        print("Scanning for Active Headset (Smart Pairing)...")
        input_index = None
        output_index = None

        import time
        DEVICE_WAIT_TIMEOUT = float(os.environ.get("GENESIS_DEVICE_TIMEOUT", "60"))
        device_start_time = time.time()

        while input_index is None:
            # UVS-H16: Timeout check
            elapsed = time.time() - device_start_time
            if elapsed > DEVICE_WAIT_TIMEOUT:
                raise TimeoutError(
                    f"Audio device enumeration timed out after {DEVICE_WAIT_TIMEOUT}s. "
                    f"Please check your audio device settings or set GENESIS_DEVICE_TIMEOUT env var."
                )
            try:
                # 1. Get Active Default Input (User's Selection)
                default_in = self.audio.get_default_input_device_info()
                default_name = default_in['name']
                
                # Filter for "Headset" or "Hands-Free" to ensure we are targeting the wireless mode
                # If the user selected the Laptop Mic, we might not want to force HFP logic, but for "KM" we do.
                print(f"Detected System Default: {default_name}")

                if "KM" in default_name or "Quantum" in default_name or "Hands-Free" in default_name:
                    input_index = default_in['index']
                    target_name_part = default_name.split('(')[0].strip() # "Headset"
                    if "Hands-Free" in default_name:
                        target_name_part = "Hands-Free"
                    
                    print(f"  > Valid Headset Detected. Pairing Output...")

                    # 2. Find Matching HFP Output
                    for i in range(self.audio.get_device_count()):
                        try:
                            info = self.audio.get_device_info_by_index(i)
                            if info['maxOutputChannels'] > 0:
                                if target_name_part in info['name'] and "Stereo" not in info['name']:
                                     if default_name[:10] in info['name'][:10]:
                                        output_index = i
                                        print(f"  Found Paired HFP OUT: {info['name']} (Index {i})")
                                        break
                        except (OSError, IOError) as e:
                            # UVS-H11: Typed exception for device enumeration errors
                            logger.debug(f"Device enumeration error at index {i}: {e}")
                else:
                    # Fallback for Wired/Laptop Mic (Non-HFP) - Just accept it
                    print("  > Standard Mic Detected (Non-HFP). Using OS Default.")
                    input_index = default_in['index'] 
                    output_index = None # Use OS Default Output

            except Exception as e:
                print(f"Device Scan Error: {e}")
            
            if input_index is None:
                print("Waiting for Headset... (Retrying in 3s)")
                time.sleep(3)
                # Re-init PyAudio to refresh device list? Some backends need it.
                self.audio.terminate()
                self.audio = pyaudio.PyAudio()

        if input_index is not None:
             print(f"Targeting Input Index: {input_index}")
        
        if output_index is not None:
             print(f"Targeting Output Index: {output_index} (HFP Speaker)")
        else:
             print("Warning: Matching HFP Speaker not found. Using System Default (Stereo Risk).")

        try:
            # Try 1: 16kHz (Ideal for HFP)
            print("Attempting 16kHz Capture (HFP Mode)...")
            AUDIO_SAMPLE_RATE_CAPTURE = 16000
            self.input_stream = self.audio.open(
                format=pyaudio.paInt16,
                channels=1,
                rate=16000,
                input=True,
                input_device_index=input_index, 
                frames_per_buffer=CHUNK_SIZE,
                stream_callback=self._input_callback
            )
            print("  [SUCCESS] 16kHz Capture Active.")
        except Exception as e:
            print(f"  [FAIL] 16kHz: {e}")
            try:
                # Try 2: 44.1kHz (A2DP Mode)
                print("Attempting 44.1kHz Capture (A2DP Mode)...")
                AUDIO_SAMPLE_RATE_CAPTURE = 44100
                self.input_stream = self.audio.open(
                    format=pyaudio.paInt16,
                    channels=1,
                    rate=44100,
                    input=True,
                    input_device_index=None, # Default
                    frames_per_buffer=CHUNK_SIZE,
                    stream_callback=self._input_callback
                )
                print("  [SUCCESS] 44.1kHz Capture Active.")
            except Exception as e2:
                 print(f"  [FAIL] 44.1kHz: {e2}")
                 raise e2

        # Output Stream (with Fallback)
        try:
            print(f"Opening Output Stream (Index {output_index})...")
            self.output_stream = self.audio.open(
                format=pyaudio.paInt16,
                channels=1,
                rate=16000,
                output=True,
                output_device_index=output_index
            )
            print("  [SUCCESS] 16kHz Output Active.")
        except Exception as e:
             print(f"  [FAIL] 16kHz Output: {e}")
             try:
                print("Attempting 44.1kHz Output Fallback...")
                self.AUDIO_SAMPLE_RATE_PLAYBACK = 44100 # Adjust internal state for resampler
                self.output_stream = self.audio.open(
                    format=pyaudio.paInt16,
                    channels=1,
                    rate=44100,
                    output=True,
                    output_device_index=output_index
                )
                print("  [SUCCESS] 44.1kHz Output Active.")
             except Exception as e2:
                 print(f"Output Stream Failed: {e2}")
                 raise e2

        # UVS-H09: Single stream start with is_active() check
        if not self.input_stream.is_active():
            self.input_stream.start_stream()

        self.is_running = True
        # UVS-H09: REMOVED duplicate start_stream() call that was here

        # System Ready Chime (Hardware Beep + TTS)
        print("="*50)
        print(" GEMINI LIVE VOICE BRIDGE: ANTIGRAVITY SYNCED")
        print(" Identity: Antigravity (Genesis Core)")
        print(" Status: LISTENING (Say 'Antigravity' to wake)")
        print("="*50 + "      ")

        # UVS-H11: Typed exceptions for audio notifications
        try:
            import winsound
            winsound.Beep(600, 200)  # 600Hz for 200ms
            winsound.Beep(800, 200)
        except (ImportError, RuntimeError) as e:
            logger.debug(f"System beep unavailable: {e}")

        if TTS_AVAILABLE:
            try:
                engine = pyttsx3.init()
                engine.say("System Ready")
                engine.runAndWait()
            except (RuntimeError, OSError) as e:
                logger.debug(f"TTS unavailable: {e}")

        config = {
            "response_modalities": ["AUDIO"],
            "system_instruction": SYSTEM_INSTRUCTION,
            "tools": get_dynamic_tools()
        }
        
        # Auto-Reconnect Loop
        retry_count = 0
        while True:
            try:
                print(f"Connecting to Gemini Live (Attempt {retry_count + 1})...")
                async with self.client.aio.live.connect(
                    model=MODEL_ID,
                    config=config
                ) as session:
                    print(" [HEARTBEAT] Connection established.")
                    self.session = session
                    self.is_running = True
                    retry_count = 0 # Reset on success
                    
                    # Configure Vision Worker
                    self.auto_browse_skill.controller.vision.on_frame = self._on_vision_frame
                    await self.auto_browse_skill.controller.vision.start()

                    # UVS-H18: Use wait() with FIRST_EXCEPTION to handle task cancellation properly
                    # UVS-H24: Include context processor for conductor protocol
                    send_task = asyncio.create_task(self._send_audio())
                    receive_task = asyncio.create_task(self._receive_responses())
                    context_task = asyncio.create_task(self._process_context_updates())
                    pending_tasks = {send_task, receive_task, context_task}

                    try:
                        done, pending = await asyncio.wait(
                            pending_tasks,
                            return_when=asyncio.FIRST_EXCEPTION
                        )
                        # Check for exceptions in completed tasks
                        for task in done:
                            if task.exception():
                                raise task.exception()
                    finally:
                        # UVS-H18: Cancel all pending tasks on any exit
                        for task in pending:
                            task.cancel()
                            try:
                                await task
                            except asyncio.CancelledError:
                                pass

            except Exception as e:
                print(f"\nSession Error: {e}")
                # Ensure vision stops on error
                if self.auto_browse_skill.controller._initialized:
                    await self.auto_browse_skill.controller.vision.stop()
                retry_count += 1
                delay = min(30, 2 ** retry_count)
                print(f"Reconnecting in {delay} seconds...")
                await asyncio.sleep(delay)
            finally:
                self.is_running = False
                if self.auto_browse_skill.controller._initialized:
                    await self.auto_browse_skill.controller.vision.stop()

    async def _cleanup(self):
        """
        Unified cleanup for streams, workers, and audio.

        UVS-H11: Typed exceptions with logging, UVS-H23: Conductor thread cleanup,
        UVS-H30: Cleanup error aggregation.
        """
        cleanup_errors = []
        self.is_running = False

        # UVS-H23: Signal conductor thread to stop and wait for it
        if hasattr(self, '_conductor_stop_flag'):
            self._conductor_stop_flag.set()
        if hasattr(self, '_conductor_thread') and self._conductor_thread:
            try:
                self._conductor_thread.join(timeout=3.0)  # Wait up to 3s
                if self._conductor_thread.is_alive():
                    logger.warning("Conductor thread did not stop in time")
                    cleanup_errors.append("conductor: timeout waiting for thread")
            except Exception as e:
                cleanup_errors.append(f"conductor: {e}")
                logger.warning(f"Conductor thread cleanup error: {e}")

        if self.auto_browse_skill.controller._initialized:
            try:
                await self.auto_browse_skill.controller.vision.stop()
            except Exception as e:
                cleanup_errors.append(f"vision: {e}")
                logger.warning(f"Vision cleanup error: {e}")

        if self.input_stream:
            try:
                self.input_stream.stop_stream()
                self.input_stream.close()
            except (OSError, IOError) as e:
                cleanup_errors.append(f"input_stream: {e}")
                logger.warning(f"Input stream cleanup error: {e}")

        if self.output_stream:
            try:
                self.output_stream.stop_stream()
                self.output_stream.close()
            except (OSError, IOError) as e:
                cleanup_errors.append(f"output_stream: {e}")
                logger.warning(f"Output stream cleanup error: {e}")

        if self.audio:
            try:
                self.audio.terminate()
            except (OSError, RuntimeError) as e:
                cleanup_errors.append(f"audio: {e}")
                logger.warning(f"Audio cleanup error: {e}")

        if cleanup_errors:
            logger.info(f"Session closed with {len(cleanup_errors)} cleanup errors")
        else:
            logger.info("Session closed cleanly")

    async def _send_audio(self):
        """Send audio from queue."""
        try:
            # Debug Recording
            import wave
            try:
                debug_file = wave.open("debug_input.wav", "wb")
                debug_file.setnchannels(1)
                debug_file.setsampwidth(2)
                debug_file.setframerate(AUDIO_SAMPLE_RATE_SEND)
            except (OSError, IOError, wave.Error) as e:
                # UVS-H11: Typed exception for debug file creation
                logger.debug(f"Debug audio file unavailable: {e}")
                debug_file = None
            
            resample_state = None
            # UVS-H26: Track previous capture rate to detect rate changes
            last_capture_rate = AUDIO_SAMPLE_RATE_CAPTURE

            print("Audio Input Loop Started (Callback Mode)...")

            while self.is_running:
                try:
                    # Get from Queue (Async - Decoupled from Capture)
                    data = await self.audio_queue.get()

                    # Software AEC: If we played audio recently (< 0.2s), DROP PACKET
                    if time.time() - self.last_audio_time < 0.2:
                        continue

                    # UVS-H26: Reset resample state if capture rate changed (prevents audio artifacts)
                    if AUDIO_SAMPLE_RATE_CAPTURE != last_capture_rate:
                        logger.debug(f"Audio rate changed: {last_capture_rate} -> {AUDIO_SAMPLE_RATE_CAPTURE}, resetting resample state")
                        resample_state = None
                        last_capture_rate = AUDIO_SAMPLE_RATE_CAPTURE

                    # 1. Resample: Only if rates differ
                    if AUDIO_SAMPLE_RATE_CAPTURE != AUDIO_SAMPLE_RATE_SEND:
                        data, resample_state = audioop.ratecv(data, 2, 1, AUDIO_SAMPLE_RATE_CAPTURE, AUDIO_SAMPLE_RATE_SEND, resample_state)
                    
                    # 2. Digital Gain Boost (x2.0) - Tweaked for JBL Quantum
                    data = audioop.mul(data, 2, 2.0)

                    # Save to debug file (pre-send)
                    if debug_file: debug_file.writeframes(data)

                    # Visual VU Meter
                    rms = audioop.rms(data, 2)
                    if rms > 100:
                        bars = "#" * int((rms / 500))
                        print(f"\rMic Level: {bars:<20}", end="", flush=True)

                    await self.session.send_realtime_input(
                        media=types.Blob(data=data, mime_type="audio/pcm")
                    )
                except Exception as e:
                    # CRITICAL: Detect Fatal 1011/Disconnect Errors and Restart
                    err_str = str(e).lower()
                    if "1011" in err_str or "deadline" in err_str or "closed" in err_str:
                         print(f"Fatal Connection Error in Send Loop: {e}")
                         self.is_running = False # Stop other loops
                         raise e # Trigger Reconnect in Start()
                    
                    print(f"Error reading/sending audio: {e}")
                    await asyncio.sleep(0.1)
            
            if debug_file: debug_file.close()

        except Exception as e:
            print(f"Send audio loop error: {e}")

    async def _process_context_updates(self):
        """
        Process conductor context updates and inject into session.

        UVS-H24: Implements actual context injection via session.send_realtime_input.
        """
        while self.is_running:
            try:
                # Wait for context update with timeout to check is_running
                try:
                    context = await asyncio.wait_for(
                        self._context_queue.get(),
                        timeout=1.0
                    )
                except asyncio.TimeoutError:
                    continue

                # UVS-H24: Inject context into session as system message
                with self._state_lock:
                    session = self._session
                    running = self._is_running

                if session and running:
                    # Build context injection message
                    message = (
                        f"[SYSTEM CONTEXT UPDATE] "
                        f"Current Task: {context.get('current_task', 'None')}. "
                        f"Pipeline: {context.get('active_pipeline', 'None')}. "
                        f"Priority: {context.get('priority', 'normal')}."
                    )
                    try:
                        await session.send_realtime_input(
                            media=types.Blob(
                                data=message.encode('utf-8'),
                                mime_type="text/plain"
                            )
                        )
                        logger.debug(f"Context injected: {context.get('current_task')}")
                    except Exception as send_err:
                        logger.warning(f"Failed to inject context: {send_err}")
            except Exception as e:
                logger.debug(f"Context processor error: {e}")
                await asyncio.sleep(0.5)

    async def _on_vision_frame(self, frame_bytes: bytes):
        """
        Callback for VisionWorker frames.

        UVS-H17: Atomic session access pattern to handle race conditions.
        """
        # UVS-H17: Atomic session access - capture reference locally
        with self._state_lock:
            session = self._session
            running = self._is_running

        if session and running:
            try:
                await session.send_realtime_input(
                    media=types.Blob(data=frame_bytes, mime_type="image/jpeg")
                )
            except Exception as e:
                # UVS-H17: Set session to None on connection errors to prevent retries
                err_str = str(e).lower()
                if "1011" in err_str or "closed" in err_str or "disconnected" in err_str:
                    with self._state_lock:
                        self._session = None
                    logger.warning(f"Session disconnected during vision frame: {e}")
                # Skip frames gracefully when session unavailable

    async def _receive_responses(self):
        """Receive audio and tool calls."""
        output_resample_state = None
        while self.is_running:
            try:
                async for response in self.session.receive():
                    # Handle Tool Calls
                    if response.tool_call:
                        print(f"Tool Call Received: {response.tool_call}")
                        # Not yet implemented in V2 SDK Receive Loop nicely? 
                        # Actually server_content contains model_turn
                        pass 

                    if response.server_content:
                        # Debug: Print server content type
                        # print(f"\n[Debug] Server Content: {response.server_content}")
                        
                        # Handle Model Output (Audio/Text)
                        if response.server_content.model_turn:
                             # New Turn Detected -> Unmute (Reset VAD Flag)
                             if self.stop_playback:
                                 self.stop_playback = False
                                 print(" [VAD] New Turn - Unmuted Output")

                             for part in response.server_content.model_turn.parts:
                                if part.inline_data:
                                    try:
                                        data = part.inline_data.data
                                        # Resample 24000 -> 16000
                                        data, output_resample_state = audioop.ratecv(data, 2, 1, AUDIO_SAMPLE_RATE_RECEIVE, 16000, output_resample_state)
                                        
                                        # SUB-BUFFERING for responsive halting (S17)
                                        # 16000Hz * 2 bytes * 0.02s = 640 bytes per 20ms
                                        SUB_BUFFER_SIZE = 640
                                        for i in range(0, len(data), SUB_BUFFER_SIZE):
                                            if self.stop_playback:
                                                # print(" [Muted] Halting playback mid-chunk")
                                                break
                                            
                                            chunk = data[i:i+SUB_BUFFER_SIZE]
                                            self.last_audio_time = time.time()
                                            self.output_stream.write(chunk)
                                            
                                        self.last_audio_time = time.time()
                                    except Exception as e:
                                        print(f"Resample/Playback Error: {e}")

                                if part.text:
                                    print(f"[Gemini] {part.text}")
                                
                                # Handle Function Calls (Standard)
                                if part.function_call:
                                    await self._handle_function_call(part.function_call)

                        # Handle Turn Complete
                        if response.server_content.turn_complete:
                            pass

            except Exception as e:
                print(f"Receive Error: {e}")
                traceback.print_exc()
                break

    async def _handle_function_call(self, call):
        """Execute local tools (Non-Blocking)."""
        function_responses = []
        
        for fc in call.function_calls if hasattr(call, 'function_calls') else [call]:
            name = fc.name
            args = fc.args
            
            result = {}
            if name == "google_search":
                # Offload to thread to prevent loop block
                result = await self.loop.run_in_executor(None, google_search, args.get("query"))
            elif name == "navigate_browser":
                # Offload to thread
                result = await self.loop.run_in_executor(None, navigate_browser, args.get("url"))
            elif name == "auto_browse":
                # HANDOVER PROTOCOL CHECK (S19)
                instruction = args.get("instruction", "")
                confirmed = args.get("confirmed", False)
                sensitive_keywords = ["save", "delete", "commit", "buy", "billing", "publish", "create"]
                
                if not confirmed and any(kw in instruction.lower() for kw in sensitive_keywords):
                    print(f"[HANDOVER] Sensitive action detected: {instruction}")
                    print(" [PROTOCOL] Pausing for verbal verification...")
                    self.requires_handover = True
                    self.pending_action = {"name": name, "args": args}
                    result = {"status": "paused", "reason": "This action involves sensitive modifications. Please verbally say 'YES' or 'CONFIRM' to proceed."}
                else:
                    # Execute normally
                    self.requires_handover = False
                    await self.auto_browse_skill.initialize()
                    result = await self.auto_browse_skill.auto_execute(instruction)
            elif name == "move_cursor":
                await self.auto_browse_skill.initialize()
                await self.auto_browse_skill.move_cursor(args.get("x"), args.get("y"))
                result = {"status": "success", "cursor_moved": True}
            elif name == "zoom_viewport":
                await self.auto_browse_skill.initialize()
                await self.auto_browse_skill.set_zoom(args.get("x"), args.get("y"), args.get("width", 400), args.get("height", 400))
                result = {"status": "success", "zoom_active": True}
            elif name == "anchor_element":
                await self.auto_browse_skill.initialize()
                # UVS-H02: Safe parameterization - use json.dumps to prevent JS injection
                from core.security.selector_sanitizer import sanitize_selector, SelectorValidationError
                try:
                    selector = sanitize_selector(args.get('selector', ''))
                    safe_selector = json.dumps(selector)  # JSON escapes quotes/backslashes
                    await self.auto_browse_skill.controller.evaluate(f"window.anchorGenesisCursor({safe_selector})")
                    result = {"status": "success", "anchored": True}
                except SelectorValidationError as e:
                    logger.warning(f"[SECURITY] Blocked unsafe anchor selector: {e}")
                    result = {"status": "error", "message": f"Invalid selector: {e}"}
            elif name == "gesture_cursor":
                await self.auto_browse_skill.initialize()
                # UVS-H02: Safe parameterization - validate and escape all inputs
                from core.security.selector_sanitizer import sanitize_selector, SelectorValidationError
                from core.security.input_validator import validate_enum, ValidationError
                try:
                    gesture_type = args.get('type', '')
                    validate_enum(gesture_type, ['circle', 'underline', 'point'], 'gesture_type')
                    selector = sanitize_selector(args.get('selector', ''))
                    safe_type = json.dumps(gesture_type)
                    safe_selector = json.dumps(selector)
                    await self.auto_browse_skill.controller.evaluate(f"window.gestureGenesisCursor({safe_type}, {safe_selector})")
                    result = {"status": "success", "gesture_active": True}
                except (SelectorValidationError, ValidationError) as e:
                    logger.warning(f"[SECURITY] Blocked unsafe gesture args: {e}")
                    result = {"status": "error", "message": f"Invalid input: {e}"}
            elif name == "clear_zoom":
                await self.auto_browse_skill.clear_zoom()
                result = {"status": "success", "zoom_cleared": True}
            elif name == "visual_reset":
                await self.auto_browse_skill.visual_reset()
                result = {"status": "success", "visual_reset_complete": True}
            elif name == "pause_for_founder_question":
                reason = args.get("reason", "Strategic checkpoint.")
                print(f"[MENTORSHIP] Pausing: {reason}")
                self.stop_playback = True
                result = {"status": "paused", "reason": reason, "instruction": "The agent is now listening for your questions. Say 'Continue' when ready."}
            elif name.startswith("ghl_"):
                # Dynamic GHL Skill Dispatch (S19 Integrated)
                # UVS-H27: Validate skill interface before execution
                skill_name = name.replace("ghl_", "")
                skill = ghl_registry.get_skill(skill_name)
                confirmed = args.get("confirmed", False)

                if skill:
                    # UVS-H27: Validate skill has required interface
                    if not hasattr(skill, 'execute') or not callable(getattr(skill, 'execute', None)):
                        logger.warning(f"[SECURITY] GHL skill '{skill_name}' missing execute() method")
                        result = {"error": f"Invalid skill interface: {skill_name} has no execute() method"}
                    elif not hasattr(skill, 'description'):
                        logger.warning(f"[SECURITY] GHL skill '{skill_name}' missing description attribute")
                        result = {"error": f"Invalid skill interface: {skill_name} has no description"}
                    else:
                        # Check if skill description suggests sensitivity
                        sensitive_keywords = ["create", "delete", "update", "modify", "save", "billing"]
                        is_sensitive = any(kw in skill.description.lower() for kw in sensitive_keywords)

                        if not confirmed and is_sensitive:
                            print(f"[HANDOVER] Sensitive GHL skill detected: {skill_name}")
                            self.requires_handover = True
                            self.pending_action = {"name": name, "args": args}
                            result = {"status": "paused", "reason": f"Executing '{skill_name}' requires founder confirmation. Please confirm."}
                        else:
                            self.requires_handover = False
                            await self.auto_browse_skill.initialize()
                            try:
                                result = await skill.execute(self.auto_browse_skill.controller, **args)
                                # UVS-H27: Validate result is proper Dict
                                if not isinstance(result, dict):
                                    logger.warning(f"GHL skill '{skill_name}' returned non-dict: {type(result)}")
                                    result = {"status": "success", "data": result}
                            except TypeError as te:
                                # UVS-H27: Handle invalid skill signature
                                logger.error(f"GHL skill '{skill_name}' signature error: {te}")
                                result = {"error": f"Skill interface error: {te}"}
                            except Exception as exec_err:
                                logger.error(f"GHL skill '{skill_name}' execution error: {exec_err}")
                                result = {"error": f"Skill execution failed: {exec_err}"}
                else:
                    result = {"error": f"GHL Skill not found: {skill_name}"}
            else:
                result = {"error": f"Unknown tool: {name}"}

            function_responses.append(
                types.FunctionResponse(
                    name=name,
                    id=fc.id,  # Important for mapping back
                    response=result
                )
            )

        # Send result back
        print(f"[Tool] Sending response...")
        await self.session.send(
            input=types.LiveClientToolResponse(
                function_responses=function_responses
            )
        )


if __name__ == "__main__":
    try:
        if sys.platform == 'win32':
             # Windows asyncio fix for some loops
            asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
            
        session = GeminiLiveSession()
        asyncio.run(session.start())
    except KeyboardInterrupt:
        print("\nExiting...")
