
    "iY-                         d Z ddlZddlZddlZddlZddlZddlmZmZ ddlm	Z	 ddl
mZ ddlZ ej                  e      ZdZ e	d      ZdZd	Z G d
 d      Zy)u   RLMCaptureAgent — async agent loop for capturing call transcripts.

Story 3.05 — AIVA RLM Nexus PRD v2 — Track A
Story 3.06 — Transcript Fetch + Speaker Labeling (extends 3.05)
    N)datetimetimezone)Path)Optionalz'https://api.telnyx.com/v2/ai/assistantsz(/mnt/e/genesis-system/data/observabilityi     c            	           e Zd ZdZ	 	 ddedededdfdZddZdefd	Zdd
Z	de
e   fdZddZddede
e   ddfdZy)RLMCaptureAgentu  
    Async capture loop that polls for call-ended signal.
    Designed to run as a background asyncio task during active calls.

    The agent polls Redis every POLL_INTERVAL_SECONDS seconds looking for
    the key ``aiva:state:{session_id}`` to contain ``{"status": "ended"}``.
    When detected — or when MAX_RUNTIME_SECONDS elapses — the loop exits
    cleanly and logs a stop event.  Any Redis failure is treated as
    fail-safe: the agent keeps running (returns False from _is_call_ended).
    N
