
    i;                     |    d Z ddlZddlZddlmZmZ ddlmZ ddlmZ  ed      Z	dZ
 G d d	      Z G d
 d      Zy)u  
Telnyx Webhook Interceptor — call lifecycle event handler.
Story 3.02 — AIVA RLM Nexus PRD v2 — Track A

Handles Telnyx call lifecycle events:
  - call.initiated  → open a royal_conversations row (started_at = now, ended_at = NULL)

Design principles (hardwired):
  - The phone call MUST NEVER break due to a DB or logging failure.
  - ALL exceptions from DB/IO operations are caught, logged, and swallowed.
  - session_id is extracted from the canonical Telnyx payload path; UUID4 is
    used as a fallback when the path is absent or malformed.
  - All timestamps are UTC.
  - The events log file (events.jsonl) uses the same path as ExecutionTelemetryInterceptor
    so a single file tracks all Genesis observability events.
    N)datetimetimezone)Path)Optionalz5/mnt/e/genesis-system/data/observability/events.jsonl)datapayloadcall_session_idc                       e Zd ZdZddZdedefdZdedefdZdedefdZdede	fd	Z
ded
e	dee	   fdZde	ddfdZde	ddfdZde	deddfdZy)TelnyxWebhookInterceptora  
    Handles Telnyx call lifecycle events.

    Responsibilities:
      - handle_call_initiated: records conversation start in royal_conversations
      - All DB failures are non-fatal (the call must never break)
      - All events are appended to the observability log

    Args:
        db_conn:      Optional psycopg2 connection. When None, DB operations are skipped.
        redis_client: Optional Redis client. Reserved for future use (e.g. active-call cache).
    Nc                      || _         || _        y )N)_db_redis)selfdb_connredis_clients      E/mnt/e/genesis-system/core/interceptors/telnyx_webhook_interceptor.py__init__z!TelnyxWebhookInterceptor.__init__3   s    "    r   returnc                    K   | j                  |      }	 | j                  |       | j                  dd|i       d|dS # t        $ r(}| j                  dt	        |      |d       Y d}~Ed}~ww xY ww)a5  
        Handle a Telnyx ``call.initiated`` webhook event.

        Processing steps:
          1. Extract call_session_id from payload["data"]["payload"]["call_session_id"].
             Falls back to a new UUID4 string if the key path is absent.
          2. Insert a new row into royal_conversations with:
               - conversation_id = session_id
               - started_at      = UTC now
               - ended_at        = NULL  (not yet finished)
               - participants    = {"kinan": false, "aiva": false}
             Uses ON CONFLICT DO NOTHING for idempotency (duplicate webhooks are safe).
          3. Append a ``telnyx_call_initiated`` event to the observability log.
          4. Return {"status": "ok", "session_id": session_id}.

        Any DB or IO failure is caught, logged to the observability file (best-effort),
        and the method still returns {"status": "ok"} so the call is never disrupted.

        Args:
            payload: Parsed Telnyx webhook body (dict).

        Returns:
            {"status": "ok", "session_id": str}
        db_errorerror
