
    Ki4                        d Z ddlmZ ddlZddlZddlmZmZmZ ddlm	Z	m
Z
mZ ddlmZmZ dd	lmZmZmZmZmZ  ej*                  d
      Ze	rddlmZ ddlmZ dZdZ G d de      Z G d ded         Zy)z Concurrency limiting dependency.    )annotationsN)datetime	timedeltatimezone)TYPE_CHECKINGAnyoverload   )CANCEL_MSG_CLEANUPcancel_task   )AdmissionBlocked
Dependencycurrent_docketcurrent_executioncurrent_workerzdocket.dependencies)Redis)	Execution   c                  $     e Zd ZdZd fdZ xZS )ConcurrencyBlockedz:Raised when a task cannot start due to concurrency limits.c                T    || _         || _        d| d| }t        |   ||       y )Nzconcurrency limit (z	 max) on )reason)concurrency_keymax_concurrentsuper__init__)self	executionr   r   r   	__class__s        l/home/jay/workspace/scripts/.codegraph-venv/lib/python3.12/site-packages/docket/dependencies/_concurrency.pyr   zConcurrencyBlocked.__init__(   s9    .,&~&6i?PQ62    )r   r   r   strr   int)__name__
__module____qualname____doc__r   __classcell__)r    s   @r!   r   r   %   s    D3 3r"   r   c                     e Zd ZU dZdZded<   edd	 	 	 	 	 dd       Ze	 	 d	 	 	 	 	 	 	 dd	       Zeddd
	 	 	 	 	 dd       Z	 	 	 d	 	 	 	 	 	 	 ddZddZddZ		 	 	 	 	 	 	 	 ddZ
	 	 	 	 	 	 	 	 ddZddZddZedd       Zy) ConcurrencyLimita  Configures concurrency limits for task execution.

    Can limit concurrency globally for a task, or per specific argument value.

    Works both as a default parameter and as ``Annotated`` metadata::

        # Default-parameter style
        async def process_customer(
            customer_id: int,
            concurrency: ConcurrencyLimit = ConcurrencyLimit("customer_id", 1),
        ) -> None: ...

        # Annotated style (parameter name auto-inferred)
        async def process_customer(
            customer_id: Annotated[int, ConcurrencyLimit(1)],
        ) -> None: ...

        # Per-task (no argument grouping)
        async def expensive(
            concurrency: ConcurrencyLimit = ConcurrencyLimit(max_concurrent=3),
        ) -> None: ...
    TboolsingleN)scopec                   y)z9Annotated style: ``Annotated[int, ConcurrencyLimit(1)]``.N r   r   r.   s      r!   r   zConcurrencyLimit.__init__I       r"   r   c                     y)z/Default-param style with per-argument grouping.Nr0   r   argument_namer   r.   s       r!   r   zConcurrencyLimit.__init__S   r2   r"   r   r.   c                    y)z,Per-task concurrency (no argument grouping).Nr0   r1   s      r!   r   zConcurrencyLimit.__init__\   r2   r"   c                    t        |t              rd | _        || _        n|| _        || _        || _        d | _        d| _        d | _        d | _        y )NF)	
isinstancer$   r5   r   r.   _concurrency_key_initialized	_task_key_renewal_taskr4   s       r!   r   zConcurrencyLimit.__init__e   sT     mS)-1D'4D!.D"0D
,0"'%)8<r"   c                z    | j                   | j                   n|}t        || j                  | j                        S )zGBind to an ``Annotated`` parameter, inferring argument_name if not set.r6   )r5   r+   r   r.   )r   namevaluer5   s       r!   bind_to_parameterz"ConcurrencyLimit.bind_to_parameterw   s;    .2.@.@.L**RV..**
 	
r"   c           
       K   ddl m} t        j                         }t	        j                         }t        j                         }| j                  xs |j                  }| j                  1	 |j                  | j                        }| d| j                   d| }n| d|j                   }t!        | j                  | j"                  | j                        }	||	_        d|	_        |j(                  |	_        |j-                         4 d {   }
