
    Ki!                     :   U d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZ d dlmZm	Z	m
Z
 d dlmZmZmZmZmZmZmZmZmZ d dlZd dlZd dlZd dlmZmZ ddlmZ d dlmZ d d	lm Z m!Z" dd
l#m$Z$m%Z%m&Z& ddl'm(Z( ddl)m*Z*m+Z+m,Z, erddl-m.Z.m/Z/  ej`                  e1      Z2ejf                  e4d<    G d de5      Z6edee   f   Z7e8e9e9f   Z: G d de      Z;dedef   dejx                  fdZ! G d dejz                        Z> G d d      Z?dejx                  de@fdZAy)    N)contextmanager)datetime	timedeltatimezone)	TYPE_CHECKINGAnyAsyncGenerator	AwaitableCallable	GeneratorMappingProtocolcast)	propagatetrace   )suppress_instrumentation)Self)_signature_cacheget_signature)ExecutionProgressProgressEvent
StateEvent)Logged)
CACHE_SIZEmessage_gettermessage_setter)DocketRedisMessageIDloggerc                       e Zd ZdZy)ExecutionCancelledz<Raised when get_result() is called on a cancelled execution.N)__name__
__module____qualname____doc__     \/home/jay/workspace/scripts/.codegraph-venv/lib/python3.12/site-packages/docket/execution.pyr"   r"   /   s    Fr(   r"   .c                   8    e Zd Zdee   deeez  ez     defdZy)_schedule_taskkeysargsreturnc                    K   y wNr'   )selfr,   r-   s      r)   __call__z_schedule_task.__call__:   s	     s   N)r#   r$   r%   liststrfloatbytesr2   r'   r(   r)   r+   r+   9   s1    I%)#+*=%>	r(   r+   functionr.   c                 f    t        |       }t        j                  t        t              ddi       |S )Ncache	signature)_uncalled_for_get_signaturer   setlenr   )r7   r:   s     r)   r   r   ?   s+    +H5INN3'(7K*@Ar(   c                   2    e Zd ZdZdZ	 dZ	 dZ	 dZ	 dZ	 dZ	y)	ExecutionStatez$Lifecycle states for task execution.	scheduledqueuedrunning	completedfailed	cancelledN)
r#   r$   r%   r&   	SCHEDULEDQUEUEDRUNNING	COMPLETEDFAILED	CANCELLEDr'   r(   r)   r?   r?   E   s5    .ILFSG7I/F I:r(   r?   c                   l   e Zd ZdZ	 	 	 	 d>dddedeedf   deeef   d	ed
e	de
dej                  j                  dz  dededz  de
ddfdZed?d       Zedefd       Zedeedf   fd       Zedeeef   fd       Zedefd       Zedefd       Zedej                  j                  dz  fd       Zedefd       Zede
fd       Zeded   fd       ZdefdZe	 	 d@dddedededz  de f
d        Z!de"eef   fd!Z#de"eee
z  f   fd"Z$d#edefd$Z%defd%Z&de'e(jR                     fd&Z*	 d@d'ed(d)ddfd*Z+d+edefd,Z,ddd-d.e-d/edz  d0edz  ddfd1Z.dAd0edz  ddfd2Z/	 dBd/edz  d0edz  ddfd3Z0dCd4Z1ddd5d6e2dz  d7e	dz  defd8Z3dCd9Z4defd:Z5d;eddfd<Z6de7e8e9z  df   fd=Z:y)D	ExecutionzRepresents a task execution with state management and progress tracking.

    Combines task invocation metadata (function, args, when, etc.) with
    Redis-backed lifecycle state tracking and user-reported progress.
    Ndocketr   r7   r-   .kwargskeywhenattempttrace_contextredeliveredfunction_name
generationr.   c                 ~   || _         || _        |
xs |j                  | _        || _        || _        || _        || _        || _        || _	        |	| _
        || _        t        j                  | _        d | _        d | _        d | _        d | _        d | _        t)        ||      | _        |j-                  d|       | _        y )Nzruns:)_docket	_functionr#   _function_name_args_kwargs_keyrQ   rR   _trace_context_redelivered_generationr?   rF   stateworker
started_atcompleted_aterror
result_keyr   progressrP   
_redis_key)r1   rN   r7   r-   rO   rP   rQ   rR   rS   rT   rU   rV   s               r)   __init__zExecution.__init__b   s     !+@x/@/@
	 	+'% &4%=%=
