o
    tPfN                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZ d dl	Z	d dlm
Z
 d dl	mZ eeds6de_ee dZe jd  dkZejrHdZnd	Zee jd
ZeoTejZedG dd dejZedG dd dejZG dd dejZedkre  dS dS )    N)mock)
subprocess	mswindowsFpypy_version_info   z@import msvcrt; msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY); newlinesr   c                   @   s<  e Zd ZdZdd Zdd Zdd Zdd	 Ze	d
dd Z
eejdeddd Zeddd Ze	d
dd Ze	d
dd Zeddd Zdd Ze	d
dd Zedd d! Zejed"d#d$ Ze	d
d%d& Zed'd(d) Zeejd*d+ d,k d-d.d/ Zd0d1 Z d*S )2	TestPopenFc                 C   s&   t tjddg}| | d d S )N-czimport sys; sys.exit(10)
   )r   Popensys
executableassertEqualwaitselfpopen r   ^/var/www/html/humari/django-venv/lib/python3.10/site-packages/gevent/tests/test__subprocess.py	test_exit*   s   zTestPopen.test_exitc                 C   s2   t tjddg}t|g | | d d S )Nr
   zimport sys; sys.exit(11)   )r   r   r   r   geventr   r   pollr   r   r   r   	test_wait.   s   zTestPopen.test_waitc                 C   sN   |  t}tdg  W d    n1 sw   Y  | |jjd d S )N*   )assertRaisesOSErrorr   r   r   r   	exceptionerrnor   excr   r   r   test_child_exception3   s   zTestPopen.test_child_exceptionc                 C   sL   t  }tjtjddgtjd}|  |j	  ~t  }| 
|| d S )Nr
   zprint()stdout)	greentestget_number_open_filesr   r   r   r   PIPEr   r%   closer   )r   
num_beforep	num_afterr   r   r   	test_leak9   s   
zTestPopen.test_leakhangsc                 C   sl   t jtjddddgt jt jt jd}|d\}}| |d tjdr.|ds,J d S | |d d S )	N-Wignorer
   zNimport sys,os;sys.stderr.write("pineapple");sys.stdout.write(sys.stdin.read()))stdinr%   stderrs   bananaz-dbgs	   pineapple)	r   r   r   r   r(   communicater   endswith
startswithr   r+   r%   r2   r   r   r   test_communicateD   s   zTestPopen.test_communicatezWindows does weird things herezSometimes segfaultsc                 C   sh   t jtjddddgt jt jt jdd}|d\}}| |t | |t | |d | |d	 d S )
Nr/   r0   r
   zfimport sys,os;sys.stderr.write("pineapple\r\n\xff\xff\xf2\xf9\r\n");sys.stdout.write(sys.stdin.read())T)r1   r%   r2   universal_newlinesu   banana
ÿÿòù
u   banana
ÿÿòù
u   pineapple
ÿÿòù
)	r   r   r   r   r(   r3   assertIsInstancestrr   r6   r   r   r   test_communicate_universalU   s(   z$TestPopen.test_communicate_universalz'Windows IO is weird; this doesn't raisec              	   C   s   t jtjddddgt jt jt jddd*}| t |  W d    n1 s)w   Y  W d    d S W d    d S 1 sAw   Y  d S )Nr/   r0   r
   z6import os, sys; os.write(sys.stdout.fileno(), b"\xff")T)r1   r%   r2   textr8   )r   r   r   r   r(   r   UnicodeDecodeErrorr3   )r   r+   r   r   r   test_communicate_undecodableq   s"   
"z&TestPopen.test_communicate_undecodablec                 C      t jtjddt d gt jddd6}|j }tr*t	s#| 
|d n| 
|d n| 
|d W d    d S W d    d S W d    d S 1 sKw   Y  d S )	Nr
   import sys,os;a	  sys.stdout.write("line1\n");sys.stdout.flush();sys.stdout.write("line2\r");sys.stdout.flush();sys.stdout.write("line3\r\n");sys.stdout.flush();sys.stdout.write("line4\r");sys.stdout.flush();sys.stdout.write("\nline5");sys.stdout.flush();sys.stdout.write("\nline6");   r%   r8   bufsize#line1