session_idcall_control_idcall_directionreturnc                     || _         || _        || _        || _        d| _        d | _        d| _        d| _        d| _        y )NTr    )	r
   r   _redisr   _running_started_at_consecutive_failures_chunk_index_last_transcript_text)selfr
   r   redis_clientr   s        6/mnt/e/genesis-system/core/agents/rlm_capture_agent.py__init__zRLMCaptureAgent.__init__&   sI     %.",",0*+"!"*,"    c                 V  K   t        j                         | _        | j                  d       | j                  rt        j                         | j                  z
  }|t
        k\  r| j                  dd|i       np| j                          d{   r| j                  d       nF| j                          d{    t        j                  t               d{    | j                  rd| _        | j                  ddt        j                         | j                  z
  i       y7 7 o7 Pw)	u_  
        Main loop.  Runs until call ended or max runtime exceeded.

        - Polls every POLL_INTERVAL_SECONDS (30 s)
        - Checks Redis ``aiva:state:{session_id}`` for ``{"status": "ended"}``
          to stop
        - Max runtime: MAX_RUNTIME_SECONDS (3600 s) — zombie guard
        - Returns normally (no exception) after stopping
        agent_startedzombie_guard_triggeredelapsed_secondsNcall_ended_detectedFagent_stoppedtotal_seconds)time	monotonicr   
_log_eventr   MAX_RUNTIME_SECONDS_is_call_ended_capture_cycleasynciosleepPOLL_INTERVAL_SECONDS)r   elapseds     r   runzRLMCaptureAgent.run8   s       >>+(mmnn&)9)99G-- 8;Lg:VW ((*** 56 %%''' -- 5666! mm$ dnn.1A1AAB	
 +
 ( 7s<   B	D)D#)D)5D%6 D)D'D)(<D)%D)'D)c                   K   | j                   sy	 d| j                   }| j                   j                  |       d{   }|syt        j                  |      }|j                  d      dk(  S 7 0# t
        $ r Y yw xY ww)z
        Returns True if Redis state shows 'ended'.
        Returns False if key is missing (call still active) or on any error.
        Fzaiva:state:Nstatusended)r   r
   getjsonloads	Exception)r   keyrawstates       r   r&   zRLMCaptureAgent._is_call_ended]   s|     
 {{		01C,,CJJsOE99X&'11	 -
  		sD   B -A1 A/A1 B (A1 .B /A1 1	A=:B <A==B c                    K   d| _         yw)z7Graceful shutdown.  Sets _running = False.  Idempotent.FN)r   )r   s    r   stopzRLMCaptureAgent.stopo   s     s   	c                 F  K   t         j                  j                  dd      }t         d| j                   d| j
                   }d| dd}	 t        j                  d	      4 d
{   }|j                  ||       d
{   }|j                          |j                         }d
d
d
      d
{    	 j                  d|      }t!        |t"              r|j                  d      nd
}	|	rHt!        |	t$              r8t'        |	      dkD  r*|	d   }
|
j                  dd      }|
j                  dd      }n+t!        |t"              r|j                  dd      nd}|sy
d}|}|sy
|| j(                  k(  ry
|| _        |dk(  rd}n| j*                  dk(  rd}nd}t-        j,                         ||| j.                  d}| xj.                  dz  c_        |S 7 k7 S7 &# 1 d
{  7  sw Y   7xY w# t        j                  $ r | j                  dd|i       Y y
t        j                  $ r3}| j                  d|j                  j                  |d       Y d
}~y
d
}~wt        $ r | j                  dd|i       Y y
w xY w# t        $ r | j                  d        Y y
w xY ww)!ub  
        Fetches the latest transcript from the Telnyx AI Assistant transcript API
        and labels the speaker based on the call direction.

        Speaker labels:
            "AIVA"     — AI assistant turns (role == "assistant")
            "KINAN"    — human turns when call_direction == "outbound"
            "CUSTOMER" — human turns when call_direction != "outbound" (inbound)

        Returns:
            {
                "t": float,           # unix timestamp of fetch
                "speaker": str,       # "AIVA" | "KINAN" | "CUSTOMER"
                "text": str,          # transcript text
                "chunk_index": int,   # monotonically increasing counter
            }
            Or None if no new content was available since the last fetch.

        This method NEVER raises — all errors are caught and logged internally.
        TELNYX_API_KEYr   /z/transcript?call_control_id=zBearer zapplication/json)AuthorizationzContent-Typeg      $@)timeoutN)headersfetch_timeouturlfetch_http_error)status_coder@   fetch_errordatamessagesr   roleusercontent
transcript	assistantAIVAoutboundKINANCUSTOMER)tspeakertextchunk_index   fetch_parse_error)osenvironr0   TELNYX_API_BASEr
   r   httpxAsyncClientraise_for_statusr1   TimeoutExceptionr$   HTTPStatusErrorresponserB   r3   
isinstancedictlistlenr   r   r"   r   )r   api_keyr@   r>   clientr^   payloadexcrD   rE   latestrG   rR   transcript_textrQ   chunks                   r   _fetch_and_label_chunkz&RLMCaptureAgent._fetch_and_label_chunks   s    * **..!126q 1*4+?+?*@B 	
  'wi0.

	((6 * *&!'C!AA))+"--/* *(1	;;vw/D 0:$/Etxx
+4HJx63x=1;L!"zz&&1zz)R0 3=T42HDHH\2.b   '& t111)-D& {" $$
2!$ YY["#00	E "LC*A* * * * %% 	OOOeS\:$$ 	OO" # 8 8E  	OOME3<8	l  	OO/0	s   AJ!G2 (G)G2 ,GG$G(G2 3G4G2 9B%J J!J %J!&J 5J!6AJ J!G2 GG2 G/"G%#G/*G2 2'I>J!I>-)IJ! I>;J!=I>>J!JJ!JJ!c                 8  K   	 | j                          d{   }|yd| j
                   }	 | j                  j                  |t        j                  |             d{    d| _        y7 V# t        $ r}| xj                  dz  c_        t        j	                  d| j
                  | j                  |       | j                  dk\  r+t        j                  d| j
                  | j                         Y d}~yd}~ww xY w7 # t        $ r}| xj                  dz  c_        t        j	                  d|| j
                  | j                  |       | j                  dk\  r0t        j                  d| j
                  | j                         Y d}~yY d}~yd}~ww xY ww)	u  
        One capture cycle called every 30 s from the run() loop.

        1. Call _fetch_and_label_chunk() to get the latest transcript chunk.
        2. If a chunk is returned: RPUSH it (as JSON) to
           ``aiva:transcript:{session_id}`` in Redis.
        3. If _fetch_and_label_chunk() returns None: no-op (normal — no new content).
        4. Any exception (from fetch OR from Redis RPUSH) is caught, logged as a
           warning, and the agent continues running — NEVER raises.
        5. After 3 consecutive failed cycles: log at ERROR level (escalation signal).
        6. A successful cycle resets the consecutive-failure counter to 0.
        NrT   zUrlm_capture: _fetch_and_label_chunk raised unexpectedly (session=%s, failures=%d): %s   uT   rlm_capture: 3+ consecutive failures (session=%s, failures=%d) — escalation signalzaiva:transcript:r   zHrlm_capture: Redis RPUSH failed for key=%s (session=%s, failures=%d): %s)rj   r3   r   loggerwarningr
   errorr   rpushr1   dumps)r   ri   rf   	redis_keys       r   r'   zRLMCaptureAgent._capture_cycle   ss    	5577E( = 't&78		++##Itzz%/@AAA)*D&; 8 	&&!+&NN0** ))Q.(OO..	 #	4 B  	&&!+&NN0** ))Q.(OO..	  /	sz   FA0 A.A0 F2D !D"D -F.A0 0	C>9A;C94F9C>>FD 	FA<F
FFF
event_typeextrac                    	 t         j                  dd       t        j                  t        j
                        j                         d| | j                  | j                  d}|r|j                  |       t        t         dz  d      5 }|j                  t        j                  |      dz          ddd       y# 1 sw Y   yxY w# t        $ r Y yw xY w)	z=Append a JSONL event to the observability log.  Never raises.T)parentsexist_okrlm_capture_)	timestamprs   r
   r   zevents.jsonla
N)
EVENTS_DIRmkdirr   nowr   utc	isoformatr
   r   updateopenwriter1   rq   r3   )r   rs   rt   eventfs        r   r$   zRLMCaptureAgent._log_event  s    	TD9%\\(,,7AAC ,ZL9"oo#'#7#7	E U#j>137 21

5)D012 2 2 		s0   B
C	 (B=4C	 =CC	 C	 		CC)Ninbound)r   N)N)__name__
__module____qualname____doc__strr   r,   boolr&   r8   r   r`   rj   r'   r$    r   r   r	   r	      s    	 '-- -
 - 
-$#
Jd $ehtn eN<|S $ 4 r   r	   )r   r(   r1   loggingrV   r"   r   r   pathlibr   typingr   rY   	getLoggerr   rm   rX   r|   r%   r*   r	   r   r   r   <module>r      sa   
    	  '   			8	$;<=
  M Mr   