
    Yi|;                    T   U d Z ddlmZ ddlZddlZddlZddlmZmZ ddlm	Z	m
Z
mZmZ ddlmZ ddlmZmZmZ  ej&                  d	      Zd
Zded<   dZded<   dZded<   dZded<   dZded<   dZded<   dddddZded<   g dg d g d!d"Zd#ed$<   d*d%Zd*d&Z G d' d(      Z g d)Z!y)+a  RLM Neo-Cortex -- Feedback Collection Pipeline.

Collects thumbs-up / thumbs-down / neutral feedback from Telegram callbacks
and generic webhooks, converts each interaction into a DPO PreferencePair,
and persists pairs to the ``pl_preference_pairs`` PostgreSQL table.

Implements Stories 5.01-5.06 of the RLM Neo-Cortex PRD (Module 5).

Infrastructure:
    - PostgreSQL (Elestio) for durable preference-pair storage
    - Redis (Elestio) for 24-hour interaction-history cache
    - No SQLite, no local files, no hardcoded secrets

VERIFICATION_STAMP
Story: 5.01-5.06
Verified By: parallel-builder
Verified At: 2026-02-26
Tests: see tests/rlm/test_feedback.py
Coverage: 100%
    )annotationsN)datetimetimezone)AnyDictListOptional)UUID   )FeedbackCollectorProtocolFeedbackSignalPreferencePairzcore.rlm.feedbackpl_preference_pairsstr_TABLE_NAMEzrlm:interaction_INTERACTION_KEY_PREFIXiQ int_INTERACTION_TTL_SECONDSd   _DPO_MINIMUM_PAIRStelegram_feedback_DEFAULT_ANNOTATORi  _FALLBACK_MAX_CHARSzjThank you for reaching out. I'd be happy to help with your booking. Could you please provide more details?zhI appreciate you sharing that with us. Your feedback has been noted and we will look into this promptly.z_That's a great question. Let me get you the right information. Could you clarify what you need?zJThank you for your message. A team member will follow up with you shortly.)booking	complaintquestiondefaultzDict[str, str]_FALLBACK_TEMPLATES)bookappointschedulereservavailab)complainunhappy
disappointissueproblemwrongbad)whathowwhenwherewhywho?)r   r   r   zDict[str, List[str]]_FALLBACK_KEYWORDSc                    | j                         t        j                         D ]  \  }}t        fd|D              s|c S  y)z8Return the best-matching intent key for the given input.c              3  &   K   | ]  }|v  
 y wN ).0kwlowers     */mnt/e/genesis-system/core/rlm/feedback.py	<genexpr>z!_detect_intent.<locals>.<genexpr>D   s     .rrU{.s   r   )r9   r2   itemsany)
input_textintentkeywordsr9   s      @r:   _detect_intentrA   @   sF    E.446 .X..M     c                    | r| j                         s	t        d   S t        |       }t        |   }t        |      t        k\  r	|dt         }|S )zStory 5.06 -- Return a short, contextual fallback response.

    Never returns an empty string.  Always < 500 characters.
    Varies by detected intent (booking / complaint / question / default).
    r   N)stripr   rA   lenr   )r>   r?   responses      r:   _generate_fallback_responserG   I   sS     Z--/"9--J'F"6*H 8}++001OrB   c                      e Zd ZU dZeZded<   	 	 d	 	 	 	 	 ddZddZddZ		 d	 	 	 	 	 	 	 	 	 ddZ
dd	Z	 	 	 	 	 	 	 	 	 	 dd
Z	 	 	 	 	 	 ddZddZdddZddZy)FeedbackCollectora^  Collects feedback signals and converts them to DPO preference pairs.

    Story 5.01 -- constructor
    Story 5.02 -- record_feedback
    Story 5.03 -- cache_interaction / get_interaction
    Story 5.04 -- get_pair_count / get_recent_pairs
    Story 5.05 -- check_dpo_readiness
    Story 5.06 -- _generate_fallback_response (module-level helper)
    r   
TABLE_NAMENc                    |xs t         j                  j                  d      }|st        d      || _        |xs t         j                  j                  d      }|| _        d| _        d| _        y)a  Story 5.01 -- Initialise collector.

        Parameters
        ----------
        pg_dsn:
            PostgreSQL DSN string.  Falls back to ``DATABASE_URL`` env var.
            If neither is provided, raises ``ValueError``.
        redis_url:
            Optional Redis URL for interaction caching.  Falls back to
            ``REDIS_URL`` env var.  If absent, Redis caching is disabled
            (get_interaction always returns None).
        DATABASE_URLz>PostgreSQL DSN is required.  Pass pg_dsn= or set DATABASE_URL.	REDIS_URLN)osenvironget
