
    Ki}Y                        d dl Z d dlZd dlZd dlmZmZmZmZ d dlm	Z	m
Z
 d dlmZ d dlmZ d dlmZ d dlmZmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZmZmZm Z  d dl!m"Z"m#Z#m$Z$ d dl%m&Z&m'Z'm(Z( d dl)m*Z* d dl+m,Z, d dl-m.Z. d dl/m0Z0  ejb                  e2      Z3e0 G d dee             Z4defdZ5 G d dee      Z6 G d d      Z7y)    N)AnyCallableListOptional)HealthCheckHealthCheckPolicy)BackgroundScheduler)	NoBackoff)PubSubWorkerThread)CoreCommandsRedisModuleCommands)MaintNotificationsConfig)CircuitBreaker)State)DefaultCommandExecutor)DEFAULT_GRACE_PERIODDatabaseConfigInitialHealthCheckMultiDbConfig)Database	DatabasesSyncDatabase)InitialHealthCheckFailedErrorNoValidDatabaseExceptionUnhealthyDatabaseException)FailureDetector)GeoFailoverReason)Retry)experimentalc                      e Zd ZdZdefdZd Zd ZdefdZ	de
dd	fd
Z	 d%dedefdZde
de
fdZdefdZde
defdZdefdZdefdZd Zd Zdedgd	f   fdZd Zde
defdZdeeef   fdZd Z d e!d!e"d"e"fd#Z#d$ Z$y	)&MultiDBClientz
    Client that operates on multiple logical Redis databases.
    Should be used in Client-side geographic failover database setups.
    configc           
         |j                         | _        |j                  s|j                         n|j                  | _        |j
                  | _        |j                  j                         | _	        |j                  s|j                         n|j                  | _        |j                  |j                         n|j                  | _        | j                  j!                  | j                         |j"                  | _        |j&                  | _        |j*                  | _        | j,                  j/                  t0        f       t3        | j                  | j                  | j,                  | j                  |j4                  |j6                  | j(                  | j$                        | _        d| _        t=               | _        tA        jB                         | _"        || _#        y )N)failure_detectors	databasescommand_retryfailover_strategyfailover_attemptsfailover_delayevent_dispatcherauto_fallback_intervalF)$r%   
_databaseshealth_checksdefault_health_checks_health_checkshealth_check_interval_health_check_intervalhealth_check_policyvalue_health_check_policyr$   default_failure_detectors_failure_detectorsr'   default_failover_strategy_failover_strategyset_databasesr+   _auto_fallback_intervalr*   _event_dispatcherr&   _command_retryupdate_supported_errorsConnectionRefusedErrorr   r(   r)   command_executorinitializedr	   _bg_scheduler	threadingLock_hc_lock_config)selfr"   s     `/home/jay/workspace/scripts/.codegraph-venv/lib/python3.12/site-packages/redis/multidb/client.py__init__zMultiDBClient.__init__*   s    **, '' ((*%% 	
 '-&B&B#&&,,. 	!
 ++ ,,.)) 	 ''/ ,,.)) 	
 	--doo>'-'D'D$!'!8!8$22335K4MN 6"55oo--"55$66!00!33#'#?#?	!
 !02!(    c                 D    	 | j                          y # t        $ r Y y w xY wN)close	ExceptionrF   s    rG   __del__zMultiDBClient.__del__T   s$    	JJL 	 		    	c                    | j                   j                  | j                         | j                   j                  | j                  | j
                         d}| j                  D ]h  \  }}|j                  j                  | j                         |j                  j                  t        j                  k(  sS|rV|| j                  _        d}j |st        d      d| _        y)zT
        Perform initialization of databases to define their initial state.
        FTz4Initial connection failed - no active database foundN)rA   run_coro_sync_perform_initial_health_checkrun_recurring_coror1   _check_databases_healthr,   circuiton_state_changed!_on_circuit_state_change_callbackstateCBStateCLOSEDr?   _active_databaser   r@   )rF   is_active_db_founddatabaseweights       rG   
initializezMultiDBClient.initialize]   s     	(()K)KL 	--''((	

 # $ 		*Hf--d.T.TU %%7@R :B%%6%)"		* "*F   rI   returnc                     | j                   S )zE
        Returns a sorted (by weight) list of all databases.
        )r,   rN   s    rG   get_databaseszMultiDBClient.get_databases   s     rI   r^   Nc                    d}| j                   D ]  \  }}||k(  sd} n |st        d      | j                  j                  | j                  |       |j
                  j                  t        j                  k(  rC| j                   j                  d      d   \  }}|t        j                  f| j                  _        yt        d      )zL
        Promote one of the existing databases to become an active.
        NT/Given database is not a member of database list   r   z1Cannot set active database, database is unhealthy)r,   
