
    xi?                        d Z ddlmZ ddlZddlZddlmZmZ ddlm	Z	m
Z
 ddlmZmZmZmZ  ej                   d      Z G d d	      Zy)
u   
Retriever Orchestrator — fan-out to all registered retrievers, merge, rank.
PRD: _bmad-output/RLM_NERVOUS_SYSTEM_PRD.md (Story 2.5)
    )annotationsN)ThreadPoolExecutoras_completed)DictList)
IRetrieverRetrievalRequestRetrievalResultRetrievedChunkznervous_system.orchestratorc                  P    e Zd ZdZdd	dZd
dZddZddZe	 	 	 	 	 	 dd       Z	y)RetrieverOrchestratorzDFans out queries to registered retrievers, merges and ranks results.c                &    i | _         |dz  | _        y )Ng     @@)_retrievers
_timeout_s)self
timeout_mss     9/mnt/e/genesis-system/core/nervous_system/orchestrator.py__init__zRetrieverOrchestrator.__init__   s    24$v-    c                6    || j                   |j                  <   y)zRegister a retriever backend.N)r   source_name)r   	retrievers     r   register_retrieverz(RetrieverOrchestrator.register_retriever   s    2;../r   c           
        t        j                         }g }g }g }| j                  j                         D ci c]&  \  }t	        fdj
                  D              r|( }}}|s0t        g dg t        | j                  j                                     S dfd}	t        t        |            5 }
|j                         D ci c]  \  }}|
j                  |	||      | }}}	 t        || j                        D ]g  }|j                         \  }}|r-t        j!                  d d|        |j#                         F|j#                         |j%                  |       i 	 |j                         D ]7  \  }|j)                         r|j#                         |j+                          9 	 ddd       | j-                  |j.                        }t        j                         |z
  dz  }t        |t1        |d      ||      S c c}}w c c}}w # t&        $ r& t        j!                  d	| j                   d
       Y w xY w# 1 sw Y   xY w)z<Execute retrieval across all registered sources in parallel.c              3  @   K   | ]  }j                  |        y wN)
startswith).0snames     r   	<genexpr>z.RetrieverOrchestrator.query.<locals>.<genexpr>,   s     ?!4??1%?s   r   )chunks
latency_mssources_queriedsources_failedc                h    	 |j                        }| |d fS # t        $ r}| g |fcY d }~S d }~ww xY wr   )retrieve	Exception)r    r   r"   excrequests       r   _fetchz+RetrieverOrchestrator.query.<locals>._fetch5   sA    '"++G4fd++ 'b#&'s    	1,11)max_workers)timeoutz
Retriever z	 failed: zRetrieval timed out after r   Ni     )r    strr   r   returntuple)time	monotonicr   itemsanysourcesr
   listkeysr   lensubmitr   r   resultloggerwarningappendextendTimeoutErrordonecancel_merge_and_ranktop_kround)r   r*   start
all_chunksr$   r%   r    retactiver+   poolfuturesfuturer"   errorranked
elapsed_mss    `    `          r   queryzRetrieverOrchestrator.query"   s:    +-
%'$& (,'7'7'='='?
 
#$?w?? #I
 

 "a "48H8H8M8M8O3P 
	'  CK8 	$D "(D# FD#.4G 
P*7DOOL 2F*0--/'D&%D65''JK&--d3'..t4"))&12 !( ${{}"))$/MMO$%	$. %%j'--@nn&.$6
Z++)	
 	
Y
&   P!;DOO;LANOP	$ 	$sN   +H6I
H(I,BH-'I#II,II
IIIc                    | j                   j                         D ci c]  \  }}||j                          c}}S c c}}w )z2Return health status of all registered retrievers.)r   r4   health_check)r   r    rH   s      r   healthzRetrieverOrchestrator.health]   s7    :>:J:J:P:P:RSYT3c&&((SSSs   =c                    t               }g }| D ]A  }t        |j                  dd       }||v r |j                  |       |j	                  |       C |j                  d d       |d| S )zADeduplicate by content hash, rank by relevance_score, take top_k.N   c                    | j                   S r   )relevance_score)cs    r   <lambda>z7RetrieverOrchestrator._merge_and_rank.<locals>.<lambda>n   s    !"3"3 r   T)keyreverse)sethashcontentaddr>   sort)r"   rD   seenuniquechunkrZ   s         r   rC   z%RetrieverOrchestrator._merge_and_ranka   st    
 E') 	!Eu}}Tc*+Cd{HHSMMM% 	! 	3TBfu~r   N)i  )r   int)r   r   r0   None)r*   r	   r0   r
   )r0   zDict[str, bool])r"   List[RetrievedChunk]rD   rd   r0   rf   )
__name__
__module____qualname____doc__r   r   rP   rS   staticmethodrC    r   r   r   r      sG    N.<9
vT $-0	 r   r   )rj   
__future__r   loggingr2   concurrent.futuresr   r   typingr   r   core.nervous_system.contractsr   r	   r
   r   	getLoggerr<   r   rl   r   r   <module>rs      sF   
 #   ?   
		8	9X Xr   