
    .מi#                        d Z ddlmZ ddlZddlZddlZddlZddlZddlm	Z	 ddl
mZmZmZ ddlmZ  ej                   e      ZdZ G d d	      Zdd
Zy)u  
core/graph/sync.py — JSONL-to-FalkorDB sync engine.

KGSyncer reads the canonical KNOWLEDGE_GRAPH JSONL files and upserts each
entry as a node in FalkorDB.  The JSONL files remain the source of truth;
FalkorDB is the queryable read-optimised projection.

Conventions applied:
    - "id"             → node ID  (falls back to line hash if absent)
    - "type" / "category" → node label  (falls back to "entity" / "axiom")
    - all other fields → node properties (complex values JSON-encoded)
    - malformed lines  → skipped, error count incremented

# VERIFICATION_STAMP
# Story: M9.03 — core/graph/sync.py — KGSyncer
# Verified By: parallel-builder
# Verified At: 2026-02-25T00:00:00Z
# Tests: 8/8
# Coverage: 100%
    )annotationsN)Path)AnyDictOptional)GenesisGraphz%/mnt/e/genesis-system/KNOWLEDGE_GRAPHc                  x    e Zd ZdZef	 	 	 	 	 d
dZdddZdddZddZddZ		 d	 	 	 	 	 	 	 ddZ
	 	 	 	 	 	 dd	Zy)KGSyncera#  
    Synchronises KNOWLEDGE_GRAPH JSONL files into FalkorDB.

    Parameters
    ----------
    graph:
        An initialised GenesisGraph instance.
    kg_base_path:
        Absolute path to the KNOWLEDGE_GRAPH root directory.
        Defaults to /mnt/e/genesis-system/KNOWLEDGE_GRAPH.
    c                2    || _         t        |      | _        y N)graphr   kg_base)selfr   kg_base_paths      (/mnt/e/genesis-system/core/graph/sync.py__init__zKGSyncer.__init__3   s    
 
L)    c                (    | j                  |d      S )z
        Sync all *.jsonl files in *kg_base/folder* as entity nodes.

        Each JSONL line becomes a node labelled by its "type" or "category"
        field (falling back to "entity").

        Returns a stats dict: {"synced": N, "errors": N}.
        entitydefault_label_sync_folderr   folders     r   sync_entitieszKGSyncer.sync_entities?   s       x @@r   c                (    | j                  |d      S )z
        Sync all *.jsonl files in *kg_base/folder* as axiom nodes.

        Each JSONL line becomes a node labelled by its "type" or "category"
        field (falling back to "axiom").

        Returns a stats dict: {"synced": N, "errors": N}.
        axiomr   r   r   s     r   sync_axiomszKGSyncer.sync_axiomsJ   s       w ??r   c                    | j                         }| j                         }|j                  dd      |j                  dd      |j                  dd      |j                  dd      z   dS )z
        Run sync_entities() and sync_axioms() and combine their stats.

        Returns:
            {
                "entities_synced": N,
                "axioms_synced": N,
                "errors": N,
            }
        syncedr   errorsentities_syncedaxioms_syncedr"   )r   r   get)r   entity_statsaxiom_statss      r   sync_allzKGSyncer.sync_allU   sk     ))+&&(  ,//!<(__Xq9"&&x3koohPQ6RR
 	
r   c                    | j                  dd|      }| j                  dd|      }|j                  dd      |j                  dd      |j                  dd      |j                  dd      z   d	S )
af  
        Only sync JSONL files whose mtime is newer than *since_file_mtime*.

        ``since_file_mtime`` is a Unix timestamp (float), e.g. from
        ``time.time()`` or ``os.path.getmtime(path)``.

        Returns:
            {
                "entities_synced": N,
                "axioms_synced": N,
                "errors": N,
            }
        entitiesr   )r   since_mtimeaxiomsr   r!   r   r"   r#   )r   r&   )r   since_file_mtimer'   r(   s       r   incremental_synczKGSyncer.incremental_synci   s     ((h<L ) 
 ''G9I ( 
  ,//!<(__Xq9"&&x3koohPQ6RR
 	
r   Nc                   | j                   |z  }d}d}|j                         st        j                  d|       dddS t	        |j                  d            }|D ]D  }|!	 |j                         j                  }	|	|k  r&| j                  ||      \  }
}||
z  }||z  }F t        j                  d|||       ||dS # t        $ r Y ow xY w)a{  
        Core sync loop: iterate .jsonl files in *folder*, upsert each line.

        Parameters
        ----------
        folder:
            Sub-directory name relative to self.kg_base.
        default_label:
            Label to apply when a line has no "type" or "category" field.
        since_mtime:
            If provided, skip files whose mtime <= since_mtime.
        r   z#KGSyncer: folder does not exist: %s)r!   r"   z*.jsonlz.KGSyncer._sync_folder(%s): synced=%d errors=%d)r   existsloggerwarningsortedglobstatst_mtimeOSError
