
    Ͽi                       d Z ddlmZ ddlZddlZddlZddlZddlZddlZddl	Z	ddl
m
Z
 ddlmZ ddlmZmZmZmZmZ  ed      Zedz  d	z  Zedz  d
z  Zedz  dz  Zedz  dz  Zedz  dz  Zedz  dz  Zedz  Zedz  Zedz  Zedz  Zedz  Zedz  Zej@                  jC                  d e"edz  dz               ej@                  jC                  d e"e              ejF                  ejH                  dd        ejJ                  d      Z&dZ'd3d4dZ(d5dZ)d5dZ*d6dZ+d7dZ, e-h d       Z.d8d!Z/d"d#	 	 	 	 	 	 	 d9d$Z0	 	 	 	 	 	 d7d%Z1 G d& d      Z2e3d'k(  rz ejh                  d()      Z5e5jm                  d*d+d,-       e5jm                  d.d/d01       e5jo                         Z8 e2e8jr                  e8jt                  2      Z;e;jy                          yy):u  
RLM Bloodstream Pipeline
=========================
Autonomous pipeline that keeps Genesis memory alive and flowing after every
session. Runs nightly via cron (or manually) to:

  Step 1 — Load new KG entities/axioms into PostgreSQL (delta, no duplicates)
  Step 2 — Embed new entries into Qdrant genesis_memories collection
  Step 3 — Refresh Redis hot cache (top 50 by confidence, 24h TTL)
  Step 4 — Run MemoryDigestion night-cycle (prune stale episodic memories)
  Step 5 — Generate mentor preference pairs for unprocessed AIVA interactions
  Step 6 — Write structured run report to data/bloodstream_last_run.log

Design principles:
  - Never crash: every step is try/except guarded; log error and continue
  - Run flag prevents concurrent executions
  - Delta load: duplicate detection via (source, title) composite key
  - Dry-run mode: report stats without writing anything

Usage:
  python3 /mnt/e/genesis-system/core/rlm_bloodstream_pipeline.py
  python3 /mnt/e/genesis-system/core/rlm_bloodstream_pipeline.py --dry-run
  python3 /mnt/e/genesis-system/core/rlm_bloodstream_pipeline.py --source cron_nightly

Author: Genesis Orchestrator
Created: 2026-02-20
    )annotationsN)datetime)Path)AnyDictListOptionalTuplez/mnt/e/genesis-systemKNOWLEDGE_GRAPHentitiesaxiomscreator_mindresearchrelationshipsclaude_conversationsdeep_think_resultsplansdocsdataz.bloodstream_runningzbloodstream_last_run.logzgenesis-memoryu4   %(asctime)s [%(levelname)s] %(name)s — %(message)s%Y-%m-%d %H:%M:%S)levelformatdatefmtBloodstreamPipelinea  
CREATE TABLE IF NOT EXISTS bloodstream_knowledge (
    id SERIAL PRIMARY KEY,
    source TEXT NOT NULL,
    type TEXT NOT NULL,
    title TEXT NOT NULL,
    content TEXT NOT NULL,
    tags TEXT[] DEFAULT '{}',
    confidence FLOAT DEFAULT 0.8,
    embedding_id TEXT,
    embedding_text TEXT,
    metadata JSONB DEFAULT '{}',
    created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_bk_source     ON bloodstream_knowledge(source);
CREATE INDEX IF NOT EXISTS idx_bk_type       ON bloodstream_knowledge(type);
CREATE INDEX IF NOT EXISTS idx_bk_confidence ON bloodstream_knowledge(confidence DESC);
CREATE INDEX IF NOT EXISTS idx_bk_tags       ON bloodstream_knowledge USING GIN(tags);
CREATE INDEX IF NOT EXISTS idx_bk_content_fts
    ON bloodstream_knowledge USING GIN(to_tsvector('english', content));
CREATE INDEX IF NOT EXISTS idx_bk_title_fts
    ON bloodstream_knowledge USING GIN(to_tsvector('english', title));
c                j   | |S t        | t        t        f      rt        t	        t        |       d      d      S t        |       j                         j                         }dddddddddd	d	d
}||v r||   S 	 t        |      }|dkD  r|dz  }t        t	        |d      d      S # t        $ r |cY S w xY w)z
    Parse a confidence value that may be a float, int, or string.

    String variants handled:
      - Numeric strings: "0.9", "90", "0.97"
      - Qualitative labels: "HIGH", "MEDIUM", "LOW", "VERY_HIGH", etc.
                  ?gffffff?333333?g      ?g?g?g?g333333?)	VERY_HIGHz	VERY HIGHHIGHMEDIUM_HIGHzMEDIUM HIGHMEDIUM
