
    KiS                        d dl Z d dlZd dlmZmZmZmZmZmZ d dl	m
Z
 d dlmZ d dlmZmZmZmZ d dlmZmZmZ d dlmZ d dlmZmZ d d	lmZ d d
lmZ d dlm Z  d dl!m"Z"m#Z# d dl$m%Z% d dl$m&Z' d dl(m)Z)m*Z*m+Z+ d dl,m-Z- d dl.m/Z/m0Z0m1Z1 d dl2m3Z3  ejh                  e5      Z6e3 G d de#e"             Z7de%fdZ8 G d de#e"      Z9 G d d      Z:y)    N)Any	AwaitableCallableListOptionalUnion)PubSubHandler)DefaultCommandExecutor)DEFAULT_GRACE_PERIODDatabaseConfigInitialHealthCheckMultiDbConfig)AsyncDatabaseDatabase	Databases)AsyncFailureDetector)HealthCheckHealthCheckPolicy)Retry)BackgroundScheduler)	NoBackoff)AsyncCoreCommandsAsyncRedisModuleCommands)CircuitBreaker)State)InitialHealthCheckFailedErrorNoValidDatabaseExceptionUnhealthyDatabaseException)GeoFailoverReason)ChannelT
EncodableTKeyT)experimentalc                   J   e Zd ZdZdefdZd,dZd Zd Zd Z	de
fd	Zd
eddfdZ	 d-dedefdZdedefdZd
efdZd
edefdZdefdZdefdZd Zd Zdddddedgeeee   f   f   ded e e!   d!ed"e e   f
d#Z"d$ Z#de$e%ef   fd%Z&d& Z'd
edefd'Z(d(e)d)e*d*e*fd+Z+y).MultiDBClientz
    Client that operates on multiple logical Redis databases.
    Should be used in Client-side geographic failover database setups.
    configc           
         |j                         | _        |j                  s|j                         n|j                  | _        |j
                  | _        |j                  j                         | _	        |j                  s|j                         n|j                  | _        |j                  |j                         n|j                  | _        | j                  j!                  | j                         |j"                  | _        |j&                  | _        |j*                  | _        | j,                  j/                  t0        g       t3        | j                  | j                  | j,                  | j                  |j4                  |j6                  | j(                  | j$                        | _        d| _        t=        j>                         | _         tC               | _"        || _#        d | _$        g | _%        d | _&        y )N)failure_detectors	databasescommand_retryfailover_strategyfailover_attemptsfailover_delayevent_dispatcherauto_fallback_intervalF)'r)   
_databaseshealth_checksdefault_health_checks_health_checkshealth_check_interval_health_check_intervalhealth_check_policyvalue_health_check_policyr(   default_failure_detectors_failure_detectorsr+   default_failover_strategy_failover_strategyset_databasesr/   _auto_fallback_intervalr.   _event_dispatcherr*   _command_retryupdate_supported_errorsConnectionRefusedErrorr
   r,   r-   command_executorinitializedasyncioLock_hc_lockr   _bg_scheduler_config_recurring_hc_task	_hc_tasks_half_open_state_task)selfr&   s     h/home/jay/workspace/scripts/.codegraph-venv/lib/python3.12/site-packages/redis/asyncio/multidb/client.py__init__zMultiDBClient.__init__)   s    **, '' ((*%% 	
 '-&B&B#&&,,. 	!
 ++ ,,.)) 	 ''/ ,,.)) 	
 	--doo>'-'D'D$!'!8!8$22335K4LM 6"55oo--"55$66!00!33#'#?#?	!
 !02"&%)"    returnc                 Z   K   | j                   s| j                          d {    | S 7 wN)rD   
