-
Notifications
You must be signed in to change notification settings - Fork 10
cancel a running job #212
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
cancel a running job #212
Conversation
WalkthroughA new Prefect flow, Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant PrefectFlow
participant AirbyteServerBlock
participant AirbyteConnectionBlock
participant AirbyteAPI
Client->>PrefectFlow: Trigger deployment_schedule_flow_v4(payload)
PrefectFlow->>PrefectFlow: Check payload["task_slug"]
alt task_slug == "airbyte-cancel"
PrefectFlow->>AirbyteServerBlock: Load Airbyte server block
PrefectFlow->>AirbyteConnectionBlock: Load Airbyte connection block
PrefectFlow->>AirbyteAPI: Call cancel_job(connection, job_id)
AirbyteAPI-->>PrefectFlow: Return cancellation result
PrefectFlow->>Client: Log and return result
else other task_slug
PrefectFlow->>...: (Other task flows)
end
Possibly related PRs
📜 Recent review detailsConfiguration used: CodeRabbit UI ⛔ Files ignored due to path filters (1)
📒 Files selected for processing (1)
✅ Files skipped from review due to trivial changes (1)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
proxy/prefect_flows.py
(3 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
proxy/prefect_flows.py (2)
tests/test_service.py (1)
load
(75-76)proxy/helpers.py (2)
info
(32-45)error
(47-60)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: check (3.10)
🔇 Additional comments (2)
proxy/prefect_flows.py (2)
18-18
: LGTM! Import addition is correct.The import of
cancel_connection_job
is properly placed and necessary for the new cancellation functionality.
342-343
: LGTM! Integration follows established patterns.The conditional branch for "airbyte-cancel" is correctly implemented and consistent with the existing code structure for other Airbyte operations.
proxy/prefect_flows.py
Outdated
# task config for cancelling a running airbyte job | ||
# { | ||
# type AIRBYTECONNECTION, | ||
# slug: "airbyte-cancel" | ||
# airbyte_server_block: str | ||
# connection_id: str | ||
# job_id: str | ||
# } | ||
@flow | ||
def run_airbyte_cancel_job(payload: dict): | ||
"""cancel a running airbyte job""" | ||
try: | ||
airbyte_server_block = payload["airbyte_server_block"] | ||
serverblock = AirbyteServer.load(airbyte_server_block) | ||
connection_block = AirbyteConnection( | ||
airbyte_server=serverblock, | ||
connection_id=payload["connection_id"], | ||
) | ||
result = cancel_connection_job(connection_block, payload["job_id"]) | ||
logger.info("airbyte job cancel result=") | ||
logger.info(result) | ||
return result | ||
except Exception as error: # skipcq PYL-W0703 | ||
logger.error(str(error)) # "Job cancel failed." | ||
raise | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add input validation for required payload fields.
The flow implementation follows the established pattern but lacks input validation for required fields. Consider adding validation to prevent KeyError exceptions if required fields are missing from the payload.
@flow
def run_airbyte_cancel_job(payload: dict):
"""cancel a running airbyte job"""
try:
+ # Validate required fields
+ required_fields = ["airbyte_server_block", "connection_id", "job_id"]
+ for field in required_fields:
+ if field not in payload:
+ raise ValueError(f"Missing required field: {field}")
+
airbyte_server_block = payload["airbyte_server_block"]
serverblock = AirbyteServer.load(airbyte_server_block)
connection_block = AirbyteConnection(
airbyte_server=serverblock,
connection_id=payload["connection_id"],
)
result = cancel_connection_job(connection_block, payload["job_id"])
logger.info("airbyte job cancel result=")
logger.info(result)
return result
except Exception as error: # skipcq PYL-W0703
logger.error(str(error)) # "Job cancel failed."
raise
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
# task config for cancelling a running airbyte job | |
# { | |
# type AIRBYTECONNECTION, | |
# slug: "airbyte-cancel" | |
# airbyte_server_block: str | |
# connection_id: str | |
# job_id: str | |
# } | |
@flow | |
def run_airbyte_cancel_job(payload: dict): | |
"""cancel a running airbyte job""" | |
try: | |
airbyte_server_block = payload["airbyte_server_block"] | |
serverblock = AirbyteServer.load(airbyte_server_block) | |
connection_block = AirbyteConnection( | |
airbyte_server=serverblock, | |
connection_id=payload["connection_id"], | |
) | |
result = cancel_connection_job(connection_block, payload["job_id"]) | |
logger.info("airbyte job cancel result=") | |
logger.info(result) | |
return result | |
except Exception as error: # skipcq PYL-W0703 | |
logger.error(str(error)) # "Job cancel failed." | |
raise | |
@flow | |
def run_airbyte_cancel_job(payload: dict): | |
"""cancel a running airbyte job""" | |
try: | |
# Validate required fields | |
required_fields = ["airbyte_server_block", "connection_id", "job_id"] | |
for field in required_fields: | |
if field not in payload: | |
raise ValueError(f"Missing required field: {field}") | |
airbyte_server_block = payload["airbyte_server_block"] | |
serverblock = AirbyteServer.load(airbyte_server_block) | |
connection_block = AirbyteConnection( | |
airbyte_server=serverblock, | |
connection_id=payload["connection_id"], | |
) | |
result = cancel_connection_job(connection_block, payload["job_id"]) | |
logger.info("airbyte job cancel result=") | |
logger.info(result) | |
return result | |
except Exception as error: # skipcq PYL-W0703 | |
logger.error(str(error)) # "Job cancel failed." | |
raise |
🤖 Prompt for AI Agents
In proxy/prefect_flows.py around lines 96 to 121, the run_airbyte_cancel_job
flow lacks validation for required keys in the payload dictionary, which can
cause KeyError exceptions. Add explicit checks at the start of the function to
verify that "airbyte_server_block", "connection_id", and "job_id" keys exist in
the payload. If any are missing, raise a clear exception or handle the error
gracefully before proceeding with the rest of the flow logic.
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #212 +/- ##
=======================================
Coverage 76.90% 76.90%
=======================================
Files 5 5
Lines 1429 1429
=======================================
Hits 1099 1099
Misses 330 330 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Summary by CodeRabbit