MEDIUM_LOWz
MEDIUM LOWLOWVERY_LOWzVERY LOWg      Y@)	
isinstancefloatintminmaxstrstripupper
ValueError)	raw_valuedefaultsqualitative_mapvals        6/mnt/e/genesis-system/core/rlm_bloodstream_pipeline.py_parse_confidencer5   d   s     )eS\*3uY'-s33I$$&AD$SO 	Oq!!Ah9+C3sC=#&& s   9*B$ $B21B2c                t   | j                  d      xs( | j                  d      xs | j                  d      xs d}|sy| j                  d      xs | j                  d      xs d}|s8| j                  d      }t        |t              rt        j                  |d	
      }|sy| j                  dd      }t        | j                  d      d      }h d}| j                         D 	ci c]  \  }}	||vs||	 }
}}	| j                  d      r| d   |
d<   | j                  dg       }t        |t              r=|j                  d      D cg c]#  }|j                         s|j                         % }}d| |t        |      dd t        |      dd |t        t        |d      d      |
dS c c}	}w c c}w )z
    Map a raw KG entity JSON object to the bloodstream schema.

    KG entities use varied field names; we normalise them here.
    Returns None if no usable title + content can be found.
    nametitleid Ndescriptioncontent
propertiesF)ensure_asciitypeentity
confidencer   r0   >   r9   r7   tagsr?   r8   r<   rA   r;   kg_idrC   ,z
kg_entity:  '  r   r   sourcer?   r8   r<   rC   rA   metadata)getr&   dictjsondumpsr5   itemsr+   splitr,   r)   r*   )raw	file_stemr8   r<   propsentity_typerA   	skip_keyskvrJ   rC   ts                r4   _extract_entity_fieldsrY      s    	 	777	774=	 	 
  ggm$@	(:@bG%eT"jjU;G''&(+K"377<#8$GJ-I  A(:1 H   wwt}I7762D$#'::c?@aaggi	@@ yk*UDS!w<'#j#.4   As   )F/6F/F5"F5c                   | j                  d      xs( | j                  d      xs | j                  d      xs d}| j                  d      xs+ | j                  d      xs | j                  d      xs |dd	 }|sy|xs | j                  d
      xs d}|sy| j                  d      xs | j                  d      xs d}t        | j                  d      d      }h d}| j                         D 	ci c]  \  }}	||vs||	 }
}}	| j                  d      r| d   |
d<   | j                  dg       }t        |t              r=|j                  d      D cg c]#  }|j                         s|j                         % }}d| |t	        |      dd t	        |      dd |t        t        |d      d      |
dS c c}	}w c c}w )z
    Map a raw KG axiom JSON object to the bloodstream schema.

    Axioms typically have: id, axiom/statement, source, confidence, category/type
    axiom	statementr<   r:   r8   r7   r9   NP   r;   r?   categoryrA   r   rB   >   r9   r7   rC   r?   r[   r8   r<   r^   r\   rA   r;   rD   rC   rE   z	kg_axiom:rF   rG   r   r   rH   )	rK   r5   rO   r&   r+   rP   r,   r)   r*   )rQ   rR   
axiom_textr8   r<   rT   rA   rU   rV   rW   rJ   rC   rX   s                r4   _extract_axiom_fieldsr`      s    !USWW[%9USWWY=OUSUJ 	776?	774=	 cr?	 
 8CGGM28bG''&/CSWWZ%8CGK"377<#8$GJAI  A(:1 H   wwt}I7762D$#'::c?@aaggi	@@ i[)UDS!w<'#j#.4   As   4F:F:G -G c                    d| d    d| d   | d   g}| j                  dg       }|r*|j                  ddj                  d	 |D              z          d
j                  |      S )z?Build embedding-optimised text string from a bloodstream entry.[r?   ]r8   r<   rC   zTags: z, c              3  2   K   | ]  }t        |        y wN)r+   ).0rX   s     r4   	<genexpr>z(_build_embedding_text.<locals>.<genexpr>   s     )?Q#a&)?s   z | )rK   appendjoin)entrypartsrC   s      r4   _build_embedding_textrl      sf    vq!5>53CDE99VR DX		)?$)? ??@::e    c           
        g }| j                         st        j                  d|        |S t        | j	                  d            D ]  }|j
                  }	 t        |dd      5 }t        |d      D ]`  \  }}|j                         }|s	 t        j                  |      }|d	k(  rt        ||      }
nt        ||      }
|
sP|j!                  |
       b 	 ddd        |S # t        j                  $ r,}	t        j                  d|j                  ||	       Y d}	~	d}	~	ww xY w# 1 sw Y   OxY w# t"        $ r"}	t        j                  d
||	       Y d}	~	d}	~	ww xY w)u   
    Scan a directory of .jsonl files and parse each line.

    kind: 'entity' or 'axiom' — controls which extractor runs.
    Silently skips non-.jsonl files (e.g. .md report files).
    KG directory not found: %s*.jsonlrutf-8encoding      Bad JSON in %s:%d — %sNr@      Cannot read %s — %s)existsloggerwarningsortedglobstemopen	enumerater,   rM   loadsJSONDecodeErrordebugr7   rY   r`   rh   OSError)	directorykindresults
jsonl_filer}   fhline_nolinerQ   excrj   s              r4   _scan_jsonl_dirr      sL    %'G3Y?Y^^I67 E
	Ej#8 .B%.r1%5 .MGT::<D !"jj.
 x' 6sD A 5c4 @u-!..E0 N  // !%?RY[^_ !. .$  	ENN2JDD	Es`   D.#&D"