initializerM   s    rN   
__aenter__zMultiDBClient.__aenter__V   s)     //### $s    +)+c                   K   | j                   r| j                   j                          | j                  r| j                  j                          | j                  D ]  }|j                           | j                  j                          d {    | j                  j                  r7| j                  j                  j                  j                          d {    y y 7 R7 wrS   )
rJ   cancelrL   rK   r8   closerC   active_databaseclientaclose)rM   hc_tasks     rN   r\   zMultiDBClient.aclose[   s     ""##**,%%&&--/~~ 	GNN	 ''--///   00''77>>EEGGG 1 	0 Hs%   BC$C AC$C"C$"C$c                 @   K   | j                          d {    y 7 wrS   r\   rM   exc_type	exc_value	tracebacks       rN   	__aexit__zMultiDBClient.__aexit__k        kkm   c                   K   | j                          d{    t        j                  | j                  j	                  | j
                  | j                              | _        d}| j                  D ]h  \  }}|j                  j                  | j                         |j                  j                  t        j                  k(  sS|rV|| j                  _        d}j |st#        d      d| _        y7 ڭw)zT
        Perform initialization of databases to define their initial state.
        NFTz4Initial connection failed - no active database found)_perform_initial_health_checkrE   create_taskrH   run_recurring_asyncr5   _check_databases_healthrJ   r0   circuiton_state_changed!_on_circuit_state_change_callbackstateCBStateCLOSEDrC   _active_databaser   rD   )rM   is_active_db_founddatabaseweights       rN   rT   zMultiDBClient.initializen   s      00222 #*"5"522++,,#
 # $ 		*Hf--d.T.TU %%7@R :B%%6%)"		* "*F   9 	3s   C3C1B,C3C3+C3c                     | j                   S )zE
        Returns a sorted (by weight) list of all databases.
        )r0   rU   s    rN   get_databaseszMultiDBClient.get_databases   s     rP   rt   Nc                   K   d}| j                   D ]  \  }}||k(  sd} n |st        d      | j                  |       d{    |j                  j                  t
        j                  k(  rT| j                   j                  d      d   \  }}| j                  j                  |t        j                         d{    yt        d      7 7 w)zL
        Promote one of the existing databases to become an active.
        NT/Given database is not a member of database list   r   z1Cannot set active database, database is unhealthy)r0   
