
    Ki0                       U d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlZd dlZd dlZd dlZd dlmZmZmZ d dlmZ d dlmZmZmZmZ esej6                  dk\  rd dlZnd dlmZ er,d dlmZ d d	lm Z m!Z!m"Z" d d
l#m$Z$m%Z%m&Z&  ede       Z' e$d      Z(e)e*e+   dz  e+dz  edz  f   Z,de-d<    ed      Z. edd      Z/ edd      Z0ejb                  d%d       Z2ejb                  d&d'd       Z3d Z4	 	 	 	 	 	 d(dZ5 e4ejl                  e5      Z7	 	 	 	 d)dZ8 e8e2e3      Z9	 	 	 	 	 	 	 	 	 d*dZ:d+dZ;ejb                  e	jx                  fd,d       Z= ej|                  e= e;             Z?ejb                  dde?f	 	 	 	 	 	 	 	 	 d-d       Z@ G d d       ZA G d! d"ej                  ej                        ZB G d# d$ej                        ZDy).    )annotationsN)Callable	GeneratorIterator)TracebackType)TYPE_CHECKINGLiteralTypeVarcast)      )tarfile)	TypeAlias)FileDescriptorOrPath
OptExcInfoStrPath)	ParamSpecSelfUnpack_FileDescriptorOrPathT)bound_Pr   _UnpackableOptExcInfo_R_T1_coT)	covariant_T2_coc              #     K   t        j                         }t        j                  |        	 |  t        j                  |       y# t        j                  |       w xY ww)z
    >>> tmp_path = getfixture('tmp_path')
    >>> with pushd(tmp_path):
    ...     assert os.getcwd() == os.fspath(tmp_path)
    >>> assert os.getcwd() != os.fspath(tmp_path)
    N)osgetcwdchdir)dirorigs     c/home/jay/workspace/scripts/.codegraph-venv/lib/python3.12/site-packages/jaraco/context/__init__.pypushdr%   4   s>      99;DHHSM	
s   *A!A A!AA!c              #    K   |?t         j                  j                  |       j                  dd      j                  dd      }t        j                  |       	 t
        j                  j                  |       }t        j                  |d      5 }|j                  |t               ddd       | t        j                  |       y# 1 sw Y   #xY w# t        j                  |       w xY ww)au  
    Get a URL to a tarball, download, extract, yield, then clean up.

    Assumes everything in the tarball is prefixed with a common
    directory. That common path is stripped and the contents
    are extracted to ``target_dir``, similar to passing
    ``-C {target} --strip-components 1`` to the ``tar`` command.

    Uses the streaming protocol to extract the contents from a
    stream in a single pass without loading the whole file into
    memory.

    >>> import urllib.request
    >>> url = getfixture('tarfile_served')
    >>> target = getfixture('tmp_path') / 'out'
    >>> tb = tarball(url, target_dir=target)
    >>> import pathlib
    >>> with tb as extracted:
    ...     contents = pathlib.Path(extracted, 'contents.txt').read_text(encoding='utf-8')
    >>> assert not os.path.exists(extracted)

    If the target is not specified, contents are extracted to a
    directory relative to the current working directory named after
    the name of the file as extracted from the URL.

    >>> target = getfixture('tmp_path')
    >>> with pushd(target), tarball(url):
    ...     target.joinpath('served').is_dir()
    True
    Nz.tar.gz z.tgzzr|*)fileobjmode)pathfilter)r   r*   basenamereplacemkdirurllibrequesturlopenr   open
