@@ -90,10 +90,12 @@ async def run_system_task(
90
90
return_type : Type | None = system_task_kicker .__annotations__ .get ("return" )
91
91
return TypeAdapter (return_type ).validate_python (result .return_value )
92
92
93
- async def run_task (self , task_data : TaskData , * args ):
93
+ async def run_task (self , task_data : TaskData , * args , raise_on_error : bool = False ):
94
94
task = await self .get_task (task_data .name ).kiq (* args )
95
95
task_result = await task .wait_result ()
96
- task_error = task_result .error
96
+ if (task_error := task_result .error ) and raise_on_error :
97
+ raise task_error
98
+
97
99
result = TaskResult .from_taskiq (task_data .name , task_result )
98
100
99
101
if metrics_str := "\n " .join (
@@ -228,7 +230,10 @@ async def startup(self) -> list[Coroutine]:
228
230
TaskType .SYSTEM_USER_TASKDATA , TaskType .STARTUP
229
231
):
230
232
exceptions_or_none = await quattro .gather (
231
- * map (lambda td : self .run_task (td , startup_state ), startup_tasks_taskdata ),
233
+ * map (
234
+ lambda td : self .run_task (td , startup_state , raise_on_error = True ),
235
+ startup_tasks_taskdata ,
236
+ ),
232
237
# NOTE: Any propagated failure in here should be handled so shutdown tasks run
233
238
return_exceptions = True ,
234
239
)
0 commit comments