
    ϟi_0                       d Z ddlmZ ddlZddlZddlmZ ddlmZmZm	Z	 ddl
Z
ddlZ
 ej                  dd       e ej                  dd	             ej                  d
d       ej                  dd       ej                  dd      dZdZdZdZd!dZd"dZd#d$dZdZ	 	 d%	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 d&dZ	 	 	 	 d'dZdZdZdZd#d(dZd)dZd*d+dZdZ	 d#	 	 	 	 	 d,d Zy)-ug  
MODULE 6: PostgreSQL Store — Genesis KB Ingestion Pipeline
===========================================================
Handles page metadata tracking, ingestion logging, and content hash
lookup for the platform KB pipeline.

Design decisions:
- customer_id=None is stored as '' (empty string) for global KB pages.
  This avoids the NULL != NULL edge case in PostgreSQL unique constraints
  while keeping the API clean (callers pass None for global).
- All functions accept an optional `conn` parameter; if not provided,
  a new connection is created and closed after use (auto-commit pattern).
- UNIQUE constraint on (platform, customer_id, url) works cleanly because
  customer_id is never NULL in the DB — it's always at least ''.

# VERIFICATION_STAMP
# Story: 6.01–6.05
# Verified By: parallel-builder
# Verified At: 2026-02-26
# Tests: 13/13
# Coverage: 100%
    )annotationsN)datetime)ListOptionalTuplePG_HOSTz(postgresql-genesis-u50607.vm.elestio.appPG_PORT25432PG_USERpostgresPG_PASSzCiBjh6LM7Yuqkq-jo2r7eQDwPG_DB)hostportuserpassworddbname a  
CREATE TABLE IF NOT EXISTS platform_kb_pages (
    id              SERIAL PRIMARY KEY,
    platform        VARCHAR(50)     NOT NULL,
    customer_id     VARCHAR(100)    NOT NULL DEFAULT '',
    url             TEXT            NOT NULL,
    title           TEXT,
    content_hash    VARCHAR(64)     NOT NULL,
    chunk_count     INTEGER         DEFAULT 0,
    last_ingested   TIMESTAMPTZ     DEFAULT NOW(),
    metadata        JSONB           DEFAULT '{}',
    UNIQUE(platform, customer_id, url)
);
CREATE INDEX IF NOT EXISTS idx_pkp_platform ON platform_kb_pages(platform);
CREATE INDEX IF NOT EXISTS idx_pkp_customer  ON platform_kb_pages(customer_id);
a}  
CREATE TABLE IF NOT EXISTS platform_kb_ingestion_log (
    id               SERIAL PRIMARY KEY,
    platform         VARCHAR(50)     NOT NULL,
    customer_id      VARCHAR(100)    NOT NULL DEFAULT '',
    started_at       TIMESTAMPTZ     DEFAULT NOW(),
    completed_at     TIMESTAMPTZ,
    pages_fetched    INTEGER         DEFAULT 0,
    pages_changed    INTEGER         DEFAULT 0,
    chunks_created   INTEGER         DEFAULT 0,
    vectors_upserted INTEGER         DEFAULT 0,
    errors           INTEGER         DEFAULT 0,
    status           VARCHAR(20)     DEFAULT 'running',
    metadata         JSONB           DEFAULT '{}'
);
c                 4    t        j                  di t        S )z3Return a new PostgreSQL connection using PG_CONFIG. )psycopg2connect	PG_CONFIGr       )/mnt/e/genesis-system/core/kb/pg_store.pyget_connectionr   W   s    (i((r   c                    | | S t         S )u;   Convert None → '' so the UNIQUE constraint works cleanly.)_GLOBAL_CUSTOMER_ID)customer_ids    r   _normalize_customer_idr    \   s    %1;J7JJr   c                :   | du }|r
t               } 	 | j                         5 }|j                  t               |j                  t               ddd       | j                          |r| j                          yy# 1 sw Y   -xY w# |r| j                          w w xY w)u(  
    Create platform_kb_pages and platform_kb_ingestion_log tables if they
    do not already exist.  Idempotent — safe to call on every startup.

    Parameters
    ----------
    conn : psycopg2 connection, optional
        If not supplied, a new connection is created, used, and closed.
    N)r   cursorexecuteCREATE_PAGES_TABLE_SQLCREATE_LOG_TABLE_SQLcommitclose)conn_owncurs      r   ensure_schemar+   a   s     4<D[[] 	.cKK./KK,-	. 	JJL 	. 	.
 JJL s"   B +A9B 9B>B Ba  
INSERT INTO platform_kb_pages
    (platform, customer_id, url, title, content_hash, chunk_count, last_ingested, metadata)
VALUES
    (%(platform)s, %(customer_id)s, %(url)s, %(title)s,
     %(content_hash)s, %(chunk_count)s, NOW(), %(metadata)s)
ON CONFLICT (platform, customer_id, url) DO UPDATE SET
    title          = EXCLUDED.title,
    content_hash   = EXCLUDED.content_hash,
    chunk_count    = EXCLUDED.chunk_count,
    last_ingested  = NOW(),
    metadata       = EXCLUDED.metadata
RETURNING id;
c                   t        |      }t        j                  |xs i       }	| j                         5 }
|
j	                  t
        |||||||	d       |