C  D" D"D. D	3"D	D"D	D""D+	'D..	E7EE>   .venv	generated__pycache__node_moduleslightrag_indexc                :    t        d | j                  D              S )z=Return True if any component of the path is in the skip list.c              3  ,   K   | ]  }|t         v   y wre   )
_SKIP_DIRS)rf   parts     r4   rg   z#_is_skipped_path.<locals>.<genexpr>&  s     6dtz!6s   )anyrk   )ps    r4   _is_skipped_pathr   $  s    6agg666rm   T)	recursivec                  g }| j                         st        j                  d|        |S |rdnd}t        | j	                  |            D ]  }t        |      r	 |j                  dd      }t        |      d	k  r2d
}|j                         D ]8  }	|	j                         }
|
j                  d      s%|
dd j                         } n |s,|j                  j                  dd      j                  dd      }t        |j!                  |             }|dd }|dd }|j#                  | d| d|dd |g d|dt        |      id        |S # t        $ r"}t        j                  d||       Y d}~(d}~ww xY w)a  
    Walk *directory* for .md files and build bloodstream entries.

    Entry shape:
      source:         f"{source_prefix}:{relative_path}"
      type:           "document"
      title:          first H1 heading, or filename stem
      content:        first 2000 chars of file
      embedding_text: first 1000 chars of file
      confidence:     0.8

    Skips:
      - Any path component in _SKIP_DIRS
      - Files with fewer than 200 characters (stubs)
    z Markdown directory not found: %sz**/*.mdz*.mdrr   replace)rt   errorsrw   N   r:   z#    _ -  i  :documentrF   g?path)rI   r?   r8   r<   rC   rA   embedding_textrJ   )rx   ry   rz   r{   r|   r   	read_textr   r   len
splitlinesr,   
startswithr}   r   r+   relative_torh   )r   source_prefixr   r   patternmd_filetextr   r8   raw_linestrippedrelr<   r   s                 r4   _scan_md_filesr   -  s   * %'G99E$i&G)..12 #G$	$$gi$HD
 t9s? ) 	H~~'H""4( **,		
 LL((c2::3DE'%%i01u+et&q.4C[,W.	
 		5#J NA  	LL0'3?	s   E	E8E33E8c           	        g }| j                         st        j                  d|        |S t        d | j	                  d      D              }|D ]7  }|j
                  }	 t        |dd      5 }|j                         }ddd       |j                  d	k(  r}t        j                         d
      D ]_  \  }	}
|
j                         }
|
s	 t        j                  |
      }|dk(  rt%        ||      nt'        ||      }|sO|j)                  |       a 	 t        j                        }t+        |t,              r|n|g}|D ]D  }t+        |t.              s|dk(  rt%        ||      nt'        ||      }|s4|j)                  |       F : |S # 1 sw Y   xY w# t        $ r"}t        j                  d||       Y d}~pd}~ww xY w# t        j                  $ r-}t        j!                  d|j"                  |	|       Y d}~Rd}~ww xY w# t        j                  $ r,}t        j!                  d|j"                  |       Y d}~d}~ww xY w)a  
    Recursively scan *directory* for .jsonl and .json files and parse them
    as structured KG entities using the existing extract helpers.

    kind: 'entity' or 'axiom'

    Skips any path component in _SKIP_DIRS.
    For .json files that contain a top-level list, iterates the list.
    For .json files that contain a top-level dict, treats as a single record.
    .jsonl files are parsed line-by-line (existing behaviour).
    ro   c              3  r   K   | ]/  }|j                         r|j                  d v rt        |      s| 1 yw)>   .json.jsonlN)is_filesuffixr   )rf   fs     r4   rg   z(_scan_jsonl_recursive.<locals>.<genexpr>  s5      #99;188':: # 	
#s   57*rq   rr   rs   Nrw   r   ru   rv   r@   u   Bad JSON in %s — %s)rx   ry   rz   r{   rglobr}   r~   readr   r   r   r   r,   rM   r   r   r   r7   rY   r`   rh   r&   listrL   )r   r   r   	all_fileskfiler}   r   raw_textr   r   r   rQ   rj   r   recordss                  r4   _scan_jsonl_recursiver   t  s(    %'G3Y?" #??3'# I  -*zz	eS73 %r779% <<8#!*8+>+>+@!!D *zz|**T*C x' +35.sD9 
 NN5)*$zz(+
 )t4d4&G 	*!#t, x' +35.sD9 
 NN5)	*I-*^ NY% % 	NN2E3?	 ++ LL!;UZZRUV '' 4ejj#Fs`   F-F>F	GH	F	F	G!F>>GH"HH	I!IIc                  h    e Zd ZdZdddZddZddZddZd ZddZ	ddZ
dd	Zdd
ZddZddZy)r   z
    Autonomous Genesis memory synchronisation pipeline.

    Each step is self-contained and failure-tolerant. The pipeline always
    completes and always writes a run log.
    c           	         || _         || _        t        j                         | _        t        j                         | _        dddddddg d| _        y )Nr   )files_scannedentries_foundnew_entries_loadedqdrant_embeddedredis_cacheddigestion_prunedmentor_pairsr   )	rI   dry_runtime	monotonic
start_timer   nowstart_tsstats)selfrI   r   s      r4   __init__zBloodstreamPipeline.__init__  sN    ..*  "#  !	&

rm   c                   | j                         sy	 t        j                  d| j                  | j                         | j                         }| j                  |       | j                          | j                          | j                          | j                          | j                          t        j                  d       y# | j                          | j                          t        j                  d       w xY w)zExecute the full pipeline.Nz:=== Bloodstream Pipeline START (source=%s, dry_run=%s) ===z%=== Bloodstream Pipeline COMPLETE ===)_acquire_run_flagry   inforI   r   _step1_load_kg_step2_embed_qdrant_step3_refresh_redis_step4_memory_digestion_step5_mentor_pairs_step6_write_report_release_run_flag)r   entriess     r4   runzBloodstreamPipeline.run  s    %%'	AKKTT\\3 ))+G$$W-%%'((*$$& $$&""$KK?@ $$&""$KK?@s   A<C 7C<c                   t         j                  d       t        t        d      }t        t        d      }t        t        d      }t        t        d      }t        t        d      }t        t        d      }t        t        d      }t        t        d      }t        t        d      }	||z   |z   |z   |z   |z   |z   |z   |	z   }
t        j                         r't        t!        t        j#                  d                  nd	}t        j                         r't        t!        t        j#                  d                  nd	}||z   | j$                  d
<   t        |
      | j$                  d<   t         j                  dt        |      |       t         j                  dt        |      |       t         j                  dt        |             t         j                  dt        |             t         j                  dt        |             t         j                  dt        |             t         j                  dt        |             t         j                  dt        |             t         j                  dt        |	             | j&                  rt         j                  d       |
S |
st         j                  d       g S 	 d	dl}d	dl}d	dlm}  |j0                  d0i |j3                         }d|_        |j7                         }|j9                  t:               |j=                          |j9                  d       t?               }|jA                         D ]  }|jC                  |d	   |d   f        t         j                  dt        |             |
D cg c]  }|d   |d   f|vr| }}t         j                  dt        |             |s"|jE                          |jE                          g S d }g }d!}tG        d	t        |      |      D ]  }||||z    }g }|D ][  }tI        |      }|jK                  |d   |d"   |d   |d#   |d$   |d%   |tM        jN                  |jQ                  d&i             f       ] |D ]M  }|j9                  ||       |jS                         }|s(|jK                  |d	   |d   |d'   |d(   |d)   d*       O |j=                          t         j                  d+tU        ||z   t        |            t        |      t        |              |jE                          |jE                          t        |      | j$                  d,<   t         j                  d-t        |             |S c c}w # tV        $ rD}d.| } t         jY                  |        | j$                  d/   jK                  |        g cY d}~S d}~ww xY w)1z
        Scan KG entity and axiom directories, delta-load new entries into
        PostgreSQL bloodstream_knowledge table.

        Returns the list of newly inserted entries (for downstream embedding).
        z#[Step 1] Scanning KG directories...r@   r[   zmd:claude_conversationszmd:deep_thinkzmd:planszmd:docsrp   r   r   r   z)  Entities:      %d entries from %d filesz)  Axioms:        %d entries from %d filesz  Creator mind:  %d entriesz  Research:      %d entriesz  Relationships: %d entriesz  Conversations: %d entriesz  Deep think:    %d entriesz  Plans:         %d entriesz  Docs:          %d entriesz&  [DRY RUN] Skipping PostgreSQL write.z  No entries to process.NPostgresConfigFz/SELECT source, title FROM bloodstream_knowledgeru   z  Existing rows in DB: %drI   r8   z  New entries after dedup: %da,  
                INSERT INTO bloodstream_knowledge
                    (source, type, title, content, tags, confidence, embedding_text, metadata)
                VALUES
                    (%s, %s, %s, %s, %s, %s, %s, %s)
                RETURNING id, source, type, title, embedding_text
            rF   r?   r<   rC   rA   rJ   r         )db_idrI   r?   r8   r   z"  Inserted batch %d/%d (%d so far)r   z%[Step 1] Done. %d new entries loaded.zStep 1 PostgreSQL error: r    )-ry   r   r   KG_ENTITIES_DIRKG_AXIOMS_DIRr   KG_CREATOR_MIND_DIRKG_RESEARCH_DIRKG_RELATIONSHIPS_DIRr   KG_CONVERSATIONS_DIRDEEP_THINK_DIR	PLANS_DIRDOCS_DIRrx   r   r   r|   r   r   psycopg2psycopg2.extraselestio_configr   connectget_connection_params
autocommitcursorexecuteCREATE_TABLE_SQLcommitsetfetchalladdcloserangerl   rh   rM   rN   rK   fetchoner)   	Exceptionerror)!r   entity_entriesaxiom_entriescreator_mind_entriesresearch_entriesrelationship_entriesconvo_md_entriesdeep_think_entriesplans_entriesdocs_entriesall_entriesentity_filesaxiom_filesr   r   conncurexisting_keysrowenew_entries
insert_sqlinserted_entries
batch_sizeichunkrowsrj   emb_text	row_tuplereturnedr   msgs!                                    r4   r   z"BloodstreamPipeline._step1_load_kg  sG    	9:.I.}gF  55H(S4_hO45I8T  ..BD]^-noN-iD-h	B "#  #	#
  !!   	 FUE[E[E]s4 4 4Y ?@AcdCPCWCWCYs4 2 29 =>?_`&2[&@

?#&)+&6

?#?^ATVbc?]ASU`a137K3LM137G3HI137K3LM137G3HI137I3JK13}3EF13|3DE<<KK@AKK23IZ	"5#8##Mn&J&J&LMD#DO++-C KK()KKM KKIJEM||~ 4!!3q63q6"234 KK3S5GH 'hK7,MA K  KK7[9IJ		

	J 68J1c+.
;  3#AJ7$&" E4U;HKKhfgi(fl+ 

599Z#<=	! 	 "& 
IKK
I6"||~H(//%-a[&.qk$,QK%-a[.6qk1 	
 @JK0@A3{CS 013= 3D IIKJJL/23C/DDJJ+,KK?EUAVW##}@  	-cU3CLLJJx '',I		s@   :CU7 U2%AU7 *B3U7 CU7 2U7 7	W 9V?9W?Wc                   t         j                  dt        |             | j                  rt         j                  d       y|st         j                  d       y	 ddlm} ddlm} ddlm	} d
}d}	  |       } ||j                  |j                  d      }		 |	j                  |       | j)                         }
|
t         j                  d       yd}	 ddl}ddlm}  |j.                  d&i |j1                         }d|_        |j5                         }d}t7        j8                  d      }|D ]  }|j;                  dd      }|s	  |
|      }|s$t=        t7        j>                  ||            }|	jA                  ||jC                  |||j;                  dd      |j;                  dd      |j;                  dd      |j;                  d      d      g       |r)|j;                  d      r	 |jE                  d ||d   f       |d!z  } |r|jI                          || jJ                  d$<   t         j                  d%|       y# t        $ r }t         j                  d	|       Y d}~yd}~ww xY w# t        $ rR |	j!                  ||j#                  ||j$                  j&                               t         j                  d|       Y "w xY w# t        $ r }t         j                  d|       Y d}~yd}~ww xY w# t        $ r#}t         j                  d|       d}Y d}~d}~ww xY w# t        $ r Y 0w xY w# t        $ r2}t         jG                  d"|j;                  dd#      |       Y d}~Bd}~ww xY w)'z
        Generate embeddings for newly inserted entries and upsert into the
        Qdrant 'genesis_memories' collection.

        Updates embedding_id in PostgreSQL after successful upsert.
        Skips gracefully if Qdrant is unavailable.
        z0[Step 2] Embedding %d new entries into Qdrant...z"  [DRY RUN] Skipping Qdrant write.Nz  Nothing to embed.r   )QdrantClient)models)QdrantConfigz4  Qdrant client not available (%s). Skipping Step 2.genesis_memoriesi      )urlapi_keytimeout)sizedistance)collection_namevectors_configz   Created Qdrant collection '%s'z0  Qdrant connection failed: %s. Skipping Step 2.z:  No embedding function available. Skipping Qdrant upsert.r   Tz2  PG connection for embedding_id update failed: %sz$e2a6d7f8-b3c4-4d5e-8f90-a1b2c3d4e5f6r   r:   rI   r?   r8   r   )rI   r?   r8   r   )r9   vectorpayload)r+  pointsz@UPDATE bloodstream_knowledge SET embedding_id = %s WHERE id = %sru   z  Embed failed for '%s': %s?r   z/[Step 2] Done. %d entries embedded into Qdrant.r   )&ry   r   r   r   qdrant_clientr!  qdrant_client.httpr"  r   r#  ImportErrorrz   r&  r'  get_collectionr  create_collectionVectorParamsDistanceCOSINE_get_embed_functionr   r   r   r   r   r   uuidUUIDrK   r+   uuid5upsertPointStructr   r   r   r   )r   r  r!  qdrant_modelsr#  r   
COLLECTION
VECTOR_DIMqcfgqdrantembed_fnpg_connr   r   pg_curembedded_count
GENESIS_NSrj   r  r-  point_ids                        r4   r   z'BloodstreamPipeline._step2_embed_qdrant  sP    	FKHXY<<KK<=KK-.	2B3
 (

	>D!dhhbQF
L%%j1  ++-NNWX 	5&h&&P)M)M)OPG!%G^^%F
 YYEF
  '	ZEyy!126H#Z!(+tzz*h?@$.%11'#)*/))Hb*A(-		&"(=).7B)?).7);	% 2 	  " eii0^%uW~6 !#I'	ZR MMO(6

$%E~V}  	NNQSVW	  L(($.#0#=#='!.!7!7!>!> $> $ )  >
KL  	NNMsS	$  	NNOQTUF	P % 
  Z:EIIgs<SUXYYZs   I 2&K I2 AK< 
L;BL;0L+L;	I/I**I/2AK	K KK 	K9K44K9<	L(L##L(+	L84L;7L88L;;	M6'M11M6c                  	 	 ddl m} t        j                  j	                  d      xs  t        j                  j	                  dd      }|st
        dz  dz  }|j                         rt        |      5 }|D ]  }|j                         }|j                  d      s|j                  d	      s6|j                  d
