diff --git a/dags/miovision_open_data.py b/dags/miovision_open_data.py new file mode 100644 index 000000000..cac8d2c12 --- /dev/null +++ b/dags/miovision_open_data.py @@ -0,0 +1,107 @@ +r"""### Monthly Miovision Open Data DAG +Pipeline to run monthly Miovision aggregations for Open Data. +""" +import sys +import os + +from airflow.decorators import dag, task +from datetime import timedelta +from airflow.models import Variable +from airflow.providers.postgres.operators.postgres import PostgresOperator +from airflow.macros import ds_format + +import logging +import pendulum + +try: + repo_path = os.path.abspath(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))) + sys.path.insert(0, repo_path) + from dags.dag_functions import task_fail_slack_alert, send_slack_msg + from dags.custom_operators import SQLCheckOperatorWithReturnValue +except: + raise ImportError("Cannot import DAG helper functions.") + +LOGGER = logging.getLogger(__name__) +logging.basicConfig(level=logging.DEBUG) + +DAG_NAME = 'miovision_open_data' +DAG_OWNERS = Variable.get('dag_owners', deserialize_json=True).get(DAG_NAME, ["Unknown"]) + +default_args = { + 'owner': ','.join(DAG_OWNERS), + 'depends_on_past':False, + #set earlier start_date + catchup when ready? + 'start_date': pendulum.datetime(2024, 1, 1, tz="America/Toronto"), + 'email_on_failure': False, + 'email_on_success': False, + 'retries': 0, + 'retry_delay': timedelta(minutes=5), + 'on_failure_callback': task_fail_slack_alert +} + +@dag( + dag_id=DAG_NAME, + default_args=default_args, + schedule='0 10 3 * *', # 10am, 3rd day of each month + catchup=True, + max_active_runs=1, + tags=["miovision", "open_data"], + doc_md=__doc__ +) +def miovision_open_data_dag(): + + #considered whether it should have an external task sensor + #for the first of the month. Decided it should run later + #to give time for anomalous_range updates if any. + + check_data_availability = SQLCheckOperatorWithReturnValue( + task_id="check_data_availability", + sql="""WITH daily_volumes AS ( + SELECT dt::date, COALESCE(SUM(daily_volume), 0) AS daily_volume + FROM generate_series('{{ macros.ds_format(ds, '%Y-%m-%d', '%Y-%m-01') }}'::date, + '{{ macros.ds_format(ds, '%Y-%m-%d', '%Y-%m-01') }}'::date + '1 month'::interval - '1 day'::interval, + '1 day'::interval) AS dates(dt) + LEFT JOIN miovision_api.volumes_daily_unfiltered USING (dt) + GROUP BY dt + ORDER BY dt + ) + + SELECT NOT(COUNT(*) > 0), 'Missing dates: ' || string_agg(dt::text, ', ') + FROM daily_volumes + WHERE daily_volume = 0""", + conn_id="miovision_api_bot" + ) + + refresh_monthly_open_data = PostgresOperator( + task_id='refresh_monthly_open_data', + sql="SELECT gwolofs.insert_miovision_open_data_monthly_summary('{{ macros.ds_format(ds, '%Y-%m-%d', '%Y-%m-01') }}'::date)", + postgres_conn_id='miovision_api_bot', + autocommit=True + ) + + refresh_15min_open_data = PostgresOperator( + task_id='refresh_15min_open_data', + sql="SELECT gwolofs.insert_miovision_15min_open_data('{{ macros.ds_format(ds, '%Y-%m-%d', '%Y-%m-01') }}'::date)", + postgres_conn_id='miovision_api_bot', + autocommit=True + ) + + @task( + retries=0, + trigger_rule='all_success', + doc_md="""A status message to report DAG success.""" + ) + def status_message(ds = None, **context): + mnth = ds_format(ds, '%Y-%m-%d', '%Y-%m-01') + send_slack_msg( + context=context, + msg=f":meow_miovision: :open_data_to: DAG ran successfully for {mnth} :white_check_mark:" + ) + + ( + check_data_availability >> + [refresh_monthly_open_data, refresh_15min_open_data] >> + status_message() + ) + +miovision_open_data_dag() diff --git a/volumes/miovision/sql/open_data/create-function-insert_miovision_open_data_15min.sql b/volumes/miovision/sql/open_data/create-function-insert_miovision_open_data_15min.sql new file mode 100644 index 000000000..57021e8f4 --- /dev/null +++ b/volumes/miovision/sql/open_data/create-function-insert_miovision_open_data_15min.sql @@ -0,0 +1,159 @@ +--Review decisions: +--Classification Grouping and naming +--Include/Exclude bicycles? +--Include/Exclude buses/streetcars? +--Decision to not include manual anomalous_range 'valid_caveat' notes: SELECT +--Including entry/exit information to satisfy ATR related DRs. +-->> providing exit leg and direction as extra columns rather +-->> than extra rows to reduce potential for double counting. + +--DROP FUNCTION gwolofs.insert_miovision_15min_open_data; + +CREATE OR REPLACE FUNCTION gwolofs.insert_miovision_15min_open_data( + _date date, + integer [] DEFAULT ARRAY[]::integer [] +) +RETURNS void +LANGUAGE 'plpgsql' +COST 100 +VOLATILE PARALLEL UNSAFE +AS $BODY$ + + DECLARE + target_intersections integer [] = miovision_api.get_intersections_uids(intersections); + n_deleted int; + n_inserted int; + _month date = date_trunc('month', _date); + + BEGIN + + WITH deleted AS ( + DELETE FROM gwolofs.miovision_15min_open_data + WHERE + datetime_15min >= _month + AND datetime_15min < _month + interval '1 month' + AND intersection_uid = ANY(target_intersections) + RETURNING * + ) + + SELECT COUNT(*) INTO n_deleted + FROM deleted; + + RAISE NOTICE 'Deleted % rows from gwolofs.miovision_15min_open_data for month %.', n_deleted, _month; + + CREATE TEMP TABLE miovision_movement_map_new AS ( + SELECT + entries.movement_uid, + entries.leg_old AS leg, + entries.dir AS entry_dir, + mov.movement_name AS movement, + --assign exits for peds, bike entry only movements + COALESCE(exits.leg_new, entries.leg_old) AS exit_leg, + COALESCE(exits.dir, entries.dir) AS exit_dir + FROM miovision_api.movement_map AS entries + JOIN miovision_api.movements AS mov USING (movement_uid) + LEFT JOIN miovision_api.movement_map AS exits ON + exits.leg_old = entries.leg_old + AND exits.movement_uid = entries.movement_uid + AND exits.leg_new = substr(exits.dir, 1, 1) --eg. E leg going East is an exit + WHERE entries.leg_new <> substr(entries.dir, 1, 1) --eg. E leg going West is an entry + ); + + WITH inserted AS ( + INSERT INTO gwolofs.miovision_15min_open_data ( + intersection_uid, intersection_long_name, datetime_15min, classification_type, + entry_leg, entry_dir, movement, exit_leg, exit_dir, volume_15min + ) + SELECT + v15.intersection_uid, + i.api_name AS intersection_long_name, + v15.datetime_bin AS datetime_15min, + CASE + WHEN cl.classification = 'Light' THEN 'Light Auto' + WHEN cl.classification IN ( + 'SingleUnitTruck', 'ArticulatedTruck', 'MotorizedVehicle', 'Bus' + ) THEN 'Truck/Bus' + ELSE cl.classification -- 'Bicycle', 'Pedestrian' + END AS classification_type, + v15.leg AS entry_leg, + mm.entry_dir, + mm.movement, + mm.exit_leg, + mm.exit_dir, + --assign exits for peds, bike entry only movements + SUM(v15.volume) AS volume_15min + --exclude notes (manual text field) + --array_agg(ar.notes ORDER BY ar.range_start, ar.uid) FILTER (WHERE ar.uid IS NOT NULL) AS anomalous_range_caveats + FROM miovision_api.volumes_15min_mvt AS v15 + JOIN miovision_api.classifications AS cl USING (classification_uid) + JOIN miovision_api.intersections AS i USING (intersection_uid) + -- TMC to ATR crossover table + JOIN miovision_movement_map_new AS mm USING (movement_uid, leg) + --anti-join anomalous_ranges. See HAVING clause. + LEFT JOIN miovision_api.anomalous_ranges AS ar ON + ( + ar.intersection_uid = v15.intersection_uid + OR ar.intersection_uid IS NULL + ) AND ( + ar.classification_uid = v15.classification_uid + OR ar.classification_uid IS NULL + ) + AND v15.datetime_bin >= ar.range_start + AND ( + v15.datetime_bin < ar.range_end + OR ar.range_end IS NULL + ) + AND ar.problem_level IN ('do-not-use'::text, 'questionable'::text) + WHERE + v15.datetime_bin >= _month + AND v15.datetime_bin < _month + interval '1 month' + AND v15.intersection_uid = ANY(target_intersections) + GROUP BY + v15.intersection_uid, + i.api_name, + v15.datetime_bin, + classification_type, + v15.leg, + mm.entry_dir, + mm.movement, + mm.exit_leg, + mm.exit_dir + HAVING + NOT array_agg(ar.problem_level) && ARRAY['do-not-use'::text, 'questionable'::text] + AND SUM(v15.volume) > 0 --confirm + ORDER BY + v15.intersection_uid, + classification_type, + v15.datetime_bin, + v15.leg + RETURNING * + ) + + SELECT COUNT(*) INTO n_inserted + FROM inserted; + + RAISE NOTICE 'Inserted % rows into gwolofs.miovision_15min_open_data for month %.', n_inserted, _month; + +END; +$BODY$; + +ALTER FUNCTION gwolofs.insert_miovision_15min_open_data(date, integer []) +OWNER TO gwolofs; + +GRANT EXECUTE ON FUNCTION gwolofs.insert_miovision_15min_open_data(date, integer []) +TO miovision_admins; + +GRANT EXECUTE ON FUNCTION gwolofs.insert_miovision_15min_open_data(date, integer []) +TO miovision_api_bot; + +REVOKE ALL ON FUNCTION gwolofs.insert_miovision_15min_open_data(date, integer []) +FROM public; + +COMMENT ON FUNCTION gwolofs.insert_miovision_15min_open_data(date, integer []) +IS 'Function for first deleting then inserting monthly 15 +minute open data volumes into gwolofs.miovision_15min_open_data. +Contains an optional intersection parameter in case one just one +intersection needs to be refreshed.'; + +--testing, around 50 minutes for 1 month (5M rows) +SELECT gwolofs.insert_miovision_15min_open_data('2024-02-01'::date); \ No newline at end of file diff --git a/volumes/miovision/sql/open_data/create-function-insert_miovision_open_data_monthly_summary.sql b/volumes/miovision/sql/open_data/create-function-insert_miovision_open_data_monthly_summary.sql new file mode 100644 index 000000000..bf64131aa --- /dev/null +++ b/volumes/miovision/sql/open_data/create-function-insert_miovision_open_data_monthly_summary.sql @@ -0,0 +1,204 @@ +--DROP FUNCTION gwolofs.insert_miovision_open_data_monthly_summary; + +CREATE OR REPLACE FUNCTION gwolofs.insert_miovision_open_data_monthly_summary( + _date date, + intersections integer [] DEFAULT ARRAY[]::integer [] +) +RETURNS void +LANGUAGE 'plpgsql' +COST 100 +VOLATILE PARALLEL UNSAFE +AS $BODY$ + + DECLARE + target_intersections integer [] = miovision_api.get_intersections_uids(intersections); + n_deleted int; + n_inserted int; + _month date = date_trunc('month', _date); + + BEGIN + + WITH deleted AS ( + DELETE FROM gwolofs.miovision_open_data_monthly_summary + WHERE + mnth = _month + AND intersection_uid = ANY(target_intersections) + RETURNING * + ) + + SELECT COUNT(*) INTO n_deleted + FROM deleted; + + RAISE NOTICE 'Deleted % rows from gwolofs.miovision_15min_open_data for month %.', n_deleted, _month; + + WITH daily_volumes AS ( + SELECT + vd.dt, + vd.intersection_uid, + CASE + WHEN cl.classification = 'Light' THEN 'Light Auto' + WHEN cl.classification IN ('SingleUnitTruck', 'ArticulatedTruck', 'MotorizedVehicle', 'Bus') THEN 'Truck/Bus' + ELSE cl.classification -- 'Bicycle', 'Pedestrian' + END AS classification_type, + --daily volume with long gaps imputted + SUM(coalesce(daily_volume,0) + coalesce(avg_historical_gap_vol,0)) AS total_vol + --omits anomalous_ranges + FROM miovision_api.volumes_daily AS vd + JOIN miovision_api.classifications AS cl USING (classification_uid) + WHERE + vd.isodow <= 5 + AND vd.holiday IS false + AND vd.dt >= _month + AND vd.dt < _month + interval '1 month' + AND vd.intersection_uid = ANY(target_intersections) + --AND classification_uid NOT IN (2,7,10) --exclude bikes due to reliability? + GROUP BY + vd.dt, + vd.intersection_uid, + vd.intersection_uid, + classification_type + ), + + v15 AS ( + --15 minute volumes grouped by classification_type + SELECT + v.intersection_uid, + v.datetime_bin, + CASE + WHEN cl.classification = 'Light' THEN 'Light Auto' + WHEN cl.classification IN ('SingleUnitTruck', 'ArticulatedTruck', 'MotorizedVehicle', 'Bus') THEN 'Truck/Bus' + ELSE cl.classification -- 'Bicycle', 'Pedestrian' + END AS classification_type, + SUM(v.volume) AS vol_15min + FROM miovision_api.volumes_15min_mvt AS v + JOIN miovision_api.classifications AS cl USING (classification_uid) + --anti join holidays + LEFT JOIN ref.holiday AS hol ON hol.dt = v.datetime_bin::date + --anti join anomalous ranges. See HAVING clause. + --NOTE: this method is omitting the whole classification_type if + --one classification is missing. May be undesired. + LEFT JOIN miovision_api.anomalous_ranges ar ON + ( + ar.intersection_uid = v.intersection_uid + OR ar.intersection_uid IS NULL + ) AND ( + ar.classification_uid = v.classification_uid + OR ar.classification_uid IS NULL + ) + AND v.datetime_bin >= ar.range_start + AND ( + v.datetime_bin < ar.range_end + OR ar.range_end IS NULL + ) + WHERE + v.datetime_bin >= _month + AND v.datetime_bin < _month + interval '1 month' + AND v.intersection_uid = ANY(target_intersections) + AND hol.holiday IS NULL + AND date_part('isodow', v.datetime_bin) <= 5 --weekday + --AND classification_uid NOT IN (2,7,10) --exclude bikes due to reliability? + GROUP BY + v.intersection_uid, + classification_type, + v.datetime_bin + ), + + hourly_data AS ( + --find rolling 1 hour volume + SELECT + intersection_uid, + classification_type, + datetime_bin, + datetime_bin::date AS dt, + CASE WHEN date_part('hour', datetime_bin) < 12 THEN 'AM' ELSE 'PM' END AS am_pm, + vol_15min, + SUM(vol_15min) OVER ( + PARTITION BY intersection_uid, classification_type + ORDER BY datetime_bin + RANGE BETWEEN '45 minutes' PRECEDING AND CURRENT ROW + ) AS hr_vol + FROM v15 + ), + + highest_daily_volume AS ( + --find highest volume each day + SELECT + intersection_uid, + classification_type, + am_pm, + dt, + MAX(hr_vol) AS max_hr_volume + FROM hourly_data + GROUP BY + intersection_uid, + classification_type, + am_pm, + dt + ), + + inserted AS ( + INSERT INTO gwolofs.miovision_open_data_monthly_summary ( + intersection_uid, intersection_long_name, mnth, avg_daily_vol_auto, avg_daily_vol_truckbus, avg_daily_vol_ped, avg_daily_vol_bike, + avg_am_peak_hour_vol_auto, avg_am_peak_hour_vol_truckbus, avg_am_peak_hour_vol_ped, avg_am_peak_hour_vol_bike, + avg_pm_peak_hour_vol_auto, avg_pm_peak_hour_vol_truckbus, avg_pm_peak_hour_vol_ped, avg_pm_peak_hour_vol_bike + ) + SELECT + coalesce(dv.intersection_uid, hv.intersection_uid) AS intersection_uid, + i.api_name AS intersection_long_name, + date_trunc('month', coalesce(dv.dt, hv.dt)) AS mnth, + ROUND(AVG(dv.total_vol) FILTER (WHERE dv.classification_type = 'Light Auto'), 0) AS avg_daily_vol_auto, + ROUND(AVG(dv.total_vol) FILTER (WHERE dv.classification_type = 'Truck/Bus'), 0) AS avg_daily_vol_truckbus, + ROUND(AVG(dv.total_vol) FILTER (WHERE dv.classification_type = 'Pedestrians'), 0) AS avg_daily_vol_ped, + ROUND(AVG(dv.total_vol) FILTER (WHERE dv.classification_type = 'Cyclists'), 0) AS avg_daily_vol_bike, + ROUND(AVG(hv.max_hr_volume) FILTER (WHERE hv.classification_type = 'Light Auto' AND hv.am_pm = 'AM'), 0) AS avg_am_peak_hour_vol_auto, + ROUND(AVG(hv.max_hr_volume) FILTER (WHERE hv.classification_type = 'Truck/Bus' AND hv.am_pm = 'AM'), 0) AS avg_am_peak_hour_vol_truckbus, + ROUND(AVG(hv.max_hr_volume) FILTER (WHERE hv.classification_type = 'Pedestrians' AND hv.am_pm = 'AM'), 0) AS avg_am_peak_hour_vol_ped, + ROUND(AVG(hv.max_hr_volume) FILTER (WHERE hv.classification_type = 'Cyclists' AND hv.am_pm = 'AM'), 0) AS avg_am_peak_hour_vol_bike, + ROUND(AVG(hv.max_hr_volume) FILTER (WHERE hv.classification_type = 'Light Auto' AND hv.am_pm = 'PM'), 0) AS avg_pm_peak_hour_vol_auto, + ROUND(AVG(hv.max_hr_volume) FILTER (WHERE hv.classification_type = 'Truck/Bus' AND hv.am_pm = 'PM'), 0) AS avg_pm_peak_hour_vol_truckbus, + ROUND(AVG(hv.max_hr_volume) FILTER (WHERE hv.classification_type = 'Pedestrians' AND hv.am_pm = 'PM'), 0) AS avg_pm_peak_hour_vol_ped, + ROUND(AVG(hv.max_hr_volume) FILTER (WHERE hv.classification_type = 'Cyclists' AND hv.am_pm = 'PM'), 0) AS avg_pm_peak_hour_vol_bike + --array_agg(DISTINCT ar.notes) FILTER (WHERE ar.uid IS NOT NULL) AS notes + FROM daily_volumes AS dv + FULL JOIN highest_daily_volume AS hv USING (intersection_uid, dt, classification_type) + LEFT JOIN miovision_api.intersections AS i ON + i.intersection_uid = coalesce(dv.intersection_uid, hv.intersection_uid) + GROUP BY + coalesce(dv.intersection_uid, hv.intersection_uid), + i.api_name, + date_trunc('month', coalesce(dv.dt, hv.dt)) + ORDER BY + coalesce(dv.intersection_uid, hv.intersection_uid), + date_trunc('month', coalesce(dv.dt, hv.dt)) + RETURNING * + ) + + SELECT COUNT(*) INTO n_inserted + FROM inserted; + + RAISE NOTICE 'Inserted % rows into gwolofs.miovision_open_data_monthly_summary for month %.', n_inserted, _month; + +END; +$BODY$; + +ALTER FUNCTION gwolofs.insert_miovision_open_data_monthly_summary(date, integer []) +OWNER TO gwolofs; + +GRANT EXECUTE ON FUNCTION gwolofs.insert_miovision_open_data_monthly_summary(date, integer []) +TO miovision_admins; + +GRANT EXECUTE ON FUNCTION gwolofs.insert_miovision_open_data_monthly_summary(date, integer []) +TO miovision_api_bot; + +REVOKE ALL ON FUNCTION gwolofs.insert_miovision_open_data_monthly_summary(date, integer []) +FROM public; + +COMMENT ON FUNCTION gwolofs.insert_miovision_open_data_monthly_summary(date, integer []) +IS 'Function for first deleting then inserting monthly summary miovision +open data into gwolofs.miovision_open_data_monthly_summary. +Contains an optional intersection parameter in case one just one +intersection needs to be refreshed.'; + +--testing, indexes work +--~50s for 1 day, ~40 minutes for 1 month (5M rows) +SELECT gwolofs.insert_miovision_open_data_monthly_summary('2024-02-01'::date); \ No newline at end of file diff --git a/volumes/miovision/sql/open_data/create-table-miovision_open_data_15min.sql b/volumes/miovision/sql/open_data/create-table-miovision_open_data_15min.sql new file mode 100644 index 000000000..aeca73db1 --- /dev/null +++ b/volumes/miovision/sql/open_data/create-table-miovision_open_data_15min.sql @@ -0,0 +1,42 @@ +-- DROP TABLE IF EXISTS gwolofs.miovision_15min_open_data; + +CREATE TABLE IF NOT EXISTS gwolofs.miovision_15min_open_data +( + intersection_uid integer NOT NULL, + intersection_long_name text COLLATE pg_catalog."default", + datetime_15min timestamp without time zone NOT NULL, + classification_type text COLLATE pg_catalog."default" NOT NULL, + entry_leg text COLLATE pg_catalog."default" NOT NULL, + entry_dir text COLLATE pg_catalog."default", + movement text COLLATE pg_catalog."default" NOT NULL, + exit_leg text COLLATE pg_catalog."default", + exit_dir text COLLATE pg_catalog."default", + volume_15min smallint, + CONSTRAINT miovision_open_data_15min_pkey PRIMARY KEY ( + intersection_uid, datetime_15min, classification_type, entry_leg, movement + ) +) + +TABLESPACE pg_default; + +CREATE INDEX miovision_15min_od_dt_idx ON +gwolofs.miovision_15min_open_data USING brin (datetime_15min); + +ALTER TABLE IF EXISTS gwolofs.miovision_15min_open_data +OWNER TO gwolofs; + +REVOKE ALL ON TABLE gwolofs.miovision_15min_open_data FROM bdit_humans; + +GRANT SELECT ON TABLE gwolofs.miovision_15min_open_data TO bdit_humans; + +GRANT ALL ON TABLE gwolofs.miovision_15min_open_data TO gwolofs; + +GRANT ALL ON TABLE gwolofs.miovision_15min_open_data TO miovision_admins; + +GRANT SELECT, INSERT, DELETE ON TABLE gwolofs.miovision_15min_open_data +TO miovision_api_bot; + +COMMENT ON TABLE gwolofs.miovision_15min_open_data +IS 'Table to store Miovision 15min open data. Updated monthly. +Schema is a blend of TMC and ATR style data to cover different +types of data requests.'; \ No newline at end of file diff --git a/volumes/miovision/sql/open_data/create-table-miovision_open_data_monthly_summary.sql b/volumes/miovision/sql/open_data/create-table-miovision_open_data_monthly_summary.sql new file mode 100644 index 000000000..331d1dc44 --- /dev/null +++ b/volumes/miovision/sql/open_data/create-table-miovision_open_data_monthly_summary.sql @@ -0,0 +1,46 @@ +-- DROP TABLE gwolofs.miovision_open_data_monthly_summary; + +CREATE TABLE gwolofs.miovision_open_data_monthly_summary ( + intersection_uid integer, + intersection_long_name text, + mnth date, + avg_daily_vol_auto numeric, + avg_daily_vol_truckbus numeric, + avg_daily_vol_ped numeric, + avg_daily_vol_bike numeric, + avg_am_peak_hour_vol_auto numeric, + avg_am_peak_hour_vol_truckbus numeric, + avg_am_peak_hour_vol_ped numeric, + avg_am_peak_hour_vol_bike numeric, + avg_pm_peak_hour_vol_auto numeric, + avg_pm_peak_hour_vol_truckbus numeric, + avg_pm_peak_hour_vol_ped numeric, + avg_pm_peak_hour_vol_bike numeric, + CONSTRAINT miovision_open_data_monthly_summary_pkey PRIMARY KEY ( + intersection_uid, intersection_long_name, mnth + ) +) + +TABLESPACE pg_default; + +CREATE INDEX miovision_monthly_od_dt_idx ON +gwolofs.miovision_open_data_monthly_summary USING brin (mnth); + +ALTER TABLE IF EXISTS gwolofs.miovision_open_data_monthly_summary +OWNER TO gwolofs; + +REVOKE ALL ON TABLE gwolofs.miovision_open_data_monthly_summary FROM bdit_humans; + +GRANT SELECT ON TABLE gwolofs.miovision_open_data_monthly_summary TO bdit_humans; + +GRANT ALL ON TABLE gwolofs.miovision_open_data_monthly_summary TO gwolofs; + +GRANT ALL ON TABLE gwolofs.miovision_open_data_monthly_summary TO miovision_admins; + +GRANT SELECT, INSERT, DELETE ON TABLE gwolofs.miovision_open_data_monthly_summary +TO miovision_api_bot; + +COMMENT ON TABLE gwolofs.miovision_open_data_monthly_summary +IS 'Table to store Miovision monthly summary open data. +Contains an approachable monthly-intersection summary of +avg daily and avg peak AM/PM hour volumes by mode.'; \ No newline at end of file diff --git a/volumes/miovision/sql/open_data/create-view-miovision_wide_tmc.sql b/volumes/miovision/sql/open_data/create-view-miovision_wide_tmc.sql new file mode 100644 index 000000000..0886322f4 --- /dev/null +++ b/volumes/miovision/sql/open_data/create-view-miovision_wide_tmc.sql @@ -0,0 +1,356 @@ +--Would prefer to omit this, unless we *really* want it for +--comparison with existing short term TMC open data. + +--include u-turns? +--bikes +--motorized vehicles/streetcars? + +CREATE OR REPLACE VIEW gwolofs.miovision_open_data_wide_15min AS ( + + SELECT + v.intersection_uid, + v.datetime_bin, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 3 + AND v.classification_uid = ANY(ARRAY[1]::int []) + AND v.leg = 'S' + ), 0) AS sb_car_r, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 1 + AND v.classification_uid = ANY(ARRAY[1]::int []) + AND v.leg = 'S' + ), 0) AS sb_car_t, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 2 + AND v.classification_uid = ANY(ARRAY[1]::int []) + AND v.leg = 'S' + ), 0) AS sb_car_l, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 3 + AND v.classification_uid = ANY(ARRAY[1]::int []) + AND v.leg = 'N' + ), 0) AS nb_car_r, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 1 + AND v.classification_uid = ANY(ARRAY[1]::int []) + AND v.leg = 'N' + ), 0) AS nb_car_t, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 2 + AND v.classification_uid = ANY(ARRAY[1]::int []) + AND v.leg = 'N' + ), 0) AS nb_car_l, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 3 + AND v.classification_uid = ANY(ARRAY[1]::int []) + AND v.leg = 'W' + ), 0) AS wb_car_r, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 1 + AND v.classification_uid = ANY(ARRAY[1]::int []) + AND v.leg = 'W' + ), 0) AS wb_car_t, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 2 + AND v.classification_uid = ANY(ARRAY[1]::int []) + AND v.leg = 'W' + ), 0) AS wb_car_l, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 3 + AND v.classification_uid = ANY(ARRAY[1]::int []) + AND v.leg = 'E' + ), 0) AS eb_car_r, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 1 + AND v.classification_uid = ANY(ARRAY[1]::int []) + AND v.leg = 'E' + ), 0) AS eb_car_t, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 2 + AND v.classification_uid = ANY(ARRAY[1]::int []) + AND v.leg = 'E' + ), 0) AS eb_car_l, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 3 + AND v.classification_uid = ANY(ARRAY[4, 5, 9]::int []) + AND v.leg = 'S' + ), 0) AS sb_truck_r, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 1 + AND v.classification_uid = ANY(ARRAY[4, 5, 9]::int []) + AND v.leg = 'S' + ), 0) AS sb_truck_t, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 2 + AND v.classification_uid = ANY(ARRAY[4, 5, 9]::int []) + AND v.leg = 'S' + ), 0) AS sb_truck_l, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 3 + AND v.classification_uid = ANY(ARRAY[4, 5, 9]::int []) + AND v.leg = 'N' + ), 0) AS nb_truck_r, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 1 + AND v.classification_uid = ANY(ARRAY[4, 5, 9]::int []) + AND v.leg = 'N' + ), 0) AS nb_truck_t, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 2 + AND v.classification_uid = ANY(ARRAY[4, 5, 9]::int []) + AND v.leg = 'N' + ), 0) AS nb_truck_l, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 3 + AND v.classification_uid = ANY(ARRAY[4, 5, 9]::int []) + AND v.leg = 'W' + ), 0) AS wb_truck_r, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 1 + AND v.classification_uid = ANY(ARRAY[4, 5, 9]::int []) + AND v.leg = 'W' + ), 0) AS wb_truck_t, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 2 + AND v.classification_uid = ANY(ARRAY[4, 5, 9]::int []) + AND v.leg = 'W' + ), 0) AS wb_truck_l, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 3 + AND v.classification_uid = ANY(ARRAY[4, 5, 9]::int []) + AND v.leg = 'E' + ), 0) AS eb_truck_r, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 1 + AND v.classification_uid = ANY(ARRAY[4, 5, 9]::int []) + AND v.leg = 'E' + ), 0) AS eb_truck_t, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 2 + AND v.classification_uid = ANY(ARRAY[4, 5, 9]::int []) + AND v.leg = 'E' + ), 0) AS eb_truck_l, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 3 + AND v.classification_uid = ANY(ARRAY[3]::int []) + AND v.leg = 'S' + ), 0) AS sb_bus_r, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 1 + AND v.classification_uid = ANY(ARRAY[3]::int []) + AND v.leg = 'S' + ), 0) AS sb_bus_t, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 2 + AND v.classification_uid = ANY(ARRAY[3]::int []) + AND v.leg = 'S' + ), 0) AS sb_bus_l, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 3 + AND v.classification_uid = ANY(ARRAY[3]::int []) + AND v.leg = 'N' + ), 0) AS nb_bus_r, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 1 + AND v.classification_uid = ANY(ARRAY[3]::int []) + AND v.leg = 'N' + ), 0) AS nb_bus_t, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 2 + AND v.classification_uid = ANY(ARRAY[3]::int []) + AND v.leg = 'N' + ), 0) AS nb_bus_l, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 3 + AND v.classification_uid = ANY(ARRAY[3]::int []) + AND v.leg = 'W' + ), 0) AS wb_bus_r, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 1 + AND v.classification_uid = ANY(ARRAY[3]::int []) + AND v.leg = 'W' + ), 0) AS wb_bus_t, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 2 + AND v.classification_uid = ANY(ARRAY[3]::int []) + AND v.leg = 'W' + ), 0) AS wb_bus_l, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 3 + AND v.classification_uid = ANY(ARRAY[3]::int []) + AND v.leg = 'E' + ), 0) AS eb_bus_r, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 1 + AND v.classification_uid = ANY(ARRAY[3]::int []) + AND v.leg = 'E' + ), 0) AS eb_bus_t, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = 2 + AND v.classification_uid = ANY(ARRAY[3]::int []) + AND v.leg = 'E' + ), 0) AS eb_bus_l, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.classification_uid = 6 + AND v.leg = 'N' + ), 0) AS nx_peds, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.classification_uid = 6 + AND v.leg = 'S' + ), 0) AS sx_peds, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.classification_uid = 6 + AND v.leg = 'E' + ), 0) AS ex_peds, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.classification_uid = 6 + AND v.leg = 'W' + ), 0) AS wx_peds, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.classification_uid = 2 + AND v.leg = 'N' + ), 0) AS nx_bike, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.classification_uid = 2 + AND v.leg = 'S' + ), 0) AS sx_bike, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.classification_uid = 2 + AND v.leg = 'E' + ), 0) AS ex_bike, + COALESCE(SUM(v.volume) FILTER ( + WHERE + v.classification_uid = 2 + AND v.leg = 'W' + ), 0) AS wx_bike + FROM miovision_api.volumes_15min_mvt AS v + --JOIN miovision_api.classifications AS c USING (classification_uid) + --JOIN miovision_api.movements AS m USING (movement_uid) + WHERE + v.datetime_bin >= '2024-02-01'::date + AND v.datetime_bin < '2024-03-01'::date + --AND intersection_uid = 1 + --AND v.classification_uid NOT IN (2,10) --exclude bikes due to reliability + GROUP BY + v.intersection_uid, + v.datetime_bin + ORDER BY + v.intersection_uid, + v.datetime_bin +); + +SELECT * FROM gwolofs.miovision_open_data_wide_15min LIMIT 10000; --noqa: L044 + +/* query used for query development! + +SELECT col_name FROM ( +SELECT + 'COALESCE(SUM(v.volume) FILTER ( + WHERE + v.movement_uid = ' + || movement_uid::text || + ' AND v.classification_uid = ANY( + ARRAY[' || string_agg(classification_uid::text, ',') || ']::int [])' || + ' + AND v.leg = ''' || leg || ''' +), 0) AS ' || + dir || '_' || + CASE classification WHEN 'Light' THEN 'car' WHEN 'Bus' THEN 'bus' + WHEN 'SingleUnitTruck' THEN 'truck' WHEN 'ArticulatedTruck' THEN 'truck' + WHEN 'MotorizedVehicle' THEN 'truck' END || '_' || + CASE movement_name WHEN 'left' THEN 'l' + WHEN 'thru' THEN 't' WHEN 'right' THEN 'r' END || ',' AS col_name +FROM miovision_api.movements +CROSS JOIN miovision_api.classifications +CROSS JOIN (VALUES + ('sb', 'S', 1), ('nb', 'N', 2), ('wb', 'W', 3), ('eb', 'E', 4) +) AS directions(dir, leg, dir_order) +WHERE + classification_uid NOT IN (2,6,8,10) --bikes, peds, workvans (dne) + --AND movement_uid NOT IN (7,8) --entrances, exits + AND movement_uid IN (1,2,3) +GROUP BY + CASE classification WHEN 'Light' THEN 1 WHEN 'Bus' THEN 3 WHEN 'SingleUnitTruck' THEN 2 + WHEN 'ArticulatedTruck' THEN 2 WHEN 'MotorizedVehicle' THEN 2 END, + CASE classification WHEN 'Light' THEN 'car' WHEN 'Bus' THEN 'bus' + WHEN 'SingleUnitTruck' THEN 'truck' WHEN 'ArticulatedTruck' THEN 'truck' + WHEN 'MotorizedVehicle' THEN 'truck' END, + dir_order, + dir, + leg, + movement_uid +ORDER BY + CASE classification WHEN 'Light' THEN 1 WHEN 'Bus' THEN 3 WHEN 'SingleUnitTruck' THEN 2 + WHEN 'ArticulatedTruck' THEN 2 WHEN 'MotorizedVehicle' THEN 2 END, + dir_order, + CASE movement_name WHEN 'left' THEN 3 WHEN 'thru' THEN 2 WHEN 'right' THEN 1 END +) AS vehs + +UNION ALL + +SELECT col_name FROM ( + SELECT + 'COALESCE(SUM(v.volume) FILTER ( + WHERE + v.classification_uid = ' + || classification_uid::text || + ' + AND v.leg = ''' || leg || ''' + ), 0) AS ' || + dir || '_' || + CASE classification WHEN 'Bicycle' THEN 'bike' + WHEN 'Pedestrian' THEN 'peds' END || ',' AS col_name + FROM miovision_api.classifications + CROSS JOIN (VALUES + ('sx', 'S', 2), ('nx', 'N', 1), ('wx', 'W', 4), ('ex', 'E', 3) + ) AS directions(dir, leg, dir_order) + WHERE + classification_uid IN (2,6) --bikes, peds + ORDER BY + CASE classification WHEN 'Pedestrian' THEN 1 WHEN 'bike' THEN 2 END, + dir_order +) AS bikes_n_peds +*/ \ No newline at end of file