
    ͢iB                       d Z ddlmZ ddlZddlZddlZddlZddlZddlZddl	Z	ddl
Z
ddlmZmZ ddlmZ ddlmZ ddlmZmZ ddlZ ed      Zed	z  Zej/                  d
       dZddddddddddd
Zi Z	 edz  dz  Z ee      5 ZeD ]}  Zej?                         Zdev sejA                  d      r*ejC                  d      \  Z"Z#Z$e$j?                         j?                  d      j?                  d      ee"j?                         <    	 ddd       ejM                  dejN                  jM                  dd            Z( ejR                  ejT                  dd ejV                  ed z         ejX                         g!        ejZ                  d"      Z.d#Z/d$Z0d% Z1d& Z2d5d'Z3d6d(Z4d7d)Z5d8d*Z6 G d+ d,      Z7 G d- d.      Z8 G d/ d0      Z9 G d1 d2      Z:d3 Z;e<d4k(  r e;        yy# 1 sw Y   xY w# e%$ r Y w xY w)9u  
RLM Army — Persistent Daemon Coordinator for Genesis Memory
============================================================
Maintains a pool of 3 worker processes that continuously pull from
PostgreSQL event queues and crystallize knowledge into the Sunaiva vault.

Workers:
  1. AIVAInteractionWorker   — pulls aiva_rlm.aiva_interactions → extracts entities
  2. BloodstreamWorker       — pulls bloodstream_knowledge → enriches vault
  3. GHLActivityWorker       — polls GHL for new contacts/activity → vault decisions

Design:
  - Supervisor loop: checks workers every 30s, respawns if dead
  - SKIP LOCKED queue for safe distributed processing
  - Gemini Flash (OpenRouter) for LLM extraction — cost-efficient
  - All output to Sunaiva vault (entity/decision tables)
  - Graceful shutdown on SIGTERM

Usage:
  python3 /mnt/e/genesis-system/core/rlm_army.py
  python3 /mnt/e/genesis-system/core/rlm_army.py --workers 5 --interval 30

Keep alive via bridge_watchdog.sh (cron) or tmux genesis-rlm-army.
    )annotationsN)datetimetimezone)Process)Path)AnyOptionalz/mnt/e/genesis-systemlogsT)exist_okz$41f4785c-e9e0-43ac-b6e8-eedb571fba57z(postgresql-genesis-u50607.vm.elestio.appiXc  postgreszetY0eog17tD-dDuj--IRH
            )
hostportuserpassworddatabaseconnect_timeout
keepaliveskeepalives_idlekeepalives_intervalkeepalives_countconfigzsecrets.env=#"'OPENROUTER_API_KEY u7   %(asctime)s [%(name)-20s] %(levelname)s — %(message)sz%Y-%m-%d %H:%M:%Szrlm_army.log)levelformatdatefmthandlersRLMArmyCoordinatora  
CREATE TABLE IF NOT EXISTS public.rlm_event_queue (
    id          SERIAL PRIMARY KEY,
    source      TEXT NOT NULL,
    event_type  TEXT NOT NULL,
    payload     JSONB NOT NULL DEFAULT '{}',
    status      TEXT NOT NULL DEFAULT 'pending',
    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    processed_at TIMESTAMPTZ
);
CREATE INDEX IF NOT EXISTS idx_rlm_eq_status ON public.rlm_event_queue(status, created_at);
CREATE INDEX IF NOT EXISTS idx_rlm_eq_source ON public.rlm_event_queue(source);
z
CREATE TABLE IF NOT EXISTS public.rlm_army_cursors (
    source      TEXT PRIMARY KEY,
    last_id     BIGINT NOT NULL DEFAULT 0,
    updated_at  TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
c                 4    t        j                  di t        S )N )psycopg2connectDBr(       &/mnt/e/genesis-system/core/rlm_army.pyget_connr.   w   s    !b!!r,   c                     t               5 } | j                         }|j                  t               |j                  t               | j                          d d d        t        j                  d       y # 1 sw Y   xY w)NzSchema ensured)r.   cursorexecute	QUEUE_DDLAIVA_CURSOR_DDLcommitloginfo)conncurs     r-   ensure_schemar9   {   sY    	 tkkmIO$	
 HH s   AA44A=c                    | j                         }t        j                  t        j                        }|j                  dt        t        j                               t        |dd ||dd |||f       y)z?Write an entity to the Sunaiva vault (upsert by name+vault_id).z
        INSERT INTO sunaiva.vault_entities
            (id, vault_id, name, type, context, source_conversation, first_seen, last_seen)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
        ON CONFLICT DO NOTHING
    N     )