"&+/-1!%
&* ,=VS+I !**uSE]3r(   c                     | j                   S )zParent docket instance.)rX   r1   s    r)   rN   zExecution.docket        ||r(   c                     | j                   S )zTask function to execute.)rY   rk   s    r)   r7   zExecution.function   s     ~~r(   c                     | j                   S )z"Positional arguments for the task.)r[   rk   s    r)   r-   zExecution.args   s     zzr(   c                     | j                   S )zKeyword arguments for the task.)r\   rk   s    r)   rO   zExecution.kwargs   rl   r(   c                     | j                   S )zUnique task identifier.)r]   rk   s    r)   rP   zExecution.key   s     yyr(   c                     | j                   S )z_Name of the task function (from message, may differ from function.__name__ for fallback tasks).)rZ   rk   s    r)   rU   zExecution.function_name        """r(   c                     | j                   S )zOpenTelemetry trace context.)r^   rk   s    r)   rS   zExecution.trace_context   rr   r(   c                     | j                   S )z%Whether this message was redelivered.)r_   rk   s    r)   rT   zExecution.redelivered   s        r(   c                     | j                   S )z9Scheduling generation counter for supersession detection.)r`   rk   s    r)   rV   zExecution.generation   s     r(   )NNNc              #      K   | j                   j                  st               5  d ddd       yd y# 1 sw Y   yxY ww)zASuppress OTel auto-instrumentation for internal Redis operations.N)rX   enable_internal_instrumentationr   rk   s    r)   _maybe_suppress_instrumentationz)Execution._maybe_suppress_instrumentation   s<      ||;;)+    s   !A6A?Ac           	         | j                   j                         | j                  j                         j                         | j                  j                         t        j                  | j                        t        j                  | j                        t        | j                        j                         t        | j                        j                         dS )N)   key   when   function   args   kwargs   attempt
   generation)rP   encoderQ   	isoformatrU   cloudpickledumpsr-   rO   r4   rR   rV   rk   s    r)   
as_messagezExecution.as_message   s    HHOO%YY((*113++224 &&tyy1"((5DLL)002t/668
 	
r(   messagefallback_taskc                 H  K   |d   j                         }|j                  j                  |      x}s|t        d|d      |} | ||t	        j
                  |d         t	        j
                  |d         |d   j                         t        j                  |d   j                               t        |d   j                               t        j                  |t        	      ||t        |j                  d
d                  }|j                          d {    |S 7 w)Nr|   zTask function z* is not registered with the current docketr}   r~   rz   r{   r   )getterr      0)rN   r7   r-   rO   rP   rQ   rR   rS   rT   rU   rV   )decodetasksget
ValueErrorr   loadsr   fromisoformatintr   extractr   sync)clsrN   r   rT   r   rU   r7   instances           r)   from_messagezExecution.from_message   s      ,335"LL,,];;;$ $]$55_`  %H""77#34$$WY%78&&(''(8(?(?(AB
+2245#++GNK#'7;;}d;<
 mmo 	s   DD"D D"c                     d| j                   iS )Ndocket.task)rU   rk   s    r)   general_labelszExecution.general_labels   s    t1122r(   c                 |    | j                   | j                  | j                  j                         | j                  dS )N)r   z
docket.keyzdocket.whenzdocket.attempt)rU   rP   rQ   r   rR   rk   s    r)   specific_labelszExecution.specific_labels   s3    --((99..0"ll	
 	
r(   	parameterc                     t        | j                        } |j                  | j                  i | j                  }|j
                  |   S r0   )r   r7   bindr-   rO   	arguments)r1   r   r:   
bound_argss       r)   get_argumentzExecution.get_argument   s>    !$--0	#Y^^TYY>$++>
##I..r(   c                    g }| j                   }t        | j                        }t        j                  |      }t        |j                  j                               }t        | j                  d t        |             D ]O  \  }}||   }|j                  |      x}	r!|j                  |	j                  |             ?|j                  d       Q | j                  j                         D ]R  \  }}|j                  |      x}	r&|j                  | d|	j                  |              ?|j                  | d       T | ddj!                  |       d| j"                   dS )N...=z=...(, z){})rU   r   r7   r   annotated_parametersr3   
parametersr,   	enumerater-   r=   r   appendformatrO   itemsjoinrP   )
r1   r   rU   r:   logged_parametersparameter_namesiargumentparameter_nameloggeds
             r)   	call_reprzExecution.call_repr   sT   !	**!$--0	"77	By3388:;$TYY/E_1E%FG 	(KAx,Q/N*..~>>v>  x!89  '	( )-(9(9(; 	:$NH*..~>>v>  N#31V]]85L4M!NO  N#34!89		:  $))I"6!7s488*BGGr(   c                     t        j                  | j                        }|j                         }|j                  rt        j
                  |      gS g S r0   )r   get_current_spanrS   get_span_contextis_validLink)r1   initiating_spaninitiating_contexts      r)   incoming_span_linkszExecution.incoming_span_links  sH    001C1CD,==?3E3N3N

