
    iL                         d Z ddlZddlZddlZddlmc mZ ddlm	Z	m
Z
 ddlmZmZ ddlmZ 	 ddlZ ej$                  e      Ze	 G d d             Z G d d	      Zy# e$ r dZY 5w xY w)
u  
AIVA RLM Nexus — BinduHydrator Base Class
Scatter/gather engine that fans out parallel tasks to fetch AIVA's context
before she picks up a call.

Story 4.01 — Track A
File: core/hydrators/bindu_hydrator.py

VERIFICATION_STAMP
Story: 4.01
Verified By: parallel-builder
Verified At: 2026-02-25
Tests: 24/24
Coverage: 100%

Story 4.02 — Track A (extension)
VERIFICATION_STAMP
Story: 4.02
Verified By: parallel-builder
Verified At: 2026-02-25
Tests: 6/6
Coverage: 100%

Story 4.03 — Track A (extension)
VERIFICATION_STAMP
Story: 4.03
Verified By: parallel-builder
Verified At: 2026-02-25
Tests: 11/11
Coverage: 100%

Story 4.04 — Track A (extension)
VERIFICATION_STAMP
Story: 4.04
Verified By: parallel-builder
Verified At: 2026-02-25
Tests: 9/9
Coverage: 100%

Story 4.05 — Track A (extension)
VERIFICATION_STAMP
Story: 4.05
Verified By: parallel-builder
Verified At: 2026-02-25
Tests: 10/10
Coverage: 100%
    N)	dataclassfield)datetimetimezone)Optionalc                   V    e Zd ZU dZeed<   eed<   eed<   eed<   dZeed<   dZ	eed<   y	)
HydrationSessionzs
    Tracks the state of a single call hydration cycle.
    One HydrationSession is created per incoming call.
    