r0   r   nowr   utcr1   struuiduuid4VAULT_ID)r7   nameetypecontextsourcer8   r=   s          r-   upsert_entityrG      sa    
++-C
,,x||
$CKK  	DJJL8Tc
E75D>6S
	r,   c                L    | j                         }|j                  d||f       y )Nz
        INSERT INTO public.rlm_army_cursors (source, last_id, updated_at)
        VALUES (%s, %s, NOW())
        ON CONFLICT (source) DO UPDATE
            SET last_id = EXCLUDED.last_id, updated_at = NOW()
    )r0   r1   )r7   rF   last_idr8   s       r-   update_cursorrJ      s(    
++-CKK 
 '	r,   c                z    | j                         }|j                  d|f       |j                         }|r|d   S dS )Nz=SELECT last_id FROM public.rlm_army_cursors WHERE source = %sr   )r0   r1   fetchone)r7   rF   r8   rows       r-   
get_cursorrN      s<    
++-CKKORXQZ[
,,.C3q6ar,   c                   t         rt        |       dk  rg S 	 ddl}t        j                  ddd| dd  dgd	d
d      j                         }|j                  j                  d|dt          ddd      }|j                  j                  |d      5 }t        j                  |      }|d   d   d   d   }|j                  d      }|j                  d      dz   }	|dk\  r&|	|kD  r!t        j                  |||	       cddd       S ddd       g S # 1 sw Y   g S xY w# t        $ r#}
t        j                  d|
        Y d}
~
g S d}
~
ww xY w)z>Use Gemini Flash via OpenRouter to extract entities from text.2   r   Nzgoogle/gemini-2.5-flashr   zExtract key entities (people, tools, decisions, systems, projects) from this text. Return JSON array: [{"name": str, "type": str, "context": str}]. Keep context under 200 chars. Max 5 entities.

Text:
i  )rolecontenti  g?)modelmessages
max_tokenstemperaturez-https://openrouter.ai/api/v1/chat/completionszBearer zapplication/jsonzhttps://genesis.agileadapt.com)AuthorizationzContent-TypezHTTP-Referer)dataheaders   timeoutchoicesmessagerR   []r   zLLM extraction failed: )OPENROUTER_KEYlenurllib.requestjsondumpsencoderequestRequesturlopenloadfindrfindloads	Exceptionr5   debug)textrF   urllibpayloadreqrespresultrR   startendes              r-   extract_entities_llmry      s~   SY^	#1**.O PTTYUY{m]  
  68 	 nn$$;#*>*:!; 2 @ % 
 ^^##C#4 	6YYt_FY'*95i@GLL%E--$q(CzcEkzz'%"45	6 	6 	6 I	6 I  1		+A3/00I1s=   A<D A*D>	D D DD D 	E(EEc                  $    e Zd ZdZdZdZd Zd Zy)AIVAInteractionWorkeruL   Polls aiva_rlm.aiva_interactions for new voice call data → vault entities.aiva_interaction_worker<   c                   t        j                  | j                        }|j                  d       	 	 | j	                          t        j                  | j                         1# t
        $ r}|j                  d|        Y d }~Bd }~ww xY wNStartedzBatch error: 
logging	getLoggerNAMEr6   _process_batchrn   warningtimesleepINTERVALselfr5   rx   s      r-   runzAIVAInteractionWorker.run   u    		*1##% JJt}}%   1mA3/001   A# #	B
,BB
c                N   t               5 }t        |d      }|j                         }|j                  d       |j	                         d   s
	 d d d        y |j                  d|f       |j                         }|s
	 d d d        y t        j                  dt        |       d       |D ]  \  }}}}}	|xs d d|xs d }
t        |
d	|       }|r;t        |t              r|n|gD ]#  }t        |t        |      d d
 dd|	 d|        % |D ]G  }t        ||j                  dd      d d
 |j                  dd      |j                  dd      d|        I |} t        |d|       |j!                          t        j                  d|        d d d        y # 1 sw Y   y xY w)Naiva_interactionsz
                SELECT EXISTS (
                    SELECT FROM information_schema.tables
                    WHERE table_schema = 'aiva_rlm'
                    AND table_name = 'aiva_interactions'
                )
            r   z
                SELECT id, caller_summary, key_topics, sentiment, timestamp
                FROM aiva_rlm.aiva_interactions
                WHERE id > %s
                ORDER BY id ASC
                LIMIT 20
            Processing z new AIVA interactionsr!   z	 Topics: aiva_interaction_r;   topiczMentioned in AIVA call at aiva_rC   typefactrE   zProcessed up to interaction ID )r.   rN   r0   r1   rL   fetchallr5   r6   rb   ry   
