a
    'f"O                     @   s
  d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZ d dl	Z	d dlm
Z
 d dl	mZ eedslde_ee dZe jd  dkZejrdZnd	Zee jd
ZeoejZedG dd dejZedG dd dejZG dd dejZedkre  dS )    N)mock)
subprocess	mswindowsFpypy_version_info   z@import msvcrt; msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY); newlinesr   c                   @   sF  e Zd ZdZdd Zdd Zdd Zdd	 Ze	d
dd Z
eejdeddd Zededdd Ze	d
dd Ze	d
dd Zeddd Zdd Ze	d
dd Zed d!d" Zejed#d$d% Ze	d
d&d' Zed(d)d* Zeejd+d, d-k d.d/d0 Z d1d2 Z!d+S )3	TestPopenFc                 C   s&   t tjddg}| | d d S )N-czimport sys; sys.exit(10)
   )r   Popensys
executableassertEqualwaitselfpopen r   k/var/www/staging/api/virtual_environments/venv/lib/python3.9/site-packages/gevent/tests/test__subprocess.py	test_exit*   s    zTestPopen.test_exitc                 C   s2   t tjddg}t|g | | d d S )Nr
   zimport sys; sys.exit(11)   )r   r   r   r   geventr   r   pollr   r   r   r   	test_wait.   s    zTestPopen.test_waitc                 C   sN   |  t }tdg  W d    n1 s00    Y  | |jjd d S )N*   )assertRaisesOSErrorr   r   r   r   	exceptionerrnor   excr   r   r   test_child_exception3   s    .zTestPopen.test_child_exceptionc                 C   sL   t  }tjtjddgtjd}|  |j	  ~t  }| 
|| d S )Nr
   zprint()stdout)	greentestZget_number_open_filesr   r   r   r   PIPEr   r%   closer   )r   Z
num_beforepZ	num_afterr   r   r   	test_leak9   s    
zTestPopen.test_leakhangsc                 C   sj   t jtjddddgt jt jt jd}|d\}}| |d tjdrZ|dsfJ n| |d d S )	N-Wignorer
   zNimport sys,os;sys.stderr.write("pineapple");sys.stdout.write(sys.stdin.read()))stdinr%   stderrs   bananaz-dbgs	   pineapple)	r   r   r   r   r'   communicater   endswith
startswithr   r)   r%   r/   r   r   r   test_communicateD   s    zTestPopen.test_communicatezWindows does weird things herezSometimes segfaultsc                 C   sh   t jtjddddgt jt jt jdd}|d\}}| |t | |t | |d | |d	 d S )
Nr,   r-   r
   zfimport sys,os;sys.stderr.write("pineapple\r\n\xff\xff\xf2\xf9\r\n");sys.stdout.write(sys.stdin.read())T)r.   r%   r/   universal_newlinesu   banana
ÿÿòù
u   banana
ÿÿòù
u   pineapple
ÿÿòù
)	r   r   r   r   r'   r0   assertIsInstancestrr   r3   r   r   r   test_communicate_universalU   s(    z$TestPopen.test_communicate_universalz'Windows IO is weird; this doesn't raisezOnly Python 2 decodesc              	   C   s~   t jtjddddgt jt jt jdddB}| t |  W d    n1 sR0    Y  W d    n1 sp0    Y  d S )Nr,   r-   r
   z6import os, sys; os.write(sys.stdout.fileno(), b"\xff")T)r.   r%   r/   textr5   )r   r   r   r   r'   r   UnicodeDecodeErrorr0   )r   r)   r   r   r   test_communicate_undecodableq   s    z&TestPopen.test_communicate_undecodablec                 C   s   t jtjddt d gt jdddJ}|j }trTt	sF| 