_sync_fileinfo)r   r   r   r,   folder_pathr!   r"   jsonl_files
jsonl_path
file_mtimefile_syncedfile_errorss               r   r   zKGSyncer._sync_folder   s    $ llV+!!#NN@+N1--[--i89% 	"J&!+!2!;!;J ,'+M($K k!Fk!F	"  	<		
 !F33#  s   !B??	C
Cc                   d}d}	 |j                  dd      }t	        |j                         d	      D ]  \  }}|j                         }	|	s	 t        j                  |	      }
t        |
t              s't        j                  d|j                  |       |dz  }gt        |
|j                  |      }|
j!                  d      xs |
j!                  d      xs |}t        |t"              r|s|}|
j%                         D ci c]  \  }}|dvr|| }}}|j                  |d<   | j&                  j)                  |||      }|r|dz  }|dz  } ||fS # t        $ r!}t        j                  d||       Y d}~yd}~ww xY w# t        j                  $ r2}t        j                  d
|j                  ||       |dz  }Y d}~d}~ww xY wc c}}w )z
        Parse a single .jsonl file and upsert each valid line into FalkorDB.

        Returns (synced_count, error_count).
        r   zutf-8replace)encodingr"   zKGSyncer: cannot read %s: %sN)r      rD   )startu(   KGSyncer: malformed JSON at %s:%d — %sz*KGSyncer: non-dict line at %s:%d, skippingtypecategory)id_source_file)	read_textr8   r2   r3   	enumerate
splitlinesstripjsonloadsJSONDecodeErrordebugname
isinstancedict_extract_idstemr&   stritemsr   
add_entity)r   pathr   r!   r"   textexcline_noraw_linelinedata	entity_identity_typekvpropsoks                    r   r9   zKGSyncer._sync_file   s    	>>79>ED
 "+4??+<A!F /	GX>>#D
zz$' dD)@II
 !#D$))W=I  !88J'!  
 k3/{+
 !JJL%AqG# 1%E %
 %)IIE.!&&y+uEB!!_/	b v~k  	NN94E	 '' >II	 !:%s5   E FG	F E;;F G'GG)r   r   r   rW   returnNone)r+   )r   rW   rg   Dict[str, int])r-   )rg   ri   )r.   floatrg   ri   r   )r   rW   r   rW   r,   zOptional[float]rg   ri   )rZ   r   r   rW   rg   ztuple[int, int])__name__
__module____qualname____doc___DEFAULT_KG_BASEr   r   r   r)   r/   r   r9    r   r   r
   r
   &   s    
 -** * 
	*	A	@
(
D (,	2424 24 %	24
 
24hDD D 
	Dr   r
   c                    d| v rt        | d   t              r
| d   r| d   S d| v rt        | d   t              r| d   S | d| }dt        j                  |j	                               j                         dd z   S )u   
    Extract a stable node ID from a JSONL record.

    Priority order:
    1. data["id"] — explicit ID field
    2. data["entity_id"] — alternate common field name
    3. Deterministic hash of file_stem + line_no (fallback for lines without IDs)
    rH   ra   :auto_N   )rS   rW   hashlibmd5encode	hexdigest)r`   	file_stemr]   
hash_inputs       r   rU   rU     s     t|
4:s3T
Dzdz${*;SAK  ;ay)JW[[!2!2!45??A#2FFFr   )r`   zDict[str, Any]ry   rW   r]   intrg   rW   )rn   
__future__r   ru   rN   loggingostimepathlibr   typingr   r   r   core.graph.clientr   	getLoggerrk   r2   ro   r
   rU   rp   r   r   <module>r      sU   ( #    	   & & *			8	$: Y YBGr   