isinstancelistrG   r?   getrJ   r4   )r   r7   rI   r8   rowsrow_idsummarytopics	sentiment	timestamprp   entitiesr   rx   s                 r-   r   z$AIVAInteractionWorker._process_batch   s   Z 0	B4 ':;G ++-CKK   <<>!$0	B 0	B KK   <<>D30	B 0	B6 HH{3t9+-CDEAE !=I!-R	&,B@/8I&6RS ,6vt,D&6( b%dCJt,<g(B9+&NRWX^W_P`bb
 " JA!$fb(9$3(?vvAV"#%%	2"6%x8HJJ !!" $ 3W=KKMHH6wi@Aa0	B 0	B 0	Bs   AF&FDFF$N__name__
__module____qualname____doc__r   r   r   r   r(   r,   r-   r{   r{      s    V$DH&1Br,   r{   c                  $    e Zd ZdZdZdZd Zd Zy)BloodstreamWorkeruR   Pulls bloodstream_knowledge entries → enriches vault with high-confidence items.bloodstream_workerx   c                   t        j                  | j                        }|j                  d       	 	 | j	                          t        j                  | j                         1# t
        $ r}|j                  d|        Y d }~Bd }~ww xY wr   r   r   s      r-   r   zBloodstreamWorker.run  r   r   c           
     h   t               5 }t        |d      }|j                         }|j                  d       |j	                         d   s
	 d d d        y |j                  d|f       |j                         }|s
	 d d d        y t        j                  dt        |       d       |D ]4  \  }}}}}	|dk(  rdnd}
d	|	d
d|d d  }t        ||d d |
|d|        |}6 t        |d|       |j                          t        j                  dt        |       d       d d d        y # 1 sw Y   y xY w)Nbloodstream_knowledgez
                SELECT EXISTS (
                    SELECT FROM information_schema.tables
                    WHERE table_schema = 'public'
                    AND table_name = 'bloodstream_knowledge'
                )
            r   z
                SELECT id, type, title, content, confidence
                FROM public.bloodstream_knowledge
                WHERE id > %s AND confidence >= 0.7
                ORDER BY id ASC
                LIMIT 30
            r   z bloodstream entriesaxiom	knowledgez[confidence=z.2fz] i  r;   bloodstream_zEnriched vault with z bloodstream items)r.   rN   r0   r1   rL   r   r5   r6   rb   rG   rJ   r4   )r   r7   rI   r8   r   r   btypetitlerR   
confidenceentity_typerE   s               r-   r   z BloodstreamWorker._process_batch)  sZ   Z $	K4 '>?G++-CKK   <<>!$$	K $	K KK   <<>D1$	K $	K4 HH{3t9+-ABC=A !9ugz).')9g{(C(874C=/JdE$3KgV\U]G^_ 	! $ 7AKKMHH+CI;6HIJI$	K $	K $	Ks   AD(&D(BD((D1Nr   r(   r,   r-   r   r     s    \DH&%Kr,   r   c                  ,    e Zd ZdZdZdZd Zd ZddZy)	KGCrystallizerWorkeruE   Reads recent KG entities/axioms from file system → writes to vault.kg_crystallizeri,  c                   t        j                  | j                        }|j                  d       	 	 | j	                          t        j                  | j                         1# t
        $ r}|j                  d|        Y d }~Bd }~ww xY w)Nr   zCrystallize error: )
r   r   r   r6   _crystallizern   r   r   r   r   r   s      r-   r   zKGCrystallizerWorker.runW  sv    		*7!!# JJt}}%   71!5667r   c           
     4   t         dz  }g }|dz  |dz  fD ]  }|j                         st        |j                  d            D ]Z  }t	        j                         |j                         j                  z
  dz  }|dkD  r;|j                  | j                  |             \  |rht               5 }|D ]  }t        ||d   |d   |d	   |d
           |j                          d d d        t        j                  dt        |       d       y y # 1 sw Y   -xY w)NKNOWLEDGE_GRAPHr   axiomsz*.jsonli     rC   r   rE   rF   zCrystallized z KG items to vault)GENESIS_ROOTexistssortedglobr   statst_mtimeextend_process_jsonlr.   rG   r4   r5   r6   rb   )r   kg_root	processed	jsonl_dir
jsonl_file	age_hoursr7   items           r-   r   z!KGCrystallizerWorker._crystallizea  s(   !22	!J.(0BC 	BI##%$Y^^I%>? B
!YY[:??+<+E+EEM	r>  !4!4Z!@AB	B  t% CD!$Vd6l"&y/4>CC 	
 HH}S^$44FGH  s   /3DDc           	     :   g }	 t        |      5 }|D ]#  }|j                         }|s	 t        j                  |      }|j	                  d      xs$ |j	                  d      xs |j	                  d      }|sg|j	                  d      xs( |j	                  d      xs |j	                  d      xs d}t        |t              rt        j                  |      }|j                  t        |      d d |j	                  d	      xs d