d      d   j                         j                  d      j                  d      }|sz|dk7  s|} n ddd       |r4|dk7  r/|j                  |      	d	fd}t        j                  d       |S t        j                  d       y# 1 sw Y   UxY w# t        $ r }t        j                  d|       Y d}~Fd}~ww xY w)z
        Return a callable that maps text -> List[float].
        Tries Gemini API first, falls back to no-op.
        Returns None if no embedding provider is available.
        r   )genaiGEMINI_API_KEYGEMINI_API_KEY_NEWr:   configzsecrets.envzGEMINI_API_KEY_NEW=zGEMINI_API_KEY==ru   "'YOUR_GEMINI_API_KEYN)r'  c                    j                   j                  d|       }t        |j                  d   j                        S )Nzgemini-embedding-001)modelcontentsr   )r"  embed_contentr   
embeddingsvalues)r   result_genai_clients     r4   gemini_embedz=BloodstreamPipeline._get_embed_function.<locals>.gemini_embed!  sC    *11??4!% @ F   1 1! 4 ; ;<<rm   z@  Embedding provider: gemini-embedding-001 (google.genai, 3072d)z$  Gemini embedding not available: %szB  No embedding provider configured. Qdrant upsert will be skipped.)r   r+   returnzList[float])googlerK  osenvironrK   GENESIS_ROOTrx   r~   r,   r   rP   Clientry   r   r  rz   )
r   google_genair'  secrets_pathr   r   r3   r[  r   rZ  s
            @r4   r9  z'BloodstreamPipeline._get_embed_function  s^   	H4jjnn%56b"**..I]_a:bG+h6F&&(l+ *q$% *D#'::<D#/DEYjIk&*jja&8&;&A&A&C&I&I#&N&T&TUX&Y#&32G+G.1G$)** 7&;; , 3 3G 3 D= ^_## 	[\3* **  	HNNA3GG	HsC   A0E 38E,AE0E6E;=E EE 	F$E??Fc                J   t         j                  d       | j                  rt         j                  d       y	 ddl}ddlm}  |j                  di |j                         }|j                         }|j                  d       |j                         }|j                          |j                          	 ddl}dd	lm}	  |j$                  di |	j                         }