session_idaiva_call_id
started_atstatusr   tasks_dispatchedtasks_completedN)
__name__
__module____qualname____doc__str__annotations__r   r   intr        6/mnt/e/genesis-system/core/hydrators/bindu_hydrator.pyr	   r	   @   s4     OKcOSr   r	   c                       e Zd ZdZdZdZdZddZdeded	e	fd
Z
ded	efdZd	ee   fdZded	ee   fdZddeded	ee   fdZdeded	efdZy)BinduHydratoraP  
    Scatter/gather engine: fans out N parallel Redis-backed tasks,
    aggregates into a single ROYAL_CHAMBER_CONTEXT XML envelope.

    This is the BASE class. Stories 4.02-4.07 will add fetch methods
    (memory, PG transcript history, Qdrant semantic recall, etc.).
    All hydration must complete within HYDRATION_DEADLINE_MS.
    i  zaiva:context:{session_id}i,  Nc                 .    || _         || _        || _        y)z
        Args:
            redis_client:    Async/sync redis client (must support setex).
            postgres_client: Optional asyncpg or psycopg2 connection/pool.
            qdrant_client:   Optional Qdrant async client.
        N)redispgqdrant)selfredis_clientpostgres_clientqdrant_clients       r   __init__zBinduHydrator.__init__`   s     "
!#r   r
   call_idreturnc                   K   t        j                  t        j                        }t	        |||ddd      }| j
                  j                  |      }	 | j                  j                  || j                  d       d{    t        j                  d|| j                         |S 7 '# t        $ r"}t        j                  d||       Y d}~|S d}~ww xY ww)aP  
        Creates a HydrationSession and marks it 'pending' in Redis.

        Writes key aiva:context:{session_id} = "pending" with a 300s TTL
        using SETEX so the key auto-expires if the call never completes.

        Redis failure is NON-FATAL: the session is still returned so the call
        can proceed (degraded but live).

        Args:
            session_id: Unique identifier for this hydration cycle.
            call_id:    Telnyx call_control_id (or internal call UUID).

        Returns:
            HydrationSession with status="pending" and started_at=UTC now.
        pendingr   )r
   r   r   r   r   r   r
   Nz8BinduHydrator: Redis key '%s' set to 'pending' (TTL=%ds)uO   BinduHydrator: Redis SETEX failed for key '%s': %s — proceeding without cache)r   nowr   utcr	   _CONTEXT_KEY_TEMPLATEformatr   setex_CONTEXT_TTL_SECONDSloggerdebug	Exceptionerror)r    r
   r%   now_utcsession	redis_keyexcs          r   start_hydrationzBinduHydrator.start_hydrationk   s     " ,,x||,"! 
 ..555L		**""9d.G.GSSSLLJ))  T  	LLa  	sB   AC*B( >B&?%B( $C&B( (	C1CCCCc                 |  K   d| }d}	 t        j                  | j                  j                  |      | j                  j                  |             d{   \  }}i }|Q	 t        j                  |      }t        |t              s+t
        j                  dt        |      j                         i }g }|Q	 t        j                  |      }t        |t               s+t
        j                  dt        |      j                         g }||dS 7 # t        $ r%}t
        j                  d|       i g dcY d}~S d}~ww xY w# t        j                  t        f$ r"}t
        j                  d|       i }Y d}~d}~ww xY w# t        j                  t        f$ r"}t
        j                  d	|       g }Y d}~d}~ww xY ww)
u  
        Fires 2 parallel Redis reads via asyncio.gather (L1 fetch — fastest tier).

        Keys read:
          1. aiva:state:{session_id}   → AIVA working state (JSON object)
          2. kinan:directives:active   → Kinan's current active directives (JSON array)

        Returns:
            {
                "aiva_state":        dict   — parsed JSON; {} if key missing or parse error
                "kinan_directives":  list   — parsed JSON; [] if key missing or parse error
            }

        Redis failure is NON-FATAL: returns safe defaults so the call can proceed
        without crashing the hydration pipeline.
        zaiva:state:zkinan:directives:activeNuR   BinduHydrator._scatter_redis_tasks: Redis gather failed: %s — returning defaults
aiva_statekinan_directivesuA   BinduHydrator: aiva:state is not a dict (%s) — defaulting to {}uG   BinduHydrator: Failed to parse aiva:state JSON: %s — defaulting to {}uN   BinduHydrator: kinan:directives:active is not a list (%s) — defaulting to []uT   BinduHydrator: Failed to parse kinan:directives:active JSON: %s — defaulting to [])asynciogatherr   getr2   r0   r3   jsonloads
isinstancedictwarningtyper   JSONDecodeError
ValueErrorlist)	r    r
   aiva_keydirectives_keyaiva_rawdirectives_rawr7   r;   r<   s	            r   _scatter_redis_tasksz"BinduHydrator._scatter_redis_tasks   s    " !-2
	>-4^^

x(

~.. ($Hn 
 !ZZ1
!*d3NN[Z(11 "$J "$%&#'::n#= !"2D9NNh-.77 (*$ )>NOO_(  	>LLd #%"==	>$ ((*5  ]  
 $ ((*5 &j $& &s   F<AD DD F<#AE  3F<8AE> F<D 	D=D82D=3F<8D==F< E;E61F<6E;;F<>F9F4/F<4F99F<c                     K   dt         t           f fd}t        j                         }|j	                  d|       d{   S 7 w)u~  
        Fetches the most recent Kinan conversation from the royal_conversations table.

        SQL (parameterized):
            SELECT conversation_id, started_at, summary, decisions_made,
                   action_items, kinan_directives, emotional_signal, key_facts
            FROM royal_conversations
            WHERE participants->>'kinan' = 'true'
            ORDER BY started_at DESC
            LIMIT 1

        The postgres_client is expected to expose a psycopg2 connection pool with
        getconn()/putconn() methods.  The sync DB call is wrapped in
        run_in_executor() so it does not block the event loop.

        Returns:
            dict  — row from royal_conversations (RealDictCursor) when found.
            None  — when no Kinan conversation exists, or on any DB error.

        DB failure is NON-FATAL: degraded context is better than a crashed call.
        r&   c                     d} 	 j                   j                         } | j                  t        j                  j
                        5 }|j                  dd       |j                         }|(	 ddd       | j                   j                  |        yyt        |      cddd       | j                   j                  |        S S # 1 sw Y   nxY wnJ# t        $ r>}t        j                  d|       Y d}~| j                   j                  |        yyd}~ww xY w	 | j                   j                  |        yy# | j                   j                  |        w w xY w)u7   Synchronous psycopg2 fetch — runs in executor thread.N)cursor_factorya?  
                        SELECT conversation_id,
                               started_at,
                               summary,
                               decisions_made,
                               action_items,
                               kinan_directives,
                               emotional_signal,
                               key_facts
                        FROM royal_conversations
                        WHERE participants->>'kinan' = %s
                        ORDER BY started_at DESC
                        LIMIT 1
                        )trueuL   BinduHydrator._scatter_postgres_task: DB fetch failed: %s — returning None)r   getconncursorpsycopg2extrasRealDictCursorexecutefetchoneputconnrC   r2   r0   r3   )conncurrowr7   r    s       r   _sync_fetchz9BinduHydrator._scatter_postgres_task.<locals>._sync_fetch   sL   D!*ww([[0N0N[O %SVKK "" ,,.C{#)%< #GGOOD) $  9-% %< #GGOOD) $=% % %.  b #GGOOD) $/%< #GGOOD) $4#GGOOD) $sT   AC 	&C/C 
C 	C CC D? 	DD4D? DD? ? EN)r   rC   r=   get_event_looprun_in_executor)r    r]   loops   `  r   _scatter_postgres_taskz$BinduHydrator._scatter_postgres_task   sA     .$	*Xd^ $	*L %%'))$<<<<s   =A AA
query_textc                     ddl }d}|j                  |j                  d            j                         }g }t	        |      D ]*  }||t        |      z     }|j                  |dz  dz
         , |S )a^  
        Produces a deterministic 768-dimensional embedding vector from query_text.

        In production this would call Gemini text-embedding-004 (768-dim output).
        For testing and offline use, a hash-based approach is used so that:
          - The same query always produces the same vector (deterministic).
          - The vector is always exactly 768 dimensions.
          - Values are in [-1.0, 1.0] (unit-scaled from hash bytes).

        Args:
            query_text: The natural-language query to embed.

        Returns:
            A list of 768 floats in the range [-1.0, 1.0].
        r   Ni   zutf-8g     _@g      ?)hashlibsha512encodedigestrangelenappend)r    rb   rd   