|	j/                  |
|j0                  |j2                         d {   }|st5        ||| j"                        d d d       d {    t7        j8                  |	j;                  |j2                        |j                   d|j(                   	      |	_        |j>                  j                         }|jA                  |	jB                         |jA                  tD        |	j<                  tF               |	S # t        $ rB}t        d| j                   dt        |j                  j                                      |d }~ww xY w7 U7 )7 # 1 d {  7  sw Y   xY ww)
Nr   )_DependszConcurrencyLimit argument 'z*' not found in task arguments. Available: z:concurrency::Tz - concurrency lease:)r?   )$_functionalrC   r   getr   r   r.   r?   r5   get_argumentKeyError
ValueErrorlistkwargskeysfunction_namer+   r   r:   r;   keyr<   redis_acquire_slotredeliveredredelivery_timeoutr   asynciocreate_task_renew_lease_loopr=   stackpush_async_callback_release_slotr   r   )r   rC   r   docketworkerr.   argument_valueer   limitrO   acquiredrV   s                r!   
__aenter__zConcurrencyLimit.__aenter__   sM    )%))+	##%##% 

)fkk)!*!7!78J8J!K 't'9'9&:!N;KL 
 "'}Y5L5L4MNO
 !!3!3T5H5H$**U!0!#-- <<> 	 	U"00y,,f.G.G H (0C0C 	 	 &11##F$=$=>KK= 5imm_E
 ""$!!%"5"56!!+u/B/BDVWW   1$2D2D1E F226y7G7G7L7L7N2O1PR *		 	 	 	s   A)I?,H BI?I I?+I):I#;I)I?#I&$B.I?	I=III?#I)&I?)I</I20I<7I?c                   K   y w)Nr0   )r   exc_type	exc_value	tracebacks       r!   	__aexit__zConcurrencyLimit.__aexit__   s      	s   c           	       K   |j                  d      }t        j                  t        j                        j                         }||j                         z
  }t        t        t        |j                         t        z              } || j                  g| j                  | j                  ||rdnd||g       d{   }t        |      S 7 w)a  Atomically acquire a concurrency slot.

        Uses a Redis sorted set to track concurrency slots per task. Each entry
        is keyed by task_key with the timestamp as the score.

        When XAUTOCLAIM reclaims a message (because the original worker stopped
        renewing its lease), is_redelivery=True signals that slot takeover is safe.
        If the message is NOT a redelivery and a slot already exists, we block to
        prevent duplicate execution.

        Slots are refreshed during lease renewal every redelivery_timeout/4.
        If all slots are full, we scavenge any slot older than redelivery_timeout
        (meaning it hasn't been refreshed and the worker must be dead).
        a9  
            local key = KEYS[1]
            local max_concurrent = tonumber(ARGV[1])
            local task_key = ARGV[2]
            local current_time = tonumber(ARGV[3])
            local is_redelivery = tonumber(ARGV[4])
            local stale_threshold = tonumber(ARGV[5])
            local key_ttl = tonumber(ARGV[6])

            -- Check if this task already has a slot (from a previous delivery attempt)
            local slot_time = redis.call('ZSCORE', key, task_key)
            if slot_time then
                slot_time = tonumber(slot_time)
                if is_redelivery == 1 and slot_time <= stale_threshold then
                    -- Redelivery AND slot is stale: original worker stopped renewing,
                    -- safe to take over the slot.
                    redis.call('ZADD', key, current_time, task_key)
                    redis.call('EXPIRE', key, key_ttl)
                    return 1
                else
                    -- Either not a redelivery, or slot is still fresh (original worker
                    -- is just slow, not dead). Don't take over.
                    return 0
                end
            end

            -- No existing slot for this task - check if we can acquire a new one
            if redis.call('ZCARD', key) < max_concurrent then
                redis.call('ZADD', key, current_time, task_key)
                redis.call('EXPIRE', key, key_ttl)
                return 1
            end

            -- All slots are full. Scavenge any stale slot (not refreshed recently).
            -- Slots are refreshed every redelivery_timeout/4, so anything older than
            -- redelivery_timeout hasn't been refreshed and the worker must be dead.
            local stale_slots = redis.call('ZRANGEBYSCORE', key, 0, stale_threshold, 'LIMIT', 0, 1)
            if #stale_slots > 0 then
                redis.call('ZREM', key, stale_slots[1])
                redis.call('ZADD', key, current_time, task_key)
                redis.call('EXPIRE', key, key_ttl)
                return 1
            end

            return 0
            r   r   rL   argsN)register_scriptr   nowr   utc	timestamptotal_secondsmaxMINIMUM_TTL_SECONDSr$   LEASE_RENEWAL_FACTORr:   r   r<   r,   )	r   rO   is_redeliveryrR   acquire_scriptcurrent_timestale_thresholdkey_ttlresults	            r!   rP   zConcurrencyLimit._acquire_slot   s     * ..-/
