Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
a0fbc23
#1132 raw_segments table definition
gabrielwol Jan 9, 2025
48ad3a8
#1132 raw_segments aggregation
gabrielwol Jan 9, 2025
073a814
#1132 add time periods in addition to hours
gabrielwol Jan 9, 2025
29b8326
#1132 dynamic bins should not exceed 1hr in length
gabrielwol Jan 10, 2025
6d1847e
#1132 reamde describing dynamic binning query
gabrielwol Jan 10, 2025
696de6e
#1132 remove unnecessary join from insert subquery
gabrielwol Jan 10, 2025
8edb8c8
#1132 here dynamic binning: function
gabrielwol Jan 17, 2025
0128961
#1132 update cache_tt_segment to return segment details + Check if ex…
gabrielwol Jan 30, 2025
330668c
#1132 cache_tt_segment procedure->function
gabrielwol Jan 30, 2025
521ecef
#1132 cache_tt_results procedure->function
gabrielwol Jan 30, 2025
154ea44
#1132 select map version func
gabrielwol Jan 30, 2025
ab45c51
#1132 cache_tt_results updates; use map version, fix end_bin bug, sav…
gabrielwol Jan 30, 2025
e51d6ed
#1132 tt_segments table; save map_version, nodes, remove link_dir con…
gabrielwol Jan 30, 2025
71468b6
#1132 update dynamic bin results table; add uri
gabrielwol Jan 30, 2025
dcd9584
#1132 update here_dynamic_bin_avg; use daily avgs -> avg method, use …
gabrielwol Jan 30, 2025
ad3c31e
#1132 apply end_bin fix to congestion network query
gabrielwol Feb 4, 2025
b1befb7
#1132 rename files/functions as per proposed dictionary
gabrielwol Feb 11, 2025
c65985c
#1132 rename files/functions continued
gabrielwol Feb 11, 2025
a59f2fd
#1132 fluff
gabrielwol Feb 11, 2025
b23d611
tsrange -> timerange
gabrielwol Feb 13, 2025
b7a6f70
#1132 congestion ntwrk hrly and period agg updates
gabrielwol Feb 13, 2025
674b0c1
#1132 rename function for consistency
gabrielwol Feb 14, 2025
61fdfdc
#1132 move time_grps to view, combine hourly and period agg
gabrielwol Feb 18, 2025
e9049a4
#1132 rename congestion_network_segment_agg
gabrielwol Feb 18, 2025
4e93259
#1132 fluff and add comments
gabrielwol Mar 3, 2025
82bf16a
#1132 change path
gabrielwol Mar 3, 2025
8bed8c2
#1132 update md with new examples
gabrielwol Mar 3, 2025
401dada
#1132 explore binning extents ipynb
gabrielwol Mar 18, 2025
84b68f2
#1132 comments
gabrielwol Mar 18, 2025
c5e5f91
#1132 update graph title
gabrielwol Mar 24, 2025
53be925
#1132 add cumulative distribution of tt
gabrielwol Mar 26, 2025
0422ab0
#1132 dynamic binning explore
gabrielwol Apr 1, 2025
40f9c99
add trigger_dags_tasks to pull_here_path #1132
gabrielwol Apr 8, 2025
3ea3d8a
#1132 add here_dynamic_binning_agg DAG
gabrielwol Apr 8, 2025
17037fd
#1132 smol dag changes
gabrielwol Apr 9, 2025
b3f8dac
#1132 fix schedule error from rebase
gabrielwol Apr 9, 2025
ec10752
#1132 add variable to test_dags
gabrielwol Apr 9, 2025
57496b5
#1132 remove time_grp, add bin_start, add temp table to reduce constr…
gabrielwol Apr 9, 2025
d1ccac3
#1132 add trigger command for here_dynamic_binning_agg
gabrielwol Apr 9, 2025
98d7b95
#1132 add temp aggregation end date
gabrielwol Apr 11, 2025
9e37d4a
#1132 an overloaded/simplified version of congestion_cache_tt_results…
gabrielwol Jun 3, 2025
79b7cf7
#1132 add project table, fkey, and corridor descriptions
gabrielwol Jun 3, 2025
b3a49f8
#1132 project / corridor demo
gabrielwol Jun 3, 2025
c3501a7
#1132 add node start/end names to corridors
gabrielwol Jun 4, 2025
0254db5
#1132 reconcile some differences between segment and corridor functions
gabrielwol Jun 5, 2025
bbf6779
#1132 account for streets_valid_range_path
gabrielwol Jun 6, 2025
e1e9e38
#1132 fn to calculate hourly overlap
gabrielwol Jun 6, 2025
afd63a8
#1132 new method to calcualte hr; midpoint
gabrielwol Jun 9, 2025
b69fcd7
#1132 new fn to calculate node streets
gabrielwol Jun 9, 2025
aa724ce
#1132 add task timeout
gabrielwol Jun 30, 2025
5c71af7
#1132 adjust constraints
gabrielwol Jun 30, 2025
ecb6585
#1132 add max bin length constraint
gabrielwol Jul 15, 2025
d2dafe6
#1132 add DROP (temp) TABLE IF EXISTS
gabrielwol Jul 15, 2025
3ff60e4
#1132 add task timeout
gabrielwol Jul 16, 2025
bb38800
#1132 monthly agg dag
gabrielwol Jul 30, 2025
d742eb9
#1132 try execution_timeout with timedelta rather than duration
gabrielwol Jul 30, 2025
70e0fc9
#1132 fix monthly cron schedule
gabrielwol Jul 30, 2025
f1679d1
#1132 fix sql operator timeout
gabrielwol Aug 28, 2025
cdb8886
#1132 implementation of bootstrapping method
gabrielwol Aug 28, 2025
0657471
#1132 bootstrapping method - eliminate another cte
gabrielwol Aug 29, 2025
e77503a
#1132 update segment_grouping to work for all map versions
gabrielwol Sep 3, 2025
f50d023
#1132 check day is not empty before aggregating
gabrielwol Sep 3, 2025
57cc194
#1132 perform bootstrapping by group of segment_ids
gabrielwol Sep 3, 2025
2c37631
#1132 much faster over a single segment_id rather than array
gabrielwol Sep 3, 2025
0effb5f
#1132 bootstrap table structure
gabrielwol Sep 3, 2025
a058f7c
#1132 tt->real, hr->smallint
gabrielwol Sep 10, 2025
c6ef208
#1132 fluff
gabrielwol Sep 15, 2025
6dc942f
#1132 separate out insert and select funcionality
gabrielwol Sep 29, 2025
dd3170b
#1132 fix logical date of TriggerDagRunOperator
gabrielwol Sep 29, 2025
7863ef9
#1132 changes to reflect change in hr datatype
gabrielwol Oct 9, 2025
9c63a32
#1132 try adding an analyze on temp table
gabrielwol Oct 9, 2025
191a2c2
#1132 materialize?
gabrielwol Oct 9, 2025
4e14aee
#1132 separate out delete query
gabrielwol Oct 15, 2025
b3c62df
#1132 change cte's to temp tables with indices to speed up congestion…
gabrielwol Oct 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions dags/here_dynamic_binning_agg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
'''
To trigger for past date (range) use CLI:
for i in {0..5}; do
end_date=$(date -I -d "2023-11-02 +$i days")
airflow dags trigger -e "${end_date}" here_dynamic_binning_agg
done

or trigger just one day: airflow dags trigger -e 2023-11-02 here_dynamic_binning_agg
`airflow dags backfill ...` doesn't work because there are no scheduled run dates in that range.
'''