-./VTVVr(   replacereschedule_messagezRedisMessageID | Nonec                   K   | j                         }t        j                  |t               | j                  }| j
                  }| j                  j                  |      }|t        j                  t        j                        k  }| j                  j                         4 d{   }|j                  | dd      4 d{    t        t        |j!                  d            }	 |	| j                  j"                  || j                  j%                  |      | j                  j&                  | j                  j)                  |      | j*                  g|t-        |j/                               |rdnd|rdnd|xs d	| j                  j0                  g|j3                         D 
cg c]  \  }
}|
|fD ]  }|  c}}}

       d{    ddd      d{    ddd      d{    |rXt4        j6                  | _        | j;                  t4        j6                  j<                  |j?                         d       d{    y|rXt4        j@                  | _        | j;                  t4        j@                  j<                  |j?                         d       d{    yt4        j6                  | _        | j;                  t4        j6                  j<                  |j?                         d       d{    y7 V7 :c c}}}
w 7 >7 1# 1 d{  7  sw Y   BxY w7 :# 1 d{  7  sw Y   KxY w7 7 7 Kw)a\  Schedule this task atomically in Redis.

        This performs an atomic operation that:
        - Adds the task to the stream (immediate) or queue (future)
        - Writes the execution state record
        - Tracks metadata for later cancellation

        Usage patterns:
        - Normal add: schedule(replace=False)
        - Replace existing: schedule(replace=True)
        - Reschedule from stream: schedule(reschedule_message=message_id)
          This atomically acknowledges and deletes the stream message, then
          reschedules the task to the queue. Prevents both task loss and
          duplicate execution when rescheduling tasks (e.g., due to concurrency limits).

        Args:
            replace: If True, replaces any existing task with the same key.
                    If False, raises an error if the task already exists.
            reschedule_message: If provided, atomically acknowledges and deletes
                    this stream message ID before rescheduling the task to the queue.
                    Used when a task needs to be rescheduled from an active stream message.
        )setterNz:lock
   timeouta   
                            local stream_key = KEYS[1]
                            -- TODO: Remove in next breaking release (v0.14.0) - legacy key locations
                            local known_key = KEYS[2]
                            local parked_key = KEYS[3]
                            local queue_key = KEYS[4]
                            local stream_id_key = KEYS[5]
                            local runs_key = KEYS[6]

                            local task_key = ARGV[1]
                            local when_timestamp = ARGV[2]
                            local is_immediate = ARGV[3] == '1'
                            local replace = ARGV[4] == '1'
                            local reschedule_message_id = ARGV[5]
                            local worker_group_name = ARGV[6]

                            -- Extract message fields from ARGV[7] onwards
                            local message = {}
                            local function_name = nil
                            local args_data = nil
                            local kwargs_data = nil
                            local generation_index = nil

                            for i = 7, #ARGV, 2 do
                                local field_name = ARGV[i]
                                local field_value = ARGV[i + 1]
                                message[#message + 1] = field_name
                                message[#message + 1] = field_value

                                -- Extract task data fields for runs hash
                                if field_name == 'function' then
                                    function_name = field_value
                                elseif field_name == 'args' then
                                    args_data = field_value
                                elseif field_name == 'kwargs' then
                                    kwargs_data = field_value
                                elseif field_name == 'generation' then
                                    generation_index = #message
                                end
                            end

                            -- Handle rescheduling from stream: atomically ACK message and reschedule to queue
                            -- This prevents both task loss (ACK before reschedule) and duplicate execution
                            -- (reschedule before ACK with slow reschedule causing redelivery)
                            if reschedule_message_id ~= '' then
                                -- Acknowledge and delete the message from the stream
                                redis.call('XACK', stream_key, worker_group_name, reschedule_message_id)
                                redis.call('XDEL', stream_key, reschedule_message_id)

                                -- Increment generation counter
                                local new_gen = redis.call('HINCRBY', runs_key, 'generation', 1)
                                if generation_index then
                                    message[generation_index] = tostring(new_gen)
                                end

                                -- Park task data for future execution
                                redis.call('HSET', parked_key, unpack(message))

                                -- Add to sorted set queue
                                redis.call('ZADD', queue_key, when_timestamp, task_key)

                                -- Update state in runs hash (clear stream_id since task is no longer in stream)
                                redis.call('HSET', runs_key,
                                    'state', 'scheduled',
                                    'when', when_timestamp,
                                    'function', function_name,
                                    'args', args_data,
                                    'kwargs', kwargs_data
                                )
                                redis.call('HDEL', runs_key, 'stream_id')

                                return 'OK'
                            end

                            -- Handle replacement: cancel existing task if needed
                            if replace then
                                -- Get stream ID from runs hash (check new location first)
                                local existing_message_id = redis.call('HGET', runs_key, 'stream_id')

                                -- TODO: Remove in next breaking release (v0.14.0) - check legacy location
                                if not existing_message_id then
                                    existing_message_id = redis.call('GET', stream_id_key)
                                end

                                if existing_message_id then
                                    redis.call('XDEL', stream_key, existing_message_id)
                                end

                                redis.call('ZREM', queue_key, task_key)
                                redis.call('DEL', parked_key)

                                -- TODO: Remove in next breaking release (v0.14.0) - clean up legacy keys
                                redis.call('DEL', known_key, stream_id_key)

                                -- Note: runs_key is updated below, not deleted
                            else
                                -- Check if task already exists (check new location first, then legacy)
                                local known_exists = redis.call('HEXISTS', runs_key, 'known') == 1
                                if not known_exists then
                                    -- Check if task is currently running (known field deleted at claim time)
                                    local state = redis.call('HGET', runs_key, 'state')
                                    if state == 'running' then
                                        return 'EXISTS'
                                    end
                                    -- TODO: Remove in next breaking release (v0.14.0) - check legacy location
                                    known_exists = redis.call('EXISTS', known_key) == 1
                                end
                                if known_exists then
                                    return 'EXISTS'
                                end
                            end

                            -- Increment generation counter
                            local new_gen = redis.call('HINCRBY', runs_key, 'generation', 1)
                            if generation_index then
                                message[generation_index] = tostring(new_gen)
                            end

                            if is_immediate then
                                -- Add to stream for immediate execution
                                local message_id = redis.call('XADD', stream_key, '*', unpack(message))

                                -- Store state and metadata in runs hash
                                redis.call('HSET', runs_key,
                                    'state', 'queued',
                                    'when', when_timestamp,
                                    'known', when_timestamp,
                                    'stream_id', message_id,
                                    'function', function_name,
                                    'args', args_data,
                                    'kwargs', kwargs_data
                                )
                            else
                                -- Park task data for future execution
                                redis.call('HSET', parked_key, unpack(message))

                                -- Add to sorted set queue
                                redis.call('ZADD', queue_key, when_timestamp, task_key)

                                -- Store state and metadata in runs hash
                                redis.call('HSET', runs_key,
                                    'state', 'scheduled',
                                    'when', when_timestamp,
                                    'known', when_timestamp,
                                    'function', function_name,
                                    'args', args_data,
                                    'kwargs', kwargs_data
                                )
                            end

                            return 'OK'
                            10r(   r,   r-   )ra   rQ   )!r   r   injectr   rP   rQ   rN   known_task_keyr   nowr   utcredislockr   r+   register_script
stream_keyparked_task_key	queue_keystream_id_keyrh   r4   	timestampworker_group_namer   r?   rF   ra   _publish_statevaluer   rG   )r1   r   r   r   rP   rQ   r   is_immediater   schedule_scriptfieldr   items                r)   schedulezExecution.schedule  s    2 '+oo&78hhyy33C8x||HLL99;;$$& z	 z	%zz^$4E":BzG x x"&"))W [^#@ &..&33C8--11#6 DNN,-+&C*1c55 18  ,u). !% !   Ex xz	 z	z '11DJ%%(2288$..BRS   '..DJ%%(//55t~~?OP   (11DJ%%(2288$..BRS  Uz	xd!Ex x x xz	 z	 z	 z	@

s   B#M%K;&M)L'K>L'CL'L<L
LLL'LL'M*L$+AML=AML?AM5M6M>L'LL'L!	LL!	L'$M'L:-L0.L:5	M?MMrb   c           
        K   t        j                  t        j                        }|j	                         }| j                         5  | j                  j                         4 d{   }|j                  d      } || j                  | j                  j                  | j                  j                  | j                        | j                  j                  | j                        g||t        | j                        g       d{   }ddd      d{    ddd       dk(  ryt         j"                  | _        || _        || _        d| j                  _        d| j                  _        | j/                  t         j"                  j0                  ||d       d{    y	7 ?7 7 # 1 d{  7  sw Y   xY w# 1 sw Y   xY w7 -w)
a  Atomically check supersession and claim task in a single round-trip.

        This consolidates worker operations when claiming a task into a single
        atomic Lua script that:
        - Checks if the task has been superseded by a newer generation
        - Sets state to RUNNING with worker name and timestamp
        - Initializes progress tracking (current=0, total=100)
        - Deletes known/stream_id fields to allow task rescheduling
        - Cleans up legacy keys for backwards compatibility

        Args:
            worker: Name of the worker claiming the task

        Returns:
            True if the task was claimed, False if it was superseded.
        Na  
                    local runs_key = KEYS[1]
                    local progress_key = KEYS[2]
                    -- TODO: Remove in next breaking release (v0.14.0) - legacy key locations
                    local known_key = KEYS[3]
                    local stream_id_key = KEYS[4]

                    local worker = ARGV[1]
                    local started_at = ARGV[2]
                    local generation = tonumber(ARGV[3])

                    -- Check supersession: generation > 0 means tracking is active
                    if generation > 0 then
                        local current = redis.call('HGET', runs_key, 'generation')
                        if not current then
                            -- Runs hash was cleaned up (execution_ttl=0 after
                            -- a newer generation completed).  This message is stale.
                            return 'SUPERSEDED'
                        end
                        if tonumber(current) > generation then
                            return 'SUPERSEDED'
                        end
                    end

                    -- Update execution state to running
                    redis.call('HSET', runs_key,
                        'state', 'running',
                        'worker', worker,
                        'started_at', started_at
                    )

                    -- Initialize progress tracking
                    redis.call('HSET', progress_key,
                        'current', '0',
                        'total', '100'
                    )

                    -- Delete known/stream_id fields to allow task rescheduling
                    redis.call('HDEL', runs_key, 'known', 'stream_id')

                    -- TODO: Remove in next breaking release (v0.14.0) - legacy key cleanup
                    redis.call('DEL', known_key, stream_id_key)

                    return 'OK'
                    r   s
   SUPERSEDEDFr   d   )ra   rb   rc   T)r   r   r   r   r   rx   rN   r   r   rh   rg   r   rP   r   r4   r`   r?   rH   ra   rb   rc   currenttotalr   r   )r1   rb   rc   started_at_isor   claim_scriptresults          r)   claimzExecution.claim  s    " \\(,,/
#--/113 <	{{((* ; ;e$44,0 d  ,0022488<11$((;	 !.#d6F6F2GH  g; ;<	| ]" $++
$ !! !!'//55 ,
 	
 	
 a;fg; ; ; ;<	 <	R	
s   AGG %F$&G )BF+F'F+G F)G BGGG$G 'F+)G +F=	1F42F=	9G  G	Gre   rf   ra   re   rf   c                  K   t        j                  t        j                        j	                         }g }|r|j                  d|g       ||j                  d|g       | j                  j                  r-t        | j                  j                  j                               nd}| j                         5  | j                  j                         4 d{   }|j                  d      } || j                  gt        | j                        |j                   |t        |      g|       d{    ddd      d{    ddd       || _        ||| _        | j&                  j)                          d{    |j                   |d}	|r||	d<   | j+                  |	       d{    y7 7 7 r# 1 d{  7  sw Y   xY w# 1 sw Y   xY w7 [7 .w)a  Mark task as having reached a terminal state.

        Args:
            state: The terminal state (COMPLETED, FAILED, or CANCELLED)
            error: Optional error message (for FAILED state)
            result_key: Optional key where the result/exception is stored

        Uses a Lua script to atomically check supersession and write the
        terminal state in a single round-trip.  If the runs hash has been
        claimed by a successor (e.g. a Perpetual on_complete already called
        docket.replace()), the hash is left untouched.

        Progress data and the pub/sub completion event are always handled
        regardless of supersession.
        re   Nrf   r   a.  
                    local runs_key = KEYS[1]
                    local generation = tonumber(ARGV[1])
                    local state = ARGV[2]
                    local completed_at = ARGV[3]
                    local ttl_seconds = tonumber(ARGV[4])

                    -- Check supersession (generation 0 = pre-tracking, always write)
                    if generation > 0 then
                        local current = redis.call('HGET', runs_key, 'generation')
                        if current and tonumber(current) > generation then
                            return 'SUPERSEDED'
                        end
                    end

                    -- Build HSET args: state + completed_at + any extras
                    local hset_args = {'state', state, 'completed_at', completed_at}
                    for i = 5, #ARGV, 2 do
                        hset_args[#hset_args + 1] = ARGV[i]
                        hset_args[#hset_args + 1] = ARGV[i + 1]
                    end
                    redis.call('HSET', runs_key, unpack(hset_args))

                    if ttl_seconds > 0 then
                        redis.call('EXPIRE', runs_key, ttl_seconds)
                    else
                        redis.call('DEL', runs_key)
                    end

                    return 'OK'
                    r   )ra   rd   )r   r   r   r   r   extendrN   execution_ttlr   total_secondsrx   r   r   rh   r4   r`   r   ra   rf   rg   deleter   )
