
    Kif0                     N   d dl mZ d dlmZ d dlmZ d dlmZmZmZm	Z	m
Z
mZ d dlmZ d dlmZmZ d dlmZmZmZ d dlmZmZmZmZ d d	lmZmZmZmZmZ d d
l m!Z! d dl"m#Z# d dl$m%Z% d dl&m'Z'm(Z( d dl)m*Z+ d dl,m-Z-m.Z. d dl/m0Z0 d dl1m2Z2 d dl3m4Z4  G d de.      Z5 G d de-e5      Z6y)    )abstractmethod)iscoroutinefunction)datetime)Any	AwaitableCallableListOptionalUnion)RedisCluster)PipelinePubSub)AsyncDatabaseDatabase	Databases)AsyncActiveDatabaseChanged&CloseConnectionOnActiveDatabaseChangedRegisterCommandFailure"ResubscribeOnActiveDatabaseChanged)DEFAULT_FAILOVER_ATTEMPTSDEFAULT_FAILOVER_DELAYAsyncFailoverStrategyDefaultFailoverStrategyExecutorFailoverStrategyExecutor)AsyncFailureDetector)record_geo_failover)Retry)AsyncOnCommandsFailEventEventDispatcherInterface)State)BaseCommandExecutorCommandExecutor)DEFAULT_AUTO_FALLBACK_INTERVAL)GeoFailoverReason)KeyTc                      e Zd Zeedefd              Zeedee   fd              Z	ededdfd       Z
eedee   fd              Zeded	eddfd
       Zeedee   fd              Zej$                  ededdfd              Zeedefd              Zeedefd              Zed        Zed        Zedefd       Zedeegdf   fd       Zedefd       Zede de!fd       Z"y)AsyncCommandExecutorreturnc                      y)zReturns a list of databases.N selfs    r/home/jay/workspace/scripts/.codegraph-venv/lib/python3.12/site-packages/redis/asyncio/multidb/command_executor.py	databaseszAsyncCommandExecutor.databases"        	    c                      y)z$Returns a list of failure detectors.Nr*   r+   s    r-   failure_detectorsz&AsyncCommandExecutor.failure_detectors(   r/   r0   failure_detectorNc                      y)z=Adds a new failure detector to the list of failure detectors.Nr*   r,   r3   s     r-   add_failure_detectorz)AsyncCommandExecutor.add_failure_detector.   s     	r0   c                      y)z"Returns currently active database.Nr*   r+   s    r-   active_databasez$AsyncCommandExecutor.active_database3   r/   r0   databasereasonc                    K   yw)zSets the currently active database.

        Args:
            database: The new active database.
            reason: The reason for the failover.
        Nr*   )r,   r9   r:   s      r-   set_active_databasez(AsyncCommandExecutor.set_active_database9   s      	   c                      y)z Returns currently active pubsub.Nr*   r+   s    r-   active_pubsubz"AsyncCommandExecutor.active_pubsubE   r/   r0   pubsubc                      y)zSets currently active pubsub.Nr*   r,   r@   s     r-   r?   z"AsyncCommandExecutor.active_pubsubK   r/   r0   c                      y)z#Returns failover strategy executor.Nr*   r+   s    r-   failover_strategy_executorz/AsyncCommandExecutor.failover_strategy_executorQ   r/   r0   c                      y)zReturns command retry object.Nr*   r+   s    r-   command_retryz"AsyncCommandExecutor.command_retryW   r/   r0   c                    K   yw)z:Initializes a PubSub object on a currently active databaseNr*   r,   kwargss     r-   r@   zAsyncCommandExecutor.pubsub]         	r=   c                    K   yw)z*Executes a command and returns the result.Nr*   )r,   argsoptionss      r-   execute_commandz$AsyncCommandExecutor.execute_commandb   rJ   r=   command_stackc                    K   yw)z)Executes a stack of commands in pipeline.Nr*   )r,   rO   s     r-   execute_pipelinez%AsyncCommandExecutor.execute_pipelineg   rJ   r=   transactionc                    K   yw)z1Executes a transaction block wrapped in callback.Nr*   )r,   rR   watchesrM   s       r-   execute_transactionz(AsyncCommandExecutor.execute_transactionl   s     
 	r=   method_namec                    K   yw)z*Executes a given method on active pub/sub.Nr*   )r,   rV   rL   rI   s       r-   execute_pubsub_methodz*AsyncCommandExecutor.execute_pubsub_methods   rJ   r=   