line2
line3
line4
line5
line6%line1
line2
line3

line4

line5
line6%line1
line2line3
line4
line5
line6r   r   r   r   	SETBINARYr(   r%   readpython_universal_newlines python_universal_newlines_brokenr   r   r+   r%   r   r   r   test_universal1   s8   
"zTestPopen.test_universal1c                 C   r?   )	Nr
   r@   zsys.stdout.write("line1\n");sys.stdout.flush();sys.stdout.write("line2\r");sys.stdout.flush();sys.stdout.write("line3\r\n");sys.stdout.flush();sys.stdout.write("line4\r\nline5");sys.stdout.flush();sys.stdout.write("\nline6");rA   rB   rD   rE   rF   rG   rL   r   r   r   test_universal2   s8   
"zTestPopen.test_universal2zUses 'grep' commandc              
   C   s   t  \}}t|}tjddg|d@}z"td | | d  W | d u r.|	  |
  t 
| n| d u rB|	  |
  t 
| w W d    d S 1 sWw   Y  d S )Ngrepr<   )r1   皙?)ospiper   
FileObjectr   timesleepr   r   killr)   )r   rwr1   r+   r   r   r   test_nonblock_removed   s   

"zTestPopen.test_nonblock_removedc              
   C   sx   t dD ]5}| t}td W d    n1 sw   Y  W d    n1 s+w   Y  | |jjtj qd S )N   this_name_must_not_exist)	ranger   r   r   r   r   r   r    ENOENT)r   _r"   r   r   r   test_issue148   s   zTestPopen.test_issue148c                 C   sR   |  tj}ttjddg W d    n1 sw   Y  | |jjd d S )Nr
   zimport sys; sys.exit(44),   )	r   r   CalledProcessErrorcheck_outputr   r   r   r   
returncoder!   r   r   r   test_check_output_keyword_error   s   z)TestPopen.test_check_output_keyword_errorz!The default buffer changed in Py3c                 C   sd   t jtjdddgt jt jd}|jd |j }W d    n1 s%w   Y  | 	|d d S )Nz-ur
   z2import sys; sys.stdout.write(sys.stdin.readline())r1   r%   s   foobar
)
r   r   r   r   r(   r1   writer%   readliner   )r   r+   rW   r   r   r   test_popen_bufsize   s   zTestPopen.test_popen_bufsizezNot sure why?c                    s   ddl m} g  |dd} fdd}||d}|  |  t d t d t	   d j
d d	 d S )
Nr   )monkey	threadingThreadc                     sJ    t} tjjddd W d    n1 sw   Y   | j d S )Nzecho 123T)shell)r   	TypeErrorr   r   r   appendr   )r"   exr   r   r   fn  s   z6TestPopen.test_subprocess_in_native_thread.<locals>.fn)targetrA   z5child watchers are only available on the default loop)r   ri   get_originalstartjoinr   len
assertTrue
isinstancerm   args)r   ri   rk   rq   threadr   ro   r    test_subprocess_in_native_thread   s   
z*TestPopen.test_subprocess_in_native_threadc                 C   sf   t jtjddgfdt ji|}| \}}W d    n1 s!w   Y  | || | | d S )Nr
   passr%   )r   r   r   r   r(   r3   r9   assertIsNone)r   kwargskindprocr%   r2   r   r   r   __test_no_output  s   