r1   ra   re   rf   rd   extra_fieldsttl_secondsr   terminal_script
state_datas
             r)   _mark_as_terminalzExecution._mark_as_terminalr  s    ,  ||HLL1;;= #%% 01!z :; {{(( ))779: 	 113 0	{{((* / /e"'"7"7	##J &//*D,,-$K(	
 &	 	 	M/ /0	d 
!(DOmm""$$$ [[(&

 "'Jw!!*---{/L	M/ / / /0	 0	l 	% 	.s   B3G5GF-GAF3.F//F33G>F1?G5G8G9.G'G(G-G/F31G3G	9F<:G	GGGGc                 b   K   | j                  t        j                  |       d{    y7 w)zMark task as completed successfully.

        Args:
            result_key: Optional key where the task result is stored
        )rf   N)r   r?   rI   )r1   rf   s     r)   mark_as_completedzExecution.mark_as_completed  s'      $$^%=%=*$UUUs   %/-/c                 d   K   | j                  t        j                  ||       d{    y7 w)zMark task as failed.

        Args:
            error: Optional error message describing the failure
            result_key: Optional key where the exception is stored
        r   N)r   r?   rJ   )r1   re   rf   s      r)   mark_as_failedzExecution.mark_as_failed  s2      $$!!: % 
 	
 	