sleep_timec                    K   yw)z!Executes pub/sub run in a thread.Nr*   )r,   rY   rI   s      r-   execute_pubsub_runz'AsyncCommandExecutor.execute_pubsub_runx   rJ   r=   )#__name__
__module____qualname__propertyr   r   r.   r	   r   r2   r6   r
   r   r8   r$   r<   r   r?   setterr   rD   r   rF   r@   rN   tuplerQ   r   r   rU   strrX   floatr   r[   r*   r0   r-   r'   r'   !   s
   9    4(<#=    5I d   -!8    	%	/@			 	 x/    F t    ,D    u        E   #XJ$45  s   5 s  r0   r'   c                       e Zd Zeeefdee   dede	de
dedededef fd	Zed
efd       Zed
ee   fd       Zded
dfdZed
ee   fd       Zdeded
dfdZed
ee   fd       Zej4                  ded
dfd       Zed
efd       Zed
e	fd       Zd Zd Zde fdZ!ddddde"d ge#e$e%e$   f   f   d!e&d"ee'   d#e(d$ee   f
d%Z)d&e'fd'Z*	 d2d(ed
e$fd)Z+	 d3d*e"d+e fd,Z,d- Z-d. Z.d/e fd0Z/d1 Z0 xZ1S )4DefaultCommandExecutorr2   r.   rF   failover_strategyevent_dispatcherfailover_attemptsfailover_delayauto_fallback_intervalc	                    t         
|   |       |D ]  }	|	j                  |         || _        || _        || _        t        |||      | _        || _        d| _	        d| _
        i | _        | j                          | j                          y)a  
        Initialize the DefaultCommandExecutor instance.

        Args:
            failure_detectors: List of failure detector instances to monitor database health
            databases: Collection of available databases to execute commands on
            command_retry: Retry policy for failed command execution
            failover_strategy: Strategy for handling database failover
            event_dispatcher: Interface for dispatching events
            failover_attempts: Number of failover attempts
            failover_delay: Delay between failover attempts
            auto_fallback_interval: Time interval in seconds between attempts to fall back to a primary database
        )command_executorN)super__init__set_command_executor
_databases_failure_detectors_command_retryr   _failover_strategy_executor_event_dispatcher_active_database_active_pubsub_active_pubsub_kwargs_setup_event_dispatcher_schedule_next_fallback)r,   r2   r.   rF   rf   rg   rh   ri   rj   fd	__class__s             r-   rn   zDefaultCommandExecutor.__init__   s    0 	/0# 	;B##T#:	; $"3++J0.,
( "24804%'"$$&$$&r0   r(   c                     | j                   S N)rp   r+   s    r-   r.   z DefaultCommandExecutor.databases   s    r0   c                     | j                   S r}   )rq   r+   s    r-   r2   z(DefaultCommandExecutor.failure_detectors   s    &&&r0   r3   Nc                 :    | j                   j                  |       y r}   )rq   appendr5   s     r-   r6   z+DefaultCommandExecutor.add_failure_detector   s    &&'78r0   c                     | j                   S r}   )ru   r+   s    r-   r8   z&DefaultCommandExecutor.active_database   s    $$$r0   r9   r:   c                    K   | j                   }|| _         |_||urZt        |||       d {    | j                  j                  t	        || j                   | fi | j
                         d {    y y y 7 J7 	w)N)	fail_fromfail_tor:   )ru   r   rt   dispatch_asyncr   rw   )r,   r9   r:   