ValueErrorrA   rR   _check_db_healthrV   rY   rZ   r[   	get_top_nr   MANUALr?   active_databaser   )rF   r^   existsexisting_db_highest_weighted_dbs         rG   set_active_databasez!MultiDBClient.set_active_database   s     "oo 	NKh&	
 NOO(()>)>I!!W^^3%)__%>%>q%A!%D"!((5D!!1 &?
 	
rI   skip_initial_health_checkc                    t        dt                     |j                  d<   d|j                  vrt        d      |j                  d<   |j                  r< | j
                  j                  j                  |j                  fi |j                  }n|j                  r_|j                  j                  t        dt                            | j
                  j                  j                  |j                        }n& | j
                  j                  di |j                  }|j                  |j                         n|j                  }t        |||j                  |j                  	      }	 | j                  j                  | j                   |       | j$                  j'                  d
      d   \  }}| j$                  j)                  ||j                         | j+                  ||       y# t"        $ r |s Y hw xY w)z
        Adds a new database to the database list.

        Args:
            config: DatabaseConfig object that contains the database configuration.
            skip_initial_health_check: If True, adds the database even if it is unhealthy.
        r   )retriesbackoffretrymaint_notifications_configF)enabled)connection_poolN)clientrV   r_   health_check_urlrf    )r   r
   client_kwargsr   from_urlrE   client_class	from_pool	set_retryrV   default_circuit_breakerr   r_   rz   rA   rR   rh   r   r,   ri   add_change_active_database)rF   r"   rq   ry   rV   r^   ro   highest_weights           rG   add_databasezMultiDBClient.add_database   s    ).a(MW% (v/C/CC(7   !=> ??7T\\..77#)#7#7F &&uQ	'LM\\..88 & 0 0 9 F /T\\..F1E1EFF ~~% **, 	 ==#44	
	,,T-B-BHM
 /3oo.G.G.J1.M+^Hhoo6$$X/BC * 	, -	s   /&G/ /G>=G>new_databasehighest_weight_databasec                     |j                   |j                   kD  rJ|j                  j                  t        j                  k(  r"|t
        j                  f| j                  _        y y y rK   )	r_   rV   rY   rZ   r[   r   	AUTOMATICr?   rk   )rF   r   r   s      rG   r   z%MultiDBClient._change_active_database   sZ     "9"@"@@$$**gnn< !++5D!!1 = ArI   c                    | j                   j                  |      }| j                   j                  d      d   \  }}||k  rJ|j                  j                  t
        j                  k(  r"|t        j                  f| j                  _
        yyy)z<
        Removes a database from the database list.
        rf   r   N)r,   removeri   rV   rY   rZ   r[   r   rj   r?   rk   )rF   r^   r_   ro   r   s        rG   remove_databasezMultiDBClient.remove_database   s     ''1.2oo.G.G.J1.M+^ f$#++11W^^C $!((5D!!1 D %rI   r_   c                    d}| j                   D ]  \  }}||k(  sd} n |st        d      | j                   j                  d      d   \  }}| j                   j                  ||       ||_        | j                  ||       y)z<
        Updates a database from the database list.
        NTre   rf   r   )r,   rg   ri   update_weightr_   r   )rF   r^   r_   rl   rm   rn   ro   r   s           rG   update_database_weightz$MultiDBClient.update_database_weight   s     "oo 	NKh&	
 NOO.2oo.G.G.J1.M+^%%h7 $$X/BCrI   failure_detectorc                 :    | j                   j                  |       y)z>
        Adds a new failure detector to the database.
        N)r6   append)rF   r   s     rG   add_failure_detectorz"MultiDBClient.add_failure_detector  s     	&&'78rI   healthcheckc                 |    | j                   5  | j                  j                  |       ddd       y# 1 sw Y   yxY w)z:
        Adds a new health check to the database.
        N)rD   r/   r   )rF   r   s     rG   add_health_checkzMultiDBClient.add_health_check  s4     ]] 	4&&{3	4 	4 	4s   2;c                 r    | j                   s| j                           | j                  j                  |i |S )zB
        Executes a single command and return its result.
        )r@   r`   r?   execute_commandrF   argsoptionss      rG   r   zMultiDBClient.execute_command  s5     OO4t$$44dFgFFrI   c                     t        |       S )z:
        Enters into pipeline mode of the client.
        )PipelinerN   s    rG   pipelinezMultiDBClient.pipeline"  s     ~rI   funcr   c                 x    | j                   s| j                           | j                  j                  |g|| S )z3
        Executes callable as transaction.
        )r@   r`   r?   execute_transaction)rF   r   watchesr   s       rG   transactionzMultiDBClient.transaction(  s:     OO8t$$88RR'RRrI   c                 R    | j                   s| j                          t        | fi |S )z
        Return a Publish/Subscribe object. With this object, you can
        subscribe to channels and listen for messages that get published to
        them.
        )r@   r`   PubSub)rF   kwargss     rG   pubsubzMultiDBClient.pubsub1  s'     OOd%f%%rI   c                   K   | j                   5  t        | j                        }ddd       | j                  j	                  |       d{   }|sH|j
                  j                  t        j                  k7  rt        j                  |j
                  _        |S |rF|j
                  j                  t        j                  k7  rt        j                  |j
                  _        |S # 1 sw Y   xY w7 w)zO
        Runs health checks on the given database until first failure.
        N)
rD   listr/   r4   executerV   rY   rZ   OPENr[   )rF   r^   r-   
is_healthys       rG   rh   zMultiDBClient._check_db_health<  s      ]] 	6 !4!45M	6  44<<]HUU
%%5)0  &H,,22gnnD%,^^H"	6 	6 Vs(   C3C%'C3C1BC3%C.*C3c                 z  K   i }g | _         | j                  D ]I  \  }}t        j                  | j	                  |            }|||<   | j                   j                  |       K t        j                  | j                   ddi d{   }t        | j                   |      D ci c]  \  }}||   | }}}|j                         D ]g  \  }}t        |t              s|j                  }t        j                  |j                  _        t         j#                  d|j$                         d||<   i |S 7 c c}}w w)zk
        Runs health checks as a recurring task.
        Runs health checks against all databases.
        return_exceptionsTNz%Health check failed, due to exception)exc_infoF)	_hc_tasksr,   asynciocreate_taskrh   r   gatherzipitems
isinstancer   r^   rZ   r   rV   rY   loggerdebugoriginal_exception)	rF   
task_to_dbr^   rn   taskresultsresult
db_resultsunhealthy_dbs	            rG   rU   z%MultiDBClient._check_databases_healthO  s4    
 46
?? 	(KHa&&t'<'<X'FGD'JtNN!!$'	(
  O$OO :=T^^W9U
)5vJtf$

 
 !+ 0 0 2 
	1Hf&"<=%-4\\$$*;#66  
 ,1
<(
	1 ' P
s+   BD;D3D;$D54)D;AD;5D;c                   K   | j                          d{   }d}| j                  j                  t        j                  k(  rd|j                         v}n| j                  j                  t        j                  k(  r)t        |j                               t        |      dz  kD  }n9| j                  j                  t        j                  k(  rd|j                         v }|s"t        d| j                  j                         y7 w)zj
        Runs initial health check and evaluate healthiness based on initial_health_check_policy.
        NTF   z:Initial health check failed. Initial health check policy: )rU   rE   initial_health_check_policyr   ALL_AVAILABLEvaluesMAJORITY_AVAILABLEsumlenONE_AVAILABLEr   )rF   r   r   s      rG   rS   z+MultiDBClient._perform_initial_health_checkq  s      4466
<<337I7W7WWgnn&66JLL44!445 W^^-.W1AAJLL448J8X8XX!11J/LT\\MuMuLvw   7s   DDC/DrV   	old_state	new_statec                    |t         j                  k(  r1| j                  j                  | j                  |j
                         y |t         j                  k(  r[|t         j                  k(  rHt        j                  d|j
                   d       | j                  j                  t        t        |       |t         j                  k7  r8|t         j                  k(  r$t        j                  d|j
                   d       y y y )Nz	Database z- is unreachable. Failover has been initiated.z is reachable again.)rZ   	HALF_OPENrA   run_coro_fire_and_forgetrh   r^   r[   r   r   warningrun_oncer   _half_open_circuitinfo)rF   rV   r   r   s       rG   rX   z/MultiDBClient._on_circuit_state_change_callback  s     )))77%%w'7'7 &9+DNNG,,--Z[ ''$&8' &9+FKK)G$4$4#55IJK ,G&rI   c                 X   | j                   rJ	 | j                   j                  | j                  j                         | j                   j                          | j                  j                  r/| j                  j                  j                  j                          yy# t        $ r Y kw xY w)z:
        Closes the client and all its resources.
        N)	rA   rR   r4   rL   rM   stopr?   rk   ry   rN   s    rG   rL   zMultiDBClient.close  s     ""001J1J1P1PQ ##%  00!!1188>>@ 1	  s   /B 	B)(B))T)%__name__
__module____qualname____doc__r   rH   rO   r`   r   rc   r   rp   r   boolr   r   r   r   floatr   r   r   r   r   r   r   r   r   r   rh   dictrU   rS   r   rZ   rX   rL   r{   rI   rG   r!   r!   #   s,   
(} (T$ Ly 
L 
T 
: IM6D$6DAE6Dp
(
CO
  D| DU D&9_ 94K 4GS*t); < S	&|  & tHdN/C  D0L%L29LFML*ArI   r!   rV   c                 .    t         j                  | _        y rK   )rZ   r   rY   )rV   s    rG   r   r     s    %%GMrI   c                   x    e Zd ZdZdefdZddZd Zd Zde	fdZ