|d q`| 
|d n| 
|d W d    n1 st0    Y  d S )	Nr
   import sys,os;a	  sys.stdout.write("line1\n");sys.stdout.flush();sys.stdout.write("line2\r");sys.stdout.flush();sys.stdout.write("line3\r\n");sys.stdout.flush();sys.stdout.write("line4\r");sys.stdout.flush();sys.stdout.write("\nline5");sys.stdout.flush();sys.stdout.write("\nline6");   r%   r5   bufsize#line1
line2
line3
line4
line5
line6%line1
line2
line3

line4

line5
line6%line1
line2line3
line4
line5
line6r   r   r   r   	SETBINARYr'   r%   readpython_universal_newlines python_universal_newlines_brokenr   r   r)   r%   r   r   r   test_universal1   s.    
zTestPopen.test_universal1c                 C   s   t jtjddt d gt jdddJ}|j }trTt	sF| 
|d q`| 
|d n| 
|d W d    n1 st0    Y  d S )	Nr
   r<   zsys.stdout.write("line1\n");sys.stdout.flush();sys.stdout.write("line2\r");sys.stdout.flush();sys.stdout.write("line3\r\n");sys.stdout.flush();sys.stdout.write("line4\r\nline5");sys.stdout.flush();sys.stdout.write("\nline6");r=   r>   r@   rA   rB   rC   rH   r   r   r   test_universal2   s.    
zTestPopen.test_universal2zUses 'grep' commandc              
   C   s   t  \}}t|}tjddg|d~}zDtd | | d  W | d u r\|	  |
  t 
| n(| d u r|	  |
  t 
| 0 W d    n1 s0    Y  d S )NZgrepr9   )r.   皙?)ospiper   Z
FileObjectr   timesleepr   r   killr(   )r   rwr.   r)   r   r   r   test_nonblock_removed   s    

zTestPopen.test_nonblock_removedc              
   C   sx   t dD ]j}| t:}td W d    n1 s80    Y  W d    n1 sV0    Y  | |jjtj qd S )N   Zthis_name_must_not_exist)	ranger   r   r   r   r   r   r    ENOENT)r   _r"   r   r   r   test_issue148   s
    <zTestPopen.test_issue148c                 C   sR   |  tj"}ttjddg W d    n1 s40    Y  | |jjd d S )Nr
   zimport sys; sys.exit(44),   )	r   r   CalledProcessErrorcheck_outputr   r   r   r   
returncoder!   r   r   r   test_check_output_keyword_error   s    0z)TestPopen.test_check_output_keyword_errorz!The default buffer changed in Py3c                 C   sd   t jtjdddgt jt jd&}|jd |j }W d    n1 sJ0    Y  | 	|d d S )Nz-ur
   z2import sys; sys.stdout.write(sys.stdin.readline())r.   r%   s   foobar
)
r   r   r   r   r'   r.   writer%   readliner   )r   r)   rQ   r   r   r   test_popen_bufsize   s    (zTestPopen.test_popen_bufsizezNot sure why?c                    s   ddl m} g  |dd} fdd}||d}|  |  t d t d t	   d j
d d	 d S )
Nr   )monkey	threadingThreadc                     sJ    t } tjjddd W d    n1 s00    Y   | j d S )Nzecho 123T)shell)r   	TypeErrorr   r   r   appendr   )r"   exr   r   r   fn  s    .z6TestPopen.test_subprocess_in_native_thread.<locals>.fn)targetr=   z5child watchers are only available on the default loop)r   rb   Zget_originalstartjoinr   len
assertTrue
isinstancerf   args)r   rb   rd   rj   threadr   rh   r    test_subprocess_in_native_thread  s    
z*TestPopen.test_subprocess_in_native_threadc                 C   sf   t jtjddgfdt ji|}| \}}W d    n1 sB0    Y  | || | | d S )Nr
   passr%   )r   r   r   r   r'   r0   r6   ZassertIsNone)r   kwargskindprocr%   r/   r   r   r   Z__test_no_output  s    