import sys
import os
import logging
from pendulum import duration, datetime

from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.models import Variable
from airflow.decorators import dag, task

try:
repo_path = os.path.abspath(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
sys.path.insert(0, repo_path)
from dags.dag_functions import task_fail_slack_alert
from dags.custom_operators import SQLCheckOperatorWithReturnValue
except:
raise ImportError("Cannot import slack alert functions")

LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)

doc_md = "This DAG is running off the `1132-here-aggregation-proposal` branch to test dynamic binning aggregation."
DAG_NAME = 'here_dynamic_binning_agg'
DAG_OWNERS = Variable.get('dag_owners', deserialize_json=True).get(DAG_NAME, ["Unknown"])

default_args = {
'owner': ','.join(DAG_OWNERS),
'depends_on_past':False,
'start_date': datetime(2019, 1, 1, tz="America/Toronto"),
'email_on_failure': False,
'email_on_success': False,
'retries': 1,
'retry_delay': duration(minutes=5),
#'on_failure_callback': task_fail_slack_alert
}

@dag(
DAG_NAME,
default_args=default_args,
schedule=None, # triggered by `pull_here_path` DAG
doc_md = doc_md,
tags=["HERE", "aggregation"],
max_active_runs=1,
catchup=False
)

#to add: catchup, one task at a time, depends on past.

