
    Kir0                     N   d dl mZmZ d dlmZmZ d dlmZmZmZm	Z	m
Z
 d dlmZmZmZ d dlmZmZ d dlmZ d dlmZ d dlmZmZmZ d d	lmZmZmZmZ d d
l m!Z!m"Z"m#Z#m$Z$m%Z% d dl&m'Z' d dl(m)Z) d dl*m+Z+ d dl,m-Z-  G d de      Z. G d de.      Z/ G d de.      Z0 G d de0e/      Z1y)    )ABCabstractmethod)datetime	timedelta)AnyCallableListOptionalTuple)PipelinePubSubPubSubWorkerThread)EventDispatcherInterfaceOnCommandsFailEvent)State)DEFAULT_AUTO_FALLBACK_INTERVAL)Database	DatabasesSyncDatabase)ActiveDatabaseChanged&CloseConnectionOnActiveDatabaseChangedRegisterCommandFailure"ResubscribeOnActiveDatabaseChanged)DEFAULT_FAILOVER_ATTEMPTSDEFAULT_FAILOVER_DELAYDefaultFailoverStrategyExecutorFailoverStrategyFailoverStrategyExecutor)FailureDetector)GeoFailoverReason)record_geo_failover)Retryc                   d    e Zd Zeedefd              Zej                  ededdfd              Zy)CommandExecutorreturnc                      y)zReturns auto-fallback interval.N selfs    j/home/jay/workspace/scripts/.codegraph-venv/lib/python3.12/site-packages/redis/multidb/command_executor.pyauto_fallback_intervalz&CommandExecutor.auto_fallback_interval        	    r+   Nc                      y)zSets auto-fallback interval.Nr'   r)   r+   s     r*   r+   z&CommandExecutor.auto_fallback_interval$   r,   r-   )__name__
__module____qualname__propertyr   floatr+   setterr'   r-   r*   r$   r$      sS        ""U t   #r-   r$   c                   h    e Zd ZefdefdZedefd       Zej                  de	ddfd       ZddZ
y)	BaseCommandExecutorr+   c                     || _         |  y N_auto_fallback_intervalr/   s     r*   __init__zBaseCommandExecutor.__init__,   s     (>$r-   r%   c                     | j                   S r9   r:   r(   s    r*   r+   z*BaseCommandExecutor.auto_fallback_interval3   s    +++r-   Nc                     || _         y r9   r:   r/   s     r*   r+   z*BaseCommandExecutor.auto_fallback_interval7   s
    '=$r-   c                     | j                   dk  ry t        j                         t        | j                         z   | _        y )Nr   )seconds)r;   r   nowr   _next_fallback_attemptr(   s    r*   _schedule_next_fallbackz+BaseCommandExecutor._schedule_next_fallback;   s6    ''!+&.llny008
 '
#r-   )r%   N)r0   r1   r2   r   r4   r<   r3   r+   r5   intrC   r'   r-   r*   r7   r7   +   s]     )G. %. , , , "">S >T > #>
r-   r7   c                      e Zd Zeedefd              Zeedee   fd              Z	ededdfd       Z
eedee   fd              Zej                  edeeef   ddfd	              Zeedee   fd
              Zej                  ededdfd              Zeedefd              Zeedefd              Zed        Zed        Zedefd       Zedeegdf   fd       Zedefd       Z ede!de"fd       Z#y)SyncCommandExecutorr%   c                      y)zReturns a list of databases.Nr'   r(   s    r*   	databaseszSyncCommandExecutor.databasesE   r,   r-   c                      y)z$Returns a list of failure detectors.Nr'   r(   s    r*   failure_detectorsz%SyncCommandExecutor.failure_detectorsK   r,   r-   failure_detectorNc                      y)z=Adds a new failure detector to the list of failure detectors.Nr'   r)   rK   s     r*   add_failure_detectorz(SyncCommandExecutor.add_failure_detectorQ        	r-   c                      y)z"Returns currently active database.Nr'   r(   s    r*   active_databasez#SyncCommandExecutor.active_databaseV   r,   r-   valuec                      y)zSets the currently active database.

        Args:
            value: A tuple of (database, reason) where database is the new active
                   database and reason is the GeoFailoverReason for the change.
        Nr'   )r)   rR   s     r*   rQ   z#SyncCommandExecutor.active_database\   s     	r-   c                      y)z Returns currently active pubsub.Nr'   r(   s    r*   active_pubsubz!SyncCommandExecutor.active_pubsubg   r,   r-   pubsubc                      y)zSets currently active pubsub.Nr'   r)   rV   s     r*   rU   z!SyncCommandExecutor.active_pubsubm   r,   r-   c                      y)z#Returns failover strategy executor.Nr'   r(   s    r*   failover_strategy_executorz.SyncCommandExecutor.failover_strategy_executors   r,   r-   c                      y)zReturns command retry object.Nr'   r(   s    r*   command_retryz!SyncCommandExecutor.command_retryy   r,   r-   c                      y)z:Initializes a PubSub object on a currently active databaseNr'   )r)   kwargss     r*   rV   zSyncCommandExecutor.pubsub   rO   r-   c                      y)z*Executes a command and returns the result.Nr'   )r)   argsoptionss      r*   execute_commandz#SyncCommandExecutor.execute_command   rO   r-   command_stackc                      y)z)Executes a stack of commands in pipeline.Nr'   )r)   rc   s     r*   execute_pipelinez$SyncCommandExecutor.execute_pipeline   rO   r-   transactionc                      y)z1Executes a transaction block wrapped in callback.Nr'   )r)   rf   watchesra   s       r*   execute_transactionz'SyncCommandExecutor.execute_transaction   s    
 	r-   method_namec                      y)z*Executes a given method on active pub/sub.Nr'   )r)   rj   r`   r^   s       r*   execute_pubsub_methodz)SyncCommandExecutor.execute_pubsub_method   rO   r-   