extractall_default_filtershutilrmtree)url
target_dirreqtfs       r$   tarballr;   E   s     @ WW%%c*229bAII&RTU
HHZ"nn$$S)\\#E2 	CbMMz/MB	Cj!		C 	C 	j!s6   AC06C C
(C 4C0
CC C--C0c                 8    d }t        j                  || d       S )Nc                      fdS )Nc                $      | |      |      S N )memberr*   f1f2s     r$   <lambda>z?_compose_tarfile_filters.<locals>.compose_two.<locals>.<lambda>s   s    Br&$'7$>     r@   )rB   rC   s   ``r$   compose_twoz-_compose_tarfile_filters.<locals>.compose_twor   s	    >>rE   c                    | S r?   r@   )rA   r*   s     r$   rD   z*_compose_tarfile_filters.<locals>.<lambda>u   s    v rE   )	functoolsreduce)filtersrF   s     r$   _compose_tarfile_filtersrK   q   s    ? K2MNNrE   c                N    | j                   j                  dd      \  }| _         | S )N/   )namesplit)rA   r*   _s      r$   strip_first_componentrR   x   s%     [[&&sA.NAv{MrE   c                 T    	 	 	 	 	 	 dd}t        j                  |t        |             S )a  
    Compose any number of dependent context managers into a single one.

    The last, innermost context manager may take arbitrary arguments, but
    each successive context manager should accept the result from the
    previous as a single parameter.

    Like :func:`jaraco.functools.compose`, behavior works from right to
    left, so the context manager should be indicated from outermost to
    innermost.

    Example, to create a context manager to change to a temporary
    directory:

    >>> temp_dir_as_cwd = _compose(pushd, temp_dir)
    >>> with temp_dir_as_cwd() as dir:
    ...     assert os.path.samefile(os.getcwd(), dir)
    c                >     d fd}t        j                  |      S )Nc               ?     K    | i |5 } |      5 }| d d d        d d d        y # 1 sw Y   xY w# 1 sw Y   y xY wwr?   r@   )argskwargssavedresinnerouters       r$   composedz/_compose.<locals>.compose_two.<locals>.composed   sL     '' 5%, #	     s(   	A	7+7	A4	7A A)rV   _P.argsrW   	_P.kwargsreturnzGenerator[_T2_co])
contextlibcontextmanager)rZ   r[   r\   s   `` r$   rF   z_compose.<locals>.compose_two   s    	 ((22rE   )rZ   z7Callable[_P, contextlib.AbstractContextManager[_T1_co]]r[   z=Callable[[_T1_co], contextlib.AbstractContextManager[_T2_co]]r_   9Callable[_P, contextlib._GeneratorContextManager[_T2_co]])rH   rI   reversed)cmgrsrF   s     r$   _composere      s;    83F3L3 
C3 K%99rE   c                J   |\  }}}| t         j                  t         j                  t         j                  fv rl|j                  t        j
                  k(  rOt        j                  |t        j                  t        j                  z  t        j                  z          | |       y )z>
    Add support for removing read-only files on Windows.
    N)r   rmdirremoveunlinkerrnoEACCESchmodstatS_IRWXUS_IRWXGS_IRWXO)funcr*   exc_inforQ   excs        r$   remove_readonlyrt      sg     IAsA"))RYY//CII4M
t||dll2T\\ABT
rE   c                     t        j                         dk(  r3t        j                  t	        dt
        j                        t              S t
        j                  S )NWindowsCallable[..., None])onerror)platformsystemrH   partialr   r5   r6   rt   r@   rE   r$   robust_removerr|      sH     ??	) 	&6#	
 ]]rE   c              #  h   K   t        j                         }	 |  | |       y#  | |       w xY ww)z
    Create a temporary directory context. Pass a custom remover
    to override the removal behavior.

    >>> import pathlib
    >>> with temp_dir() as the_dir:
    ...     assert os.path.isdir(the_dir)
    >>> assert not os.path.exists(the_dir)
    N)tempfilemkdtemp)removertemp_dirs     r$   r   r      s0      !Hs   2% 	2
/2)r   c              #     K   d| v rdnd} |       5 }|d| |g}|j                  d|gt        |      z         |rt        j                  nd}t        j                  |||       | ddd       y# 1 sw Y   yxY ww)a  
    Check out the repo indicated by url.

    If dest_ctx is supplied, it should be a context manager
    to yield the target directory for the check out.

    >>> getfixture('ensure_git')
    >>> getfixture('needs_internet')
    >>> repo = repo_context('https://github.com/jaraco/jaraco.context')
    >>> with repo as dest:
    ...     listing = os.listdir(dest)
    >>> 'README.rst' in listing
    True
    githgclonez--branchN)stdoutstderr)extendbool