t        |      v rdndt        |      d d d|j                   d       & 	 d d d        |S # t        j                  $ r Y Hw xY w# 1 sw Y   |S xY w# t        $ r&}t        j                  d| d|        Y d }~|S d }~ww xY w)NrC   idr   descriptionrR   patternr!   r;   r   r   r   entityr<   zkg:)rC   r   rE   rF   zError reading z: )openstriprd   rm   r   r   dictre   appendr?   rC   JSONDecodeErrorrn   r5   ro   )	r   pathitemsflineobjrC   rE   rx   s	            r-   r   z#KGCrystallizerWorker._process_jsonlw  s   	4d q D::<D "jj."wwvS#''$-S3777CS#$"%''-"8"jCGGI<N"jRURYRYZcRd"jhj%gt4&*jj&9G$'IdsO$'GGFO$g8sSWyCX^f'*7|ET':(+DII;&7	& . 	  // '.   	4IItfBqc233	4se   E+ EAE8E9B=E6E:E+ EEEEE(#E+ (E+ +	F4FFN)r   r   return
list[dict])	r   r   r   r   r   r   r   r   r   r(   r,   r-   r   r   Q  s    ODH&I,r,   r   c                  4    e Zd ZdZeeegZd Zd Z	ddZ
d Zy)RLMArmyz5Supervisor that spawns and maintains the worker pool.c                    i | _         d| _        t        j                  t        j                  | j                         t        j                  t        j
                  | j                         y )NF)	processes	_shutdownsignalSIGTERM_handle_shutdownSIGINT)r   s    r-   __init__zRLMArmy.__init__  s@    -/fnnd&;&;<fmmT%:%:;r,   c                D    t         j                  d| d       d| _        y )NzReceived signal u    — graceful shutdownT)r5   r6   r   )r   signumframes      r-   r   zRLMArmy._handle_shutdown  s     #F8+ABCr,   c                     |       }t        |j                  |j                  d      }|j                          t        j                  d|j                   d|j                   d       |S )NT)targetrC   daemonzSpawned z (pid ))r   r   r   rv   r5   r6   pid)r   WorkerClassworkerps       r-   _spawnzRLMArmy._spawn  sU    6::K,<,<TJ		8K,,-VAEE7!<=r,   c                j   t         j                  d       t                | j                  D ]*  }| j	                  |      | j
                  |j                  <   , | j                  st        j                  d       | j                  D ]  }| j
                  j                  |j                        }||j                         r;t         j                  d|j                   d       |r|j                          | j	                  |      | j
                  |j                  <    | j                  st         j                  d       | j
                  j                         D ]@  \  }}|j                          |j                  d       t         j                  d| d	       B t         j                  d
       y )Nz=== RLM Army starting ===r   zWorker u    is dead — respawningzShutting down workers...r   r[   z  z stoppedz=== RLM Army stopped ===)r5   r6   r9   WORKER_CLASSESr   r   r   r   r   r   r   is_aliver   	terminater   join)r   wcr   rC   s       r-   r   zRLMArmy.run  sF   ,- %% 	6B&*kk"oDNN277#	6 ..JJrN)) >NN&&rww/9AJJLKK'"''2I JK.2kk"oDNN277+> .. 	+,~~++- 	*GD!KKMFF1FHHr$x()	* 	+,r,   N)r   r   )r   r   r   r   r{   r   r   r   r   r   r   r   r(   r,   r-   r   r     s+    ? 	N<-r,   r   c                     t        j                  d      } | j                  dt        dd       | j	                         }t               }|j                          y )Nu.   Genesis RLM Army — Persistent Memory Workers)r   z
--intervalr   z#Supervisor check interval (seconds))r   defaulthelp)argparseArgumentParseradd_argumentint
parse_argsr   r   )parserargsarmys      r-   mainr    sI    $$1abF
3AfgD9DHHJr,   __main__)
rC   r?   rD   r?   rE   r?   rF   r?   r   None)rF   r?   rI   r   r   r  )rF   r?   r   r   )rp   r?   rF   r?   r   r   )=r   
__future__r   r   rd   r   osr   sysr   r@   r   r   multiprocessingr   pathlibr   typingr   r	   r)   r   LOG_DIRmkdirrB   r+   _secretssecrets_pathr   r   r   r   
startswith	partitionk_vrn   r   environra   basicConfigINFOFileHandlerStreamHandlerr   r5   r2   r3   r.   r9   rG   rJ   rN   ry   r{   r   r   r   r  r   r(   r,   r-   <module>r     s?  2 #    	  
   ' #     +,

 t 1 7' 		(*]:L	l	 Fq 	FD::<Dd{4??3#7..-1a&'ggiooc&:&@&@&E#		FF 2BJJNNCWY[4\]   
,,DGn45	 g,-
	"  )\AB ABH5K 5Kp@ @J3- 3-p zF gF F  		s7   :G7 
G+%G+7AG+G7 +G40G7 7G?>G?