
    1iP.                     6   d dl mZ d dlmZ d dlmZ d dlmZmZmZm	Z	m
Z
mZ d dlmZ d dlmZmZ d dlmZmZmZ d dlmZmZmZmZ d d	lmZmZmZmZmZ d d
l m!Z! d dl"m#Z# d dl$m%Z%m&Z& d dl'm(Z) d dl*m+Z+m,Z, d dl-m.Z. d dl/m0Z0  G d de,      Z1 G d de+e1      Z2y)    )abstractmethod)iscoroutinefunction)datetime)Any	AwaitableCallableListOptionalUnion)RedisCluster)PipelinePubSub)AsyncDatabaseDatabase	Databases)AsyncActiveDatabaseChanged&CloseConnectionOnActiveDatabaseChangedRegisterCommandFailure"ResubscribeOnActiveDatabaseChanged)DEFAULT_FAILOVER_ATTEMPTSDEFAULT_FAILOVER_DELAYAsyncFailoverStrategyDefaultFailoverStrategyExecutorFailoverStrategyExecutor)AsyncFailureDetector)Retry)AsyncOnCommandsFailEventEventDispatcherInterface)State)BaseCommandExecutorCommandExecutor)DEFAULT_AUTO_FALLBACK_INTERVAL)KeyTc                      e Zd Zeedefd              Zeedee   fd              Z	ededdfd       Z
eedee   fd              Zededdfd	       Zeedee   fd
              Zej"                  ededdfd              Zeedefd              Zeedefd              Zed        Zed        Zedefd       Zedeegdf   fd       Zedefd       Zedede fd       Z!y)AsyncCommandExecutorreturnc                      y)zReturns a list of databases.N selfs    p/mnt/e/genesis-system/.venvs/voice-bridge/lib/python3.12/site-packages/redis/asyncio/multidb/command_executor.py	databaseszAsyncCommandExecutor.databases         	    c                      y)z$Returns a list of failure detectors.Nr(   r)   s    r+   failure_detectorsz&AsyncCommandExecutor.failure_detectors&   r-   r.   failure_detectorNc                      y)z=Adds a new failure detector to the list of failure detectors.Nr(   r*   r1   s     r+   add_failure_detectorz)AsyncCommandExecutor.add_failure_detector,   s     	r.   c                      y)z"Returns currently active database.Nr(   r)   s    r+   active_databasez$AsyncCommandExecutor.active_database1   r-   r.   databasec                    K   yw)z#Sets the currently active database.Nr(   )r*   r7   s     r+   set_active_databasez(AsyncCommandExecutor.set_active_database7         	   c                      y)z Returns currently active pubsub.Nr(   r)   s    r+   active_pubsubz"AsyncCommandExecutor.active_pubsub<   r-   r.   pubsubc                      y)zSets currently active pubsub.Nr(   r*   r>   s     r+   r=   z"AsyncCommandExecutor.active_pubsubB   r-   r.   c                      y)z#Returns failover strategy executor.Nr(   r)   s    r+   failover_strategy_executorz/AsyncCommandExecutor.failover_strategy_executorH   r-   r.   c                      y)zReturns command retry object.Nr(   r)   s    r+   command_retryz"AsyncCommandExecutor.command_retryN   r-   r.   c                    K   yw)z:Initializes a PubSub object on a currently active databaseNr(   r*   kwargss     r+   r>   zAsyncCommandExecutor.pubsubT   r:   r;   c                    K   yw)z*Executes a command and returns the result.Nr(   )r*   argsoptionss      r+   execute_commandz$AsyncCommandExecutor.execute_commandY   r:   r;   command_stackc                    K   yw)z)Executes a stack of commands in pipeline.Nr(   )r*   rL   s     r+   execute_pipelinez%AsyncCommandExecutor.execute_pipeline^   r:   r;   transactionc                    K   yw)z1Executes a transaction block wrapped in callback.Nr(   )r*   rO   watchesrJ   s       r+   execute_transactionz(AsyncCommandExecutor.execute_transactionc   s     
 	r;   method_namec                    K   yw)z*Executes a given method on active pub/sub.Nr(   )r*   rS   rI   rG   s       r+   execute_pubsub_methodz*AsyncCommandExecutor.execute_pubsub_methodj   r:   r;   