subprocessDEVNULL
check_call)r7   branchquietdest_ctxexerepo_dircmdstreams           r$   repo_contextr      s~     * C<%TC	 xGS(+

J'$v,67',##$c&@  s   A=AA1(	A=1A:6A=c                      e Zd ZU dZdZded<   effddZddZe	dd       Z
e	dd       Ze	dd	       Z	 	 	 	 dd
ZddZed	 	 	 	 	 ddZddZy)ExceptionTrapa  
    A context manager that will catch certain exceptions and provide an
    indication they occurred.

    >>> with ExceptionTrap() as trap:
    ...     raise Exception()
    >>> bool(trap)
    True

    >>> with ExceptionTrap() as trap:
    ...     pass
    >>> bool(trap)
    False

    >>> with ExceptionTrap(ValueError) as trap:
    ...     raise ValueError("1 + 1 is not 3")
    >>> bool(trap)
    True
    >>> trap.value
    ValueError('1 + 1 is not 3')
    >>> trap.tb
    <traceback object at ...>

    >>> with ExceptionTrap(ValueError) as trap:
    ...     raise Exception()
    Traceback (most recent call last):
    ...
    Exception

    >>> bool(trap)
    False
    )NNNr   rr   c                    || _         y r?   )
exceptions)selfr   s     r$   __init__zExceptionTrap.__init__+  s	    $rE   c                    | S r?   r@   r   s    r$   	__enter__zExceptionTrap.__enter__.      rE   c                     | j                   d   S Nr   rr   r   s    r$   typezExceptionTrap.type1      }}QrE   c                     | j                   d   S )NrN   r   r   s    r$   valuezExceptionTrap.value5  r   rE   c                     | j                   d   S )N   r   r   s    r$   tbzExceptionTrap.tb9  r   rE   c                V    |d   }|xr t        || j                        }|r|| _        |S r   )
issubclassr   rr   )r   rr   exc_typematchess       r$   __exit__zExceptionTrap.__exit__=  s1     A;Dz(DOOD$DMrE   c                ,    t        | j                        S r?   )r   r   r   s    r$   __bool__zExceptionTrap.__bool__G  s    DIIrE   _testc               L     t        j                        d fd       }|S )a>  
        Wrap func and replace the result with the truth
        value of the trap (True if an exception occurred).

        Decorate a function that always fails.

        >>> @ExceptionTrap(ValueError).raises
        ... def fail():
        ...     raise ValueError('failed')
        >>> fail()
        True
        c                 x    t        j                        5 } | i | d d d               S # 1 sw Y   xY wr?   )r   r   )rV   rW   trapr   rq   r   s      r$   wrapperz%ExceptionTrap.raises.<locals>.wrapperZ  s=    t/ &4d%f%&;& &s   	09)rV   r]   rW   r^   r_   r   )rH   wraps)r   rq   r   r   s   ``` r$   raiseszExceptionTrap.raisesJ  s'      
		 
	
 rE   c                D    | j                  |t        j                        S )a7  
        Wrap func and replace the result with the truth
        value of the trap (True if no exception).

        Decorate a function that always fails.

        >>> @ExceptionTrap(ValueError).passes
        ... def fail():
        ...     raise ValueError('failed')

        >>> fail()
        False
        r   )r   operatornot_)r   rq   s     r$   passeszExceptionTrap.passesb  s     {{4x}}{55rE   N)r   ztuple[type[BaseException], ...]r_   r   )r_   type[BaseException] | None)r_   BaseException | None)r_   TracebackType | None)rr   zUnpack[_UnpackableOptExcInfo]r_   z*builtins.type[BaseException] | None | bool)r_   r   )rq   Callable[_P, _R]r   zCallable[[ExceptionTrap], bool]r_   $functools._Wrapped[_P, _R, _P, bool])rq   r   r_   r   )__name__
