@@ -78,8 +78,6 @@ def __print_progress_a2vybg(progress: float) -> None:
78
78
79
79
80
80
def load_data_dict_a2vybg (record : Dict [str , Any ]) -> Dict [str , Any ]:
81
- global vocab_a2vybg
82
-
83
81
if record ["bytes" ][:2 ] == "\\ x" :
84
82
record ["bytes" ] = record ["bytes" ][2 :]
85
83
else :
@@ -109,17 +107,15 @@ def parse_data_to_record_dict_a2vybg(
109
107
110
108
111
109
def send_cache_to_object_storage_a2vybg ():
112
- global llm_ac_cache_a2vybg , llm_config_hash_a2vybg , cached_records_a2vybg
110
+ global llm_ac_cache_a2vybg
113
111
114
112
if data_type == "LLM_RESPONSE" and "http" in CACHE_FILE_UPLOAD_LINK_A2VYBG :
115
113
llm_ac_cache_a2vybg [llm_config_hash_a2vybg ] = cached_records_a2vybg
116
114
requests .put (CACHE_FILE_UPLOAD_LINK_A2VYBG , json = llm_ac_cache_a2vybg )
117
115
118
116
119
117
def save_ac_value_a2vybg (record_id : str , attr_value : Any ) -> None :
120
- global calculated_attribute_by_record_id_a2vybg , processed_records_a2vybg , progress_size_a2vybg , amount_a2vybg
121
- global check_data_type_a2vybg , py_data_types_a2vybg , llm_ac_cache_a2vybg , llm_config_hash_a2vybg , cached_records_a2vybg
122
- global CACHE_FILE_UPLOAD_LINK_A2VYBG
118
+ global processed_records_a2vybg
123
119
124
120
if not check_data_type_a2vybg (attr_value ):
125
121
raise ValueError (
@@ -147,14 +143,13 @@ def process_attribute_calculation_a2vybg(
147
143
148
144
def check_abort_status_a2vybg () -> bool :
149
145
# function outside the async loop for reading always the freshest value
150
- global should_abort_a2vybg
151
146
return should_abort_a2vybg
152
147
153
148
154
149
async def process_llm_record_batch_a2vybg (
155
150
record_dict_batch : List [Dict [str , Any ]]
156
151
) -> None :
157
- global DEFAULT_USER_PROMPT_A2VYBG , cached_records_a2vybg
152
+ global should_abort_a2vybg
158
153
159
154
for record_dict in record_dict_batch :
160
155
if check_abort_status_a2vybg ():
@@ -169,33 +164,32 @@ async def process_llm_record_batch_a2vybg(
169
164
170
165
save_ac_value_a2vybg (record_dict ["id" ], attr_value )
171
166
except Exception as e :
172
- global should_abort_a2vybg
173
167
should_abort_a2vybg = True
174
168
print (f"Error in record { record_dict ['data' ]['running_id' ]} : { str (e )} " )
175
169
return
176
170
177
171
172
+ def make_batches (
173
+ iterable : List [Any ], size : int = 1
174
+ ) -> Generator [List [Any ], None , None ]:
175
+ length = len (iterable )
176
+ for ndx in range (0 , length , size ):
177
+ yield iterable [ndx : min (ndx + size , length )]
178
+
179
+
178
180
async def process_async_llm_calls_a2vybg (
179
181
record_dict_list : List [Dict [str , Any ]]
180
182
) -> None :
181
- global amount_a2vybg
182
-
183
- def make_batches (
184
- iterable : List [Any ], size : int = 1
185
- ) -> Generator [List [Any ], None , None ]:
186
- length = len (iterable )
187
- for ndx in range (0 , length , size ):
188
- yield iterable [ndx : min (ndx + size , length )]
189
183
190
184
batch_size = max (amount_a2vybg // int (attribute_calculators .NUM_WORKERS_A2VYBG ), 1 )
191
185
tasks = [
192
186
process_llm_record_batch_a2vybg (batch )
193
187
for batch in make_batches (record_dict_list , size = batch_size )
194
188
]
195
189
await asyncio .gather (* tasks )
196
- send_cache_to_object_storage_a2vybg ()
197
190
if check_abort_status_a2vybg ():
198
191
raise ValueError ("Encountered error during LLM processing." )
192
+ send_cache_to_object_storage_a2vybg ()
199
193
200
194
201
195
if __name__ == "__main__" :
0 commit comments