zTestPopen.__test_no_outputzGSometimes segfaults; https://travis-ci.org/gevent/gevent/jobs/327357682c                 C      |  ddit d S )Nr8   T_TestPopen__test_no_outputr:   r   r   r   r   9test_universal_newlines_text_mode_no_output_is_always_str%  s   zCTestPopen.test_universal_newlines_text_mode_no_output_is_always_strNr   )r      zNeed encoding argumentc                 C   r   )Nencodingzutf-8r   r   r   r   r   'test_encoded_text_mode_no_output_is_str-  s   z1TestPopen.test_encoded_text_mode_no_output_is_strc                 C   s   |  i t d S )N)r   bytesr   r   r   r   )test_default_mode_no_output_is_always_str4  s   z3TestPopen.test_default_mode_no_output_is_always_str)!__name__
__module____qualname__error_fatalr   r   r#   r-   r&   skipOnLibuvOnPyPyOnWinr7   skipIfr   r   skipOnLibuvOnCIOnPyPyr;   skipOnWindowsr>   rM   rN   rY   r_   rd   	skipOnPy3rh   ignores_leakcheckr{   r   r   r   version_infor   r   r   r   r   r   r	   #   sH    


$
"





r	   zTesting POSIX fd closingc                   @   s   e Zd Zedededdd Zededdd	 Zededededd
d Zededdd Zedededdd Z	dS )TestFDszos.closerangez"gevent.subprocess._set_inheritablezos.closec                 C   s^   d}t j|d  |tddtdt jg |tddtddg |d d S )N      rZ   r   r      Tr   r   )r   r   _close_fds_brute_forceassert_has_callsr   callMAXFDassert_called_once_with)r   r)   set_inheritable
closerangekeepr   r   r   test_close_fds_brute_force=  s   


z"TestFDs.test_close_fds_brute_forcez.gevent.subprocess.Popen._close_fds_brute_forcez
os.listdirc                 C   s&   d|_ tjdg d |g d d S )NzNot an Integerpath*   )return_valuer   r   _close_fds_from_pathr   )r   listdirbrute_forcer   r   r   #test_close_fds_from_path_bad_valuesU  s   z+TestFDs.test_close_fds_from_path_bad_valuesc                 C   sh   d}g d|_ tjd|d | g |j |tddtddg |tdtd	g d S )
Nr   )1637r   r   r   TrZ   r   %   )	r   r   r   r   r   
mock_callsr   r   r   )r   r)   r   r   r   r   r   r   r   test_close_fds_from_path]  s   



z TestFDs.test_close_fds_from_pathzos.path.isdirc                 C   s>   d|_ tjg d |g d |tdtdg d S )NFr   /proc/self/fdz/dev/fd)r   r   r   
_close_fdsr   r   r   r   )r   isdirr   r   r   r   test_close_fds_no_dirx  s   
zTestFDs.test_close_fds_no_dirz,gevent.subprocess.Popen._close_fds_from_pathc                 C   s8   d|_ tjdgd | g |j |ddgd d S )NTrZ   r   r   )r   r   r   r   r   r   r   )r   r   r   	from_pathr   r   r   test_close_fds_with_dir  s   zTestFDs.test_close_fds_with_dirN)
r   r   r   r   patchr   r   r   r   r   r   r   r   r   r   :  s(    
r   c                   @   s   e Zd ZejZeddd Zdd Zdd Z	dd	 Z
d
d Zeddd Zeddd Zeddd Zeddd Zeddd Zeddd Zdd Zeddd ZdS )RunFuncTestCaser   c                 K   s   t jd|g}tj|fi |S )z4Run Python code in a subprocess using subprocess.runr
   )r   r   r   run)r   coder~   argvr   r   r   
run_python  s   zRunFuncTestCase.run_pythonc                 C   sR   |  d}| |jd | tj |  W d    d S 1 s"w   Y  d S )Nimport sys; sys.exit(47)/   )r   r   rc   r   r   ra   check_returncoder   cpr   r   r   test_returncode  s
   

