Skip to content

Commit 05099fc

Browse files
authored
Fix pipe jobs immediately failing due to stale server_job_ids snapshot (#869)
Pipe job IDs submitted during the scheduler label loop weren't in the server_job_ids list (fetched earlier in the same iteration), causing poll_pipes to conclude the scheduler job was gone and mark all tasks FAILED TERMINAL Append the job ID to server_job_ids on both initial submission and resubmission so _is_scheduler_job_alive sees it immediately.
2 parents a92f456 + a978df8 commit 05099fc

2 files changed

Lines changed: 24 additions & 0 deletions

File tree

arc/job/pipe/pipe_coordinator.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ def submit_pipe_run(self, run_id: str, tasks: List[TaskSpec],
172172
pipe.status = PipeRunState.SUBMITTED
173173
pipe.submitted_at = time.time()
174174
pipe._save_run_metadata()
175+
self.sched.server_job_ids.append(job_id)
175176
logger.info(f'Pipe run {run_id} submitted as job {job_id}.')
176177
else:
177178
logger.warning(f'Pipe run {run_id}: submission returned status={job_status}. '
@@ -234,6 +235,7 @@ def poll_pipes(self) -> None:
234235
pipe.submitted_at = time.time()
235236
pipe._needs_resubmission = False
236237
pipe._save_run_metadata()
238+
self.sched.server_job_ids.append(job_id)
237239
logger.info(f'Pipe run {run_id}: resubmitted as job {job_id}.')
238240
else:
239241
pipe._needs_resubmission = False

arc/job/pipe/pipe_coordinator_test.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ def _make_mock_sched(project_directory):
6767
"""Create a mock Scheduler with the attributes PipeCoordinator needs."""
6868
sched = MagicMock()
6969
sched.project_directory = project_directory
70+
sched.server_job_ids = list()
7071
spc = ARCSpecies(label='H2O', smiles='O')
7172
spc.conformers = [None] * 5
7273
spc.conformer_energies = [None] * 5
@@ -157,6 +158,14 @@ def test_submit_uses_explicit_cluster_software(self):
157158
pipe = self.coord.submit_pipe_run('run_pbs', tasks, cluster_software='pbs')
158159
self.assertEqual(pipe.cluster_software, 'pbs')
159160

161+
def test_submit_adds_job_id_to_server_job_ids(self):
162+
"""Submitted pipe job ID is added to server_job_ids to prevent stale-snapshot race."""
163+
tasks = [_make_spec('t_0')]
164+
with patch.object(PipeRun, 'submit_to_scheduler', return_value=('submitted', '12345[]')):
165+
pipe = self.coord.submit_pipe_run('run_ids', tasks)
166+
self.assertIn('12345[]', self.coord.sched.server_job_ids)
167+
self.assertEqual(pipe.scheduler_job_id, '12345[]')
168+
160169

161170
class TestRegisterFromDir(unittest.TestCase):
162171
"""Tests for PipeCoordinator.register_pipe_run_from_dir()."""
@@ -222,6 +231,19 @@ def test_poll_resets_failure_count_on_success(self):
222231
self.coord.poll_pipes() # succeeds this time
223232
self.assertNotIn('run_flaky', self.coord._pipe_poll_failures)
224233

234+
def test_resubmission_adds_job_id_to_server_job_ids(self):
235+
"""Resubmitted pipe job ID is added to server_job_ids."""
236+
pipe = self.coord.submit_pipe_run('run_resub', [_make_spec('t_resub')])
237+
238+
def fake_reconcile():
239+
pipe._needs_resubmission = True
240+
return {TaskState.PENDING.value: 1}
241+
242+
with patch.object(pipe, 'reconcile', side_effect=fake_reconcile), \
243+
patch.object(pipe, 'submit_to_scheduler', return_value=('submitted', '77777[]')):
244+
self.coord.poll_pipes()
245+
self.assertIn('77777[]', self.coord.sched.server_job_ids)
246+
225247

226248
class TestIngestPipeResults(unittest.TestCase):
227249
"""Tests for PipeCoordinator.ingest_pipe_results()."""

0 commit comments

Comments
 (0)