j                         }| j                          |d   cddd       S # 1 sw Y   yxY w)a$  
    Upsert page metadata into platform_kb_pages.

    On conflict (same platform + customer_id + url), all mutable fields are
    updated.  customer_id=None is treated as global (stored as '').

    Returns
    -------
    int
        The database row id of the inserted / updated page.
    platformr   urltitlecontent_hashchunk_countmetadatar   N)r    jsondumpsr"   r#   UPSERT_PAGE_SQLfetchoner&   )r(   r.   r/   r0   r1   r2   r   r3   cid	meta_jsonr*   rows               r   upsert_pager;      s    * !
-C

8>r*I	 #$" ,*%	
 lln1v  s   ABBc                >   |sy| j                         5 }|D ]L  \  }}}}}}}	t        |      }
t        j                  |	xs i       }|j	                  t
        ||
|||||d       N | j                          ddd       t        |      S # 1 sw Y   t        |      S xY w)z
    Batch upsert multiple pages in a single transaction.

    Each tuple in *pages* is:
        (platform, url, title, content_hash, chunk_count, customer_id, metadata)

    Returns the number of rows upserted.
    r   r-   N)r"   r    r4   r5   r#   r6   r&   len)r(   pagesr*   r.   r/   r0   r1   r2   r   r3   r8   r9   s               r   upsert_pages_batchr?      s     	 #',	 $XsE<((5C

8>r2IKK (#&"$0#. )		  	#$ u:%$ u:s   A"B		Bz
INSERT INTO platform_kb_ingestion_log (platform, customer_id, status)
VALUES (%(platform)s, %(customer_id)s, 'running')
RETURNING id;
aq  
UPDATE platform_kb_ingestion_log SET
    completed_at     = NOW(),
    pages_fetched    = %(pages_fetched)s,
    pages_changed    = %(pages_changed)s,
    chunks_created   = %(chunks_created)s,
    vectors_upserted = %(vectors_upserted)s,
    errors           = %(errors)s,
    status           = %(status)s,
    metadata         = %(metadata)s
WHERE id = %(log_id)s;
a  
SELECT id, platform, customer_id, started_at, completed_at,
       pages_fetched, pages_changed, chunks_created, vectors_upserted,
       errors, status, metadata
FROM platform_kb_ingestion_log
WHERE platform = %(platform)s
ORDER BY started_at DESC
LIMIT %(limit)s;
c                    t        |      }| j                         5 }|j                  t        ||d       |j	                         }| j                          |d   cddd       S # 1 sw Y   yxY w)z
    Record the start of an ingestion run.

    Returns
    -------
    int
        The log row id, to be passed to log_ingestion_complete().
    r.   r   r   N)r    r"   r#   LOG_START_SQLr7   r&   )r(   r.   r   r8   r*   r:   s         r   log_ingestion_startrC      s\     !
