
    ՞i                    D   d Z ddlmZ ddlZddlmZ  ej                  e      Z	 ddl	Z	ddl	m
Z dZ ed	d
dd      dd       Z edddd      dd       Z edddd      dd       Z ed	d
dd      d d       Z edddd      d!d       Zy# e$ r dZd ZY lw xY w)"u  
Genesis Task Queue — Dramatiq actor definitions.

Each actor maps to a priority queue and has explicit retry + backoff settings.
Workers are imported by the Dramatiq CLI worker process (``dramatiq core.task_queue.workers``).

Queue assignments
-----------------
critical  voice/realtime webhooks — tight SLA, max_retries=1
high      customer-facing operations — max_retries=5, fast back-off
default   background processing — max_retries=3, standard back-off
low       analytics / reporting — max_retries=5, long back-off (runs off-peak)

Backoff values are in milliseconds (Dramatiq convention).

When dramatiq is not installed the module still imports cleanly. Each actor
function gets a no-op ``.send()`` and ``.send_with_options()`` stub so call
sites don't need to guard against ImportError.

# VERIFICATION_STAMP
# Story: M6.02 — workers.py — Dramatiq actor definitions
# Verified By: parallel-builder
# Verified At: 2026-02-25T00:00:00Z
# Tests: 8/8
# Coverage: 100%
    )annotationsN)Any)actorTFc                     dd}|S )z?No-op ``@actor`` decorator used when dramatiq is not installed.c                8     d fd _         fd _         S )Nc                 F    t         j                  dj                         y )Nub   dramatiq not installed — task '%s' was not dispatched. Install with: pip install dramatiq[redis])loggerwarning__name__)argskwargsfns     0/mnt/e/genesis-system/core/task_queue/workers.py
_noop_sendz._actor.<locals>._decorator.<locals>._noop_send1   s    @KK    c                      | i |S )N )akr   s     r   <lambda>z,_actor.<locals>._decorator.<locals>.<lambda>9   s    :q3FA3F r   )r   r   r   r   returnNone)sendsend_with_options)r   r   s   `@r   
_decoratorz_actor.<locals>._decorator0   s     !BG#FB Ir   )r   r   r   r   r   )_kwargsr   s     r   _actorr   .   s    
	 r   default   i  i`  )
queue_namemax_retriesmin_backoffmax_backoffc                    t         j                  d|        ddl}ddlm}  |       }|j                  |j                                y)ac  
    Run a nightly epoch processing cycle.

    Delegates to ``NightlyEpochRunner.run_epoch()`` which aggregates the past
    7 days of royal_conversations, distils axioms via Gemini Pro, and persists
    results to Qdrant + KG jsonl files.

    Parameters
    ----------
    epoch_id : str
        Identifier for the epoch run (e.g. ``"2026-W09"``).
    z"Processing epoch task: epoch_id=%sr   N)NightlyEpochRunner)r	   infoasynciocore.epoch.nightly_epoch_runnerr%   run	run_epoch)epoch_idr'   r%   runners       r   process_epoch_taskr-   D   s6     KK4h?B!FKK  "#r   high   i  i0u  c                2    t         j                  d| |       y)aB  
    Dispatch a notification via the configured channel.

    Parameters
    ----------
    notification_type : str
        One of ``"email"``, ``"sms"``, ``"webhook"``.
    recipient : str
        Destination address or phone number.
    data : dict
        Notification payload (subject, body, template vars, etc.).
    zSending %s notification to %sNr	   r&   )notification_type	recipientdatas      r   send_notificationr5   Z   s     KK'r   critical   d   c                z    | j                  d      xs | j                  dd      }t        j                  d|       y)ay  
    Handle an incoming Telnyx voice webhook with minimal latency.

    Uses ``queue_name="critical"`` so dedicated workers drain this queue before
    any other. ``max_retries=1`` avoids double-processing a call event.

    Parameters
    ----------
    payload : dict
        Telnyx webhook JSON body.  Must contain ``call_control_id`` or
        ``call_id`` at minimum.
    call_control_idcall_idunknownz$Processing voice webhook: call_id=%sN)getr	   r&   )payloadr;   s     r   process_voice_webhookr?   p   s2     kk+,QIy0QG
KK6@r   c                2    t         j                  d| |       y)a  
    Write an entity update to the Genesis Knowledge Graph.

    Parameters
    ----------
    entity_type : str
        KG entity category (e.g. ``"conversation"``, ``"axiom"``).
    entity_id : str
        Unique entity identifier.
    data : dict
        Entity fields to upsert.
    z&KG update: entity_type=%s entity_id=%sNr1   )entity_type	entity_idr4   s      r   update_knowledge_graphrC      s     KK8+yQr   lowi  i c                2    t         j                  d| |       y)a  
    Generate an analytics report during off-peak hours.

    Parameters
    ----------
    report_type : str
        Report category (e.g. ``"weekly_revenue"``, ``"epoch_summary"``).
    period : str
        Reporting period string (e.g. ``"2026-W09"``).
    z5Generating analytics report: report_type=%s period=%sNr1   )report_typeperiods     r   generate_analytics_reportrH      s     KK?r   )r+   strr   r   )r2   rI   r3   rI   r4   dictr   r   )r>   rJ   r   r   )rA   rI   rB   rI   r4   rJ   r   r   )rF   rI   rG   rI   r   r   )__doc__
__future__r   loggingtypingr   	getLoggerr   r	   dramatiqr   r   DRAMATIQ_AVAILABLEImportErrorr-   r5   r?   rC   rH   r   r   r   <module>rS      s   4 #  			8	$
(6 9!FS$ T$* 6qcvN O* :1#5QA RA$ 9!FSR TR" 5aUP QW  s   B 
BB