|
j'                          	 |
j+                         }|j-                  d       |D ]m  }|\  }}}}}}}}d| }t/        j0                  |||||dd |xs g ||ddt2              }|j5                  ||d       |j7                  dt3        |             o |j9                  dd       |j                          t;        |      | j                  d<   t         j                  dt;        |             y# t        $ rB}d| }t         j                  |       | j                  d   j                  |       Y d}~yd}~ww xY w# t        $ r }t         j)                  d
|       Y d}~yd}~ww xY w# t        $ rB}d| }t         j                  |       | j                  d   j                  |       Y d}~yd}~ww xY w)u  
        Query PostgreSQL for top 50 entries by confidence and cache them
        in Redis with a 24h TTL.

        Key scheme:
          bloodstream:{id}    — JSON blob of the entry
          bloodstream:hot_ids — Redis list of ids (ordered by confidence DESC)
        z&[Step 3] Refreshing Redis hot cache...z!  [DRY RUN] Skipping Redis write.Nr   r   z
                SELECT id, source, type, title, content, tags, confidence, metadata
                FROM bloodstream_knowledge
                ORDER BY confidence DESC
                LIMIT 50
                zStep 3 PostgreSQL query error: r   )RedisConfigz*  Redis unavailable (%s). Skipping Step 3.zbloodstream:hot_idszbloodstream:r   )r9   rI   r?   r8   r<   rC   rA   rJ   F)r>   r0   iQ )exr   z*[Step 3] Done. %d entries cached in Redis.zStep 3 Redis write error: r   )ry   r   r   r   r   r   r   r   r   r   r   r   r  r  r   rh   redisre  Redispingrz   pipelinedeleterM   rN   r+   r   rpushexpirer   )r   r   r   r  r  r  r   r  	redis_libre  rq   piper  row_idrI   rtyper8   r<   rC   rA   rJ   keyblobs                          r4   r   z(BloodstreamPipeline._step3_refresh_redis2  sW    	<=<<KK;<	5#8##Mn&J&J&LMD++-CKK <<>DIIKJJL	%2	F+"C"C"EFAFFH
	-::<DKK-. ?TWQugtZ$VH-zz $!"&u~ JB", (	# !&s	4 du-

0#f+>?  KK-u5LLN),TDJJ~&KKDc$iPQ  	3C59CLLJJx '',		  	NNGM	>  	-.se4CLLJJx '',,	-sI   A;G 5:H+ 0C,I 	H(&8H##H(+	I4II	J" 8JJ"c                   t         j                  d       | j                  rt         j                  d       y	 ddlm}  || j                         d| j
                  d<   t         j                  d	       y# t        $ rQ}d
| }t         j                  |       | j
                  d   j                  |       d| j
                  d<   Y d}~yd}~ww xY w)zv
        Run the MemoryDigestion night-cycle to prune low-impact episodic
        memories older than 7 days.
        z#[Step 4] Running MemoryDigestion...z&  [DRY RUN] Skipping memory digestion.Nr   )r   )r   	completedr   z/[Step 4] Done. Memory digestion cycle complete.zStep 4 MemoryDigestion error: r   r  )	ry   r   r   core.memory_digestionr   r   r  r  rh   )r   memory_digestion_runr   r  s       r4   r   z+BloodstreamPipeline._step4_memory_digestion  s    
 	9:<<KK@A		5I 6-8DJJ)*KKIJ 	523%8CLLJJx '',-4DJJ)*		5s   =A7 7	C ACCc                8
   t         j                  d       | j                  rt         j                  d       yg }	 ddl}ddl}ddlm}  |j                  d&i |j                         }|j                  |j                  j                        }|j                  d       |j                         D cg c]  }t        |       }}|j                          |j                          t         j                  dt!        |             |st         j                  d
       y	 t&        j(                  j+                  dt-        t.        dz  dz               ddlm}  |       }		 ddl}ddlm}  |j                  d&i |j                         }|j                         }|j                  d       |j;                         du}|j                          |j                          |st         j%                  d       yd}	 ddl}ddl}ddlm}  |j                  d&i |j                         }d|_        |j                         }|D ]I  }	 |	t?        |	d      r|	jA                  |      }n(|	t?        |	d      r|	jC                  |      }n |      }|sPtE        |t              r|g}|D ]  }|j                  d|jG                  d      tI        jJ                  |jG                  di             tI        jJ                  |jG                  di             tI        jJ                  |jG                  di             |jG                  dd      tM        |jG                  dd            f       |dz  } |j                  d |jG                  d      f       L |jQ                          |j                          |j                          || jT                  d$<   t         j                  d%|       yc c}w # t"        $ r }t         j%                  d	|       Y d}~yd}~ww xY w# t4        t6        f$ r@}	 ddlm}
 d}	|
}n)# t4        $ r t         j%                  d|       Y Y d}~yw xY wY d}~d}~ww xY w# t"        $ r }t         j%                  d|       Y d}~yd}~ww xY w# t"        $ r1}t         jO                  d!|jG                  d      |       Y d}~d}~ww xY w# t"        $ rC}d"| }t         jS                  |       | jT                  d#   jW                  |       Y d}~Yd}~ww xY w)'aY  
        Query unprocessed AIVA interactions and run them through the mentor
        feedback engine to generate preference pairs for the RLM training loop.

        Skips gracefully if:
          - aiva_interactions table doesn't exist
          - pl_preference_pairs table doesn't exist
          - mentor_feedback_engine import fails
        z.[Step 5] Generating mentor preference pairs...z,  [DRY RUN] Skipping mentor pair generation.Nr   r   )cursor_factoryz
                SELECT *
                FROM aiva_interactions
                WHERE mentor_evaluated = FALSE OR mentor_evaluated IS NULL
                LIMIT 100
                z)  Found %d unprocessed AIVA interactions.z7  aiva_interactions query failed (%s). Skipping Step 5.z/[Step 5] No unprocessed interactions. Skipping.AIVAmentors)MentorFeedbackEngine)run_evaluationz=  mentor_feedback_engine import failed (%s). Skipping Step 5.z
                SELECT 1 FROM information_schema.tables
                WHERE table_name = 'pl_preference_pairs'
                LIMIT 1
                z5  Table existence check failed (%s). Skipping Step 5.z7  pl_preference_pairs table not found. Skipping Step 5.Fevaluategenerate_preference_pairsa6  
                            INSERT INTO pl_preference_pairs
                                (interaction_id, chosen, rejected, context, mentor, score, created_at)
                            VALUES (%s, %s, %s, %s, %s, %s, NOW())
                            ON CONFLICT DO NOTHING
                            r9   chosenrejectedcontextmentorunknownscoreg      ?ru   zBUPDATE aiva_interactions SET mentor_evaluated = TRUE WHERE id = %sz/  Pair generation failed for interaction %s: %sz Step 5 mentor pair write error: r   r   z-[Step 5] Done. %d preference pairs generated.r   ),ry   r   r   r   r   r   r   r   r   r   extrasRealDictCursorr   r   rL   r   r   r  rz   sysr   insertr+   r`  mentor_feedback_enginer|  r3  AttributeErrorr}  r  r   hasattrr~  r  r&   rK   rM   rN   r'   r   r   r  r   rh   )r   interactionsr   r   r  r  r  r   r|  enginer}  run_fntable_existspairs_generatedinteractionpairspairr  s                     r4   r   z'BloodstreamPipeline._step5_mentor_pairs  sa    	DE<<KKFG .0	"5#8##Mn&J&J&LMD++X__-K-K+LCKK 25@#DI@L@IIKJJLKKCSEVW KKIJ	HHOOAs<&#89#DEFC)+F	5#8##Mn&J&J&LMD++-CKK <<>5LIIKJJL
 NNTU <	-"5#8##Mn&J&J&LMD#DO++-C+ +=*=)gfj.I & <+@[0\ & @ @ M &{ 3   "%.!& % -  !, 5 $

488Hb+A B $

488J+C D $

488Ir+B C $9 = %dhhw&< =  (1,#-( KK\$.0I+=Z KKMIIKJJL &5

>"C_Uk A  	NNTVYZ	 ^, 	A' ^`cd 	4  	NNRTWX	v ! =LL!R!,!6= ==  	-4SE:CLLJJx '',,	-s   A7O& 2O!AO& A P A=Q$ 7AS A
RS C:R3S !O& &	P/P