ValueError_check_db_healthrl   ro   rp   rq   	get_top_nrC   set_active_databaser   MANUALr   )rM   rt   existsexisting_db_highest_weighted_dbs         rN   r~   z!MultiDBClient.set_active_database   s      "oo 	NKh&	
 NOO##H---!!W^^3%)__%>%>q%A!%D"'';;+22   &?
 	
 	.s)   C&CCA9C=C>CCskip_initial_health_checkc                   K   |j                   j                  dt        dt                     i       |j                  r< | j
                  j                  j                  |j                  fi |j                   }n|j                  r_|j                  j                  t        dt                            | j
                  j                  j                  |j                        }n& | j
                  j                  di |j                   }|j                  |j                         n|j                  }t        |||j                  |j                        }	 | j                  |       d{    | j                   j#                  d      d   \  }}| j                   j%                  ||j                         | j'                  ||       d{    y7 f# t        $ r |s Y rw xY w7 w)	z
        Adds a new database to the database list.

        Args:
            config: DatabaseConfig object that contains the database configuration.
            skip_initial_health_check: If True, adds the database even if it is unhealthy.
        retryr   )retriesbackoff)connection_poolN)r[   rl   ru   health_check_urlrz    )client_kwargsupdater   r   from_urlrI   client_class	from_pool	set_retryrl   default_circuit_breakerr   ru   r   r|   r   r0   r}   add_change_active_database)rM   r&   r   r[   rl   rt   r   highest_weights           rN   add_databasezMultiDBClient.add_database   s     	##WeAy{.S$TU??7T\\..77#)#7#7F &&uQ	'LM\\..88 & 0 0 9 F /T\\..F1E1EFF ~~% **, 	 ==#44	
	''111
 /3oo.G.G.J1.M+^Hhoo6**85HIII 2) 	, -	 	JsI   EG)G ,G-G 1AG)G'G)G G$!G)#G$$G)new_databasehighest_weight_databasec                    K   |j                   |j                   kD  r[|j                  j                  t        j                  k(  r3| j
                  j                  |t        j                         d {    y y y 7 wrS   )	ru   rl   ro   rp   rq   rC   r~   r   	AUTOMATIC)rM   r   r   s      rN   r   z%MultiDBClient._change_active_database   so      "9"@"@@$$**gnn<'';;/99   = As   A.A:0A81A:c                 H  K   | j                   j                  |      }| j                   j                  d      d   \  }}||k  r[|j                  j                  t
        j                  k(  r3| j                  j                  |t        j                         d{    yyy7 w)z<
        Removes a database from the database list.
        rz   r   N)r0   remover}   rl   ro   rp   rq   rC   r~   r   r   )rM   rt   ru   r   r   s        rN   remove_databasezMultiDBClient.remove_database   s      ''1.2oo.G.G.J1.M+^ f$#++11W^^C'';;#%6%=%=   D %s   BB"B B"ru   c                 $  K   d}| j                   D ]  \  }}||k(  sd} n |st        d      | j                   j                  d      d   \  }}| j                   j                  ||       ||_        | j                  ||       d{    y7 w)z<
        Updates a database from the database list.
        NTry   rz   r   )r0   r{   r}   update_weightru   r   )rM   rt   ru   r   r   r   r   r   s           rN   update_database_weightz$MultiDBClient.update_database_weight   s      "oo 	NKh&	
 NOO.2oo.G.G.J1.M+^%%h7 **85HIIIs   BA+BB	Bfailure_detectorc                 :    | j                   j                  |       y)z>
        Adds a new failure detector to the database.
        N)r:   append)rM   r   s     rN   add_failure_detectorz"MultiDBClient.add_failure_detector  s     	&&'78rP   healthcheckc                    K   | j                   4 d{    | j                  j                  |       ddd      d{    y7 07 # 1 d{  7  sw Y   yxY ww)z:
        Adds a new health check to the database.
        N)rG   r3   r   )rM   r   s     rN   add_health_checkzMultiDBClient.add_health_check  sR      == 	4 	4&&{3	4 	4 	4 	4 	4 	4 	4sA   AAAAAAAAAAAAc                    K   | j                   s| j                          d{     | j                  j                  |i | d{   S 7 (7 w)zB
        Executes a single command and return its result.
        N)rD   rT   rC   execute_commandrM   argsoptionss      rN   r   zMultiDBClient.execute_command  sK      //###:T**::DLGLLL $Ls!    AA#AAAAc                     t        |       S )z:
        Enters into pipeline mode of the client.
        )PipelinerU   s    rN   pipelinezMultiDBClient.pipeline'  s     ~rP   F
shard_hintvalue_from_callablewatch_delayfuncr   watchesr   r   r   c                   K   | j                   s| j                          d{     | j                  j                  |g||||d d{   S 7 .7 w)z3
        Executes callable as transaction.
        Nr   )rD   rT   rC   execute_transaction)rM   r   r   r   r   r   s         rN   transactionzMultiDBClient.transaction-  sg      //###>T**>>

 " 3#
 
 	
 $
s!    AA)AAAAc                 n   K   | j                   s| j                          d{    t        | fi |S 7 w)z
        Return a Publish/Subscribe object. With this object, you can
        subscribe to channels and listen for messages that get published to
        them.
        N)rD   rT   PubSub)rM   kwargss     rN   pubsubzMultiDBClient.pubsubC  s6      //###d%f%% $s    535c                 z  K   i }g | _         | j                  D ]I  \  }}t        j                  | j	                  |            }|||<   | j                   j                  |       K t        j                  | j                   ddi d{   }t        | j                   |      D ci c]  \  }}||   | }}}|j                         D ]g  \  }}t        |t              s|j                  }t        j                  |j                  _        t         j#                  d|j$                         d||<   i |S 7 c c}}w w)zk
        Runs health checks as a recurring task.
        Runs health checks against all databases.
        return_exceptionsTNz%Health check failed, due to exception)exc_infoF)rK   r0   rE   ri   r|   r   gatherzipitems
