Skip to content

Commit 130a75c

Browse files
Multiple schedulers (#156)
* implement multiple schedulers * add dependencies * add a comment * remove debug log * set scheduler in request_metadata * update scheduler info on request_metadata * typing * qa * changes cli * refactor: streamline worker count retrieval and add Plackett-Luce shuffle for load balancing * refactor: update Broker class to use 'schedulers' dictionary instead of 'clients' and remove unused 'scheduler_url' parameter
1 parent 12fd405 commit 130a75c

File tree

6 files changed

+187
-68
lines changed

6 files changed

+187
-68
lines changed

alembic/versions/a4e8be715296_add_deleted_as_new_status.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,13 @@ def upgrade() -> None:
4646
def downgrade() -> None:
4747
# Remove the new status from the enum
4848
# this doesn't work
49-
#op.execute("ALTER TYPE status DELETE VALUE 'deleted'")
50-
op.execute("CREATE TYPE status_old AS ENUM ('accepted','running','failed','successful','dismissed')")
49+
# op.execute("ALTER TYPE status DELETE VALUE 'deleted'")
50+
op.execute(
51+
"CREATE TYPE status_old AS ENUM ('accepted','running','failed','successful','dismissed')"
52+
)
5153
op.execute("DELETE FROM system_requests where status='deleted'")
52-
op.execute("ALTER TABLE system_requests ALTER COLUMN status TYPE status_old USING (status::text::status_old)")
54+
op.execute(
55+
"ALTER TABLE system_requests ALTER COLUMN status TYPE status_old USING (status::text::status_old)"
56+
)
5357
op.execute("DROP TYPE status")
54-
op.execute("ALTER TYPE status_old RENAME TO status")
58+
op.execute("ALTER TYPE status_old RENAME TO status")

cads_broker/database.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,7 @@ def set_request_status(
719719
error_reason: str | None = None,
720720
resubmit: bool | None = None,
721721
priority: float | None = None,
722+
scheduler: str | None = None,
722723
) -> SystemRequest:
723724
"""Set the status of a request."""
724725
statement = sa.select(SystemRequest).where(SystemRequest.request_uid == request_uid)
@@ -734,6 +735,9 @@ def set_request_status(
734735
if priority is not None:
735736
metadata.update({"priority": priority})
736737
request.request_metadata = metadata
738+
if scheduler is not None:
739+
metadata.update({"scheduler": scheduler})
740+
request.request_metadata = metadata
737741
if status == "successful":
738742
request.finished_at = sa.func.now()
739743
elif status == "failed":

0 commit comments

Comments
 (0)