File tree Expand file tree Collapse file tree 4 files changed +53
-0
lines changed Expand file tree Collapse file tree 4 files changed +53
-0
lines changed Original file line number Diff line number Diff line change 28
28
get_flow_runs_by_name ,
29
29
set_deployment_schedule ,
30
30
get_deployment ,
31
+ get_deployment_scheduled_flow_runs ,
31
32
get_flow_run ,
32
33
retry_flow_run ,
33
34
create_secret_block ,
@@ -489,6 +490,21 @@ def put_dataflow_v1(deployment_id, payload: DeploymentUpdate2):
489
490
return {"success" : 1 }
490
491
491
492
493
+ @app .get ("/proxy/v1/deployments/get_scheduled_flow_runs" )
494
+ def get_dataflow_scheduled_flow_runs (deployment_id : str ):
495
+ """fetch scheduled flow-runs for a deployment"""
496
+ if not isinstance (deployment_id , str ):
497
+ raise TypeError ("deployment_id must be a string" )
498
+ try :
499
+ res = get_deployment_scheduled_flow_runs (deployment_id )
500
+ except Exception as error :
501
+ logger .exception (error )
502
+ raise HTTPException (
503
+ status_code = 400 , detail = "failed to fetch scheduled flow-runs for deployment"
504
+ ) from error
505
+ return {"flow_runs" : res }
506
+
507
+
492
508
@app .post ("/proxy/flow_run/" )
493
509
async def get_flowrun (payload : FlowRunRequest ):
494
510
"""look up a flow run by name and return id if found"""
Original file line number Diff line number Diff line change @@ -725,6 +725,19 @@ def get_deployment(deployment_id: str) -> dict:
725
725
return res
726
726
727
727
728
+ def get_deployment_scheduled_flow_runs (deployment_id : str ) -> dict :
729
+ """fetch scheduled flow-runs for a deployment"""
730
+ if not isinstance (deployment_id , str ):
731
+ raise TypeError ("deployment_id must be a string" )
732
+ res = prefect_post (
733
+ "deployments/get_scheduled_flow_runs" ,
734
+ {
735
+ "deployment_ids" : [deployment_id ],
736
+ },
737
+ )
738
+ return res
739
+
740
+
728
741
def update_flow_run_final_state (flow_run : dict ) -> dict :
729
742
"""
730
743
fetch tasks of the flow_run & checks for the custom flow_run state
Original file line number Diff line number Diff line change 35
35
post_run_dbtcore_flow_v1 ,
36
36
post_dataflow_v1 ,
37
37
put_dataflow_v1 ,
38
+ get_dataflow_scheduled_flow_runs ,
38
39
get_long_running_flows ,
39
40
patch_dbt_cloud_creds ,
40
41
get_dbt_cloud_creds ,
@@ -588,6 +589,15 @@ def test_put_dataflow_v1_success():
588
589
assert result == {"success" : 1 }
589
590
590
591
592
+ def test_get_dataflow_scheduled_flow_runs ():
593
+ with patch (
594
+ "proxy.main.get_deployment_scheduled_flow_runs"
595
+ ) as mock_get_deployment_scheduled_flow_runs :
596
+ mock_get_deployment_scheduled_flow_runs .return_value = [{"id" : "flow_run_id" }]
597
+ result = get_dataflow_scheduled_flow_runs ("deployment-id" )
598
+ assert result == {"flow_runs" : [{"id" : "flow_run_id" }]}
599
+
600
+
591
601
def test_get_flow_run_by_id_badparams ():
592
602
with pytest .raises (TypeError ) as excinfo :
593
603
get_flow_run_by_id (123 )
Original file line number Diff line number Diff line change 52
52
post_deployment_v1 ,
53
53
put_deployment_v1 ,
54
54
get_deployment ,
55
+ get_deployment_scheduled_flow_runs ,
55
56
CronSchedule ,
56
57
post_deployment_flow_run ,
57
58
create_secret_block ,
@@ -1091,6 +1092,19 @@ def test_get_deployment(mock_get: Mock):
1091
1092
assert response == "retval"
1092
1093
1093
1094
1095
+ @patch ("proxy.service.prefect_post" )
1096
+ def test_get_deployment_scheduled_flow_runs (mock_post : Mock ):
1097
+ mock_post .return_value = [{"id" : "flow_run_id" }]
1098
+ response = get_deployment_scheduled_flow_runs ("deployment-id" )
1099
+ mock_post .assert_called_once_with (
1100
+ "deployments/get_scheduled_flow_runs" ,
1101
+ {
1102
+ "deployment_ids" : ["deployment-id" ],
1103
+ },
1104
+ )
1105
+ assert response == [{"id" : "flow_run_id" }]
1106
+
1107
+
1094
1108
def test_get_flow_runs_by_deployment_id_type_error ():
1095
1109
with pytest .raises (TypeError ):
1096
1110
get_flow_runs_by_deployment_id (123 , 10 , "" )
You can’t perform that action at this time.
0 commit comments