isinstancer   rt   rp   OPENrl   ro   loggerdebugoriginal_exception)	rM   
task_to_dbrt   r   taskresultsresult
db_resultsunhealthy_dbs	            rN   rk   z%MultiDBClient._check_databases_healthN  s4    
 46
?? 	(KHa&&t'<'<X'FGD'JtNN!!$'	(
  O$OO :=T^^W9U
)5vJtf$

 
 !+ 0 0 2 
	1Hf&"<=%-4\\$$*;#66  
 ,1
<(
	1 ' P
s+   BD;D3D;$D54)D;AD;5D;c                   K   | j                          d{   }d}| j                  j                  t        j                  k(  rd|j                         v}n| j                  j                  t        j                  k(  r)t        |j                               t        |      dz  kD  }n9| j                  j                  t        j                  k(  rd|j                         v }|s"t        d| j                  j                         y7 w)zj
        Runs initial health check and evaluate healthiness based on initial_health_check_policy.
        NTF   z:Initial health check failed. Initial health check policy: )rk   rI   initial_health_check_policyr   ALL_AVAILABLEvaluesMAJORITY_AVAILABLEsumlenONE_AVAILABLEr   )rM   r   
is_healthys      rN   rh   z+MultiDBClient._perform_initial_health_checkp  s      4466
<<337I7W7WWgnn&66JLL44!445 W^^-.W1AAJLL448J8X8XX!11J/LT\\MuMuLvw   7s   DDC/Dc                   K   | j                   j                  | j                  |       d{   }|sH|j                  j                  t
        j                  k7  rt
        j                  |j                  _        |S |rF|j                  j                  t
        j                  k7  rt
        j                  |j                  _        |S 7 w)zO
        Runs health checks on the given database until first failure.
        N)r8   executer3   rl   ro   rp   r   rq   )rM   rt   r   s      rN   r|   zMultiDBClient._check_db_health  s     
  44<<
 

 %%5)0  &H,,22gnnD%,^^H"
s   *CCBCrl   	old_state	new_statec                    t        j                         }|t        j                  k(  r4t        j                  | j                  |j                              | _        y |t        j                  k(  rQ|t        j                  k(  r>t        j                  d|j                   d       |j                  t        t        |       |t        j                  k7  r8|t        j                  k(  r$t        j                  d|j                   d       y y y )Nz	Database z- is unreachable. Failover has been initiated.z is reachable again.)rE   get_running_looprp   	HALF_OPENri   r|   rt   rL   rq   r   r   warning
call_laterr   _half_open_circuitinfo)rM   rl   r   r   loops        rN   rn   z/MultiDBClient._on_circuit_state_change_callback  s     '')))))0)<)<%%g&6&67*D& &9+DNNG,,--Z[ OO02DgN&9+FKK)G$4$4#55IJK ,G&rP   )rM   r%   rQ   r%   )T),__name__
__module____qualname____doc__r   rO   rV   r\   rd   rT   r   rw   r   r~   r   boolr   r   r   floatr   r   r   r   r   r   r   r   r   r   r   r"   r   strr   r   dictr   rk   rh   r|   r   rp   rn   r   rP   rN   r%   r%   "   sz   
+*} +*Z
H " Hy 
- 
D 
8 IM/J$/JAE/Jb	)	DQ	m J] JE J&95I 94+ 4M %)$)'+

|U3	#+>%??@
 
 SM	

 "
 e_