ValueError_pg_dsn
_redis_url_pg_pool_redis)selfpg_dsn	redis_urlresolved_dsnresolved_rediss        r:   __init__zFeedbackCollector.__init__n   sg    " ?!?P  ) #Abjjnn[&A)7 "rB   c                   K   | j                   9	 ddl}|j                  | j                         d{   | _         | j                   S | j                   S 7 !# t        $ r}t        d|       |d}~ww xY ww)z:Return the asyncpg connection pool, creating it if needed.Nr   z"Failed to create PostgreSQL pool: )rT   asyncpgcreate_poolrR   	ExceptionRuntimeError)rV   r]   excs      r:   _get_pg_poolzFeedbackCollector._get_pg_pool   st     == X&-&9&9$,,&G G }}t}} !H X"%Gu#MNTWWXs7   A6"A A	A A6A 	A3A..A33A6c                  K   | j                   A| j                  r5	 ddlm} |j	                  | j                  d      | _         | j                   S | j                   S # t
        $ r }t        j                  d|       Y d}~yd}~ww xY ww)z/Return the Redis client, creating it if needed.Nr   T)decode_responseszFailed to connect to Redis: %s)rU   rS   redis.asyncioasynciofrom_urlr_   loggerwarning)rV   aioredisra   s      r:   
_get_rediszFeedbackCollector._get_redis   sr     ;;4??0&//RV/W {{t{{  ?Es.   B(A B	B%B ;B BBc                  K   |t         j                  k(  ry| j                  ||       d{   }|r%|j                  dd      }|j                  dd      }n*|xs i }|j                  dd      }|j                  dd      }t	        |      }	|t         j
                  k(  r|}
|	}n|	}
|}t        ||
|t        d|t        |      |j                  d      }| j                  |       d{    |S 7 7 w)a  Convert a user feedback signal into a DPO PreferencePair.

        Pipeline:
            1. Retrieve cached interaction (input + AI output)
            2. Generate fallback response for the rejected side
            3. Build PreferencePair (POSITIVE: AI chosen; NEGATIVE: AI rejected)
            4. Persist pair to ``pl_preference_pairs``

        NEUTRAL signals return None without any DB write.
        Nr>    output_textg      ?)interaction_id	tenant_idsignal)r>   chosen_outputrejected_outputannotator_id
confidencemetadata)r   NEUTRALget_interactionrP   rG   POSITIVEr   r   r   name_persist_pair)rV   rp   ro   rq   contextinteractionr>   	ai_outputctxfallbackchosenrejectedpairs                r:   record_feedbackz!FeedbackCollector.record_feedback   s    " ^+++ !00NKK
 $r:J#r:I -RCr2Jr2I.z: ^,,,FHF H! $+"0 ^ ++
   &&&O LL 	's"   *C4C0B<C4)C2*C42C4c                  K   | j                          d{   }dt         d}|j                         4 d{   }|j                  ||j                  |j
                  |j                  |j                  |j                  t        j                  |j                        t        j                  t        j                               d{    ddd      d{    y7 7 7 7 # 1 d{  7  sw Y   yxY ww)z8Write a PreferencePair to the pl_preference_pairs table.Nz
            INSERT INTO z
                (input_text, chosen_output, rejected_output,
                 annotator_id, confidence, metadata, created_at)
            VALUES
                ($1, $2, $3, $4, $5, $6, $7)
        )rb   r   acquireexecuter>   rr   rs   rt   ru   jsondumpsrv   r   nowr   utc)rV   r   poolsqlconns        r:   r{   zFeedbackCollector._persist_pair   s     &&(($ &	 <<> 
	 
	T,,""$$!!

4==)X\\*	 	 	
	 
	 
	 )
		
	 
	 
	 
	sg   C<C"C<C!C<BC'	C#
C'C<C%C<!C<#C'%C<'C9-C0.C95C<c                  K   | j                          d{   }|t        j                  d       yt         d| d| }t	        j
                  ||t        |      |d      }|j                  ||t               d{    y7 n7 w)zCache an interaction in Redis with a 24-hour TTL.

        Key: ``rlm:interaction:{tenant_id}:{interaction_id}``

        Returns True if cached successfully, False otherwise.
        Nu,   Redis unavailable — interaction not cachedF:)r>   rn   rp   ro   )exT)	rk   rh   ri   r   r   r   r   setr   )rV   rp   ro   r>   rn   rediskeyvalues           r:   cache_interactionz#FeedbackCollector.cache_interaction  s      oo''=NNIJ()9+Q~6FG