old_actives       r-   r<   z*DefaultCommandExecutor.set_active_database   s      **
 (!j&@%$   
 ((77*)) 00	   'A!
s"   +A<A8AA<0A:1A<:A<c                     | j                   S r}   rv   r+   s    r-   r?   z$DefaultCommandExecutor.active_pubsub       """r0   r@   c                     || _         y r}   r   rB   s     r-   r?   z$DefaultCommandExecutor.active_pubsub   s
    $r0   c                     | j                   S r}   )rs   r+   s    r-   rD   z1DefaultCommandExecutor.failover_strategy_executor   s    ///r0   c                     | j                   S r}   )rr   r+   s    r-   rF   z$DefaultCommandExecutor.command_retry   r   r0   c                     | j                   bt        | j                  j                  t              rt        d       | j                  j                  j                  di || _         || _        y y )Nz(PubSub is not supported for RedisClusterr*   )rv   
isinstanceru   clientr   
ValueErrorr@   rw   rH   s     r-   r@   zDefaultCommandExecutor.pubsub   sa    &$//66E !KLL"E$"7"7">">"E"E"O"OD)/D& 'r0   c                 V    K    fd} j                  |       d {   S 7 w)Nc                     K    j                   j                  j                  i  d {   } j                         d {    | S 7 7 wr}   )ru   r   rN   _register_command_execution)responserL   rM   r,   s    r-   callbackz8DefaultCommandExecutor.execute_command.<locals>.callback   s[     IT2299II  H 224888O	 9s!   *AAAAAA_execute_with_failure_detection)r,   rL   rM   r   s   ``` r-   rN   z&DefaultCommandExecutor.execute_command   s'     	 99(DIIIIs   )')rO   c                 R    K    fd} j                  |       d {   S 7 w)Nc                  `  K   j                   j                  j                         4 d {   } D ]  \  }} | j                  |i |  | j	                          d {   }j                         d {    |cd d d       d {    S 7 d7 27 7 # 1 d {  7  sw Y   y xY wwr}   )ru   r   pipelinerN   executer   )pipecommandrM   r   rO   r,   s       r-   r   z9DefaultCommandExecutor.execute_pipeline.<locals>.callback   s     ,,33<<>    $(5 >$GW(D(('=W=> "&/66}EEE      0E       sh   )B.BB.0B B!B9B:B?B.BB.BBB.B+B" B+'B.r   )r,   rO   r   s   `` r-   rQ   z'DefaultCommandExecutor.execute_pipeline   s'     	  99(MRRRRs   '%'F
shard_hintvalue_from_callablewatch_delayfuncr   rT   r   r   r   c                `    K    fd} j                  |       d {   S 7 w)Nc                     K    j                   j                  j                  gd d {   } j                  d       d {    | S 7 7 w)Nr   r*   )ru   r   rR   r   )r   r   r,   r   r   r   rT   s    r-   r   z<DefaultCommandExecutor.execute_transaction.<locals>.callback  sk     ET2299EE &$7' H 222666O 7s!   0AAAAAAr   )r,   r   r   r   r   rT   r   s   `````` r-   rU   z*DefaultCommandExecutor.execute_transaction   s*     		 		 99(CCCCs   .,.rV   c                 \    K    fd}  j                   |g  d {   S 7 w)Nc                     K   t        j                        } t        |       r | i  d {   }n | i }j                         d {    |S 7 (7 wr}   )getattrr?   r   r   )methodr   rL   rI   rV   r,   s     r-   r   z>DefaultCommandExecutor.execute_pubsub_method.<locals>.callback  sf     T//=F"6*!'!8!88!4262224888O 9 9s!   -AA!AAAAr   )r,   rV   rL   rI   r   s   ```` r-   rX   z,DefaultCommandExecutor.execute_pubsub_method  s,     	 :T99(JTJJJJs   ,*,rY   c                 X    K    fd} j                  |       d {   S 7 w)Nc                  \   K   j                   j                          d {   S 7 w)N)poll_timeoutexception_handlerr@   )rv   run)r   r@   r,   rY   s   r-   r   z;DefaultCommandExecutor.execute_pubsub_run.<locals>.callback   s8     ,,00'"3 1    s   ",*,r   )r,   rY   r   r@   r   s   ```` r-   r[   z)DefaultCommandExecutor.execute_pubsub_run  s%     	 99(CCCCs   *(*r   cmdsc                 x    K    fd j                   j                  fd fd       d{   S 7 w)zO
        Execute a commands execution callback with failure detection.
        c                  b   K   j                          d {              d {   S 7 7 wr}   )_check_active_database)r   r,   s   r-   wrapperzGDefaultCommandExecutor._execute_with_failure_detection.<locals>.wrapper0  s-     --///!## 0#s   /+/-//c                               S r}   r*   )r   s   r-   <lambda>zHDefaultCommandExecutor._execute_with_failure_detection.<locals>.<lambda>6  s	    GI r0   c                 *     j                   | g S r}   )_on_command_fail)errorr   r,   s    r-   r   zHDefaultCommandExecutor._execute_with_failure_detection.<locals>.<lambda>7  s    /$//== r0   N)rr   call_with_retry)r,   r   r   r   s   ```@r-   r   z6DefaultCommandExecutor._execute_with_failure_detection)  s7     	$
 ((88=
 
 	
 
s   -:8:c                   K   | j                   a| j                   j                  j                  t        j                  k7  s0| j
                  dkD  r{| j                  t        j                         k  rY| j                  | j                  j                          d{   t        j                         d{    | j                          yyy7 27 w)zB
        Checks if active a database needs to be updated.
        Nr   )ru   circuitstateCBStateCLOSED_auto_fallback_interval_next_fallback_attemptr   nowr<   rs   r   r$   	AUTOMATICry   r+   s    r-   r   z-DefaultCommandExecutor._check_active_database:  s     
 !!)$$,,22gnnD,,q0//8<<>A **66>>@@!++   ((* B 1
 As$   BCCC3C4CCc                 j   K   | j                   j                  t        ||             d {    y 7 wr}   )rt   r   r   )r,   r   rL   s      r-   r   z'DefaultCommandExecutor._on_command_failL  s.     $$33$T51
 	
 	
