
    [i|#                        d dl Z d dlZd dlmZ d dlmZ d dlmZ d dlm	Z	m
Z
 d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZmZmZmZ dZdZ G d dee      Z G d d      Z	 ddee   de	de	dededeeeef      ddfdZ  G d d      Z!y)    N)defaultdict)deepcopy)Enum)Queueget_context)BaseContext)BaseProcess)Synchronized)Empty)AnyIterableOptionalTypeiX     c                       e Zd ZdZdZdZy)QueueSignalsstopconfirmerrorN)__name__
__module____qualname__r   r   r        X/mnt/e/genesis-system/.venv/lib/python3.12/site-packages/fastembed/parallel_processor.pyr   r      s    DGEr   r   c                   Z    e Zd Zedededd fd       Zdeeeef      deeeef      fdZ	y)Workerargskwargsreturnc                     t               NNotImplementedError)clsr   r   s      r   startzWorker.start   s    !##r   itemsc                     t               r"   r#   )selfr'   s     r   processzWorker.process   s    !##r   N)
r   r   r   classmethodr   r&   r   tupleintr*   r   r   r   r   r      sV    $# $ $ $ $$XeCHo6 $8E#s(O;T $r   r   worker_classinput_queueoutput_queuenum_active_workers	worker_idr   r    c                     |i }t        j                  d| dt        j                          d|j	                  dd              	  | j
                  di |}dt        t           ffd}|j                   |             D ]  }|j                  |        	 j                          |j                          j                          |j                          |j!                         5  |xj"                  d	z  c_        ddd       t        j                  d