session_idNcall_initiatedr   ok)statusr   )_extract_session_id_insert_conversation	Exception
_log_eventstr)r   r   r   excs       r   handle_call_initiatedz.TelnyxWebhookInterceptor.handle_call_initiated;   sz     2 --g6
	W%%j1
 	(<*DEj99  	WOOJ#c(*(UVV	Ws+   A3? A3	A0A+&A3+A00A3c                 `  K   ddl }| j                  |      }	 | j                  rft        j                  dt        j                  t        j                        j                         d      }| j                  j                  d| |       	 | j                  |       	 t               }|j!                  |j#                  |      d	
       d{    | j                  dd|i       dddS # t        $ r(}| j                  dt        |      |d       Y d}~d}~ww xY w# t        $ r(}| j                  dt        |      |d       Y d}~d}~ww xY w7 # |j$                  $ r | j                  d|dd       Y t        $ r(}| j                  d|t        |      d       Y d}~d}~ww xY ww)u  
        Handle a Telnyx ``call.hangup`` webhook event.

        Processing steps:
          1. Extracts session_id from payload["data"]["payload"]["call_session_id"].
             Falls back to a UUID4 string if the path is absent or malformed.
          2. Updates Redis key ``aiva:state:{session_id}`` to a JSON object:
               {"status": "ended", "ended_at": <ISO-8601 UTC timestamp>}
             The Redis write is NON-FATAL — if it fails the event is logged and
             execution continues.
          3. Updates Postgres ``royal_conversations.ended_at`` to UTC now for the
             row whose conversation_id matches session_id.
             The Postgres write is NON-FATAL — if it fails the event is logged and
             execution continues.
          4. Spawns PostCallEnricher.enrich(session_id) with a hard asyncio deadline
             of 3.0 seconds.
               - If it finishes within 3 s  → enrichment_started = True (already done)
               - If it raises within 3 s    → error logged, no crash
               - If it times out at 3 s     → TimeoutError logged, no crash
          5. Returns {"status": "ok", "enrichment_started": True} in all cases.

        ANY failure is logged and swallowed — the call must NEVER break.

        Args:
            payload: Parsed Telnyx webhook body (dict).

        Returns:
            {"status": "ok", "enrichment_started": True}
        r   Nended)r   ended_ataiva:state:redis_errorr   r   g      @)timeoutenricher_timeoutz#PostCallEnricher timed out after 3s)r   r   enricher_errorcall_hangupr   r   T)r   enrichment_started)asyncior   r   jsondumpsr   nowr   utc	isoformatsetr    r!   r"   _update_conversation_endedPostCallEnricherwait_forenrichTimeoutError)r   r   r/   r   stater#   enrichers          r   handle_call_hangupz+TelnyxWebhookInterceptor.handle_call_hangupa   s    < 	--g6
	Z{{

% (X\\ : D D F$  +j\ :EB
	W++J7
	')H""8??:#>"LLL 	j'ABd;;7  	ZOOMSXZ+XYY	Z  	WOOJ#c(*(UVV	W M## 	OO")4YZ  	OO )CH= 	s   F.A2C+ D /E EE F.+	D4DF.DF.	E(EF.EF.E $F+9F.;F+F&!F.&F++F.c                   K   | j                  |      }| j                  |d      }	 | j                  rft        j                  dt        j                  t        j                        j                         d      }| j                  j                  d| |       | j                  d||xs d	d
       dd|dS # t        $ r(}| j                  dt        |      |d       Y d}~Kd}~ww xY ww)u#  
        Handle a Telnyx ``call.answered`` webhook event.

        Processing steps:
          1. Extracts session_id from payload["data"]["payload"]["call_session_id"].
             Falls back to a UUID4 string if the path is absent or malformed.
          2. Extracts call_control_id from payload["data"]["payload"]["call_control_id"].
             None is acceptable (non-fatal).
          3. Sets Redis key ``aiva:state:{session_id}`` to a JSON object:
               {"status": "active", "answered_at": <ISO-8601 UTC timestamp>}
             The Redis write is NON-FATAL — if it fails the event is logged and
             execution continues.
          4. Appends a ``call_answered`` event to the observability log.
          5. Returns {"status": "ok", "capture_started": True, "session_id": session_id}.

        ANY failure is logged and swallowed — the call must NEVER break.

        Args:
            payload: Parsed Telnyx webhook body (dict).

        Returns:
            {"status": "ok", "capture_started": True, "session_id": str}
        call_control_idactive)r   answered_atr(   r)   r   Ncall_answeredunknown)r   r?   r   T)r   capture_startedr   )r   _extract_fieldr   r0   r1   r   r2   r   r3   r4   r5   r    r!   r"   )r   r   r   r?   r;   r#   s         r   handle_call_answeredz-TelnyxWebhookInterceptor.handle_call_answered   s     0 --g6
--g7HI	Z{{

&#+<<#=#G#G#I$  +j\ :EB
 	$.;)*
 	
 4zRR  	ZOOMSXZ+XYY	Zs/   $C,A2B8 C,8	C)C$C,$C))C,c                     	 |}t         D ]  }||   }	 |rt        |t              r|S t        t        j                               S # t        t        f$ r Y .w xY w)a;  
        Extract call_session_id from the canonical Telnyx payload path.

        Canonical path: payload["data"]["payload"]["call_session_id"]

        Returns:
            The session ID string from the payload, or a new UUID4 string if
            the path is absent, None, or the payload is not a dict.
        )_SESSION_PATH
