-
Notifications
You must be signed in to change notification settings - Fork 12
Add support for a matrix of Airflow versions #4
Conversation
Co-authored-by: Tatiana Al-Chueyr <tatiana.alchueyr@gmail.com>
@@ -141,18 +163,81 @@ def _get_api_client(): | |||
) | |||
|
|||
|
|||
def _get_launch_task_key(current_task_key: TaskInstanceKey, group_id: str): | |||
if group_id: | |||
def get_task_group_legacy(operator: BaseOperator) -> TaskGroup: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could probably benefit by having unit tests for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We agreed to address this as part of a separate ticket: #14
Codecov ReportPatch coverage:
Additional details and impacted files@@ Coverage Diff @@
## main #4 +/- ##
===========================================
- Coverage 89.60% 78.31% -11.29%
===========================================
Files 5 5
Lines 327 415 +88
Branches 31 47 +16
===========================================
+ Hits 293 325 +32
- Misses 24 73 +49
- Partials 10 17 +7
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the great work, @pankajkoti !
We will improve the coverage in a separate PR. This is the main problem left.
The only change I was unsure about is keeping example DAGs inside tests - but we can move them outside if we all agree with this afterwards in a separate PR.
While testing the Databricks workflow and notebook operators across various versions of Airflow, it was observed that quite a few attributes are not available as per their current usage available in the latest Airflow version. To support these limitations with older Airflow versions, the PR adds the following fixes: 1. `DatabricksNotebookOperator`: The `task_group` attribute is not available with the `BaseOperator`. For that, we fetch the task group from the `TaskGroupContext` 2. `DatabricksWorkflowTaskGroup`: Like in `DatabricksNotebookOperator`, we need to fetch the task group from the `TaskGroupContext` and additionally, the `dag` attribute is not available with the `TaskGroup` base class which we need to fetch from the `DagContext` 3. `plugin.py` module: The `airflow_flask_app` utility is unavailable. So, we use the `current_app` from the flask context to get the Airflow flask instance. Additionally, the task_group is not available in the operator. So, we have added a custom implementation to fetch the task group for the legacy Airflow version with also fixing the way how we construct URLs for the views. 4. `conftest.py`: The `EmptyOperator` used here is unavailable. So we make use of the `DummyOperator` here 5. `dag.test()` is unavailable. So, we introduce local utilities to leverage the `test_dag` utility for us to run example DAGs without the scheduler Additionally, in order to support running the example DAGs i.e. running concurrent Databricks jobs with the same notebooks across multiple Airflow and Python versions we have introduced the following changes: 1. Provider unique `job_cluster_key` to the Databricks jobs 2. Add the Airflow (`2.2.4`, `2.3`, `2.4`, `2.5`) and Python (`3.8`, `3.9`) versions for the matrix in `ci.yml` Closes: #3
While testing the Databricks workflow and notebook operators across various versions of Airflow, it was observed that quite a few attributes are not available as per their current usage available in the latest Airflow version.
To support these limitations with older Airflow versions, the PR adds the following fixes:
DatabricksNotebookOperator
: Thetask_group
attribute is not available with theBaseOperator
. For that, we fetch the task group from theTaskGroupContext
DatabricksWorkflowTaskGroup
: Like inDatabricksNotebookOperator
, we need to fetch the task group from theTaskGroupContext
and additionally, thedag
attribute is not available with theTaskGroup
base class which we need to fetch from theDagContext
plugin.py
module: Theairflow_flask_app
utility is unavailable. So, we use thecurrent_app
from the flask context to get the Airflow flask instance. Additionally, the task_group is not available in the operator. So, we have added a custom implementation to fetch the task group for the legacy Airflow version with also fixing the way how we construct URLs for the views.conftest.py
: TheEmptyOperator
used here is unavailable. So we make use of theDummyOperator
heredag.test()
is unavailable. So, we introduce local utilities to leverage thetest_dag
utility for us to run example DAGs without the schedulerAdditionally, in order to support running the example DAGs i.e. running concurrent Databricks jobs with the same notebooks across multiple Airflow and Python versions we have introduced the following changes:
job_cluster_key
to the Databricks jobs2.2.4
,2.3
,2.4
,2.5
) and Python (3.8
,3.9
) versions for the matrix inci.yml
closes: #3