| d       y# t        $ r>}	t        j                  |	       |j                  t        j                         Y d}	~	d}	~	ww xY w# 1 sw Y   mxY w# j                          |j                          j                          |j                          |j!                         5  |xj"                  d	z  c_        ddd       n# 1 sw Y   nxY wt        j                  d
| d       w xY w)z
    A worker that pulls data pints off the input queue, and places the execution result on the output queue.
    When there are no data pints left on the input queue, it decrements
    num_active_workers to signal completion.
    NzReader worker: z PID: z	 Device: 	device_idCPUr    c               3   `   K   	 j                         } | t        j                  k(  ry |  )wr"   )getr   r   )itemr/   s    r   input_queue_iterablez%_worker.<locals>.input_queue_iterable;   s1     "(<,,,
	 s   +.   zReader worker z	 finishedr   )logginginfoosgetpidr7   r&   r   r   r*   put	Exception	exceptionr   r   closejoin_threadget_lockvalue)
r.   r/   r0   r1   r2   r   workerr9   processed_itemes
    `        r   _workerrI   #   s    ~LL
)F299;-yKY^A_@`a!<###-f-	hsm 	 %nn-A-CD 	-N^,	- 	!  "((* 	*$$)$	* 	~i[	:;)  -!++,,-"	* 	* 	!  "((* 	*$$)$	* 	* 	* 	~i[	:;sV   AD! E7 )E+!	E(*4E#E7 #E((E7 +E47AHG'	H'G0,!Hc                       e Zd Z	 	 	 ddedee   dee   deee      de	f
dZ
ded	dfd
Zdee   deded	ee   fdZdee   deded	eeeef      fdZddZddee   d	dfdZddZddZy)ParallelWorkerPoolNnum_workersrF   start_method
device_idscudac                     || _         || _        d | _        d | _        t	        |      | _        g | _        | j                  t        z  | _        d| _	        || _
        || _        d | _        y )NF)r.   rL   r/   r0   r   ctx	processesmax_internal_batch_size
queue_sizeemergency_shutdownrN   rO   r1   )r)   rL   rF   rM   rN   rO   s         r   __init__zParallelWorkerPool.__init__\   sh     #&,0-1 +L 9,.**-DD"'$	7;r   r   r    c           
      $   | j                   j                  | j                        | _        | j                   j                  | j                        | _        | j                   j                  d| j                        }t        |t              sJ || _	        t        d| j                        D ]  }t        |      }| j                  r9| j                  |t        | j                        z     }||d<   | j                  |d<   t        | j                   d      sJ | j                   j!                  t"        | j$                  | j                  | j                  | j                  ||f      }|j'                          | j(                  j+                  |        y )Nir   r4   rO   Process)targetr   )rQ   r   rT   r/   r0   ValuerL   
isinstance	BaseValuer1   ranger   rN   lenrO   hasattrrY   rI   r.   r&   rR   append)r)   r   	ctx_valuer2   worker_kwargsr4   r*   s          r   r&   zParallelWorkerPool.startp   sC   88>>$//: HHNN4??;HHNN3(8(89	)Y///"+q$"2"23 	+I$V,M OOIDOO8L,LM	-6k*(,		f%488Y///hh&&%%$$%%++! ' 
G MMONN!!'*)	+r   streamr   c              /      K   t        t              }d} | j                  |g|i |D ],  \  }}|||<   ||v s|j                  |       |dz  }||v r. y w)Nr   r:   )r   r   semi_ordered_mappop)r)   rd   r   r   buffernext_expectedidxr8   s           r   ordered_mapzParallelWorkerPool.ordered_map   ss     (3C(8...vGGG 	#ICF3K6)jj//"  6)	#s   7AAAc              /   z  K   	  | j                   d	i | | j                  J d       | j                  J d       d}d}t        |      D ]  \  }}| j	                          ||z
  | j
                  k  r	 | j                  j                         }n!	 | j                  j                  t              }|7|t        j                  k(  r| j                          t        d      | |dz  }| j                  j                  ||f       |dz  } t        | j                         D ]+  }
| j                  j                  t        j"                         - ||k  rm| j	                          | j                  j                  t              }|t        j                  k(  r| j                          t        d      | |dz  }||k  rm| j                  J d       | j                  J d       | j%                          | j                  j'                          | j                  j'                          | j(                  r5| j                  j+                          | j                  j+                          y | j                  j-                          | j                  j-                          y # t        $ r d }Y  w xY w# t        $ r}	| j                          |	d }	~	ww xY w# | j                  J d       | j                  J d       | j%                          | j                  j'                          | j                  j'                          | j(                  r5| j                  j+                          | j                  j+                          w | j                  j-                          | j                  j-                          w xY ww)
NzInput queue was not initializedz Output queue was not initializedr   timeoutzThread unexpectedly terminatedr:   zInput queue is NonezOutput queue is Noner   )r&   r/   r0   	enumeratecheck_worker_healthrT   
get_nowaitr   r7   processing_timeoutjoin_or_terminater   r   RuntimeErrorr?   r^   rL   r   joinrB   rU   cancel_join_threadrC   )r)   rd   r   r   pushedreadrj   r8   out_itemrH   _s              r   rf   z#ParallelWorkerPool.semi_ordered_map   sZ    6	0DJJ  ##/R1RR/$$0T2TT0FD&v. 	T((*D=4??2(#'#4#4#?#?#A #'#4#4#8#8AS#8#T
 '<#5#55..0*+KLL"NAID  $$c4[1!-0 4++, 8  $$\%6%678 -((*,,009K0L|111**,&'GHH	 - ##/F1FF/$$0H2HH0IIK""$##%&&  335!!446  ,,.!!--/S ! (#'(
 !  ..0 4 ##/F1FF/$$0H2HH0IIK""$##%&&  335!!446  ,,.!!--/sh   N;A/K 4J"K  J40DK C N;"J1-K 0J11K 4	K=KKK C!N88N;c                     | j                   D ]^  }|j                         r|j                  dk7  s$d| _        | j	                          t        d|j                   d|j                          y)zJ
        Checks if any worker process has terminated unexpectedly
        r   TzWorker PID: z# terminated unexpectedly with code N)rR   is_aliveexitcoderU   rs   rt   pidr)   r*   s     r   rp   z&ParallelWorkerPool.check_worker_health   sn     ~~ 	G##%'*:*:a*?*.'&&(""7;;-/RSZScScRde 		r   rn   c                     | j                   D ]5  }|j                  |       |j                         s&|j                          7 | j                   j	                          y)zM
        Emergency shutdown
        @param timeout:
        @return:
        rm   N)rR   ru   r|   	terminateclear)r)   rn   r*   s      r   rs   z$ParallelWorkerPool.join_or_terminate   sO     ~~ 	$GLLL)!!!#	$ 	r   c                 z    | j                   D ]  }|j                           | j                   j                          y r"   )rR   ru   r   r   s     r   ru   zParallelWorkerPool.join   s.    ~~ 	GLLN	r   c                 h    | j                   D ]#  }|j                         s|j                          % y)a  
        Terminate processes if the user hasn't joined. This is necessary as
        leaving stray processes running can corrupt shared state. In brief,
        we've observed shared memory counters being reused (when the memory was
        free from the perspective of the parent process) while the stray
        workers still held a reference to them.
        For a discussion of using destructors in Python in this manner, see
        https://eli.thegreenplace.net/2009/06/12/safely-using-destructors-in-python/.
        N)rR   r|   r   r   s     r   __del__zParallelWorkerPool.__del__   s/     ~~ 	$G!!!#	$r   )NNF)r    N)r:   )r   r   r   r-   r   r   r   strlistboolrV   r   r&   r   rk   r,   rf   rp   rs   ru   r   r   r   r   rK   rK   [   s    
 '+*.<< V< sm	<
 T#Y'< <(+c +d +<#(3- # #s #xX[} #90sm90,/90;>90	%S/	"90v

# 
t 

$r   rK   r"   )"r;   r=   collectionsr   copyr   enumr   multiprocessingr   r   multiprocessing.contextr   multiprocessing.processr	   multiprocessing.sharedctypesr
   r]   queuer   typingr   r   r   r   rr   rS   r   r   r   r-   dictrI   rK   r   r   r   <module>r      s     	 #   . / / B  0 0   3 $ $ (,5<v,5<5< 5< "	5<
 5< T#s(^$5< 
5<pa$ a$r   