PQ!"
P-,Q-QQQQQ!$	R-RR	S
&S?S S

S 	T8TTc                   t        j                         | j                  z
  }| j                  j	                  dd      }t        |t              rd| d}n|}d| j                  j                  d       d| j                   d| j                   d	| j                  d
    d| j                  d    d| j                  d    d| j                  d    d| j                  d    d| d| j                  d    d|dd}| j                  d   r<|dt        | j                  d          dz  }| j                  d   D ]  }|d| dz  } |dz  }| j                  rt        j                  d|       y	 t        j                  d d !       t!        t"        d"d#$      5 }|j%                  |       ddd       t        j                  d%t"               t+        |d'(       y# 1 sw Y   1xY w# t&        $ r }t        j)                  d&|       Y d}~>d}~ww xY w))z<Append a structured run summary to bloodstream_last_run.log.r   r   zpruned z	 memoriesrb   r   z"] Pipeline run complete
  Source: z
  Dry run: z
  KG files scanned: r   z
  Entries found: r   z
  New entries loaded: r   z
  Qdrant embeddings: r   z
  Redis cache items: r   z
  Digestion: z
  Mentor pairs generated: r   z
  Duration: z.1fzs
r   z
  Errors (z):
z    - 
z
--- DRY RUN REPORT ---
%sNTparentsexist_okarr   rs   z![Step 6] Run report written to %sz$[Step 6] Failed to write run log: %sr:   )end)r   r   r   r   rK   r&   r(   r   strftimerI   r   r   ry   r   DATA_DIRmkdirr~   LOG_FILEwriter   r  print)r   elapseddigestion_valdigestion_strreporterrr   r   s           r4   r   z'BloodstreamPipeline._step6_write_report3  s$   .."T__4