s   &0.0c                 ^   K   | j                  t        j                         d{    y7 w)zMark task as cancelled.N)r   r?   rK   rk   s    r)   mark_as_cancelledzExecution.mark_as_cancelled  s      $$^%=%=>>>s   #-+-)r   deadliner   r   c                    
K   ||t        d      |&t        j                  t        j                        |z   }t
        j                  t
        j                  t
        j                  f
 j                  
vrd}|R|t        j                  t        j                        z
  j                         }|dk  rt        d j                   d      	  
fd}t        j                   |       |       d{     j                  t
        j                  k(  rt        d j                   d	       j                  t
        j                  k(  r j                   rl j"                  j$                  j'                   j                          d{   }|r3d
|v r/t)        j*                  |d
         }t-        j.                  |      }| j0                  xs d}t3        |       j                   rj j"                  j$                  j'                   j                          d{   }|1d
|v r-t)        j*                  |d
         }	t-        j.                  |	      S y7 b# t        j                  $ r t        d j                   d      w xY w7 7 mw)a'  Retrieve the result of this task execution.

        If the execution is not yet complete, this method will wait using
        pub/sub for state updates until completion.

        Args:
            timeout: Optional duration to wait before giving up.
                    If None and deadline is None, waits indefinitely.
            deadline: Optional absolute datetime when to stop waiting.
                     If None and timeout is None, waits indefinitely.

        Returns:
            The result of the task execution, or None if the task returned None.

        Raises:
            ValueError: If both timeout and deadline are provided
            Exception: If the task failed, raises the stored exception
            TimeoutError: If timeout/deadline is reached before execution completes
        Nz(Cannot specify both timeout and deadliner   zTimeout waiting for execution z to completec                     K   j                         2 3 d {   } | d   dk(  st        | d         }|v s%j                          d {     y 7 :7 6 y w)Ntypera   )	subscriber?   r   )eventra   r1   terminal_statess     r)   wait_for_completionz1Execution.get_result.<locals>.wait_for_completion&  sb     '+~~'7 & &e =G3$25>$BE$7&*iik 1 1 %&
 !2 (8sA   AAAA	AAAAAAAAr   z
Execution z was cancelleddatazTask execution failed)r   r   r   r   r   r?   rI   rJ   rK   ra   r   TimeoutErrorrP   asynciowait_forr"   rf   rN   result_storager   base64	b64decoder   r   re   	Exception)r1   r   r   timeout_secondsr   result_datapickled_exception	exception	error_msgpickled_resultr   s   `         @r)   
get_resultzExecution.get_result  sJ    4 8#7GHH ||HLL1G;H $$!!$$
 ::_,"O#x||HLL99-/   #a'&8