__module____qualname____doc__rr   __annotations__	Exceptionr   r   propertyr   r   r   r   r   r   r   r   r@   rE   r$   r   r     s    B ,Hj+FO\ %            0 
4 SW$0O	-06rE   r   c                      e Zd ZdZy)suppressz
    A version of contextlib.suppress with decorator support.

    >>> @suppress(KeyError)
    ... def key_error():
    ...     {}['']
    >>> key_error()
    N)r   r   r   r   r@   rE   r$   r   r   s  s    rE   r   c                  B    e Zd ZdZ	 d	 	 	 ddZddZ	 	 	 	 	 	 	 	 d	dZy)
on_interruptaF  
    Replace a KeyboardInterrupt with SystemExit(1).

    Useful in conjunction with console entry point functions.

    >>> def do_interrupt():
    ...     raise KeyboardInterrupt()
    >>> on_interrupt('error')(do_interrupt)()
    Traceback (most recent call last):
    ...
    SystemExit: 1
    >>> on_interrupt('error', code=255)(do_interrupt)()
    Traceback (most recent call last):
    ...
    SystemExit: 255
    >>> on_interrupt('suppress')(do_interrupt)()
    >>> with __import__('pytest').raises(KeyboardInterrupt):
    ...     on_interrupt('ignore')(do_interrupt)()
    c                    || _         || _        y r?   )actioncode)r   r   r   s      r$   r   zon_interrupt.__init__  s     	rE   c                    | S r?   r@   r   s    r$   r   zon_interrupt.__enter__  r   rE   c                    |t         us| j                  dk(  ry | j                  dk(  rt        | j                        || j                  dk(  S )Nignoreerrorr   )KeyboardInterruptr   
SystemExitr   )r   exctypeexcinstexctbs       r$   r   zon_interrupt.__exit__  sG     ++t{{h/F[[G#TYY'W4{{j((rE   N)r   rN   )r   intr   z&Literal['ignore', 'suppress', 'error']r   )r   r   r   r   r   r   r_   zNone | bool)r   r   r   r   r   r   r   r@   rE   r$   r   r   ~  sS    * XYQT<
)+
) &
) $	
)
 

)rE   r   )r"   r   r_   Iterator[StrPath]r?   )r7   strr8   zStrPath | Noner_   r   )rA   tarfile.TarInfor*   objectr_   r   )rd   zUnpack[tuple[Callable[[_T1_co], contextlib.AbstractContextManager[_T2_co]], Callable[_P, contextlib.AbstractContextManager[_T1_co]]]]r_   rb   )rq   z*Callable[[_FileDescriptorOrPathT], object]r*   r   rr   ztuple[object, OSError, object]r_   None)r_   rw   )r   zCallable[[str], object]r_   Generator[str])
r7   r   r   z
str | Noner   r   r   z4Callable[[], contextlib.AbstractContextManager[str]]r_   r   )E
__future__r   builtinsr`   rj   rH   r   r   ry   r5   rm   r   sysr~   urllib.requestr/   collections.abcr   r   r   typesr   typingr   r	   r
   r   version_infor   	backportsr   	_typeshedr   r   r   typing_extensionsr   r   r   r   r   tupler   BaseExceptionr   r   r   r   r   ra   r%   r;   rK   rR   data_filterr4   re   tarball_cwdrt   r|   r6   r   r{   robust_temp_dirr   r   r   ContextDecoratorr   r@   rE   r$   <module>r      sp   "      	     
   9 9   C$$/! CC99$ (< 
4B#($DD$ y 
 T]	T	*	T	*    (" ("VO
  +7+>+>@UV*:*: ?*:Z ug&

4
  - 
	$	 06  " $)##Hn6FG  ET		  C	
  :i6 i6Xz""J$?$? ():.. ()rE   