sleep_timec                    K   yw)z!Executes pub/sub run in a thread.Nr(   )r*   rV   rG   s      r+   execute_pubsub_runz'AsyncCommandExecutor.execute_pubsub_runo   r:   r;   )"__name__
__module____qualname__propertyr   r   r,   r	   r   r0   r4   r
   r   r6   r9   r   r=   setterr   rB   r   rD   r>   rK   tuplerN   r   r   rR   strrU   floatr   rX   r(   r.   r+   r%   r%      s    9    4(<#=    5I d   -!8    - D   x/    F t    ,D    u        E   #XJ$45  s   5 s  r.   r%   c                       e Zd Zeeefdee   dede	de
dedededef fd	Zed
efd       Zed
ee   fd       Zded
dfdZed
ee   fd       Zded
dfdZed
ee   fd       Zej2                  ded
dfd       Zed
efd       Zed
e	fd       Zd Zd ZdefdZ ddddde!dge"e#e$e#   f   f   d e%d!ee&   d"e'd#ee   f
d$Z(d%e&fd&Z)	 d1d'ed
e#fd(Z*	 d2d)e!d*efd+Z+d, Z,d- Z-d.efd/Z.d0 Z/ xZ0S )3DefaultCommandExecutorr0   r,   rD   failover_strategyevent_dispatcherfailover_attemptsfailover_delayauto_fallback_intervalc	                    t         
|   |       |D ]  }	|	j                  |         || _        || _        || _        t        |||      | _        || _        d| _	        d| _
        i | _        | j                          | j                          y)a  
        Initialize the DefaultCommandExecutor instance.

        Args:
            failure_detectors: List of failure detector instances to monitor database health
            databases: Collection of available databases to execute commands on
            command_retry: Retry policy for failed command execution
            failover_strategy: Strategy for handling database failover
            event_dispatcher: Interface for dispatching events
            failover_attempts: Number of failover attempts
            failover_delay: Delay between failover attempts
            auto_fallback_interval: Time interval in seconds between attempts to fall back to a primary database
        )command_executorN)super__init__set_command_executor
_databases_failure_detectors_command_retryr   _failover_strategy_executor_event_dispatcher_active_database_active_pubsub_active_pubsub_kwargs_setup_event_dispatcher_schedule_next_fallback)r*   r0   r,   rD   rc   rd   re   rf   rg   fd	__class__s             r+   rk   zDefaultCommandExecutor.__init__v   s    0 	/0# 	;B##T#:	; $"3++J0.,
( "24804%'"$$&$$&r.   r&   c                     | j                   S N)rm   r)   s    r+   r,   z DefaultCommandExecutor.databases   s    r.   c                     | j                   S rz   )rn   r)   s    r+   r0   z(DefaultCommandExecutor.failure_detectors   s    &&&r.   r1   Nc                 :    | j                   j                  |       y rz   )rn   appendr3   s     r+   r4   z+DefaultCommandExecutor.add_failure_detector   s    &&'78r.   c                     | j                   S rz   )rr   r)   s    r+   r6   z&DefaultCommandExecutor.active_database   s    $$$r.   r7   c                    K   | j                   }|| _         |I||urD| j                  j                  t        || j                   | fi | j                         d {    y y y 7 wrz   )rr   rq   dispatch_asyncr   rt   )r*   r7   
old_actives      r+   r9   z*DefaultCommandExecutor.set_active_database   sy     **
 (!j&@((77*)) 00	   'A!s   AA$A"A$c                     | j                   S rz   rs   r)   s    r+   r=   z$DefaultCommandExecutor.active_pubsub       """r.   r>   c                     || _         y rz   r   r@   s     r+   r=   z$DefaultCommandExecutor.active_pubsub   s
    $r.   c                     | j                   S rz   )rp   r)   s    r+   rB   z1DefaultCommandExecutor.failover_strategy_executor   s    ///r.   c                     | j                   S rz   )ro   r)   s    r+   rD   z$DefaultCommandExecutor.command_retry   r   r.   c                     | j                   bt        | j                  j                  t              rt        d       | j                  j                  j                  di || _         || _        y y )Nz(PubSub is not supported for RedisClusterr(   )rs   
