
    ci                     H   d dl Z d dlZd dlZd dlZd dlZd dlmZmZmZ d dl	m
Z d dlmZ d dlmZ d dlmZ d dlmZ d dlZd dlmZ d dlmZ d d	lmZmZ 	 d d
lmZ dZdZej@                  dz   Z! ejD                  e!de!       Z#edZ$y G d de      Z$y# e$ r dZY Dw xY w)    N)AnyIterableOptional)client_info)
exceptions)bigquery)initializer)TaskContext)cached_remote_fn)BlockBlockAccessor)Datasink
      z+vertex_rayzray-on-vertex/)gapic_version
user_agentc                   ^    e Zd Zdedfdedee   dedee   ddf
dZdd	Z	d
e
e   dedefdZy)_BigQueryDatasinkNTdataset
project_idmax_retry_cntoverwrite_tablereturnc                 t    || _         |xs t        j                  j                  | _        || _        || _        y )N)r   r	   global_configprojectr   r   r   )selfr   r   r   r   s        C/tmp/pip-target-z3e9_cxr/lib/python/vertex_ray/bigquery_datasink.py__init__z_BigQueryDatasink.__init__<   s3     #DL(MK,E,E,M,MDO!.D#2D     c                 &   t        j                  | j                  t              }| j                  j                  dd      d   }	 |j                  |       | j                  rHt        d| j                   d	z          |j                  | j                   d| j                   d
       y t        d| j                   dz   dz          y # t        j                  $ r3 |j                  | j                   d| d       t        d|z          Y w xY w)Nr   r   .   r      )timeoutz$[Ray on Vertex AI]: Created dataset z/[Ray on Vertex AI]: Attempting to delete table z9 if it already exists since kwarg overwrite_table = True.T)not_found_okz3[Ray on Vertex AI]: The write will append to table z if it already exists z$since kwarg overwrite_table = False.)r   Clientr   bq_infor   splitget_datasetr   NotFoundcreate_datasetprintr   delete_table)r   client
dataset_ids      r   on_write_startz _BigQueryDatasink.on_write_startH   s   __T__'RF++C3A6JK"":. ##Edll^TQR ##'q7d $  I&<=><= && K%%(9:,&GQS%T<zIJKs   C
 
ADDblocksctxc           
           dt         dt        dt        dd f fd}t        |      }t        j                  |D cg c])  }|j                  | j                   j                        + c}       yc c}w )Nblockr   r   r   c                    t        j                  |       j                         } t        j                  |t
              }t        j                  d      }t        j                  j                  |_	        t        j                  j                  |_        t        j                         5 }t        j                   j#                  |dt%        j&                          d      }t)        j*                  | |d       d}|j,                  k  rNt/        |d	      5 }|j1                  |||
      }	d d d        	 t3        j4                  	j7                                	 |j,                  kD  r3t=        dj,                   ddz   dz          tG        d| dz   dz         	 d d d        y # 1 sw Y   zxY w# t8        j:                  $ rf}
|dz  }|j,                  kD  rY d }
~
t=        dd| dz   dz          t3        j>                  |
       tA        jB                  tD               Y d }
~
nd }
~
ww xY w|j,                  k  r3# 1 sw Y   y xY w)Nr"   T)
autodetectblock_z.parquetSNAPPY)compressionr   rb)
job_configr$   z-[Ray on Vertex AI]: A block write encounteredz a rate limit exceeded error z	 time(s).z Sleeping to try again.z[Ray on Vertex AI]: Maximum (z) retry count exceeded.z? Ray will attempt to retry the block write via fault tolerance.z\ For more information, see https://docs.ray.io/en/latest/ray-core/fault_tolerance/tasks.htmlz([Ray on Vertex AI]: Write failed due to z5 repeated API rate limit exceeded responses. Considerz9 specifiying the max_retry_cnt kwarg with a higher value.)$r   	for_blockto_arrowr   r(   r)   LoadJobConfigSourceFormatPARQUETsource_formatWriteDispositionWRITE_APPENDwrite_dispositiontempfileTemporaryDirectoryospathjoinuuiduuid4pqwrite_tabler   openload_table_from_filelogginginforesultr   	Forbiddenr.   debugtimesleepRATE_LIMIT_EXCEEDED_SLEEP_TIMERuntimeError)r6   r   r   r0   r=   temp_dirfp	retry_cntsource_filejober   s              r   _write_single_blockz4_BigQueryDatasink.write.<locals>._write_single_blockg   s     &//6??A!Q%33tD
+3+@+@+H+H
(/7/H/H/U/U
,002 $hh&h0OPBNN5"(C !I#t'9'99!"d^ {"("="= +W #> #CG#LL6! !4#5#55;D<N<N;OOfg_`|}
 +FykRUVYZ  65$ $   *33 
G%NI(4+=+== %! O$A)I"V!W";!<
 $MM!, JJ'EFF
G $t'9'99$ $s]   (A,H<F")H<2#F.AH<"F+	'H<.H'H"H<AH"H<"H''H<<Iok)r   strr   raygetremoter   r   )r   r3   r4   ra   r6   s   `    r   writez_BigQueryDatasink.writeb   s~    
..*-.8;..` #33F"G GG "( (..udoot||T s   .A0)r   N)__name__
__module____qualname__DEFAULT_MAX_RETRY_CNTrc   r   intboolr   r2   r   r   r
   r   rg    r    r   r   r   ;   st     )-!6.2
	3
	3 !
	3 	
	3
 &d^
	3 
	3	4?	UO?	 ?	 	?	r    r   )%rR   rI   rG   rW   rL   typingr   r   r   pyarrow.parquetparquetrN   google.api_corer   r   google.cloudr   google.cloud.aiplatformr	   rd   'ray.data._internal.execution.interfacesr
   ray.data._internal.remote_fnr   ray.data.blockr   r   ray.data.datasource.datasinkr   ImportErrorrk   rY   __version___BQ_GAPIC_VERSION
ClientInfor)   r   rn   r    r   <module>r}      s   &  	    * *  ' & ! / 
 ? 9 /5  !# ((=8 
 +
 
 #.AR@S0T fH f'  Hs   B B! B!