defd	ZddZddZddZd Zdee   fdZy
)r   zG
    Pipeline implementation for multiple logical Redis databases.
    ry   c                      g | _         || _        y rK   )_command_stack_client)rF   ry   s     rG   rH   zPipeline.__init__  s     rI   ra   c                     | S rK   r{   rN   s    rG   	__enter__zPipeline.__enter__      rI   c                 $    | j                          y rK   reset)rF   exc_type	exc_value	tracebacks       rG   __exit__zPipeline.__exit__      

rI   c                 D    	 | j                          y # t        $ r Y y w xY wrK   r   rM   rN   s    rG   rO   zPipeline.__del__  s"    	JJL 		rP   c                 ,    t        | j                        S rK   )r   r   rN   s    rG   __len__zPipeline.__len__  s    4&&''rI   c                      y)z1Pipeline instances should always evaluate to TrueTr{   rN   s    rG   __bool__zPipeline.__bool__  s    rI   Nc                     g | _         y rK   )r   rN   s    rG   r   zPipeline.reset  s
     rI   c                 $    | j                          y)zClose the pipelineNr   rN   s    rG   rL   zPipeline.close  s    

rI   c                 @    | j                   j                  ||f       | S )ar  
        Stage a command to be executed when execute() is next called

        Returns the current Pipeline object back so commands can be
        chained together, such as:

        pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')

        At some other point, you can then run: pipe.execute(),
        which will execute all commands queued in the pipe.
        )r   r   r   s      rG   pipeline_execute_commandz!Pipeline.pipeline_execute_command  s!     	""D'?3rI   c                 &     | j                   |i |S )zAdds a command to the stack)r   rF   r   r   s      rG   r   zPipeline.execute_command  s    ,t,,d=f==rI   c                    | j                   j                  s| j                   j                          	 | j                   j                  j	                  t        | j                              | j                          S # | j                          w xY w)z0Execute all the commands in the current pipeline)r   r@   r`   r?   execute_pipelinetupler   r   rN   s    rG   r   zPipeline.execute  s_    ||''LL##%	<<00AAd))* JJLDJJLs   7A: :B)ra   r   ra   N)r   r   r   r   r!   rH   r   r   rO   intr   r   r   r   rL   r   r   r   r   r   r{   rI   rG   r   r     s^    } ( ($ !>