,	& tHdN/C  D0}  $L%L29LFMLrP   r%   rl   c                 .    t         j                  | _        y rS   )rp   r   ro   )rl   s    rN   r   r     s    %%GMrP   c                   ~    e Zd ZdZdefdZddZd Zd Zd Z	de
fd	Zdefd
ZddZddZddZd Zdee   fdZy)r   zG
    Pipeline implementation for multiple logical Redis databases.
    r[   c                      g | _         || _        y rS   )_command_stack_client)rM   r[   s     rN   rO   zPipeline.__init__  s     rP   rQ   c                    K   | S wrS   r   rU   s    rN   rV   zPipeline.__aenter__          c                    K   | j                          d {    | j                  j                  |||       d {    y 7 *7 wrS   )resetr   rd   r`   s       rN   rd   zPipeline.__aexit__  s:     jjlll$$Xy)DDD 	Ds   AA$AAAAc                 >    | j                         j                         S rS   )_async_self	__await__rU   s    rN   r   zPipeline.__await__  s    !++--rP   c                    K   | S wrS   r   rU   s    rN   r   zPipeline._async_self  r   r   c                 ,    t        | j                        S rS   )r   r   rU   s    rN   __len__zPipeline.__len__  s    4&&''rP   c                      y)z1Pipeline instances should always evaluate to TrueTr   rU   s    rN   __bool__zPipeline.__bool__  s    rP   Nc                    K   g | _         y wrS   )r   rU   s    rN   r   zPipeline.reset  s      s   	c                 @   K   | j                          d{    y7 w)zClose the pipelineN)r   rU   s    rN   r\   zPipeline.aclose  s     jjlrf   c                 @    | j                   j                  ||f       | S )ar  
        Stage a command to be executed when execute() is next called

        Returns the current Pipeline object back so commands can be
        chained together, such as:

        pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')

        At some other point, you can then run: pipe.execute(),
        which will execute all commands queued in the pipe.
        )r   r   r   s      rN   pipeline_execute_commandz!Pipeline.pipeline_execute_command  s!     	""D'?3rP   c                 &     | j                   |i |S )zAdds a command to the stack)r  rM   r   r   s      rN   r   zPipeline.execute_command  s    ,t,,d=f==rP   c                 t  K   | j                   j                  s"| j                   j                          d{    	 | j                   j                  j	                  t        | j                               d{   | j                          d{    S 7 ]7 7 	# | j                          d{  7   w xY ww)z0Execute all the commands in the current pipelineN)r   rD   rT   rC   execute_pipelinetupler   r   rU   s    rN   r   zPipeline.execute  s     ||'',,))+++	66GGd))*  **, , $**,sV   4B8BB8;B 7B8B ;B8BB8B B8B5.B1/B55B8)rM   r   rQ   r   rQ   N)rQ   r   )r   r   r   r   r%   rO   rV   rd   r   r   intr   r   r   r   r\   r  r   r   r   r   r   rP   rN   r   r     sd    } E.( ($ !>
tCy 
rP   r   c                       e Zd ZdZdefdZddZddZd Ze	de
fd	       Zd
efdZd
edefdZd
efdZd
edefdZd Z	 dde
dee   fdZddddeddfdZy)r   z2
    PubSub object for multi database client.
    r[   c                 ^    || _          | j                   j                  j                  di | y)zInitialize the PubSub object for a multi-database client.

        Args:
            client: MultiDBClient instance to use for pub/sub operations
            **kwargs: Additional keyword arguments to pass to the underlying pubsub implementation
        Nr   )r   rC   r   )rM   r[   r   s      rN   rO   zPubSub.__init__  s(     ,%%,,6v6rP   rQ   c                    K   | S wrS   r   rU   s    rN   rV   zPubSub.__aenter__  r   r   Nc                 @   K   | j                          d {    y 7 wrS   r_   r`   s       rN   rd   zPubSub.__aexit__
  re   rf   c                 h   K   | j                   j                  j                  d       d {   S 7 w)Nr\   r   rC   execute_pubsub_methodrU   s    rN   r\   zPubSub.aclose  s'     \\22HHRRRRs   )202c                 V    | j                   j                  j                  j                  S rS   )r   rC   active_pubsub
subscribedrU   s    rN   r  zPubSub.subscribed  s    ||,,::EEErP   r   c                 l   K    | j                   j                  j                  dg|  d {   S 7 w)Nr   r  rM   r   s     rN   r   zPubSub.execute_command  s:     HT\\22HH
 $
 
 	
 
   +424r   c                 r   K    | j                   j                  j                  dg|i | d{   S 7 w)aE  
        Subscribe to channel patterns. Patterns supplied as keyword arguments
        expect a pattern name as the key and a callable as the value. A
        pattern's callable will be invoked automatically when a message is
        received on that pattern rather than producing a message via
        ``listen()``.
        
psubscribeNr  r  s      rN   r  zPubSub.psubscribe  sE      IT\\22HH

#)
 
 	
 
   .757c                 l   K    | j                   j                  j                  dg|  d{   S 7 w)zj
        Unsubscribe from the supplied patterns. If empty, unsubscribe from
        all patterns.
        punsubscribeNr  r  s     rN   r  zPubSub.punsubscribe%  s=     
 IT\\22HH
!
 
 	
 
r  c                 r   K    | j                   j                  j                  dg|i | d{   S 7 w)aR  
        Subscribe to channels. Channels supplied as keyword arguments expect
        a channel name as the key and a callable as the value. A channel's
        callable will be invoked automatically when a message is received on
        that channel rather than producing a message via ``listen()`` or
        ``get_message()``.
        	subscribeNr  r  s      rN   r  zPubSub.subscribe.  sE      IT\\22HH

"(
 
 	
 
r  c                 l   K    | j                   j                  j                  dg|  d{   S 7 w)zi
        Unsubscribe from the supplied channels. If empty, unsubscribe from
        all channels
        unsubscribeNr  r  s     rN   r  zPubSub.unsubscribe:  s=     
 IT\\22HH
 
 
 	
 
r  ignore_subscribe_messagestimeoutc                 n   K   | j                   j                  j                  d||       d{   S 7 w)a  
        Get the next message if one is available, otherwise None.

        If timeout is specified, the system will wait for `timeout` seconds
        before returning. Timeout should be specified as a floating point
        number or None to wait indefinitely.
        get_message)r  r   Nr  )rM   r  r   s      rN   r"  zPubSub.get_messageC  s>      \\22HH&? I 
 
 	
 
   ,535g      ?)exception_handlerpoll_timeoutr%  c                n   K   | j                   j                  j                  |||        d{   S 7 w)a  Process pub/sub messages using registered callbacks.

        This is the equivalent of :py:meth:`redis.PubSub.run_in_thread` in
        redis-py, but it is a coroutine. To launch it as a separate task, use
        ``asyncio.create_task``:

            >>> task = asyncio.create_task(pubsub.run())

        To shut it down, use asyncio cancellation:

            >>> task.cancel()
            >>> await task
        )