def here_dynamic_binning_agg():
check_not_empty = SQLCheckOperatorWithReturnValue(
task_id="check_not_empty",
sql="SELECT COUNT(*), COUNT(*) FROM here.ta_path WHERE dt = '{{ ds }}'",
conn_id="congestion_bot",
retries=1,
retry_delay=duration(days=1)
)

delete_daily = SQLExecuteQueryOperator(
sql="DELETE FROM gwolofs.congestion_raw_segments WHERE dt = '{{ ds }}'",
task_id='delete_daily',
conn_id='congestion_bot',
autocommit=True,
retries = 2
)

aggregate_daily = SQLExecuteQueryOperator(
sql="SELECT gwolofs.congestion_network_segment_agg('{{ ds }}'::date);",
task_id='aggregate_daily',
conn_id='congestion_bot',
autocommit=True,
retries = 2,
hook_params={"options": "-c statement_timeout=10800000ms"} #3 hours
)

check_not_empty >> delete_daily >> aggregate_daily

here_dynamic_binning_agg()
110 changes: 110 additions & 0 deletions dags/here_dynamic_binning_monthly_agg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import os
import sys
import logging
from pendulum import duration, datetime

from airflow.models import Variable
from airflow.decorators import dag, task
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook

try:
repo_path = os.path.abspath(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
sys.path.insert(0, repo_path)
from dags.dag_functions import task_fail_slack_alert
from dags.custom_operators import SQLCheckOperatorWithReturnValue
except:
raise ImportError("Cannot import slack alert functions")

LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)

doc_md = "This DAG is running off the `1132-here-aggregation-proposal` branch to test dynamic binning aggregation."
DAG_NAME = 'here_dynamic_binning_monthly_agg'
DAG_OWNERS = Variable.get('dag_owners', deserialize_json=True).get(DAG_NAME, ["Unknown"])

default_args = {
'owner': ','.join(DAG_OWNERS),
'depends_on_past':False,
'start_date': datetime(2019, 1, 1, tz="America/Toronto"),
'retries': 1,
'retry_delay': duration(hours=1)
#'on_failure_callback': task_fail_slack_alert
}

@dag(
DAG_NAME,
default_args=default_args,
schedule='0 16 1 * *', # 4pm, first day of month
template_searchpath=os.path.join(repo_path,'here/traffic/sql/dynamic_bins'),
doc_md = doc_md,
tags=["HERE", "aggregation"],
max_active_runs=1,
catchup=True
)

#to add: catchup, one task at a time, depends on past.

def here_dynamic_binning_monthly_agg():

check_missing_dates = SQLCheckOperatorWithReturnValue(
sql="select-check_missing_days.sql",
task_id="check_missing_dates",
conn_id='congestion_bot',
retries = 0
)

aggregate_monthly = SQLExecuteQueryOperator(
sql=[
"DELETE FROM gwolofs.congestion_segments_monthy_summary WHERE mnth = '{{ ds }}'",
"SELECT gwolofs.congestion_segment_monthly_agg('{{ ds }}')"
],
task_id='aggregate_monthly',
conn_id='congestion_bot',
autocommit=True,
retries = 1
)

create_groups = SQLExecuteQueryOperator(
sql="segment_grouping.sql",
task_id="create_segment_groups",
conn_id='congestion_bot',
retries = 0,
params={"max_group_size": 100}
)

delete_data = SQLExecuteQueryOperator(
sql="DELETE FROM gwolofs.congestion_segments_monthly_bootstrap WHERE mnth = '{{ ds }}' AND n_resamples = 300",
task_id="delete_bootstrap_results",
conn_id='congestion_bot',
retries=0
)