'91=mS)%m_I>M)M &&':;< = &,, (##'::o#>"? @  $

? ;< =%%)ZZ0D%E$F G$$(JJ/@$A#B C$$(JJ~$>#? @)? +))-N)C(D E"3-s
, 	 ::h
3tzz(';#<"=TBBFzz(+ +F3%r**+ 	$<<KK6?	FNN4$N7hg6 !" !KK;XF
 	f"! !  	FLL?EE	Fs0   +)G" G&"G" GG" "	H+HHc                   	 t         j                  dd       t        j                         r|t	        j                         t        j                         j                  z
  }|dk  rt        j                  d|       yt        j                  d|dz         t        j                          t        j                  t        j                  t        j                         | j                  j!                         d      d	
       t        j#                  dt        j                                y# t$        $ r }t        j'                  d|       Y d}~yd}~ww xY w)zz
        Write a run flag to prevent concurrent executions.
        Returns False if pipeline is already running.
        Tr  i   z4Pipeline already running (flag age: %.0fs). Exiting.Fz0Stale run flag found (%.0f hours old). Removing.i  )pidstartedrr   rs   zRun flag acquired (PID %d).zCannot acquire run flag: %sN)r  r  RUN_FLAGrx   r   statst_mtimery   rz   unlink
write_textrM   rN   r^  getpidr   	isoformatr   r   r  )r   ager   s      r4   r   z%BloodstreamPipeline._acquire_run_flagc  s    
	NN4$N7 iikHMMO$<$<<:NN#Y[^_ NN#UWZ]aWabOO%

