Skip to content

Commit ee4ee43

Browse files
authored
Merge pull request #4 from nuclearcat/improvements-2
improvements: add scheduler and verify no running workflows
2 parents d02069b + fa6a2e9 commit ee4ee43

File tree

11 files changed

+519
-42
lines changed

11 files changed

+519
-42
lines changed

CHANGELOG.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,16 @@
22

33
All notable changes to the KernelCI Staging Control application will be documented in this file.
44

5-
## [Unreleased]
5+
## [0.2.0]
6+
7+
### Added
8+
9+
- **Scheduler**: Added staging scheduler that will run staging at 0:00UTC 8:00UTC,16:00UTC
10+
- Will be run using fastapi-crons
11+
- Condition: If staging has run less than 1 hour ago - skip run
12+
- Appears dashboard as user run, under "virtual" user "scheduler"
13+
14+
## [0.1.0]
615

716
### Added
817
- **Staging Run Cancellation**: Added ability to cancel running staging runs

database.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
DEFAULT_ADMIN_EMAIL,
1313
SETTINGS_KEYS,
1414
)
15+
from scheduler_user import ensure_scheduler_user
1516

1617
SQLALCHEMY_DATABASE_URL = DATABASE_URL
1718

@@ -66,6 +67,13 @@ def run_migrations():
6667

6768
print("Database migration check completed")
6869

70+
# Ensure scheduler user exists for legacy databases migrated without init_db
71+
db = SessionLocal()
72+
try:
73+
ensure_scheduler_user(db)
74+
finally:
75+
db.close()
76+
6977

7078
def init_db():
7179
"""Initialize database and create default admin user"""
@@ -94,6 +102,8 @@ def init_db():
94102
db.commit()
95103
print("Default admin user created")
96104

105+
ensure_scheduler_user(db)
106+
97107
# Create default settings
98108
for setting_name, setting_key in SETTINGS_KEYS.items():
99109
setting = db.query(Settings).filter(Settings.key == setting_key).first()

deployment_manager.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,3 +440,62 @@ async def docker_workaround(self) -> Dict[str, Any]:
440440
)
441441

442442
return {"success": True, "results": results}
443+
444+
async def restart_trigger_service(self) -> Dict[str, Any]:
445+
"""
446+
Restart the trigger service in the pipeline
447+
Returns: {"success": bool, "error": str, "details": dict}
448+
"""
449+
result = {
450+
"success": False,
451+
"error": None,
452+
"details": {"start_time": datetime.utcnow().isoformat()},
453+
}
454+
455+
try:
456+
# Set environment variable for pipeline settings
457+
env = os.environ.copy()
458+
env["SETTINGS"] = PIPELINE_SETTINGS_PATH
459+
460+
# Restart the trigger service using docker-compose
461+
cmd = ["docker-compose"]
462+
463+
# Add compose files if configured
464+
if self.compose_files:
465+
cmd.extend(self.compose_files)
466+
467+
# Add restart command for trigger service
468+
cmd.extend(["restart", "trigger"])
469+
470+
print(f"Restarting trigger service with command: {' '.join(cmd)}")
471+
472+
process = await asyncio.create_subprocess_exec(
473+
*cmd,
474+
stdout=asyncio.subprocess.PIPE,
475+
stderr=asyncio.subprocess.PIPE,
476+
env=env,
477+
cwd=self.pipeline_path,
478+
)
479+
480+
stdout, stderr = await process.communicate()
481+
482+
if process.returncode == 0:
483+
result["success"] = True
484+
result["details"]["stdout"] = stdout.decode("utf-8", errors="ignore")
485+
result["details"]["stderr"] = stderr.decode("utf-8", errors="ignore")
486+
result["details"]["end_time"] = datetime.utcnow().isoformat()
487+
print("Successfully restarted trigger service")
488+
else:
489+
result["error"] = (
490+
f"Failed to restart trigger service: {stderr.decode('utf-8', errors='ignore')}"
491+
)
492+
result["details"]["stdout"] = stdout.decode("utf-8", errors="ignore")
493+
result["details"]["stderr"] = stderr.decode("utf-8", errors="ignore")
494+
print(f"Failed to restart trigger service: {result['error']}")
495+
496+
except Exception as e:
497+
result["error"] = f"Exception restarting trigger service: {str(e)}"
498+
result["details"]["exception"] = str(e)
499+
print(f"Exception restarting trigger service: {e}")
500+
501+
return result

main.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from sqlalchemy.orm import Session
2525
from pydantic import BaseModel
2626
import uvicorn
27+
from fastapi_crons import Crons
2728

2829
from config import (
2930
APP_TITLE,
@@ -63,6 +64,7 @@
6364
validate_single_running_staging,
6465
enforce_single_running_staging,
6566
)
67+
from scheduler import register_cron_jobs
6668

6769

6870
# Pydantic models for API requests
@@ -115,6 +117,8 @@ async def lifespan(app: FastAPI):
115117

116118
# Create app with lifespan
117119
app = FastAPI(title=APP_TITLE, lifespan=lifespan)
120+
crons = Crons(app)
121+
register_cron_jobs(crons)
118122

119123
# Mount static files
120124
app.mount("/static", StaticFiles(directory="templates"), name="static")
@@ -974,5 +978,63 @@ async def get_staging_status(
974978
}
975979

976980