c 
rI   r   c                       e Zd ZdZdefdZddZddZddZdd	Z	e
defd
       Zd Zd Zd Zd Zd Zd Zd Z	 ddedefdZ	 ddedefdZ	 	 	 	 ddededee   deddf
dZy) r   z2
    PubSub object for multi database client.
    ry   c                 ^    || _          | j                   j                  j                  di | y)zInitialize the PubSub object for a multi-database client.

        Args:
            client: MultiDBClient instance to use for pub/sub operations
            **kwargs: Additional keyword arguments to pass to the underlying pubsub implementation
        Nr{   )r   r?   r   )rF   ry   r   s      rG   rH   zPubSub.__init__  s(     ,%%,,6v6rI   ra   c                     | S rK   r{   rN   s    rG   r   zPubSub.__enter__  r   rI   Nc                 D    	 | j                          y # t        $ r Y y w xY wrK   r   rN   s    rG   rO   zPubSub.__del__  s$    	 JJL 		rP   c                 L    | j                   j                  j                  d      S )Nr   r   r?   execute_pubsub_methodrN   s    rG   r   zPubSub.reset  s    ||,,BB7KKrI   c                 $    | j                          y rK   r   rN   s    rG   rL   zPubSub.close  r   rI   c                 V    | j                   j                  j                  j                  S rK   )r   r?   active_pubsub
subscribedrN   s    rG   r  zPubSub.subscribed  s    ||,,::EEErI   c                 P     | j                   j                  j                  dg| S )Nr   r  rF   r   s     rG   r   zPubSub.execute_command  s,    Bt||,,BB
 $
 	
rI   c                 V     | j                   j                  j                  dg|i |S )aE  
        Subscribe to channel patterns. Patterns supplied as keyword arguments
        expect a pattern name as the key and a callable as the value. A
        pattern's callable will be invoked automatically when a message is
        received on that pattern rather than producing a message via
        ``listen()``.
        