@task
def expand_groups(**context):
return context["ti"].xcom_pull(task_ids="create_segment_groups")[0][0]

@task(retries=0, max_active_tis_per_dag=1)
def bootstrap_agg(segments, ds):
print(f"segments: {segments}")
postgres_cred = PostgresHook("congestion_bot")
query="""SELECT *
FROM UNNEST(%s::bigint[]) AS unnested(segment_id),
LATERAL (
SELECT gwolofs.congestion_segment_bootstrap(
mnth := %s::date,
segment_id := segment_id,
n_resamples := 300)
) AS lat"""
with postgres_cred.get_conn() as conn:
with conn.cursor() as cur:
cur.execute(query, (segments, ds))
conn.commit()

expand = expand_groups()

check_missing_dates >> aggregate_monthly >> create_groups >> delete_data
delete_data >> expand
bootstrap_agg.expand(segments=expand)

here_dynamic_binning_monthly_agg()
22 changes: 19 additions & 3 deletions dags/pull_here_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
import pendulum
from datetime import timedelta

from airflow.decorators import task, dag
from airflow.decorators import task, dag, task_group
from airflow.hooks.base import BaseHook
from airflow.models import Variable
from airflow.macros import ds_add, ds_format
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

try:
repo_path = os.path.abspath(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
Expand Down Expand Up @@ -45,7 +46,7 @@

@dag(dag_id = dag_name,
default_args=default_args,
schedule='30 10 * * *' ,
schedule='0 17 * * * ',
catchup=False,
doc_md = doc_md,
tags=["HERE", "data_pull"]
Expand Down Expand Up @@ -85,6 +86,21 @@ def get_download_link(request_id: str, access_token: str):
def load_data()->str:
return '''curl $DOWNLOAD_URL | gunzip | psql -h $HOST -U $LOGIN -d bigdata -c "\\COPY here.ta_path_view FROM STDIN WITH (FORMAT csv, HEADER TRUE);" '''

load_data()
# Create a task group for triggering the DAGs
@task_group
def trigger_dags_tasks():
# Define TriggerDagRunOperator for each DAG to trigger
trigger_operators = []
DAGS_TO_TRIGGER = Variable.get('here_path_dag_triggers', deserialize_json=True)
for dag_id in DAGS_TO_TRIGGER:
trigger_operator = TriggerDagRunOperator(
task_id=f'trigger_{dag_id}',
trigger_dag_id=dag_id,
logical_date='{{macros.ds_add(ds, -1)}}',
reset_dag_run=True # Clear existing dag if already exists (for backfilling), old runs will not be in the logs
)
trigger_operators.append(trigger_operator)

load_data() >> trigger_dags_tasks()

pull_here_path()
557 changes: 557 additions & 0 deletions here/traffic/here_dynamic_binning_explore.ipynb

Large diffs are not rendered by default.

22 changes: 22 additions & 0 deletions here/traffic/sql/dynamic_bins/corridor_agg.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
--test: 35 projects, 1 day = 47s
SELECT
gwolofs.congestion_cache_tt_results_daily(
node_start := congestion_corridors.node_start,
node_end := congestion_corridors.node_end,
start_date := dates.dt::date
)
FROM gwolofs.congestion_corridors
JOIN gwolofs.congestion_projects USING (project_id),
generate_series('2025-01-01', '2025-02-28', '1 day'::interval) AS dates (dt)
WHERE
congestion_projects.description IN (
'Avenue Road cycleway installation',
'bluetooth_corridors',
'scrutinized-cycleway-corridors'
)
AND corridor_id NOT IN (

Check failure on line 17 in here/traffic/sql/dynamic_bins/corridor_agg.sql

View workflow job for this annotation

GitHub Actions / SQLFluff Lint

SQLFluff

RF02: Unqualified reference 'corridor_id' found in select with more than one referenced table/view.
SELECT DISTINCT corridor_id

Check failure on line 18 in here/traffic/sql/dynamic_bins/corridor_agg.sql

View workflow job for this annotation

GitHub Actions / SQLFluff Lint

SQLFluff

RF02: Unqualified reference 'corridor_id' found in select with more than one referenced table/view.
FROM gwolofs.congestion_raw_corridors
WHERE dt >= '2025-01-01' AND dt < '2025-02-28'

Check failure on line 20 in here/traffic/sql/dynamic_bins/corridor_agg.sql

View workflow job for this annotation

GitHub Actions / SQLFluff Lint

SQLFluff

RF02: Unqualified reference 'dt' found in select with more than one referenced table/view.

Check failure on line 20 in here/traffic/sql/dynamic_bins/corridor_agg.sql

View workflow job for this annotation

GitHub Actions / SQLFluff Lint

SQLFluff

RF02: Unqualified reference 'dt' found in select with more than one referenced table/view.
)
AND map_version = '23_4';

Check failure on line 22 in here/traffic/sql/dynamic_bins/corridor_agg.sql

View workflow job for this annotation

GitHub Actions / SQLFluff Lint

SQLFluff

RF02: Unqualified reference 'map_version' found in select with more than one referenced table/view.
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
--DROP FUNCTION gwolofs.congestion_segment_bootstrap(date,bigint,integer);

CREATE OR REPLACE FUNCTION gwolofs.congestion_segment_bootstrap(
mnth date,
segment_id bigint,
n_resamples int
)
RETURNS void
LANGUAGE SQL
COST 100
VOLATILE PARALLEL SAFE
AS $BODY$

SELECT setseed(('0.'||replace(mnth::text, '-', ''))::numeric);

WITH raw_obs AS (
SELECT
--segment_id and mnth don't need to be in group by until end
EXTRACT('isodow' FROM dt) IN (1, 2, 3, 4, 5) AS is_wkdy,
hr,
ARRAY_AGG(tt::real) AS tt_array,
AVG(tt::real) AS avg_tt,
COUNT(*) AS n
FROM gwolofs.congestion_raw_segments
WHERE -- same params as the above aggregation
dt >= congestion_segment_bootstrap.mnth
AND dt < congestion_segment_bootstrap.mnth + interval '1 month'
AND segment_id = congestion_segment_bootstrap.segment_id
GROUP BY
segment_id,
is_wkdy,
hr
),

random_selections AS (
SELECT
raw_obs.is_wkdy,
raw_obs.hr,
raw_obs.avg_tt,
raw_obs.n,
sample_group.group_id,
--get a random observation from the array of tts
AVG(raw_obs.tt_array[ceiling(random() * raw_obs.n)]) AS rnd_avg_tt
FROM raw_obs
CROSS JOIN generate_series(1, n)
-- 200 resamples (could be any number)
CROSS JOIN generate_series(1, congestion_segment_bootstrap.n_resamples) AS sample_group(group_id)
GROUP BY
raw_obs.is_wkdy,
raw_obs.hr,
raw_obs.avg_tt,
raw_obs.n,
sample_group.group_id
)

INSERT INTO gwolofs.congestion_segments_monthly_bootstrap (
segment_id, mnth, is_wkdy, hr, avg_tt, n, n_resamples, ci_lower, ci_upper
)
SELECT
congestion_segment_bootstrap.segment_id,
congestion_segment_bootstrap.mnth,
is_wkdy,
hr,
avg_tt::real,
n,
n_resamples,
percentile_disc(0.025) WITHIN GROUP (ORDER BY rnd_avg_tt)::real AS ci_lower,
percentile_disc(0.975) WITHIN GROUP (ORDER BY rnd_avg_tt)::real AS ci_upper
FROM random_selections
GROUP BY
is_wkdy,
hr,
avg_tt,
n;

$BODY$;

GRANT EXECUTE ON FUNCTION gwolofs.congestion_segment_bootstrap(
date, bigint, integer
) TO congestion_bot;

Check failure on line 80 in here/traffic/sql/dynamic_bins/create-function-congestion_segment_bootstrap.sql

View workflow job for this annotation

GitHub Actions / SQLFluff Lint

SQLFluff

CP02: Unquoted identifiers must be consistently upper case.

/*Usage example: (works best one segment at a time with Lateral)
SELECT *
FROM UNNEST('{1,2,3,4,5,6,7,8,9}'::bigint[]) AS unnested(segment_id)
LATERAL (
SELECT gwolofs.congestion_segment_bootstrap(
mnth := '2025-06-01'::date,
segment_ids := segment_id,
n_resamples := 300)
)
*/
Loading
Loading