sleep_timer$  r   N)r   rC   execute_pubsub_run)rM   r$  r%  s      rN   runz
PubSub.runS  s>     & \\22EE#7HQU F 
 
 	
 
r#  )rQ   r   r  )Fg        )r   r   r   r   r%   rO   rV   rd   r\   propertyr   r  r!   r   r    r	   r  r  r   r  r  r   r   r"  r)  r   rP   rN   r   r     s    	7} 	7S FD F F
: 



h 

- 


 


X 

 


 SV
)-
@H
& !	
 	

 

rP   r   );rE   loggingtypingr   r   r   r   r   r   redis.asyncio.clientr	   &redis.asyncio.multidb.command_executorr
   redis.asyncio.multidb.configr   r   r   r   redis.asyncio.multidb.databaser   r   r   &redis.asyncio.multidb.failure_detectorr   !redis.asyncio.multidb.healthcheckr   r   redis.asyncio.retryr   redis.backgroundr   redis.backoffr   redis.commandsr   r   redis.multidb.circuitr   r   rp   redis.multidb.exceptionr   r   r   redis.observability.attributesr   redis.typingr    r!   r"   redis.utilsr#   	getLoggerr   r   r%   r   r   r   r   rP   rN   <module>r=     s      B B . I  N M G L % 0 # F 0 2 
 = 3 3 $			8	$ IL,.? IL ILX& &A'): AHq
 q
rP   