299;4==;R;R;TUV     KK5ryy{C 	LL6<	s   A9D) <B,D) )	E2EEc                    	 t         j                         r*t         j                          t        j	                  d       yy# t
        $ r Y yw xY w)z"Remove the run flag on clean exit.zRun flag released.N)r  rx   r  ry   r   r   )r   s    r4   r   z%BloodstreamPipeline._release_run_flag~  sA    	 !12 !  		s   =A 	AAN)manualF)rI   r+   r   bool)r\  None)r\  List[Dict[str, Any]])r  r  r\  r  )r\  r  )__name__
__module____qualname____doc__r   r   r   r   r9  r   r   r   r   r   r   r   rm   r4   r   r     sK    
(A,TpuWn(XL-`50UVr,`6rm   __main__u;   Genesis RLM Bloodstream Pipeline — autonomous memory sync)r;   z--sourcer  z@What triggered this run (e.g. manual, cron_nightly, session_end))r0   helpz	--dry-run
store_truez,Report stats without writing to any database)actionr  )rI   r   )r   )r/   r   r0   r'   r\  r'   )rQ   Dict[str, Any]rR   r+   r\  zOptional[Dict[str, Any]])rj   r  r\  r+   )r   r   r   r+   r\  r  )r   r   r\  r  )r   r   r   r+   r   r  r\  r  )=r  
__future__r   argparserM   loggingr^  r  r   r:  r   pathlibr   typingr   r   r   r	   r
   r`  r   r   r   r   r   r   r   r   r   r  r  r  r   r  r+   basicConfigINFO	getLoggerry   r   r5   rY   r`   rl   r   	frozensetr   r   r   r   r   r  ArgumentParserparseradd_argument
parse_argsargsrI   r   rj  r   r   rm   r4   <module>r     s|  8 #    	 
     3 3 +,!22Z?008;$'88>I $'88:E$'88?J $'88;QQ $';;$w.	$v-& ,,00 3|f,/??@ A 3|$ %   
,,A
 
		0	1 : F3l,^$T   

7 	@@@ 	@
 @NII
I Ib@ @N z$X$$QF O  
 ;  
 D"$++t||LHLLN# rm   