sleep_timec                      y)z!Executes pub/sub run in a thread.Nr'   )r)   rm   r^   s      r*   execute_pubsub_runz&SyncCommandExecutor.execute_pubsub_run   rO   r-   )$r0   r1   r2   r3   r   r   rH   r	   r   rJ   rN   r
   r   rQ   r5   r   r   r    r   rU   r   rZ   r"   r\   rV   rb   tuplere   r   r   ri   strrl   r4   r   ro   r'   r-   r*   rF   rF   D   s   9    4#8    _    (!3    U<9J+J%K PT    x/    F t    ,D    u        e   #XJ$45     U   r-   rF   c                       e Zd Zeeefdee   dede	de
dedededef fd	Zed
efd       Zed
ee   fd       Zded
dfdZed
e	fd       Zed
ee   fd       Zej.                  deeef   d
dfd       Zed
ee   fd       Zej.                  ded
dfd       Zed
efd       Zd ZdefdZ de!e"gdf   fdZ#d Z$de%fdZ&d)d Z'd*d!e!d"efd#Z(d$ Z)d% Z*d&efd'Z+d( Z, xZ-S )+DefaultCommandExecutorrJ   rH   r\   failover_strategyevent_dispatcherfailover_attemptsfailover_delayr+   c	                    t         
|   |       |D ]  }	|	j                  |         || _        || _        || _        t        |||      | _        || _        d| _	        d| _
        i | _        | j                          | j                          y)a  
        Initialize the DefaultCommandExecutor instance.

        Args:
            failure_detectors: List of failure detector instances to monitor database health
            databases: Collection of available databases to execute commands on
            command_retry: Retry policy for failed command execution
            failover_strategy: Strategy for handling database failover
            event_dispatcher: Interface for dispatching events
            failover_attempts: Number of failover attempts
            failover_delay: Delay between failover attempts
            auto_fallback_interval: Time interval in seconds between attempts to fall back to a primary database
        )command_executorN)superr<   set_command_executor