psubscriber  r   s      rG   r
  zPubSub.psubscribe#  7     Ct||,,BB

#)
 	
rI   c                 P     | j                   j                  j                  dg| S )zj
        Unsubscribe from the supplied patterns. If empty, unsubscribe from
        all patterns.
        punsubscriber  r  s     rG   r  zPubSub.punsubscribe/  /    
 Ct||,,BB
!
 	
rI   c                 V     | j                   j                  j                  dg|i |S )aR  
        Subscribe to channels. Channels supplied as keyword arguments expect
        a channel name as the key and a callable as the value. A channel's
        callable will be invoked automatically when a message is received on
        that channel rather than producing a message via ``listen()`` or
        ``get_message()``.
        	subscriber  r   s      rG   r  zPubSub.subscribe8  s7     Ct||,,BB

"(
 	
rI   c                 P     | j                   j                  j                  dg| S )zi
        Unsubscribe from the supplied channels. If empty, unsubscribe from
        all channels
        unsubscriber  r  s     rG   r  zPubSub.unsubscribeD  s(    
 Ct||,,BB=XSWXXrI   c                 V     | j                   j                  j                  dg|i |S )az  
        Subscribes the client to the specified shard channels.
        Channels supplied as keyword arguments expect a channel name as the key
        and a callable as the value. A channel's callable will be invoked automatically
        when a message is received on that channel rather than producing a message via
        ``listen()`` or ``get_sharded_message()``.
        
ssubscriber  r   s      rG   r  zPubSub.ssubscribeK  r  rI   c                 P     | j                   j                  j                  dg| S )zu
        Unsubscribe from the supplied shard_channels. If empty, unsubscribe from
        all shard_channels
        sunsubscriber  r  s     rG   r  zPubSub.sunsubscribeW  r  rI   ignore_subscribe_messagestimeoutc                 R    | j                   j                  j                  d||      S )a  
        Get the next message if one is available, otherwise None.

        If timeout is specified, the system will wait for `timeout` seconds
        before returning. Timeout should be specified as a floating point
        number, or None, to wait indefinitely.
        get_messager  r  r  rF   r  r  s      rG   r  zPubSub.get_message`  s0     ||,,BB&? C 
 	
rI   c                 R    | j                   j                  j                  d||      S )a&  
        Get the next message if one is available in a sharded channel, otherwise None.

        If timeout is specified, the system will wait for `timeout` seconds
        before returning. Timeout should be specified as a floating point
        number, or None, to wait indefinitely.
        get_sharded_messager  r  r  s      rG   r  zPubSub.get_sharded_messagep  s0     ||,,BB!&? C 
 	
rI   
sleep_timedaemonexception_handlersharded_pubsubr   c                 V    | j                   j                  j                  |||| |      S )N)r   r!  r   r"  )r   r?   execute_pubsub_run)rF   r  r   r!  r"  s        rG   run_in_threadzPubSub.run_in_thread  s6     ||,,??/) @ 
 	
rI   )ra   r   r   )F        )r&  FNF)r   r   r   r   r!   rH   r   rO   r   rL   propertyr   r  r   r
  r  r  r  r  r  r   r  r  r   r   r%  r{   rI   rG   r   r     s    	7} 	7L FD F F






Y


 IL
)-
@E
" IL
)-
@E
$  04$

 
 $H-	

 
 

rI   r   )8r   loggingrB   typingr   r   r   r   !redis.asyncio.multidb.healthcheckr   r   redis.backgroundr	   redis.backoffr
   redis.clientr   redis.commandsr   r   redis.maint_notificationsr   redis.multidb.circuitr   r   rZ   redis.multidb.command_executorr   redis.multidb.configr   r   r   r   redis.multidb.databaser   r   r   redis.multidb.exceptionr   r   r   redis.multidb.failure_detectorr   redis.observability.attributesr   redis.retryr   redis.utilsr   	getLoggerr   r   r!   r   r   r   r{   rI   rG   <module>r:     s       0 0 L 0 # + < > 0 2 A  E D 
 ; <  $			8	$ JA' JA JAZ& &@"L @FU
 U
rI   