981+
@app.get("/api/staging/check-workflow")
982+
async def check_workflow_status(
983+
current_user: User = Depends(get_current_user),
984+
db: Session = Depends(get_db),
985+
):
986+
"""Check if GitHub workflow is already running"""
987+
# Only allow admin/maintainer to check workflow status
988+
if current_user.role not in [UserRole.ADMIN, UserRole.MAINTAINER]:
989+
raise HTTPException(
990+
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
991+
)
992+
993+
# Check if GitHub token is configured
994+
github_token = get_setting(GITHUB_TOKEN)
995+
if not github_token:
996+
return {
997+
"can_trigger": False,
998+
"reason": "GitHub token not configured",
999+
"running_workflows": [],
1000+
}
1001+
1002+
try:
1003+
from github_integration import GitHubWorkflowManager
1004+
1005+
github_manager = GitHubWorkflowManager(github_token)
1006+
running_workflows = await github_manager.get_running_workflows()
1007+
1008+
if running_workflows:
1009+
return {
1010+
"can_trigger": False,
1011+
"reason": f"GitHub workflow is already running ({len(running_workflows)} active)",
1012+
"running_workflows": running_workflows,
1013+
}
1014+
1015+
# Also check if there's a running staging in database
1016+
running_staging = validate_single_running_staging(db)
1017+
if running_staging:
1018+
return {
1019+
"can_trigger": False,
1020+
"reason": f"Staging run #{running_staging.id} is already running",
1021+
"running_workflows": [],
1022+
}
1023+
1024+
return {
1025+
"can_trigger": True,
1026+
"reason": "Ready to trigger new staging run",
1027+
"running_workflows": [],
1028+
}
1029+
1030+
except Exception as e:
1031+
print(f"Error checking workflow status: {e}")
1032+
return {
1033+
"can_trigger": False,
1034+
"reason": f"Error checking workflow status: {str(e)}",
1035+
"running_workflows": [],
1036+
}
1037+
1038+
9771039
if __name__ == "__main__":
9781040
uvicorn.run(app, host=HOST, port=PORT)

models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ class StagingStepType(str, enum.Enum):
3232
KERNEL_TREE_UPDATE = "kernel_tree_update"
3333
API_PIPELINE_UPDATE = "api_pipeline_update"
3434
MONITORING_SETUP = "monitoring_setup"
35+
TRIGGER_RESTART = "trigger_restart"
3536

3637

3738
class StagingRunStatus(str, enum.Enum):

orchestrator.py

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,11 +137,15 @@ async def initialize_staging_steps(self, staging_run: StagingRun, db: Session):
137137

138138
steps.extend(
139139
[
140-
{"type": StagingStepType.KERNEL_TREE_UPDATE, "order": order_counter},
141140
{
142141
"type": StagingStepType.API_PIPELINE_UPDATE,
142+
"order": order_counter,
143+
},
144+
{
145+
"type": StagingStepType.KERNEL_TREE_UPDATE,
143146
"order": order_counter + 1,
144147
},
148+
{"type": StagingStepType.TRIGGER_RESTART, "order": order_counter + 2},
145149
]
146150
)
147151

@@ -221,6 +225,8 @@ async def process_step(
221225
await self.process_kernel_tree_step(staging_run, step, db)
222226
elif step.step_type == StagingStepType.API_PIPELINE_UPDATE:
223227
await self.process_api_pipeline_step(staging_run, step, db)
228+
elif step.step_type == StagingStepType.TRIGGER_RESTART:
229+
await self.process_trigger_restart_step(staging_run, step, db)
224230

225231
async def process_github_workflow_step(
226232
self, staging_run: StagingRun, step: StagingRunStep, db: Session
@@ -441,6 +447,36 @@ async def process_api_pipeline_step(
441447
step.end_time = datetime.utcnow()
442448
db.commit()
443449

450+
async def process_trigger_restart_step(
451+
self, staging_run: StagingRun, step: StagingRunStep, db: Session
452+
):
453+
"""Process trigger restart step - restart the trigger service in pipeline"""
454+
if step.status == StagingStepStatus.PENDING:
455+
step.status = StagingStepStatus.RUNNING
456+
step.start_time = datetime.utcnow()
457+
staging_run.current_step = "trigger_restart"
458+
db.commit()
459+
460+
try:
461+
# Restart the trigger service in the pipeline
462+
result = await self.deployment_manager.restart_trigger_service()
463+
464+
step.end_time = datetime.utcnow()
465+
if result["success"]:
466+
step.status = StagingStepStatus.COMPLETED
467+
step.details = json.dumps(result)
468+
else:
469+
step.status = StagingStepStatus.FAILED
470+
step.error_message = result.get("error", "Unknown error")
471+
472+
db.commit()
473+
474+
except Exception as e:
475+
step.status = StagingStepStatus.FAILED
476+
step.error_message = str(e)
477+
step.end_time = datetime.utcnow()
478+
db.commit()
479+
444480
async def complete_staging_run(
445481
self, staging_run: StagingRun, db: Session, success: bool
446482
):
@@ -508,6 +544,8 @@ async def recover_stuck_steps(self, staging_run: StagingRun, db: Session):
508544
timeout_minutes = 15 # Git operations
509545
elif step.step_type == StagingStepType.SELF_UPDATE:
510546
timeout_minutes = 10 # Quick git pull
547+
elif step.step_type == StagingStepType.TRIGGER_RESTART:
548+
timeout_minutes = 5 # Quick docker restart
511549

512550
if running_duration.total_seconds() > (timeout_minutes * 60):
513551
print(
@@ -576,6 +614,7 @@ async def startup_recovery(self):
576614
StagingStepType.API_PIPELINE_UPDATE,
577615
StagingStepType.KERNEL_TREE_UPDATE,
578616
StagingStepType.SELF_UPDATE,
617+
StagingStepType.TRIGGER_RESTART,
579618
]:
580619
print(
581620
f"Recovering stuck local step: {step.step_type.value}"

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ passlib[bcrypt]==1.7.4
88
aiofiles==23.2.1
99
httpx==0.25.2
1010
toml
11+
fastapi-crons==2.0.1

0 commit comments

Comments
 (0)