DIMENSIONSrg   valuesibyte_vals           r   _embed_queryzBinduHydrator._embed_query!  sz      	
 
 1 1' :;BBD !z" 	4Aa#f+o.HMM8e+s23	4
 r   top_kc           
      >  K   d}d}| j                   t        j                  d       g S 	 | j                  |      }| j                   j	                  ||||       d{   }g }|D ]  }t        |d      r|j                  r|j                  ni }	|j                  |	j                  dd      t        |j                        |	j                  d	d      |	j                  d
d      d        |S 7 # t        $ r"}
t        j                  d|
       g cY d}
~
S d}
~
ww xY ww)uI  
        Semantic search in the aiva_conversations Qdrant collection (L3 fetch — semantic scars).

        Steps:
          1. Embed query_text using _embed_query() (deterministic 768-dim vector for testing;
             in production this would be Gemini text-embedding-004).
          2. Search the aiva_conversations collection with score_threshold=0.7.
          3. Return up to top_k results whose score exceeds the threshold.

        Each result dict contains:
            - chunk_text      (str)  — the matched conversation chunk
            - score           (float) — similarity score from Qdrant
            - conversation_id (str)  — source conversation identifier
            - timestamp       (str)  — ISO-8601 timestamp of the chunk

        Returns:
            list[dict] — up to top_k results above the 0.7 threshold.
            []         — if no results above threshold, or on any exception (non-fatal).

        Args:
            query_text: Natural-language query to search for.
            top_k:      Maximum number of results to return (default 3).
        gffffff?aiva_conversationsNuP   BinduHydrator._scatter_qdrant_task: No Qdrant client configured — returning [])collection_namequery_vectorlimitscore_thresholdpayload