,O & &&':'<oVVV ::111$z$((>%JKK ::...$(KK$>$>$B$B4??$SS6[#8(.(8(8V9L(M% + 1 12C DI#O

=&=II&& ?? $ : : > >t OOK&6[+@!'!1!1+f2E!F"((88 C W'' "4TXXJlK  T PsP   CJ%I 5I6I :BJJ
BJJ8JI -JJJc                   K   | j                         5  | j                  j                         4 d{   }|j                  | j                         d{   }|r |j                  d      }|r0t        |t              r|j                         }t        |      | _
        d|v r|d   j                         nd| _        d|v r&t        j                  |d   j                               nd| _        d|v r&t        j                  |d   j                               nd| _        d|v r|d   j                         nd| _        d|v r|d   j                         nd| _        n8t        j$                  | _
        d| _        d| _        d| _        d| _        d| _        ddd      d{    ddd       | j&                  j)                          d{    y7 7 ~7 5# 1 d{  7  sw Y   ExY w# 1 sw Y   IxY w7 .w)zSynchronize instance attributes with current execution data from Redis.

        Updates self.state, execution metadata, and progress data from Redis.
        Sets attributes to None if no data exists.
        Ns   states   workers
   started_ats   completed_ats   errors
   result_key)rx   rN   r   hgetallrh   r   
isinstancer6   r   r?   ra   rb   r   r   rc   rd   re   rf   rF   rg   r   )r1   r   r   state_values       r)   r   zExecution.syncS  s     113 $	+{{((* #+ #+e"]]4??;;"&((8"4K"%k59*5*<*<*>K%3K%@
 5>4EY..04 K
 )D0 !..tM/B/I/I/KL! O +d2 !..tO/D/K/K/MN! %
 =E<Lh!6!6!8RVDJ8E8M]+224SW O
 "0!9!9DJ"&DK&*DO(,D%!%DJ&*DOG#+ #+$	+N mm  """M#+;#+ #+ #+ #+$	+ $	+N 	#s   G;G-GG-GGD?GG- G!G-%%G;
G9G;G-GG-G*	G!G*	&G--G62G;c                   K   | j                   dk(  ry| j                         5  | j                  j                         4 d{   }|j	                  | j
                  d       d{   }ddd      d{    ddd       t        |      nd}|| j                   kD  S 7 ^7 <7 .# 1 d{  7  sw Y   >xY w# 1 sw Y   BxY ww)a9  Check whether a newer schedule has superseded this execution.

        Compares this execution's generation against the current generation
        stored in the runs hash. If the stored generation is strictly greater,
        this execution has been superseded by a newer schedule() call.

        Generation 0 means the message predates generation tracking (e.g. it
        was moved from queue to stream by an older worker's scheduler that
        doesn't pass through the generation field). These are never considered
        superseded since we can't tell.
        r   FNrV   )r`   rx   rN   r   hgetrh   r   )r1   r   r   current_gens       r)   is_supersededzExecution.is_superseded  s      q 113 	J{{((* J Je %

4??L IIJ J	J '.&9c'lqT----JIJ J J J	J 	Jsv   !CB<B!B< B'&B#'B'+B<6B%7B<;&C!B<#B'%B<'B9	-B0.B9	5B<<CCr   c                    K   | j                   j                  d| j                         }d| j                  d|}| j                   j                  |t        j                  |             d{    y7 w)znPublish state change to Redis pub/sub channel.

        Args:
            data: State data to publish
        state:ra   )r   rP   N)rN   rP   _publishjsonr   )r1   r   channelpayloads       r)   r   zExecution._publish_state  se      ++//F488*"5688
 

 kk""7DJJw,?@@@s   A,A6.A4/A6c           	       K   | j                          d{    d| j                  | j                  | j                  j	                         | j
                  | j                  r| j                  j	                         nd| j                  r| j                  j	                         nd| j                  d}| d| j                  | j                  j                  | j                  j                  | j                  j                  | j                  j                  r$| j                  j                  j	                         ndd}| | j                  j                  d| j                         }| j                  j                  d| j                         }| j                  j                         4 d{   }|j!                  ||       d{    |j#                         2 3 d{   }|d   d	k(  st%        j&                  |d
         }|d   dk(  rt)        |d         |d<   | I7 7 z7 b7 K6 ddd      d{  7   y# 1 d{  7  sw Y   yxY ww)a  Subscribe to both state and progress updates for this task.

        Emits the current state as the first event, then subscribes to real-time
        state and progress updates via Redis pub/sub.

        Yields:
            Dict containing state or progress update events with a 'type' field:
            - For state events: type="state", state, worker, timestamps, error
            - For progress events: type="progress", current, total, message, updated_at
        Nra   )r   rP   ra   rQ   rb   rc   rd   re   rg   )r   rP   r   r   r   
updated_atr  z	progress:r   r   r   )r   rP   ra   rQ   r   rb   rc   rd   re   rg   r   r   r   r  rN   _pubsubr   listenr  r   r?   )r1   initial_stateprogress_eventstate_channelprogress_channelpubsubr   message_datas           r)   r   zExecution.subscribe  s     iik 88ZZII'')kk9=$//335d151B1B!!++-ZZ%
  88}},,]]((}},,}}'' --22<<>	)
  &
(;<;;??Ytxxj+AB;;&&( 	' 	'F""=2BCCC!' ' 'g6?i/#'::gfo#>L#F+w60>|G?T0UW-&&S 	D	'C'	' 	' 	' 	' 	's   IH+FI3H.4I7IH0I"H4&H2'H4*	I47I+I.I0I2H44I5I IIIIII)NFNr   )r.   r   )FNr0   )NN)r.   N);r#   r$   r%   r&   TaskFunctiontupler   dictr4   r   r   opentelemetrycontextContextboolri   propertyrN   r7   r-   rO   rP   rU   rS   rT   rV   r   r   rx   Messager   classmethodr   r   r   r   r   r   r   r3   r   r   r   r   r   r?   r   r   r   r   r   r  r   r  r   r	   r   r   r   r'   r(   r)   rM   rM   [   s    ?C!$()4)4 )4 CHo	)4
 S#X)4 )4 )4 )4 %,,44t;)4 )4 Tz)4 )4 
)4X   ,   eCHo   S#X   S   #s # #
 #}44<<tC # # !T ! !  C     ;K1L  	
G 	
 
 "-1  	
 $d* 
 >3S 1 3
cCi!8 
/c /c /
H3 H.WT%**%5 W TXmm9Pm	m^e# e$ eV !!%c.c. Tz	c.
 $Jc. 
c.JV#* V V BF
4Z
47$J
	
? %)$(	a T!a T/	a
 
aF-#^.T .(A A$ A5'
]0JD0P!Q 5'r(   rM   r:   c                 B   g }d}| j                   j                         D ]  }t        |j                  t        j
                        r|dz  }-|j                  }|j                  |j                  urN|j                  }t        |d      r|j                  d   }t        |dt        |            }|j                   d| }|j                  |j                  ur| d|j                  }|j                  |        |dkD  r|j                  d       dj                  |      S )	Nr   r   
__origin__r#   z: z = r   r   )r   valuesr  defaultuncalled_for
Dependencyname
annotationemptyhasattr__args__getattrr4   r   r   )r:   r   dependenciesr   parameter_definitionr5  	type_names          r)   compact_signaturer=    s   JL))002 0	i'')@)@AAL(~~y6"--Jz<0'003

JJHI&/nn%5R	{#C IOO3&:%;3y?P?P>S#T ./#0& a% 99Z  r(   )Br   r  enuminspectr  logging
contextlibr   r   r   r   typingr   r   r	   r
   r   r   r   r   r   r   opentelemetry.contextr'  r2  r   r   
_telemetryr   typing_extensionsr   uncalled_for.introspectionr   r   r;   _execution_progressr   r   r   annotationsr   instrumentationr   r   r   rN   r   r   	getLoggerr#   r    Logger__annotations__r  r"   r$  r&  r6   r,  r+   	SignatureEnumr?   rM   r4   r=  r'   r(   r)   <module>rO     s         % 2 2
 
 
    * 0 "
 N M  G G.***84 4	 	 Ys^+,
ue|
X HS#X. 73D3D ;TYY ;,~' ~'B!!2!2 !s !r(   