*zTestPopen.__test_no_outputzGSometimes segfaults; https://travis-ci.org/gevent/gevent/jobs/327357682c                 C   s   |  ddit d S )Nr5   T_TestPopen__test_no_outputr7   r   r   r   r   9test_universal_newlines_text_mode_no_output_is_always_str&  s    zCTestPopen.test_universal_newlines_text_mode_no_output_is_always_strNr   )r      zNeed encoding argumentc                 C   s   |  ddit d S )Nencodingzutf-8rx   rz   r   r   r   'test_encoded_text_mode_no_output_is_str.  s    z1TestPopen.test_encoded_text_mode_no_output_is_strc                 C   s   |  i t d S )N)ry   bytesrz   r   r   r   )test_default_mode_no_output_is_always_str5  s    z3TestPopen.test_default_mode_no_output_is_always_str)"__name__
__module____qualname__Zerror_fatalr   r   r#   r*   r&   skipOnLibuvOnPyPyOnWinr4   ZskipIfr   r   ZskipOnLibuvOnCIOnPyPyr8   skipOnWindowsZ	skipOnPy2r;   rI   rJ   rS   rX   r]   Z	skipOnPy3ra   Zignores_leakcheckrs   ry   r{   r   version_infor~   r   r   r   r   r   r	   #   sH   

$
"





r	   zTesting POSIX fd closingc                   @   s   e Zd Zedededdd Zededdd	 Zededededd
d Zededdd Zedededdd Z	dS )TestFDszos.closerangez"gevent.subprocess._set_inheritablezos.closec                 C   s^   d}t j|d  |tddtdt jg |tddtddg |d d S )N      rT   r   r      Tr   r|   )r   r   Z_close_fds_brute_forceassert_has_callsr   callZMAXFDassert_called_once_with)r   r(   set_inheritable
closerangekeepr   r   r   test_close_fds_brute_force>  s    


z"TestFDs.test_close_fds_brute_forcez.gevent.subprocess.Popen._close_fds_brute_forcez
os.listdirc                 C   s&   d|_ tjdg d |g d d S )NzNot an Integerpath*   )return_valuer   r   _close_fds_from_pathr   )r   listdirbrute_forcer   r   r   #test_close_fds_from_path_bad_valuesV  s    z+TestFDs.test_close_fds_from_path_bad_valuesc                 C   sh   d}g d|_ tjd|d | g |j |tddtddg |tdtd	g d S )
Nr   )16Z37r   r   r   TrT   r|   %   )	r   r   r   r   r   
mock_callsr   r   r   )r   r(   r   r   r   r   r   r   r   test_close_fds_from_path^  s    


z TestFDs.test_close_fds_from_pathzos.path.isdirc                 C   s>   d|_ tjg d |g d |tdtdg d S )NFr   /proc/self/fdz/dev/fd)r   r   r   
_close_fdsr   r   r   r   )r   isdirr   r   r   r   test_close_fds_no_diry  s    zTestFDs.test_close_fds_no_dirz,gevent.subprocess.Popen._close_fds_from_pathc                 C   s8   d|_ tjdgd | g |j |ddgd d S )NTrT   r   r   )r   r   r   r   r   r   r   )r   r   r   	from_pathr   r   r   test_close_fds_with_dir  s    zTestFDs.test_close_fds_with_dirN)
r   r   r   r   patchr   r   r   r   r   r   r   r   r   r   ;  s&   
r   c                   @   s   e Zd ZejZeddd Zdd Zdd Z	dd	 Z
d
d Zeddd Zeddd Zeddd Zeddd Zeddd Zeddd Zdd Zeddd ZdS )RunFuncTestCaser   c                 K   s   t jd|g}tj|fi |S )z4Run Python code in a subprocess using subprocess.runr
   )r   r   r   run)r   coderu   argvr   r   r   