isinstancerr   clientr   
ValueErrorr>   rt   rF   s     r+   r>   zDefaultCommandExecutor.pubsub   sa    &$//66E !KLL"E$"7"7">">"E"E"O"OD)/D& 'r.   c                 V    K    fd} j                  |       d {   S 7 w)Nc                     K    j                   j                  j                  i  d {   } j                         d {    | S 7 7 wrz   )rr   r   rK   _register_command_execution)responserI   rJ   r*   s    r+   callbackz8DefaultCommandExecutor.execute_command.<locals>.callback   s[     IT2299II  H 224888O	 9s!   *AAAAAA_execute_with_failure_detection)r*   rI   rJ   r   s   ``` r+   rK   z&DefaultCommandExecutor.execute_command   s'     	 99(DIIIIs   )')rL   c                 R    K    fd} j                  |       d {   S 7 w)Nc                  `  K   j                   j                  j                         4 d {   } D ]  \  }} | j                  |i |  | j	                          d {   }j                         d {    |cd d d       d {    S 7 d7 27 7 # 1 d {  7  sw Y   y xY wwrz   )rr   r   pipelinerK   executer   )pipecommandrJ   r   rL   r*   s       r+   r   z9DefaultCommandExecutor.execute_pipeline.<locals>.callback   s     ,,33<<>    $(5 >$GW(D(('=W=> "&/66}EEE      0E       sh   )B.BB.0B B!B9B:B?B.BB.BBB.B+B" B+'B.r   )r*   rL   r   s   `` r+   rN   z'DefaultCommandExecutor.execute_pipeline   s'     	  99(MRRRRs   '%'F
shard_hintvalue_from_callablewatch_delayfuncr   rQ   r   r   r   c                `    K    fd} j                  |       d {   S 7 w)Nc                     K    j                   j                  j                  gd d {   } j                  d       d {    | S 7 7 w)Nr   r(   )rr   r   rO   r   )r   r   r*   r   r   r   rQ   s    r+   r   z<DefaultCommandExecutor.execute_transaction.<locals>.callback   sk     ET2299EE &$7' H 222666O 7s!   0AAAAAAr   )r*   r   r   r   r   rQ   r   s   `````` r+   rR   z*DefaultCommandExecutor.execute_transaction   s*     		 		 99(CCCCs   .,.rS   c                 \    K    fd}  j                   |g  d {   S 7 w)Nc                     K   t        j                        } t        |       r | i  d {   }n | i }j                         d {    |S 7 (7 wrz   )getattrr=   r   r   )methodr   rI   rG   rS   r*   s     r+   r   z>DefaultCommandExecutor.execute_pubsub_method.<locals>.callback  sf     T//=F"6*!'!8!88!4262224888O 9 9s!   -AA!AAAAr   )r*   rS   rI   rG   r   s   ```` r+   rU   z,DefaultCommandExecutor.execute_pubsub_method   s,     	 :T99(JTJJJJs   ,*,rV   c                 X    K    fd} j                  |       d {   S 7 w)Nc                  \   K   j                   j                          d {   S 7 w)N)poll_timeoutexception_handlerr>   )rs   run)r   r>   r*   rV   s   r+   r   z;DefaultCommandExecutor.execute_pubsub_run.<locals>.callback  s8     ,,00'"3 1    s   ",*,r   )r*   rV   r   r>   r   s   ```` r+   rX   z)DefaultCommandExecutor.execute_pubsub_run  s%     	 99(CCCCs   *(*r   cmdsc                 x    K    fd j                   j                  fd fd       d{   S 7 w)zO
        Execute a commands execution callback with failure detection.
        c                  b   K   j                          d {              d {   S 7 7 wrz   )_check_active_database)r   r*   s   r+   wrapperzGDefaultCommandExecutor._execute_with_failure_detection.<locals>.wrapper   s-     --///!## 0#s   /+/-//c                               S rz   r(   )r   s   r+   <lambda>zHDefaultCommandExecutor._execute_with_failure_detection.<locals>.<lambda>&  s	    GI r.   c                 *     j                   | g S rz   )_on_command_fail)errorr   r*   s    r+   r   zHDefaultCommandExecutor._execute_with_failure_detection.<locals>.<lambda>'  s    /$//== r.   N)ro   call_with_retry)r*   r   r   r   s   ```@r+   r   z6DefaultCommandExecutor._execute_with_failure_detection  s7     	$
 ((88=
 
 	
 
s   -:8:c                   K   | j                   a| j                   j                  j                  t        j                  k7  s0| j
                  dkD  rl| j                  t        j                         k  rJ| j                  | j                  j                          d{          d{    | j                          yyy7 #7 w)zB
        Checks if active a database needs to be updated.
        Nr   )rr   circuitstateCBStateCLOSED_auto_fallback_interval_next_fallback_attemptr   nowr9   rp   r   rv   r)   s    r+   r   z-DefaultCommandExecutor._check_active_database*  s     
 !!)$$,,22gnnD,,q0//8<<>A **66>>@@   ((* B 1
 As$   BC B<C $B>%C >C c                 j   K   | j                   j                  t        ||             d {    y 7 wrz   )rq   r   r   )r*   r   rI   s      r+   r   z'DefaultCommandExecutor._on_command_fail;  s.     $$33$T51
 	
 	