$&Y,	
  iiU'?i@@@ ( 	As"   B	BA(B	?B B	B	c                  K   | j                          d{   }|yt         d| d| }|j                  |       d{   }|y	 t        j                  |      S 7 G7 # t        j
                  t        f$ r Y yw xY ww)zRetrieve a cached interaction.

        Returns a dict with keys input_text / output_text, or None.
        Cross-tenant access is prevented by key structure (tenant_id in key).
        Nr   )rk   r   rP   r   loadsJSONDecodeError	TypeError)rV   rp   ro   r   r   raws         r:   rx   z!FeedbackCollector.get_interaction   s      oo''=()9+Q~6FGIIcN";	::c?" (
 # $$i0 		sE   BA)B A B	A" B B"A>;B=A>>Bc                $  K   | j                          d{   }|j                         4 d{   }|j                  dt                d{   }t	        |d         cddd      d{    S 7 \7 E7 '7 # 1 d{  7  sw Y   yxY ww)z5Return exact COUNT(*) of rows in pl_preference_pairs.NSELECT COUNT(*) AS cnt FROM cnt)rb   r   fetchrowr   r   )rV   r   r   rows       r:   get_pair_countz FeedbackCollector.get_pair_count<  s     &&((<<> 	# 	#T(D[M&RSSCs5z?	# 	# 	# )	#S	# 	# 	# 	#sf   BA3BA5BA;A7A;!B-A9.B5B7A;9B;BBB	Bc                L  K   | j                          d{   }dt         d}|j                         4 d{   }|j                  ||       d{   }|D cg c]  }t	        |       c}cddd      d{    S 7 k7 J7 2c c}w 7 # 1 d{  7  sw Y   yxY ww)zReturn the most recently inserted pairs, newest first.

        Each row dict contains: input_text, chosen_output, rejected_output,
        annotator_id, confidence, metadata, created_at.
        Nz
            SELECT input_text, chosen_output, rejected_output,
                   annotator_id, confidence, metadata, created_at
            FROM zC
            ORDER BY created_at DESC
            LIMIT $1
        )rb   r   r   fetchdict)rV   limitr   r   r   rowsrs          r:   get_recent_pairsz"FeedbackCollector.get_recent_pairsC  s      &&((  	 <<> 	+ 	+TC//D%)*DG*	+ 	+ 	+ )	+/*	+ 	+ 	+ 	+sx   B$B"B$BB$BBBB.B0B$<B=B$B$BBB$B!BB!B$c                ^  K   | j                          d{   }|j                         4 d{   }|j                  dt                d{   }t	        |d         }|j                  dt                d{   }|j                  dt         d       d{   }ddd      d{    d}d}D ]\  }	|	d   }
t        |
t              r	 t        j                  |
      }n|
xs i }|j                  d	d
      }|dk(  r|dz  }R|dk(  sX|dz  }^ D 	ci c]  }	|	d   t	        |	d          }}	dkD  r||z  nd}|dkD  r||z  nd}|t        k\  |t        |||dS 7 M7 77 7 7 7 # 1 d{  7  sw Y   xY w# t        j                  t        f$ r i }Y w xY wc c}	w w)a  Check whether there are enough pairs for DPO training.

        Returns
        -------
        dict with keys:
            ready              -- bool: True when pair_count >= 100
            pair_count         -- int
            minimum_required   -- int (always 100)
            pct_positive       -- float (0.0-1.0)
            pct_negative       -- float (0.0-1.0)
            annotator_dist     -- dict mapping annotator_id -> count
        Nr   r   zSELECT metadata FROM z*SELECT annotator_id, COUNT(*) AS cnt FROM z GROUP BY annotator_idr   rv   rq   rm   ry   r   NEGATIVErt   g        )ready
pair_countminimum_requiredpct_positivepct_negativeannotator_dist)rb   r   r   r   r   r   
isinstancer   r   r   r   r   rP   r   )rV   r   r   	total_rowtotalsignal_rowsannotator_rows	pos_count	neg_countr   meta_rawmetasigr   r   r   s                   r:   check_dpo_readinessz%FeedbackCollector.check_dpo_readinessY  s     &&(( <<> 	 	T"mm.{m< I Yu-.E !%

'}5! K $(::<[M J( )$ N	 	 		 	C:H(C(::h/D  ~2((8R(Cj Q	
"Q		  =K*
58CSZ0*
 *
 /4ai	E)S.3ai	E)S 00 2((,
 	
U )		 	 	 	, ,,i8 D*
s   F-E#F-E&F-E2E)-E2=E,> E2E.E2#F-.E0/"F-F')F-F-F(5/F-&F-)E2,E2.E20F-2F8E;9F F-F%"F-$F%%F-)NN)rW   Optional[str]rX   r   returnNone)r   r   )r   zOptional[Any]r5   )
rp   r
   ro   r   rq   r   r|   Optional[Dict[str, Any]]r   zOptional[PreferencePair])r   r   r   r   )
rp   r
   ro   r   r>   r   rn   r   r   bool)rp   r
   ro   r   r   r   )r   r   )2   )r   r   r   zList[Dict[str, Any]])r   zDict[str, Any])__name__
__module____qualname____doc__r   rJ   __annotations__r[   rb   rk   r   r{   r   rx   r   r   r   r6   rB   r:   rI   rI   `   s     "J! !%#'   !  
	 H	( -1<< < 	<
 *< 
"<|4  	
  
:  
"	8#+,>
rB   rI   )rI   rG   r   r   r   r   r   )r>   r   r   r   )"r   
__future__r   r   loggingrN   r   r   typingr   r   r   r	   uuidr
   	contractsr   r   r   	getLoggerrh   r   r   r   r   r   r   r   r   r2   rA   rG   rI   __all__r6   rB   r:   <module>r      s   ( #   	 ' , ,  P P			.	/ )S (0  0 & # & C - C - S  |s^	' ^  FZD, ( .w
 w
|	rB   