
    Xi                         d dl Z d dlZd dlZd dlmZ d dlmZmZmZ 	 d dl	m
Z
 dZdZ G d de      Zy# e$ r	 d dlm
Z
 Y w xY w)	    N)Thread)APIErrorDatetimeSerializer
batch_post)Emptyi  i  P c                   j    e Zd ZdZ ej
                  d      Z	 	 	 	 	 	 	 	 d
dZd Zd Z	d Z
d Zd	 Zy)Consumerz.Consumes the messages from the client's queue.posthogNc                     t        j                  |        d| _        || _        || _        || _        || _        || _        || _        || _	        d| _
        || _        |	| _        |
| _        y)zCreate a consumer thread.TN)r   __init__daemonflush_atflush_intervalapi_keyhoston_errorqueuegziprunningretriestimeouthistorical_migration)selfr   r   r   r   r   r   r   r   r   r   s              Z/mnt/e/genesis-system/.venvs/browser-army/lib/python3.12/site-packages/posthog/consumer.pyr   zConsumer.__init__   sh     	 ,	 
	
 $8!    c                     | j                   j                  d       | j                  r| j                          | j                  r| j                   j                  d       y)zRuns the consumer.zconsumer is running...zconsumer exited.N)logdebugr   uploadr   s    r   runzConsumer.run;   s?    /0llKKM ll 	)*r   c                     d| _         y)zPause the consumer.FN)r   r    s    r   pausezConsumer.pauseC   s	    r   c                 
   d}| j                         }t        |      dk(  ry	 | j                  |       d}|D ]  }| j                  j                           |S # t        $ rz}| j                  j                  d|       d}| j                  rF	 | j                  ||       n2# t        $ r&}| j                  j                  d|       Y d}~nd}~ww xY wY d}~d}~ww xY w# |D ]  }| j                  j                           w xY w)z:Upload the next batch of items, return whether successful.Fr   Tzerror uploading: %szon_error handler failed: %sN)	nextlenrequest	Exceptionr   errorr   r   	task_done)r   successbatcheitems        r   r   zConsumer.uploadG   s    		u:?	'LLG  '

$$&'   	EHHNN0!4G}}EMM!U+  EHHNN#@!DDE	E  '

$$&'sS   A 	C"*CB C 	C)C
C
CCC CC #Dc                    | j                   }g }t        j                         }d}t        |      | j                  k  rt        j                         |z
  }|| j
                  k\  r	 |S 	 |j                  d| j
                  |z
        }t        t        j                  |t              j                               }|t        kD  r&| j                  j                  dt        |             |j                  |       ||z  }|t         k\  r| j                  j#                  d|       	 |S 	 t        |      | j                  k  r|S # t$        $ r Y |S w xY w)z)Return the next batch of items to upload.r   T)blockr   )clsz)Item exceeds 900kib limit, dropping. (%s)zhit batch size limit (size: %d))r   time	monotonicr&   r   r   getjsondumpsr   encodeMAX_MSG_SIZEr   r)   strappendBATCH_SIZE_LIMITr   r   )r   r   items
start_time
total_sizeelapsedr.   	item_sizes           r   r%   zConsumer.next`   s4   

^^%

%j4==(nn&3G$---" !yytT5H5H75RyS

45G H O O QR	|+HHNNCSY T"i'
!11HHNN#DjQ  2 %j4==((   s   (B E );E 	EEc           	         d }d}t        | j                  dz         D ]G  }	 t        | j                  | j                  | j
                  | j                  || j                          y |r|y# t        $ rp}|} ||      s || j                  k  rLt        |dd      }|r|dkD  rt        j                  |       n"t        j                  t        d|z  d             Y d}~d}~ww xY w)	z=Attempt to upload the batch and retry before raising an errorc                     t        | t              r;| j                  dk(  ryd| j                  cxk  xr dk  nc xr | j                  dv S y)NzN/AFi  i  )i  i  T)
isinstancer   status)excs    r   is_retryablez&Consumer.request.<locals>.is_retryable   sG    #x( ::&  CJJ44V#**J:VWW r   N   )r   r   r,   r   retry_afterr         )ranger   r   r   r   r   r   r   r(   getattrr2   sleepmin)r   r,   rF   last_excattemptr-   rH   s          r   r'   zConsumer.request~   s    
	 T\\A-. 	8G8LLII LL)-)B)B 	8. N   
8#AT\\)")!]D"AK"{Q

;/

3q'z2#67
8s   AA--	C&6A&C!!C&)d   NNg      ?F
      F)__name__
__module____qualname____doc__logging	getLoggerr   r   r!   r#   r   r%   r'    r   r   r	   r	      sP    8
'

I
&C "9B+2<(r   r	   )r5   rX   r2   	threadingr   posthog.requestr   r   r   r   r   ImportErrorQueuer8   r;   r	   rZ   r   r   <module>r_      sP        D D
  # Qv Q  s   4 AA