s   )313cmdc                 d   K   | j                   D ]  }|j                  |       d {     y 7 wrz   )rn   register_command_execution)r*   r   detectors      r+   r   z2DefaultCommandExecutor._register_command_execution@  s2     // 	;H55c:::	;:s   $0.0c                     t        | j                        }t               }t               }| j                  j                  t        |gt        ||gi       y)z0
        Registers necessary listeners.
        N)r   rn   r   r   rq   register_listenersr   r   )r*   failure_listenerresubscribe_listenerclose_connection_listeners       r+   ru   z.DefaultCommandExecutor._setup_event_dispatcherD  sW     2$2I2IJAC$J$L!11(+;*<*-(-	
r.   )NN)r(   )1rY   rZ   r[   r   r   r"   r	   r   r   r   r   r   intr`   rk   r\   r,   r0   r4   r
   r   r6   r9   r   r=   r]   r   rB   rD   r>   rK   r^   rN   r   r   r   r   r#   r_   boolrR   rU   rX   r   r   r   r   ru   __classcell__)rx   s   @r+   rb   rb   u   sM    "; 6(F(' 45(' (' 	('
 1(' 3(' (' (' !&('T 9   '4(<#= ' '95I 9d 9 %-!8 % %- D  #x/ # # %F %t % % 0,D 0 0 #u # #0J
SE 
S  %)$)'+D
|U3	#+>%??@D D SM	D
 "D e_D*Ks K AE
D
D	
D 13
 
(-
"+"

;U ;
r.   rb   N)3abcr   asyncior   r   typingr   r   r   r	   r
   r   redis.asyncior   redis.asyncio.clientr   r   redis.asyncio.multidb.databaser   r   r   redis.asyncio.multidb.eventr   r   r   r   redis.asyncio.multidb.failoverr   r   r   r   r   &redis.asyncio.multidb.failure_detectorr   redis.asyncio.retryr   redis.eventr   r   redis.multidb.circuitr   r   redis.multidb.command_executorr    r!   redis.multidb.configr"   redis.typingr#   r%   rb   r(   r.   r+   <module>r      sm     '  B B & 1 M M   H % J 2 O ? S? Sl^
02F ^
r.   