s   )313cmdc                 d   K   | j                   D ]  }|j                  |       d {     y 7 wr}   )rq   register_command_execution)r,   r   detectors      r-   r   z2DefaultCommandExecutor._register_command_executionQ  s2     // 	;H55c:::	;:s   $0.0c                     t        | j                        }t               }t               }| j                  j                  t        |gt        ||gi       y)z0
        Registers necessary listeners.
        N)r   rq   r   r   rt   register_listenersr   r   )r,   failure_listenerresubscribe_listenerclose_connection_listeners       r-   rx   z.DefaultCommandExecutor._setup_event_dispatcherU  sW     2$2I2IJAC$J$L!11(+;*<*-(-	
r0   )NN)r*   )2r\   r]   r^   r   r   r#   r	   r   r   r   r   r   intrc   rn   r_   r.   r2   r6   r
   r   r8   r$   r<   r   r?   r`   r   rD   rF   r@   rN   ra   rQ   r   r   r   r   r%   rb   boolrU   rX   r[   r   r   r   r   rx   __classcell__)r{   s   @r-   re   re   ~   sW    "; 6(F(' 45(' (' 	('
 1(' 3(' (' (' !&('T 9   '4(<#= ' '95I 9d 9 %-!8 % %%/@	* #x/ # # %F %t % % 0,D 0 0 #u # #0J
SE 
S  %)$)'+D
|U3	#+>%??@D D SM	D
 "D e_D*Ks K AE
D
D	
D 13
 
(-
"+$

;U ;
r0   re   N)7abcr   asyncior   r   typingr   r   r   r	   r
   r   redis.asyncior   redis.asyncio.clientr   r   redis.asyncio.multidb.databaser   r   r   redis.asyncio.multidb.eventr   r   r   r   redis.asyncio.multidb.failoverr   r   r   r   r   &redis.asyncio.multidb.failure_detectorr   $redis.asyncio.observability.recorderr   redis.asyncio.retryr   redis.eventr   r   redis.multidb.circuitr    r   redis.multidb.command_executorr!   r"   redis.multidb.configr#   redis.observability.attributesr$   redis.typingr%   r'   re   r*   r0   r-   <module>r      ss     '  B B & 1 M M   H D % J 2 O ? < Z? Zzf
02F f
r0   