@@ -112,57 +112,61 @@ def get_replacement(node_id: str, node: dict, subgraph_id: SubGraphId) -> dict:
112
112
}
113
113
}
114
114
115
- for sjob_id , subjob , subjob_dependencies in splitter .split_streaming (
116
- process_graph = process ["process_graph" ], get_replacement = get_replacement , main_subgraph_id = main_subgraph_id
117
- ):
118
- subjobs [sjob_id ] = subjob
119
- dependencies [sjob_id ] = subjob_dependencies
120
- try :
121
- # TODO: how to error handle this? job creation? Fail whole partitioned job or try to finish what is possible?
122
- con = self ._backends .get_connection (subjob .backend_id )
123
- with con .authenticated_from_request (request = flask .request ), con .override (
124
- default_timeout = CONNECTION_TIMEOUT_JOB_START
125
- ):
126
- with TimingLogger (title = f"Create batch job { pjob_id = } :{ sjob_id } on { con .id = } " , logger = _log .info ):
127
- job = con .create_job (
128
- process_graph = subjob .process_graph ,
129
- title = f"Crossbackend job { pjob_id } :{ sjob_id } " ,
130
- plan = metadata .get ("plan" ),
131
- budget = metadata .get ("budget" ),
132
- additional = job_options ,
133
- )
134
- _log .info (f"Created { pjob_id } :{ sjob_id } on backend { con .id } as batch job { job .job_id } " )
135
- batch_jobs [sjob_id ] = job
136
- title = f"Partitioned job { pjob_id = } { sjob_id = } "
137
- self ._db .insert_sjob (
138
- user_id = user_id ,
139
- pjob_id = pjob_id ,
140
- sjob_id = sjob_id ,
141
- subjob = subjob ,
142
- title = title ,
143
- status = STATUS_CREATED ,
144
- )
145
- self ._db .set_backend_job_id (
146
- user_id = user_id , pjob_id = pjob_id , sjob_id = sjob_id , job_id = job .job_id
147
- )
148
- create_stats [STATUS_CREATED ] += 1
149
- except Exception as exc :
150
- _log .error (f"Creation of { pjob_id } :{ sjob_id } failed" , exc_info = True )
151
- msg = f"Create failed: { exc } "
152
- self ._db .set_sjob_status (
153
- user_id = user_id , pjob_id = pjob_id , sjob_id = sjob_id , status = STATUS_ERROR , message = msg
154
- )
155
- create_stats [STATUS_ERROR ] += 1
115
+ try :
116
+ for sjob_id , subjob , subjob_dependencies in splitter .split_streaming (
117
+ process_graph = process ["process_graph" ],
118
+ get_replacement = get_replacement ,
119
+ main_subgraph_id = main_subgraph_id ,
120
+ ):
121
+ subjobs [sjob_id ] = subjob
122
+ dependencies [sjob_id ] = subjob_dependencies
123
+ try :
124
+ title = f"Partitioned job { pjob_id = } { sjob_id = } "
125
+ self ._db .insert_sjob (user_id = user_id , pjob_id = pjob_id , sjob_id = sjob_id , subjob = subjob , title = title )
126
+
127
+ # TODO: how to error handle this? job creation? Fail whole partitioned job or try to finish what is possible?
128
+ con = self ._backends .get_connection (subjob .backend_id )
129
+ with con .authenticated_from_request (request = flask .request ), con .override (
130
+ default_timeout = CONNECTION_TIMEOUT_JOB_START
131
+ ):
132
+ with TimingLogger (
133
+ title = f"Create batch job { pjob_id = } :{ sjob_id } on { con .id = } " , logger = _log .info
134
+ ):
135
+ job = con .create_job (
136
+ process_graph = subjob .process_graph ,
137
+ title = f"Crossbackend job { pjob_id } :{ sjob_id } " ,
138
+ plan = metadata .get ("plan" ),
139
+ budget = metadata .get ("budget" ),
140
+ additional = job_options ,
141
+ )
142
+ _log .info (f"Created { pjob_id } :{ sjob_id } on backend { con .id } as batch job { job .job_id } " )
143
+ batch_jobs [sjob_id ] = job
144
+ self ._db .set_sjob_status (
145
+ user_id = user_id , pjob_id = pjob_id , sjob_id = sjob_id , status = STATUS_CREATED
146
+ )
147
+ self ._db .set_backend_job_id (
148
+ user_id = user_id , pjob_id = pjob_id , sjob_id = sjob_id , job_id = job .job_id
149
+ )
150
+ create_stats [STATUS_CREATED ] += 1
151
+ except Exception as exc :
152
+ _log .error (f"Creation of { pjob_id } :{ sjob_id } failed" , exc_info = True )
153
+ msg = f"Create failed: { exc } "
154
+ self ._db .set_sjob_status (
155
+ user_id = user_id , pjob_id = pjob_id , sjob_id = sjob_id , status = STATUS_ERROR , message = msg
156
+ )
157
+ create_stats [STATUS_ERROR ] += 1
156
158
157
- # TODO: this is currently unused, don't bother building it at all?
158
- partitioned_job = PartitionedJob (
159
- process = process , metadata = metadata , job_options = job_options , subjobs = subjobs , dependencies = dependencies
160
- )
159
+ # TODO: this is currently unused, don't bother building it at all?
160
+ partitioned_job = PartitionedJob (
161
+ process = process , metadata = metadata , job_options = job_options , subjobs = subjobs , dependencies = dependencies
162
+ )
161
163
162
- pjob_status = STATUS_CREATED if create_stats [STATUS_CREATED ] > 0 else STATUS_ERROR
163
- self ._db .set_pjob_status (
164
- user_id = user_id , pjob_id = pjob_id , status = pjob_status , message = repr (create_stats ), progress = 0
165
- )
164
+ pjob_status = STATUS_CREATED if create_stats [STATUS_CREATED ] > 0 else STATUS_ERROR
165
+ self ._db .set_pjob_status (
166
+ user_id = user_id , pjob_id = pjob_id , status = pjob_status , message = repr (create_stats ), progress = 0
167
+ )
168
+ except Exception as exc :
169
+ self ._db .set_pjob_status (user_id = user_id , pjob_id = pjob_id , status = STATUS_ERROR , message = str (exc ))
166
170
167
171
return pjob_id
168
172
0 commit comments