-
Notifications
You must be signed in to change notification settings - Fork 8
(DRAFT) #1132 HERE dynamic binning aggregation #1165
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 57 commits
a0fbc23
48ad3a8
073a814
29b8326
6d1847e
696de6e
8edb8c8
0128961
330668c
521ecef
154ea44
ab45c51
e51d6ed
71468b6
dcd9584
ad3c31e
b1befb7
c65985c
a59f2fd
b23d611
b7a6f70
674b0c1
61fdfdc
e9049a4
4e93259
82bf16a
8bed8c2
401dada
84b68f2
c5e5f91
53be925
0422ab0
40f9c99
3ea3d8a
17037fd
b3f8dac
ec10752
57496b5
d1ccac3
98d7b95
9e37d4a
79b7cf7
b3a49f8
c3501a7
0254db5
bbf6779
e1e9e38
afd63a8
b69fcd7
aa724ce
5c71af7
ecb6585
d2dafe6
3ff60e4
bb38800
d742eb9
70e0fc9
f1679d1
cdb8886
0657471
e77503a
f50d023
57cc194
2c37631
0effb5f
a058f7c
c6ef208
6dc942f
dd3170b
7863ef9
9c63a32
191a2c2
4e14aee
b3c62df
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,73 @@ | ||
| ''' | ||
| To trigger for past date (range) use CLI: | ||
| for i in {0..5}; do | ||
| end_date=$(date -I -d "2023-11-02 +$i days") | ||
| airflow dags trigger -e "${end_date}" here_dynamic_binning_agg | ||
| done | ||
|
|
||
| or trigger just one day: airflow dags trigger -e 2023-11-02 here_dynamic_binning_agg | ||
| `airflow dags backfill ...` doesn't work because there are no scheduled run dates in that range. | ||
| ''' | ||
|
|
||
| import sys | ||
| import os | ||
| import logging | ||
| from datetime import timedelta | ||
| from pendulum import duration, datetime | ||
|
|
||
| from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator | ||
| from airflow.models import Variable | ||
| from airflow.decorators import dag, task | ||
|
|
||
| try: | ||
| repo_path = os.path.abspath(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))) | ||
| sys.path.insert(0, repo_path) | ||
| from dags.dag_functions import task_fail_slack_alert | ||
| except: | ||
| raise ImportError("Cannot import slack alert functions") | ||
|
|
||
| LOGGER = logging.getLogger(__name__) | ||
| logging.basicConfig(level=logging.DEBUG) | ||
|
|
||
| doc_md = "This DAG is running off the `1132-here-aggregation-proposal` branch to test dynamic binning aggregation." | ||
| DAG_NAME = 'here_dynamic_binning_agg' | ||
| DAG_OWNERS = Variable.get('dag_owners', deserialize_json=True).get(DAG_NAME, ["Unknown"]) | ||
|
|
||
| default_args = { | ||
| 'owner': ','.join(DAG_OWNERS), | ||
| 'depends_on_past':False, | ||
| 'start_date': datetime(2019, 1, 1, tz="America/Toronto"), | ||
| #aggregation doesn't work on 24_4 yet (no congestion.network_links_24_4) | ||
| #'end_date': datetime(2025, 3, 17, tz="America/Toronto"), | ||
| 'email_on_failure': False, | ||
| 'email_on_success': False, | ||
| 'retries': 1, | ||
| 'retry_delay': duration(minutes=5), | ||
| #'on_failure_callback': task_fail_slack_alert | ||
| } | ||
|
|
||
| @dag( | ||
| DAG_NAME, | ||
| default_args=default_args, | ||
| schedule=None, # triggered by `pull_here_path` DAG | ||
| doc_md = doc_md, | ||
| tags=["HERE", "aggregation"], | ||
| max_active_runs=1, | ||
| catchup=False | ||
| ) | ||
|
|
||
| #to add: catchup, one task at a time, depends on past. | ||
|
|
||
| def here_dynamic_binning_agg(): | ||
| aggregate_daily = SQLExecuteQueryOperator( | ||
| sql=["DELETE FROM gwolofs.congestion_raw_segments WHERE dt = '{{ ds }}'", | ||
| "SELECT gwolofs.congestion_network_segment_agg('{{ ds }}'::date);"], | ||
| task_id='aggregate_daily', | ||
| conn_id='congestion_bot', | ||
| autocommit=True, | ||
| retries = 1, | ||
| execution_timeout=timedelta(hours=1) | ||
| ) | ||
| aggregate_daily | ||
|
|
||
| here_dynamic_binning_agg() |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,73 @@ | ||
| import os | ||
| import sys | ||
| import logging | ||
| from datetime import timedelta | ||
| from pendulum import duration, datetime | ||
|
|
||
| from airflow.models import Variable | ||
| from airflow.decorators import dag | ||
| from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator | ||
|
|
||
| try: | ||
| repo_path = os.path.abspath(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))) | ||
| sys.path.insert(0, repo_path) | ||
| from dags.dag_functions import task_fail_slack_alert | ||
| from dags.custom_operators import SQLCheckOperatorWithReturnValue | ||
| except: | ||
| raise ImportError("Cannot import slack alert functions") | ||
|
|
||
| LOGGER = logging.getLogger(__name__) | ||
| logging.basicConfig(level=logging.DEBUG) | ||
|
|
||
| doc_md = "This DAG is running off the `1132-here-aggregation-proposal` branch to test dynamic binning aggregation." | ||
| DAG_NAME = 'here_dynamic_binning_monthly_agg' | ||
| DAG_OWNERS = Variable.get('dag_owners', deserialize_json=True).get(DAG_NAME, ["Unknown"]) | ||
|
|
||
| default_args = { | ||
| 'owner': ','.join(DAG_OWNERS), | ||
| 'depends_on_past':False, | ||
| 'start_date': datetime(2019, 1, 1, tz="America/Toronto"), | ||
| 'email_on_failure': False, | ||
| 'email_on_success': False, | ||
| 'retries': 1, | ||
| 'retry_delay': duration(hours=1), | ||
| 'on_failure_callback': task_fail_slack_alert | ||
| } | ||
|
|
||
| @dag( | ||
| DAG_NAME, | ||
| default_args=default_args, | ||
| schedule='0 16 1 * *', # 4pm, first day of month | ||
| template_searchpath=os.path.join(repo_path,'here/traffic/sql/dynamic_bins'), | ||
| doc_md = doc_md, | ||
| tags=["HERE", "aggregation"], | ||
| max_active_runs=1, | ||
| catchup=True | ||
| ) | ||
|
|
||
| #to add: catchup, one task at a time, depends on past. | ||
|
|
||
| def here_dynamic_binning_monthly_agg(): | ||
|
|
||
| check_missing_dates = SQLCheckOperatorWithReturnValue( | ||
| sql="select-check_missing_days.sql", | ||
| task_id="check_missing_dates", | ||
| conn_id='congestion_bot', | ||
| retries = 1, | ||
| execution_timeout=timedelta(minutes=10) | ||
| ) | ||
|
|
||
| aggregate_monthly = SQLExecuteQueryOperator( | ||
| sql=[ | ||
| "DELETE FROM gwolofs.congestion_segments_monthy_summary WHERE mnth = '{{ ds }}'", | ||
| "SELECT gwolofs.congestion_segment_monthly_agg('{{ ds }}')" | ||
| ], | ||
| task_id='aggregate_monthly', | ||
| conn_id='congestion_bot', | ||
| autocommit=True, | ||
| retries = 1, | ||
| execution_timeout=timedelta(hours=1) | ||
| ) | ||
| check_missing_dates >> aggregate_monthly | ||
|
|
||
| here_dynamic_binning_monthly_agg() |
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| --test: 35 projects, 1 day = 47s | ||
| SELECT gwolofs.congestion_cache_tt_results_daily( | ||
| node_start := congestion_corridors.node_start, | ||
| node_end := congestion_corridors.node_end, | ||
| start_date := dates.dt::date | ||
| ) | ||
| FROM gwolofs.congestion_corridors | ||
| JOIN gwolofs.congestion_projects USING (project_id), | ||
| generate_series('2025-01-01', '2025-02-28', '1 day'::interval) AS dates(dt) | ||
|
Check failure on line 9 in here/traffic/sql/dynamic_bins/corridor_agg.sql
|
||
| WHERE | ||
| congestion_projects.description IN ( | ||
| 'Avenue Road cycleway installation', | ||
| 'bluetooth_corridors', | ||
| 'scrutinized-cycleway-corridors' | ||
| ) | ||
| AND corridor_id NOT IN ( | ||
| SELECT DISTINCT corridor_id FROM gwolofs.congestion_raw_corridors WHERE dt >= '2025-01-01' AND dt < '2025-02-28' | ||
|
Check notice on line 17 in here/traffic/sql/dynamic_bins/corridor_agg.sql
|
||
| ) | ||
| AND map_version = '23_4'; | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| -- Table: gwolofs.congestion_corridors | ||
|
|
||
| -- DROP TABLE IF EXISTS gwolofs.congestion_corridors; | ||
|
|
||
| CREATE TABLE IF NOT EXISTS gwolofs.congestion_corridors | ||
| ( | ||
| link_dirs text [] COLLATE pg_catalog."default", | ||
| lengths numeric [], | ||
| geom geometry, | ||
| total_length numeric, | ||
| corridor_id smallint NOT NULL DEFAULT nextval('congestion_corridors_uid_seq'::regclass), | ||
| node_start bigint NOT NULL, | ||
| node_end bigint NOT NULL, | ||
| map_version text COLLATE pg_catalog."default" NOT NULL, | ||
| corridor_streets text COLLATE pg_catalog."default", | ||
| corridor_start text COLLATE pg_catalog."default", | ||
| corridor_end text COLLATE pg_catalog."default", | ||
| project_id integer, | ||
| CONSTRAINT congestion_corridors_pkey PRIMARY KEY (node_start, node_end, map_version), | ||
| CONSTRAINT corridor_pkey UNIQUE NULLS NOT DISTINCT (corridor_id), | ||
| CONSTRAINT project_id_fk FOREIGN KEY (project_id) | ||
| REFERENCES gwolofs.congestion_projects (project_id) MATCH SIMPLE | ||
| ON UPDATE NO ACTION | ||
| ON DELETE NO ACTION | ||
| NOT VALID | ||
| ) | ||
|
|
||
| TABLESPACE pg_default; | ||
|
|
||
| ALTER TABLE IF EXISTS gwolofs.congestion_corridors | ||
| OWNER TO gwolofs; | ||
|
|
||
| REVOKE ALL ON TABLE gwolofs.congestion_corridors FROM bdit_humans; | ||
|
|
||
| GRANT SELECT ON TABLE gwolofs.congestion_corridors TO bdit_humans; | ||
|
|
||
| GRANT ALL ON TABLE gwolofs.congestion_corridors TO gwolofs; | ||
|
|
||
| COMMENT ON TABLE gwolofs.congestion_corridors IS | ||
| 'Stores cached travel time corridors to reduce routing time.'; | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| -- Table: gwolofs.congestion_projects | ||
|
|
||
| -- DROP TABLE IF EXISTS gwolofs.congestion_projects; | ||
|
|
||
| CREATE TABLE IF NOT EXISTS gwolofs.congestion_projects | ||
| ( | ||
| project_id integer NOT NULL DEFAULT nextval('congestion_projects_project_id_seq'::regclass), | ||
| description text COLLATE pg_catalog."default" NOT NULL, | ||
| CONSTRAINT congestion_projects_pkey PRIMARY KEY (project_id), | ||
| CONSTRAINT unique_prj_description UNIQUE NULLS NOT DISTINCT (description) | ||
| ) | ||
|
|
||
| TABLESPACE pg_default; | ||
|
|
||
| ALTER TABLE IF EXISTS gwolofs.congestion_projects | ||
| OWNER TO gwolofs; | ||
|
|
||
| REVOKE ALL ON TABLE gwolofs.congestion_projects FROM bdit_humans; | ||
|
|
||
| GRANT SELECT ON TABLE gwolofs.congestion_projects TO bdit_humans; | ||
|
|
||
| GRANT ALL ON TABLE gwolofs.congestion_projects TO gwolofs; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,80 @@ | ||
| -- Table: gwolofs.congestion_raw_corridors | ||
|
|
||
| -- DROP TABLE IF EXISTS gwolofs.congestion_raw_corridors; | ||
|
|
||
| CREATE TABLE IF NOT EXISTS gwolofs.congestion_raw_corridors | ||
| ( | ||
| corridor_id smallint, | ||
| time_grp timerange NOT NULL, | ||
| bin_range tsrange NOT NULL, | ||
| tt numeric, | ||
| num_obs integer, | ||
| uri_string text COLLATE pg_catalog."default", | ||
| dt date, | ||
| hr timestamp without time zone, | ||
|
||
| CONSTRAINT congestion_raw_corridors_pkey PRIMARY KEY (corridor_id, bin_range, time_grp), | ||
| CONSTRAINT corridor_fkey FOREIGN KEY (corridor_id) | ||
| REFERENCES gwolofs.congestion_corridors (corridor_id) MATCH SIMPLE | ||
| ON UPDATE NO ACTION | ||
| ON DELETE CASCADE | ||
| NOT VALID | ||
| ) | ||
|
|
||
| TABLESPACE pg_default; | ||
|
|
||
| ALTER TABLE IF EXISTS gwolofs.congestion_raw_corridors | ||
| OWNER TO gwolofs; | ||
|
|
||
| REVOKE ALL ON TABLE gwolofs.congestion_raw_corridors FROM bdit_humans; | ||
|
|
||
| GRANT SELECT ON TABLE gwolofs.congestion_raw_corridors TO bdit_humans; | ||
|
|
||
| GRANT ALL ON TABLE gwolofs.congestion_raw_corridors TO gwolofs; | ||
|
|
||
| -- Index: congestion_raw_corridors_dt_idx | ||
|
|
||
| -- DROP INDEX IF EXISTS gwolofs.congestion_raw_corridors_dt_idx; | ||
|
|
||
| CREATE INDEX IF NOT EXISTS congestion_raw_corridors_dt_idx | ||
| ON gwolofs.congestion_raw_corridors USING brin | ||
| (dt) | ||
| TABLESPACE pg_default; | ||
| -- Index: congestion_raw_corridors_uri_string | ||
|
|
||
| -- DROP INDEX IF EXISTS gwolofs.congestion_raw_corridors_uri_string; | ||
|
|
||
| CREATE INDEX IF NOT EXISTS congestion_raw_corridors_uri_string | ||
| ON gwolofs.congestion_raw_corridors USING btree | ||
| (uri_string COLLATE pg_catalog."default" ASC NULLS LAST) | ||
| WITH (deduplicate_items = TRUE) | ||
| TABLESPACE pg_default; | ||
| -- Index: dynamic_binning_results_time_grp_corridor_id_idx | ||
|
|
||
| -- DROP INDEX IF EXISTS gwolofs.dynamic_binning_results_time_grp_corridor_id_idx; | ||
|
|
||
| CREATE INDEX IF NOT EXISTS dynamic_binning_results_time_grp_corridor_id_idx | ||
| ON gwolofs.congestion_raw_corridors USING btree | ||
| (time_grp ASC NULLS LAST, corridor_id ASC NULLS LAST, dt ASC NULLS LAST) | ||
| WITH (deduplicate_items = TRUE) | ||
| TABLESPACE pg_default; | ||
|
|
||
| COMMENT ON TABLE gwolofs.congestion_raw_corridors IS | ||
| 'Stores dynamic binning results for custom corridor based travel time requests.'; | ||
|
|
||
| COMMENT ON TABLE gwolofs.congestion_raw_corridors | ||
| IS 'Stores dynamic binning results from standard HERE congestion network travel time aggregations.'; | ||
|
Check failure on line 65 in here/traffic/sql/dynamic_bins/create-table-congestion_raw_corridors.sql
|
||
|
|
||
| COMMENT ON COLUMN gwolofs.congestion_raw_corridors.bin_range | ||
| IS 'Bin range. An exclusion constraint on a temp table prevents overlapping ranges during insert.'; | ||
|
Check failure on line 68 in here/traffic/sql/dynamic_bins/create-table-congestion_raw_corridors.sql
|
||
|
|
||
| COMMENT ON COLUMN gwolofs.congestion_raw_corridors.tt | ||
| IS 'Travel time in seconds.'; | ||
|
|
||
| COMMENT ON COLUMN gwolofs.congestion_raw_corridors.num_obs | ||
| IS 'The sum of the sample size from here.ta_path.'; | ||
|
|
||
| COMMENT ON COLUMN gwolofs.congestion_raw_corridors.dt | ||
| IS 'The date of aggregation for the record. Records may not overlap dates.'; | ||
|
|
||
| COMMENT ON COLUMN gwolofs.congestion_raw_corridors.hr | ||
| IS 'The hour the majority of the record occured in. Ties are rounded up.'; | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The precision on these is wiiild. I'd suggest
reals here too to save space without losing any meaningful precision.