"zRunFuncTestCase.test_returncodec                 C   sN   |  tj}| jddd W d    n1 sw   Y  | |jjd d S )Nr   Tcheckr   )r   r   ra   r   r   r   rc   r   cr   r   r   
test_check  s   zRunFuncTestCase.test_checkc                 C   s    | j ddd}| |jd d S )Nzimport sys; sys.exit(0)Tr   r   )r   r   rc   r   r   r   r   test_check_zero  s   zRunFuncTestCase.test_check_zeroc                 C   s@   |  tj | jddd W d    d S 1 sw   Y  d S )Nzwhile True: passg-C6?)timeout)r   r   TimeoutExpiredr   r   r   r   r   test_timeout  s   "zRunFuncTestCase.test_timeoutr.   c                 C   "   | j dtjd}| d|j d S )Nzprint('BDFL')r$      BDFLr   r   r(   assertInr%   r   r   r   r   test_capture_stdout  s   z#RunFuncTestCase.test_capture_stdoutc                 C   r   )Nz$import sys; sys.stderr.write('BDFL'))r2   r   )r   r   r(   r   r2   r   r   r   r   test_capture_stderr  s   z#RunFuncTestCase.test_capture_stderrc                 C   sb   t  #}|d |d | jd|tjd}| d|j W d    d S 1 s*w   Y  d S )N   pearr   6import sys; sys.stdout.write(sys.stdin.read().upper())re      PEAR)	tempfileTemporaryFilerf   seekr   r   r(   r   r%   )r   tfr   r   r   r   test_check_output_stdin_arg  s   


"z+RunFuncTestCase.test_check_output_stdin_argc                 C   s$   | j ddtjd}| d|j d S )Nr   r   )inputr%   r   r   r   r   r   r   test_check_output_input_arg  s
   z+RunFuncTestCase.test_check_output_input_argc              	   C   s   t  F}|d |d | jtdd}| jd|dd W d    n1 s)w   Y  | d|jj	d  | d	|jj	d  W d    d S 1 sMw   Y  d S )
Nr   r   z7Expected ValueError when stdin and input args supplied.)msgzprint('will not be run')s   hare)r1   r   r1   r   )
r   r   rf   r   r   
ValueErrorr   r   r   ry   )r   r   r   r   r   r   &test_check_output_stdin_with_input_arg  s   


"z6RunFuncTestCase.test_check_output_stdin_with_input_argc                 C   sb   |  tj}| jddtjd W d    n1 sw   Y  | |jjd | |jjd d S )NzMimport sys, time
sys.stdout.write('BDFL')
sys.stdout.flush()
time.sleep(3600)r   )r   r%   r   )	r   r   r   r   r(   r   r   outputr%   r   r   r   r   test_check_output_timeout  s   z)RunFuncTestCase.test_check_output_timeoutc                 C   s2   t j }d|d< | jd|d}| |jd d S )NbananaFRUITzCimport sys, os;sys.exit(33 if os.getenv("FRUIT")=="banana" else 31))env!   )rQ   environcopyr   r   rc   )r   newenvr   r   r   r   test_run_kwargs  s   
zRunFuncTestCase.test_run_kwargsz)requires posix like 'sleep' shell commandc              	   C   s~   |  d0 | tj tjddddd W d    n1 s w   Y  W d    d S W d    d S 1 s8w   Y  d S )NrP   zsleep 3T)rl   r   capture_output)runs_in_given_timer   r   r   r   r   r   r   r   .test_run_with_shell_timeout_and_capture_output  s   
"z>RunFuncTestCase.test_run_with_shell_timeout_and_capture_outputN)r   r   r   r&   LARGE_TIMEOUT__timeout__skipWithoutResourcer   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r     s.    







r   __main__)r   rQ   r    unittestrT   r   gevent.testingtestingr&   r   r   r   hasattrr   PYPYr   PY3rH   r%   rJ   rK   r   TestCaser	   r   r   r   r   mainr   r   r   r   <module>   s:    


  Tu