run_python  s    zRunFuncTestCase.run_pythonc                 C   sP   |  d}| |jd | tj |  W d    n1 sB0    Y  d S )Nimport sys; sys.exit(47)/   )r   r   r\   r   r   rZ   check_returncoder   cpr   r   r   test_returncode  s    
zRunFuncTestCase.test_returncodec                 C   sN   |  tj}| jddd W d    n1 s00    Y  | |jjd d S )Nr   Tcheckr   )r   r   rZ   r   r   r   r\   r   cr   r   r   
test_check  s    ,zRunFuncTestCase.test_checkc                 C   s    | j ddd}| |jd d S )Nzimport sys; sys.exit(0)Tr   r   )r   r   r\   r   r   r   r   test_check_zero  s    zRunFuncTestCase.test_check_zeroc                 C   s>   |  tj | jddd W d    n1 s00    Y  d S )Nzwhile True: passg-C6?)timeout)r   r   TimeoutExpiredr   rz   r   r   r   test_timeout  s    zRunFuncTestCase.test_timeoutr+   c                 C   s"   | j dtjd}| d|j d S )Nzprint('BDFL')r$      BDFLr   r   r'   assertInr%   r   r   r   r   test_capture_stdout  s    z#RunFuncTestCase.test_capture_stdoutc                 C   s"   | j dtjd}| d|j d S )Nz$import sys; sys.stderr.write('BDFL'))r/   r   )r   r   r'   r   r/   r   r   r   r   test_capture_stderr  s    z#RunFuncTestCase.test_capture_stderrc                 C   s`   t  D}|d |d | jd|tjd}| d|j W d    n1 sR0    Y  d S )N   pearr   6import sys; sys.stdout.write(sys.stdin.read().upper())r^      PEAR)	tempfileTemporaryFiler_   seekr   r   r'   r   r%   )r   tfr   r   r   r   test_check_output_stdin_arg  s    


z+RunFuncTestCase.test_check_output_stdin_argc                 C   s$   | j ddtjd}| d|j d S )Nr   r   )inputr%   r   r   r   r   r   r   test_check_output_input_arg  s
    z+RunFuncTestCase.test_check_output_input_argc              	   C   s   t  }|d |d | jtdd }| jd|dd W d    n1 sR0    Y  | d|jj	d  | d	|jj	d  W d    n1 s0    Y  d S )
Nr   r   z7Expected ValueError when stdin and input args supplied.)msgzprint('will not be run')s   hare)r.   r   r.   r   )
r   r   r_   r   r   
ValueErrorr   r   r   rq   )r   r   r   r   r   r   &test_check_output_stdin_with_input_arg  s    


$z6RunFuncTestCase.test_check_output_stdin_with_input_argc                 C   sb   |  tj"}| jddtjd W d    n1 s40    Y  | |jjd | |jjd d S )NzMimport sys, time
sys.stdout.write('BDFL')
sys.stdout.flush()
time.sleep(3600)r   )r   r%   r   )	r   r   r   r   r'   r   r   outputr%   r   r   r   r   test_check_output_timeout  s    $
z)RunFuncTestCase.test_check_output_timeoutc                 C   s2   t j }d|d< | jd|d}| |jd d S )NbananaZFRUITzCimport sys, os;sys.exit(33 if os.getenv("FRUIT")=="banana" else 31))env!   )rL   environcopyr   r   r\   )r   Znewenvr   r   r   r   test_run_kwargs  s    
zRunFuncTestCase.test_run_kwargsz)requires posix like 'sleep' shell commandc              	   C   sl   |  dN | tj" tjddddd W d    n1 s@0    Y  W d    n1 s^0    Y  d S )NrK   zsleep 3T)re   r   capture_output)Zruns_in_given_timer   r   r   r   rz   r   r   r   .test_run_with_shell_timeout_and_capture_output  s
    
z>RunFuncTestCase.test_run_with_shell_timeout_and_capture_outputN)r   r   r   r&   ZLARGE_TIMEOUTZ__timeout__skipWithoutResourcer   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r     s,   







r   __main__)r   rL   r    ZunittestrN   r   Zgevent.testingtestingr&   r   r   r   hasattrr   ZPYPYr   PY3rD   r%   rF   rG   r   ZTestCaser	   r   r   r   r   mainr   r   r   r   <module>   s6   


  Tu