_databases_failure_detectors_command_retryr   _failover_strategy_executor_event_dispatcher_active_database_active_pubsub_active_pubsub_kwargs_setup_event_dispatcherrC   )r)   rJ   rH   r\   rt   ru   rv   rw   r+   fd	__class__s             r*   r<   zDefaultCommandExecutor.__init__   s    0 	/0# 	;B##T#:	; $"3++J0.,
( "24804%'"$$&$$&r-   r%   c                     | j                   S r9   )r|   r(   s    r*   rH   z DefaultCommandExecutor.databases   s    r-   c                     | j                   S r9   )r}   r(   s    r*   rJ   z(DefaultCommandExecutor.failure_detectors   s    &&&r-   rK   Nc                 :    | j                   j                  |       y r9   )r}   appendrM   s     r*   rN   z+DefaultCommandExecutor.add_failure_detector   s    &&'78r-   c                     | j                   S r9   )r~   r(   s    r*   r\   z$DefaultCommandExecutor.command_retry       """r-   c                     | j                   S r9   )r   r(   s    r*   rQ   z&DefaultCommandExecutor.active_database   s    $$$r-   rR   c                     |\  }}| j                   }|| _         |O||urJt        |||       | j                  j                  t	        || j                   | fi | j
                         y y y )N)	fail_fromfail_toreason)r   r!   r   dispatchr   r   )r)   rR   databaser   
old_actives        r*   rQ   z&DefaultCommandExecutor.active_database   s     &**
 (!j&@$ 
 ""++%)) 00	 'A!r-   c                     | j                   S r9   r   r(   s    r*   rU   z$DefaultCommandExecutor.active_pubsub   r   r-   rV   c                     || _         y r9   r   rX   s     r*   rU   z$DefaultCommandExecutor.active_pubsub   s
    $r-   c                     | j                   S r9   )r   r(   s    r*   rZ   z1DefaultCommandExecutor.failover_strategy_executor   s    ///r-   c                 :      fd} j                  |      S )Nc                  v     j                   j                  j                  i } j                         | S r9   )r   clientrb   _register_command_execution)responser`   ra   r)   s    r*   callbackz8DefaultCommandExecutor.execute_command.<locals>.callback   s9    Ct,,33CCTUWUH,,T2Or-   _execute_with_failure_detection)r)   r`   ra   r   s   ``` r*   rb   z&DefaultCommandExecutor.execute_command   s    	
 33HdCCr-   rc   c                 6      fd} j                  |      S )Nc                      j                   j                  j                         5 } D ]  \  }} | j                  |i |  | j	                         }j                         |cd d d        S # 1 sw Y   y xY wr9   )r   r   pipelinerb   executer   )pipecommandra   r   rc   r)   s       r*   r   z9DefaultCommandExecutor.execute_pipeline.<locals>.callback  sy    &&--668  D(5 >$GW(D(('=W=>  <<>00?     s   ?A//A8r   )r)   rc   r   s   `` r*   re   z'DefaultCommandExecutor.execute_pipeline  s    	  33HmLLr-   rf   c                 <      fd} j                  |      S )Nc                  ~     j                   j                  j                  gi } j                  d       | S Nr'   )r   r   rf   r   )r   ra   r)   rf   rh   s    r*   r   z<DefaultCommandExecutor.execute_transaction.<locals>.callback  sI    ?t,,33??%)0H ,,R0Or-   r   )r)   rf   rh   ra   r   s   ```` r*   ri   z*DefaultCommandExecutor.execute_transaction  s    	 33H==r-   c                 4      fd} j                  |      S )Nc                      j                   2 j                  j                  j                  di  _          _        y r   )r   r   r   rV   r   )r^   r)   s   r*   r   z/DefaultCommandExecutor.pubsub.<locals>.callback   s>    ""*&Id&;&;&B&B&I&I&SF&S#-3*r-   r   )r)   r^   r   s   `` r*   rV   zDefaultCommandExecutor.pubsub  s    	 33H==r-   rj   c                 @      fd}  j                   |g S )Nc                  f    t        j                        }  | i }j                         |S r9   )getattrrU   r   )methodr   r`   r^   rj   r)   s     r*   r   z>DefaultCommandExecutor.execute_pubsub_method.<locals>.callback)  s7    T//=Ft.v.H,,T2Or-   r   )r)   rj   r`   r^   r   s   ```` r*   rl   z,DefaultCommandExecutor.execute_pubsub_method(  s"    	 4t33HDtDDr-   c                 8      fd} j                  |      S )Nc                  >     j                   j                  fi  S r9   )r   run_in_thread)r^   r)   rm   s   r*   r   z;DefaultCommandExecutor.execute_pubsub_run.<locals>.callback2  s!    44&&44ZJ6JJr-   r   )r)   rm   r^   r   s   ``` r*   ro   z)DefaultCommandExecutor.execute_pubsub_run1  s    	K 33H==r-   r   cmdsc                 \      fd j                   j                  fd fd      S )zO
        Execute a commands execution callback with failure detection.
        c                  2    j                                   S r9   )_check_active_database)r   r)   s   r*   wrapperzGDefaultCommandExecutor._execute_with_failure_detection.<locals>.wrapper<  s    ''):r-   c                               S r9   r'   )r   s   r*   <lambda>zHDefaultCommandExecutor._execute_with_failure_detection.<locals>.<lambda>B  s	    GI r-   c                 *     j                   | g S r9   )_on_command_fail)errorr   r)   s    r*   r   zHDefaultCommandExecutor._execute_with_failure_detection.<locals>.<lambda>C  s    /$//== r-   )r~   call_with_retry)r)   r   r   r   s   ```@r*   r   z6DefaultCommandExecutor._execute_with_failure_detection7  s)    
	
 ""22=
 	