chunk_text conversation_id	timestamp)rx   scorerz   r{   uM   BinduHydrator._scatter_qdrant_task: Qdrant search failed: %s — returning [])r   r0   rD   ro   searchhasattrrw   rj   r?   floatr|   r2   r3   )r    rb   rp   SCORE_THRESHOLDCOLLECTION_NAMErt   raw_resultsresultshitrw   r7   s              r   _scatter_qdrant_taskz"BinduHydrator._scatter_qdrant_taskA  s     0 .;;NNb I	,,Z8L $ 2 2 /) /	 !3 ! K #%G" 	)0i)@S[[#++VX&-kk,&C!&syy!1+2;;7H"+M%,[[b%A		 N'*  	LL_ I	sG   (D3C/ C-BC/ ,D-C/ /	D8DDDDDc           	      ,  K   d}i g d}d}g }	 t        j                  t        j                  | j                  |      | j	                         | j                  |      d      |       d{   }|\  }}	}
t        |t              rt        j                  d|       n|}t        |	t              rt        j                  d|	       n|	}t        |
t              rt        j                  d	|
       n|
}t        j                  |j                  di             }t        j                  |j                  dg             }|t        j                  |      nd}t        j                  |      }d| d| d| d| d	}t        j                   |       | j"                  j%                  |      }	 | j&                  j)                  || j*                  |       d{    t        j-                  d|| j*                         |S 7 # t         j                  $ r& t        j                  d
t        |dz               Y Ft        $ r!}t        j                  d|       Y d}~jd}~ww xY w7 # t        $ r"}t        j                  d||       Y d}~|S d}~ww xY ww)a  
        Fires all 3 scatter tasks in parallel, assembles a ROYAL_CHAMBER_CONTEXT
        XML envelope, caches it in Redis, and returns the XML string.

        Execution:
          1. Fires _scatter_redis_tasks, _scatter_postgres_task, and
             _scatter_qdrant_task concurrently via asyncio.gather with a 450ms
             timeout (under the 500ms HYDRATION_DEADLINE_MS budget).
          2. Any task that fails or times out is replaced with its safe default
             (aiva_state={}, kinan_directives=[], postgres_result=None,
             qdrant_results=[]) so the remaining results are still assembled.
          3. Assembles a ROYAL_CHAMBER_CONTEXT XML envelope:
               <ROYAL_CHAMBER_CONTEXT>
                 <AIVA_WORKING_STATE>{json}</AIVA_WORKING_STATE>
                 <KING_DIRECTIVES>{json}</KING_DIRECTIVES>
                 <LAST_CONVERSATION>{json or "none"}</LAST_CONVERSATION>
                 <RELATED_SCARS>{json}</RELATED_SCARS>
               </ROYAL_CHAMBER_CONTEXT>
          4. Writes the assembled XML to Redis key aiva:context:{session_id}
             with TTL 300s (SETEX). Redis failure is non-fatal.
          5. Updates the HydrationSession status to "ready" (via start_hydration).

        Args:
            session_id: Unique identifier for this hydration cycle.
            call_id:    Telnyx call_control_id (or internal call UUID).

        Returns:
            The assembled ROYAL_CHAMBER_CONTEXT XML string.
        g?r:   NT)return_exceptions)timeoutuF   gather_and_assemble: _scatter_redis_tasks raised %s — using defaultsuD   gather_and_assemble: _scatter_postgres_task raised %s — using Noneu@   gather_and_assemble: _scatter_qdrant_task raised %s — using []uW   gather_and_assemble: gather timed out after %dms — proceeding with collected defaultsi  uO   gather_and_assemble: unexpected gather failure: %s — proceeding with defaultsr;   r<   nonez+<ROYAL_CHAMBER_CONTEXT><AIVA_WORKING_STATE>z&</AIVA_WORKING_STATE><KING_DIRECTIVES>z%</KING_DIRECTIVES><LAST_CONVERSATION>z#</LAST_CONVERSATION><RELATED_SCARS>z(</RELATED_SCARS></ROYAL_CHAMBER_CONTEXT>r)   zCgather_and_assemble: cached ROYAL_CHAMBER_CONTEXT at '%s' (TTL=%ds)uJ   gather_and_assemble: Redis SETEX failed for key '%s': %s — cache skipped)r=   wait_forr>   rM   ra   r   rB   r2   r0   r3   TimeoutErrorr   r@   dumpsr?   ET
fromstringr,   r-   r   r.   r/   r1   )r    r
   r%   _GATHER_TIMEOUT_Sredis_resultpostgres_resultqdrant_resultsoutcomes	raw_redisraw_postgres
raw_qdrantr7   aiva_state_jsonking_directives_jsonlast_conversation_jsonrelated_scars_jsonxml_strr6   s                     r   gather_and_assemblez!BinduHydrator.gather_and_assemble  s    < " -/BG*.!.	$----j9//1--g6&*	 * H 3;/I|Z)Y/\
  ),	2Z 
 #/*i0V
 ", **\%5%5lB%GH#zz,*:*:;Mr*RS+:+FDJJ'F 	 "ZZ7##2"3 4  45 6""8!9 :01 2'	' 	 	g ..555L		**""9d.G.GQQQLLU)) gH ## 	LLi%,-  	LLa 	: R  	LL\  	s   JAH  ,G=-BH  2B8J+*I& I$%I& ;J=H   5I!5J8I! IJI!!J$I& &	J/JJJJ)NN)   )r   r   r   r   HYDRATION_DEADLINE_MSr,   r/   r$   r   r	   r8   rC   rM   r   ra   rH   r   ro   r   r   r   r   r   r   r   r   N   s       8 	$. .c .FV .`DPS DPT DPL>=htn >=@s tE{ @>S > >TRVZ >@zC z# z# zr   r   )r   r=   r@   loggingxml.etree.ElementTreeetreeElementTreer   dataclassesr   r   r   r   typingr   psycopg2.extrasrT   ImportError	getLoggerr   r0   r	   r   r   r   r   <module>r      s|   .^    " " ( '  
		8	$ 
 
 
m m)  Hs   A A)(A)