-C	 #M#MNlln1v	  s   >A$$A-c                   | j                         5 }|j                  t        ||j                  dd      |j                  dd      |j                  dd      |j                  dd      |j                  dd      |j                  dd      t	        j
                  |j                  d	i             d
       | j                          ddd       y# 1 sw Y   yxY w)ax  
    Finalise an ingestion log row with statistics.

    Parameters
    ----------
    conn
        Active psycopg2 connection.
    log_id
        ID returned by log_ingestion_start().
    stats
        Dict with keys: pages_fetched, pages_changed, chunks_created,
        vectors_upserted, errors, status ('completed' | 'failed'),
        and optionally metadata (dict).
    pages_fetchedr   pages_changedchunks_createdvectors_upsertederrorsstatus	completedr3   )log_idrE   rF   rG   rH   rI   rJ   r3   N)r"   r#   LOG_COMPLETE_SQLgetr4   r5   r&   )r(   rL   statsr*   s       r   log_ingestion_completerP     s     
 # $)IIoq$A$)IIoq$A$)II.>$B$)II.@!$D$)IIh$:$)IIh$D$(JJuyyR/H$I		
 	  s   B3CCc                   | j                  t        j                  j                        5 }|j	                  t
        ||d       |j                         }ddd       D cg c]  }t        |       c}S # 1 sw Y   "xY wc c}w )z
    Return the most recent ingestion runs for a platform, newest first.

    Returns
    -------
    list[dict]
        Each dict has keys matching the platform_kb_ingestion_log columns.
    )cursor_factory)r.   limitN)r"   r   extrasRealDictCursorr#   LOG_HISTORY_SQLfetchalldict)r(   r.   rS   r*   rowsrs         r   get_ingestion_historyr[   -  sl     
HOO$B$B	C sO(U%KL||~ ""DG""  #s   *A6!B6A?zt
SELECT url, content_hash
FROM platform_kb_pages
WHERE platform = %(platform)s
  AND customer_id = %(customer_id)s;
c                    t        |      }| j                         5 }|j                  t        ||d       |j	                         }ddd       D ci c]  \  }}||
 c}}S # 1 sw Y   xY wc c}}w )a  
    Return a {url: content_hash} mapping for all pages belonging to
    the given platform (and optionally scoped to a customer).

    Used by the orchestrator for change-detection: only pages whose hash
    differs from the stored value need re-ingestion.
    rA   N)r    r"   r#   GET_HASHES_SQLrW   )r(   r.   r   r8   r*   rY   r/   r1   s           r   get_content_hashesr^   H  sm     !
-C	 #N#$NO||~ 8<<"3#|C<<  =s   *A$A0$A-)returnzpsycopg2.extensions.connection)r   Optional[str]r_   str)N)r_   None)NN)r.   ra   r/   ra   r0   ra   r1   ra   r2   intr   r`   r3   zOptional[dict]r_   rc   )r>   zCList[Tuple[str, str, str, str, int, Optional[str], Optional[dict]]]r_   rc   )r.   ra   r   r`   r_   rc   )rL   rc   rO   rX   r_   rb   )
   )r.   ra   rS   rc   r_   z
list[dict])r.   ra   r   r`   r_   zdict[str, str]) __doc__
__future__r   r4   osr   typingr   r   r   r   psycopg2.extrasgetenvrc   r   r   r$   r%   r   r    r+   r6   r;   r?   rB   rM   rV   rC   rP   r[   r]   r^   r   r   r   <module>rk      sz  . #  	  ( (   BIIi!KL			)W-.BIIi,		)%?@bii,	   " $)
K
6. "&#'' 
' 	'
 ' ' ' ' 	'T!N! 	!P "@#& "&== = 	=r   