isinstancer"   KeyError	TypeErroruuiduuid4)r   r   nodekeys       r   r   z,TelnyxWebhookInterceptor._extract_session_id   sa    	D$ !Cy!
4- 4::<   )$ 		s   %A AAfieldc                 P    	 |d   d   |   }||S dS # t         t        f$ r Y yw xY w)a  
        Extract an arbitrary field from the canonical Telnyx payload path
        ``payload["data"]["payload"][field]``.

        Args:
            payload: Parsed Telnyx webhook body.
            field:   Field name to look up inside ``data.payload``.

        Returns:
            The field value as a string, or None if the path is absent,
            None-valued, or the payload structure is unexpected.
        r   r   N)rJ   rK   )r   r   rP   values       r   rE   z'TelnyxWebhookInterceptor._extract_field   sA    	FOI.u5E!-5747)$ 		s     %%r   c                 v   | j                   syd}t        j                  t        j                        }t        j                  ddd      }| j                   j                         }	 |j                  ||||f       | j                   j                          |j                          y# |j                          w xY w)a  
        Insert a new row into royal_conversations for this call session.

        Schema (from data/migrations/001_royal_conversations.sql):
          - conversation_id UUID PRIMARY KEY
          - started_at      TIMESTAMP NOT NULL
          - ended_at        TIMESTAMP               (NULL until hangup)
          - participants    JSONB NOT NULL DEFAULT '{"kinan": false, "aiva": false}'

        Uses ON CONFLICT DO NOTHING so duplicate call.initiated events are safe.

        Args:
            session_id: UUID string for this call session.

        Raises:
            Any psycopg2 exception if the DB operation fails.
            Callers are responsible for catching and swallowing.
        Nz
            INSERT INTO royal_conversations
                (conversation_id, started_at, participants)
            VALUES
                (%s, %s, %s)
            ON CONFLICT (conversation_id) DO NOTHING
        F)kinanaiva)r   r   r2   r   r3   r0   r1   cursorexecutecommitclose)r   r   queryr2   participantsrV   s         r   r   z-TelnyxWebhookInterceptor._insert_conversation  s    & xx ll8<<(zzE5"AB"	NN5:sL"ABHHOOLLNFLLNs   &/B& &B8c                 D   | j                   syd}t        j                  t        j                        }| j                   j                         }	 |j                  |||f       | j                   j                          |j                          y# |j                          w xY w)a3  
        Set ended_at = UTC now for royal_conversations row matching session_id.

        Args:
            session_id: UUID string for this call session.

        Raises:
            Any psycopg2 exception if the DB operation fails.
            Callers are responsible for catching and swallowing.
        Nzu
            UPDATE royal_conversations
            SET ended_at = %s
            WHERE conversation_id = %s
        )	r   r   r2   r   r3   rV   rW   rX   rY   )r   r   rZ   r2   rV   s        r   r6   z3TelnyxWebhookInterceptor._update_conversation_ended)  sr     xx
 ll8<<("	NN53
"34HHOOLLNFLLNs   .B B
event_typer   c                 p   	 t         j                  j                  dd       t        j                  t
        j                        j                         d| d|}t        t         d      5 }|j                  t        j                  |      dz          ddd       y# 1 sw Y   yxY w# t        $ r Y yw xY w)a  
        Append an event record to the observability log (best-effort, never raises).

        Format: one JSON object per line, compatible with events.jsonl used by
        ExecutionTelemetryInterceptor.

        Args:
            event_type: Short string label (e.g. "call_initiated", "db_error").
            data:       Additional key/value pairs to include in the event record.
        T)parentsexist_oktelnyx_)	timestampr]   a
N)EVENTS_LOG_PATHparentmkdirr   r2   r   r3   r4   openwriter0   r1   r    )r   r]   r   eventfhs        r   r!   z#TelnyxWebhookInterceptor._log_eventE  s    
	""(((E%\\(,,7AAC '
|4 E
 os+ 3rE*T123 3 3 		s0   A*B) ,(BB) B&"B) &B) )	B54B5)NN)__name__
__module____qualname____doc__r   dictr$   r=   rF   r"   r   r   rE   r   r6   r!    r   r   r   r   %   s    #$:4 $:D $:LE< E< E<N,S$ ,S4 ,Sd!4 !C !*d 3 8C= &%s %t %NS T 8S   r   r   c                        e Zd ZdZdeddfdZy)r7   u  
    Stub implementation of post-call enrichment logic.

    In production this will:
      - Pull transcript / recording from Telnyx
      - Run sentiment analysis
      - Update the royal_conversations row with enrichment data
      - Fire RLM ingestion for the conversation

    For Story 3.04 this is a stub — it simply returns immediately so the
    asyncio.wait_for deadline path can be exercised by tests and real code
    without blocking.
    r   r   Nc                    K   yw)z
        Enrich a completed call session.

        Args:
            session_id: The call session identifier from Telnyx.
        Nrq   )r   r   s     r   r9   zPostCallEnricher.enrichp  s      	s   )rl   rm   rn   ro   r"   r9   rq   r   r   r7   r7   a  s    s t r   r7   )ro   r0   rL   r   r   pathlibr   typingr   re   rH   r   r7   rq   r   r   <module>rv      sF       '   NO 7u ux	 r   