b  ||HLL1;;=&);)I)I)KK"0025IIJ

 &''(##"

 

 F|

s   B:C<C=Cc                b  K   | j                   r| j                  sJ t        j                         }|j	                         4 d{   }|j                  d      } || j                   g| j                  g       d{    ddd      d{    y7 N7 7 	# 1 d{  7  sw Y   yxY ww)z/Release a concurrency slot when task completes.Nz
                redis.call('ZREM', KEYS[1], ARGV[1])
                if redis.call('ZCARD', KEYS[1]) == 0 then
                    redis.call('DEL', KEYS[1])
                end
                rf   )r:   r<   r   rF   rO   rh   )r   rY   rO   release_scripts       r!   rX   zConcurrencyLimit._release_slot   s      $$77##%<<> 	V 	VU #22N !t'<'<&=T^^DTUUU	V 	V 	V V	V 	V 	V 	VsZ   AB/BB/	5B>B?BB/BB/BB/B, B#!B,(B/c                  K   t        j                         }|j                         t        z  }t	        t
        t        |j                         t        z              }	 t        j                  |       d{    	 |j                         4 d{   }t        j                  t        j                        j                         }|j                  | j                   | j"                  |i       d{    |j%                  | j                   |       d{    ddd      d{    7 7 7 =7 7 # 1 d{  7  sw Y   xY w# t&        $ r% t(        j+                  d| j                   d       Y Mw xY ww)z:Periodically refresh slot timestamp to prevent expiration.TNz'Concurrency lease renewal failed for %s)exc_info)r   rF   rl   ro   rm   rn   r$   rS   sleeprO   r   ri   r   rj   rk   zaddr:   r<   expire	Exceptionloggerwarning)r   rR   rY   renewal_intervalrt   rO   rr   s          r!   rU   z"ConcurrencyLimit._renew_lease_loop3  sX    ##%-;;=@TT"0025IIJ

 -- 0111!<<> G GU#+<<#=#G#G#IL**--6    ,,t'<'<gFFFG G 1G GG G G G   =))!  s   A3E:5D*6E:;E	 D,E	 AD40D.1#D4D0D4E	 $D2%E	 )E:,E	 .D40D42E	 4E:D=;EE	 	+E74E:6E77E:c                d    | j                   st        d      | j                  J | j                  S )zRedis key used for tracking concurrency for this specific argument value.
        Raises RuntimeError if accessed before initialization.z:ConcurrencyLimit not initialized - use within task context)r;   RuntimeErrorr:   )r   s    r!   r   z ConcurrencyLimit.concurrency_keyO  s=       L  $$000$$$r"   )r   r$   r.   
str | NonereturnNone)r   N)r5   r#   r   r$   r.   r   r   r   )Nr   N)r5   zstr | int | Noner   r$   r.   r   r   r   )r?   r#   r@   r   r   r+   )r   r+   )ra   type[BaseException] | Nonerb   zBaseException | Nonerc   r   r   r   )rO   r   rp   r,   rR   r   r   r,   )r   r   )rR   r   r   r   )r   r#   )r%   r&   r'   r(   r-   __annotations__r	   r   rA   r_   rd   rP   rX   rU   propertyr   r0   r"   r!   r+   r+   /   s}   . FD !HH
 H 
H H    	>> > 	>
 
> >    	; ; 	;
 
; ; +/ 	='= = 	=
 
=$
8t	,	 (	 .		
 
	YY+/YENY	YvV&8 % %r"   r+   )r(   
__future__r   rS   loggingr   r   r   typingr   r   r	   _cancellationr   r   _baser   r   r   r   r   	getLoggerr~   redis.asyncior   r   r   ro   rn   r   r+   r0   r"   r!   <module>r      sy    & "   2 2 / / ;  
		0	1#%    3) 3i%z"45 i%r"   