r-   c                 N    | j                   j                  t        ||             y r9   )r   r   r   )r)   r   r`   s      r*   r   z'DefaultCommandExecutor._on_command_failF  s    ''(;D%(HIr-   c                 `   | j                   a| j                   j                  j                  t        j                  k7  s0| j
                  dkD  rb| j                  t        j                         k  r@| j                  j                         t        j                  f| _        | j                          yyy)zB
        Checks if active a database needs to be updated.
        Nr   )r   circuitstateCBStateCLOSEDr;   rB   r   rA   r   r   r    	AUTOMATICrQ   rC   r(   s    r*   r   z-DefaultCommandExecutor._check_active_databaseI  s    
 !!)$$,,22gnnD,,q0//8<<>A 0088:!++$D  ((* B 1r-   cmdc                 H    | j                   D ]  }|j                  |        y r9   )r}   register_command_execution)r)   r   detectors      r*   r   z2DefaultCommandExecutor._register_command_execution[  s%    // 	5H//4	5r-   c                     t        | j                        }t               }t               }| j                  j                  t        |gt        ||gi       y)z0
        Registers necessary listeners.
        N)r   r}   r   r   r   register_listenersr   r   )r)   failure_listenerresubscribe_listenerclose_connection_listeners       r*   r   z.DefaultCommandExecutor._setup_event_dispatcher_  sW     2$2I2IJAC$J$L!11#&6%7%-((	
r-   )r%   r   )r'   ).r0   r1   r2   r   r   r   r	   r   r   r"   r   r   rD   r4   r<   r3   rH   rJ   rN   r\   r
   r   rQ   r5   r   r    r   rU   r   rZ   rb   rp   re   r   r   ri   rV   rq   rl   ro   r   r   r   r   r   __classcell__)r   s   @r*   rs   rs      s    "; 6(F('0(' (' 	('
 ,(' 3(' (' (' !&('T 9   '4#8 ' '9_ 9 9 #u # # %,!7 % % U<9J+J%K PT  ( #x/ # # %F %t % % 0,D 0 0D
Me 
M
>#XJ$45
>>E E>
 
 
J+$5u 5
r-   rs   N)2abcr   r   r   r   typingr   r   r	   r
   r   redis.clientr   r   r   redis.eventr   r   redis.multidb.circuitr   r   redis.multidb.configr   redis.multidb.databaser   r   r   redis.multidb.eventr   r   r   r   redis.multidb.failoverr   r   r   r   r   redis.multidb.failure_detectorr   redis.observability.attributesr    redis.observability.recorderr!   redis.retryr"   r$   r7   rF   rs   r'   r-   r*   <module>r      s    # ( 7 7 = = E 2 ? D D   ; < < c 
/ 
2Y/ YxN
02E N
r-   