Skip to content

Enterprise-grade transit analytics platform built on Databricks. Implements Medallion Architecture (Bronze-Silver-Gold) with Delta Live Tables, Unity Catalog governance, and 20+ data quality rules. Demonstrates production-ready data engineering patterns including Liquid Clustering, Change Data Feed, and automated pipeline orchestration.

Notifications You must be signed in to change notification settings

Barrese93/Real-Time-Transit-Analytics-Platform

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

24 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

Real-Time Transit Analytics Platform

Databricks Delta Lake Python License: MIT GitHub Build Status

Enterprise-grade transit analytics platform built on Databricks, demonstrating advanced data engineering capabilities and best practices for modern data pipelines.


πŸ“‹ Table of Contents


🎯 Overview

The Real-Time Transit Analytics Platform is a production-ready data engineering solution that processes GTFS (General Transit Feed Specification) data to provide real-time insights into public transportation systems. Built using Databricks Asset Bundles (DABs), this project demonstrates industry best practices for:

  • Medallion Architecture: Structured Bronze β†’ Silver β†’ Gold data layers
  • Delta Live Tables (DLT): Declarative ETL with built-in data quality
  • Unity Catalog: Enterprise-grade data governance and security
  • CI/CD Automation: GitHub Actions for deployment and testing
  • Liquid Clustering: Advanced query optimization for multi-dimensional analytics
  • Streaming & Batch: Hybrid processing patterns for different data sources

Key Capabilities

βœ… Static GTFS Processing: Complete implementation with 8 Silver tables and 5 Gold aggregates
βœ… Data Quality Framework: 20+ DLT expectations with automated monitoring
βœ… Security & Governance: Row-level security, column masking, audit logging
βœ… Observability: Event log analysis, pipeline health dashboards, alerting
βœ… Cost Optimization: Serverless compute, SPOT instances, auto-termination
⚠️ Realtime Pipeline: Proof-of-concept with mock data (demonstration purposes)


πŸ—οΈ Architecture

Medallion Architecture Diagram

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                          DATA SOURCES                               β”‚
β”‚  β€’ GTFS Static Schedule (CSV files)                                β”‚
β”‚  β€’ GTFS-RT Feeds (Protocol Buffers) - POC only                     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                         β”‚
                         β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    BRONZE LAYER (Raw Data)                          β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚ β€’ Auto Loader with schema evolution                          β”‚  β”‚
β”‚  β”‚ β€’ Immutable source-of-truth storage                          β”‚  β”‚
β”‚  β”‚ β€’ Rescued data column for malformed records                  β”‚  β”‚
β”‚  β”‚ β€’ Partition by ingestion date                                β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚  Tables: bronze_agencies, bronze_routes, bronze_trips,              β”‚
β”‚          bronze_stops, bronze_stop_times, bronze_calendar, etc.     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                         β”‚
                         β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                  SILVER LAYER (Validated Data)                      β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚ β€’ Data quality expectations (20+ rules)                      β”‚  β”‚
β”‚  β”‚ β€’ Type casting and standardization                           β”‚  β”‚
β”‚  β”‚ β€’ Deduplication and null handling                            β”‚  β”‚
β”‚  β”‚ β€’ Change Data Feed (CDF) enabled                             β”‚  β”‚
β”‚  β”‚ β€’ SCD Type 2 for historical tracking                         β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚  Tables: silver_agencies, silver_routes, silver_trips,              β”‚
β”‚          silver_stops, silver_stop_times, silver_calendar, etc.     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                         β”‚
                         β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                   GOLD LAYER (Business Metrics)                     β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚ β€’ Pre-aggregated business metrics                            β”‚  β”‚
β”‚  β”‚ β€’ Liquid Clustering for query optimization                   β”‚  β”‚
β”‚  β”‚ β€’ Optimized for BI tool consumption                          β”‚  β”‚
β”‚  β”‚ β€’ Row-level security and column masking                      β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚  Tables: gold_route_summary, gold_stop_frequency,                   β”‚
β”‚          gold_service_patterns, gold_trip_statistics, etc.          β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                         β”‚
                         β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    ANALYTICS & DASHBOARDS                           β”‚
β”‚  β€’ SQL Analytics Dashboards                                         β”‚
β”‚  β€’ Real-time monitoring views                                       β”‚
β”‚  β€’ Data quality scorecards                                          β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Technology Stack

Component Technology Purpose
Compute Databricks Serverless DLT pipeline execution
Storage Delta Lake on S3 ACID transactions, time travel
Orchestration Delta Live Tables Declarative ETL framework
Governance Unity Catalog Metadata management, access control
CI/CD GitHub Actions Automated deployment & testing
IaC Databricks Asset Bundles Infrastructure as code
Monitoring DLT Event Logs Pipeline observability

✨ Features

Data Engineering

  • Delta Live Tables: Declarative pipeline definitions with automatic dependency resolution
  • Auto Loader: Incremental file ingestion with schema evolution
  • Change Data Feed: Track all changes for downstream consumers
  • Liquid Clustering: Multi-dimensional clustering for optimal query performance
  • Z-Ordering: Data skipping for faster queries on high-cardinality columns

Data Quality

  • 20+ DLT Expectations: Automated data quality checks at Silver layer
  • Expectation Types:
    • expect_or_drop: Drop invalid records (e.g., invalid coordinates)
    • expect_or_fail: Fail pipeline on critical violations (e.g., missing IDs)
    • expect: Track violations without blocking (e.g., data freshness)
  • Quality Monitoring: Real-time dashboards showing DQ metrics
  • Automated Alerts: Email notifications on quality threshold breaches

Security & Governance

  • Row-Level Security: Filter data based on user group membership
  • Column Masking: Redact sensitive fields for non-privileged users
  • Audit Logging: Track all data access and modifications
  • Unity Catalog Integration: Centralized metadata and lineage
  • Encryption: At-rest and in-transit encryption for all data

Note: The security features described, including Row-Level Security and Column Masking, have been implemented. The core logic is managed in the notebooks/security/unity_catalog_security.sql notebook.

Observability

  • Pipeline Health Dashboard: Real-time monitoring of DLT pipelines
  • Event Log Analysis: Query pipeline execution history and metrics
  • Data Quality Scorecard: Track expectation pass/fail rates
  • Cost Tracking: Monitor DBU consumption per pipeline
  • Alerting: Configurable alerts for failures and anomalies

πŸš€ Quick Start

Prerequisites

  • Databricks Workspace: 14-day trial or existing workspace
  • AWS Account: For S3 storage (or Azure/GCP equivalent)
  • Python: 3.9 or higher
  • Databricks CLI: Latest version
  • Git: For version control

Installation

  1. Clone the repository

    git clone https://github.com/Barrese93/Real-Time-Transit-Analytics-Platform.git
    cd Real-Time-Transit-Analytics-Platform
  2. Install Databricks CLI

    pip install databricks-cli
  3. Configure authentication

    databricks configure --token

    Enter your workspace URL and personal access token when prompted.

  4. Set up S3 bucket (first-time only)

    Create an S3 bucket for raw data storage:

    # Create bucket (replace with your unique name)
    aws s3 mb s3://your-transit-analytics-bucket
    
    # Create folder structure
    aws s3api put-object --bucket your-transit-analytics-bucket --key gtfs-static/
    aws s3api put-object --bucket your-transit-analytics-bucket --key checkpoints/
  5. Update configuration

    Edit databricks.yml to set your workspace URL and catalog name:

    workspace:
      host: https://your-workspace.cloud.databricks.com
    
    variables:
      catalog_name:
        default: your_catalog_name
      raw_data_bucket:
        default: "your-s3-bucket-name"
  6. Validate the bundle

    databricks bundle validate -t dev
  7. Deploy to development

    databricks bundle deploy -t dev

Running Your First Pipeline

  1. Upload sample GTFS data to S3

    aws s3 cp sample-data/gtfs/ s3://your-bucket/gtfs-static/ --recursive
  2. Trigger static data ingestion job

    databricks jobs run-now --job-id <static-ingestion-job-id>
  3. Start the Silver layer DLT pipeline

    databricks pipelines start-update --pipeline-id <silver-pipeline-id>
  4. Monitor pipeline execution

    databricks pipelines get --pipeline-id <silver-pipeline-id>
  5. View results in Unity Catalog

    Navigate to your Databricks workspace β†’ Data β†’ Catalog β†’ your_catalog β†’ silver schema


πŸ“ Project Structure

Real-Time-Transit-Analytics-Platform/
β”‚
β”œβ”€β”€ databricks.yml                    # Root bundle configuration
β”œβ”€β”€ .gitignore                        # Git ignore patterns
β”œβ”€β”€ README.md                         # This file
β”œβ”€β”€ AWS_S3_BUCKET_SETUP_GUIDE.md     # S3 setup instructions
β”‚
β”œβ”€β”€ src/                              # Source code
β”‚   β”œβ”€β”€ pipelines/                    # DLT pipeline definitions
β”‚   β”‚   β”œβ”€β”€ dlt_static_gtfs_silver.py    # Silver layer (8 tables, 20+ DQ rules)
β”‚   β”‚   β”œβ”€β”€ dlt_static_gtfs_gold.py      # Gold layer (5 aggregated tables)
β”‚   β”‚   └── dlt_transit_realtime.py      # Realtime POC (mock data)
β”‚   β”œβ”€β”€ jobs/                         # Databricks jobs
β”‚   β”‚   └── ingest_static_gtfs.py        # Static GTFS ingestion
β”‚   └── utils/                        # Utility modules
β”‚       └── gtfs_parser.py               # GTFS parsing utilities
β”‚
β”œβ”€β”€ resources/                        # Databricks resource definitions
β”‚   β”œβ”€β”€ pipelines/                    # DLT pipeline YAML configs
β”‚   β”‚   β”œβ”€β”€ static_gtfs_silver_pipeline.yml
β”‚   β”‚   β”œβ”€β”€ static_gtfs_gold_pipeline.yml
β”‚   β”‚   └── transit_pipeline.yml
β”‚   └── jobs/                         # Job YAML configs
β”‚       β”œβ”€β”€ static_ingestion_job.yml
β”‚       β”œβ”€β”€ pipeline_trigger_job.yml
β”‚       β”œβ”€β”€ maintenance_job.yml
β”‚       └── monitoring_job.yml
β”‚
β”œβ”€β”€ notebooks/                        # Analysis and maintenance notebooks
β”‚   β”œβ”€β”€ analytics/                    # Analytics queries and dashboards
β”‚   β”‚   β”œβ”€β”€ dashboard_definitions.py
β”‚   β”‚   └── sql_views_analytics.sql
β”‚   β”œβ”€β”€ monitoring/                   # Observability notebooks
β”‚   β”‚   β”œβ”€β”€ dlt_event_log_analysis.sql
β”‚   β”‚   └── data_quality_monitoring.sql
β”‚   β”œβ”€β”€ maintenance/                  # Maintenance operations
β”‚   β”‚   └── optimize_gold_tables.py
β”‚   └── validation/                   # End-to-end validation
β”‚       └── end_to_end_validation.sql
β”‚
β”œβ”€β”€ docs/                             # Additional documentation
β”‚   └── GOLD_LAYER_USER_GUIDE.md
β”‚
└── .github/                          # CI/CD workflows
    └── workflows/
        β”œβ”€β”€ validate.yml              # Bundle validation on PR
        β”œβ”€β”€ deploy.yml                # Automated deployment
        └── integration-tests.yml     # Scheduled integration tests

πŸ”„ Data Pipelines

Static GTFS Silver Pipeline

Status: βœ… Production-ready
File: src/pipelines/dlt_static_gtfs_silver.py
Tables: 8 Silver tables with comprehensive data quality

Tables Created

Table Records DQ Rules Description
silver_agencies ~10 3 Transit agencies with validated contact info
silver_routes ~100 4 Routes with type validation and color codes
silver_trips ~5,000 5 Trips with direction and service validation
silver_stops ~500 6 Stops with coordinate validation
silver_stop_times ~50,000 7 Stop times with sequence validation
silver_calendar ~365 4 Service calendars with date validation
silver_calendar_dates ~100 3 Service exceptions
silver_shapes ~10,000 5 Route shapes with coordinate validation

Data Quality Rules

# Example expectations from the pipeline
@dlt.expect_or_drop("valid_coordinates", 
    "stop_lat BETWEEN -90 AND 90 AND stop_lon BETWEEN -180 AND 180")
@dlt.expect_or_fail("required_stop_id", "stop_id IS NOT NULL")
@dlt.expect("recent_data", "ingestion_date >= current_date() - INTERVAL 7 DAYS")

Static GTFS Gold Pipeline

Status: βœ… Production-ready
File: src/pipelines/dlt_static_gtfs_gold.py
Tables: 5 Gold aggregated tables with Liquid Clustering

Tables Created

Table Clustering Keys Description
gold_route_summary [route_id, agency_id] Route-level metrics and statistics
gold_stop_frequency [stop_id, route_id] Stop visit frequency by route
gold_service_patterns [service_id] Service pattern analysis
gold_trip_statistics [route_id, direction_id] Trip-level aggregations
gold_hourly_service_levels [route_id, hour_of_day] Hourly service frequency

Example Gold Table

-- gold_route_summary structure
SELECT 
    route_id,
    agency_id,
    route_short_name,
    route_long_name,
    total_trips,
    total_stops,
    avg_trip_duration_minutes,
    service_days_per_week,
    first_departure_time,
    last_departure_time,
    peak_frequency_minutes,
    off_peak_frequency_minutes
FROM gold.gold_route_summary
WHERE route_id = 'ROUTE_001';

Realtime Pipeline (POC)

Status: ⚠️ Proof-of-concept with mock data
File: src/pipelines/dlt_transit_realtime.py
Purpose: Technical demonstration

This pipeline demonstrates DLT streaming patterns but uses hardcoded data. See the file header for details on converting to production.


πŸ›‘οΈ Data Quality & Governance

Data Quality Framework

The platform implements a comprehensive data quality framework using DLT expectations:

Expectation Categories

  1. Structural Validation

    • Required fields (NOT NULL checks)
    • Data type validation
    • Format validation (e.g., email, phone)
  2. Business Rules

    • Coordinate ranges (-90 to 90 for latitude)
    • Enumeration validation (route types, service days)
    • Referential integrity (foreign key checks)
  3. Temporal Validation

    • Date range checks
    • Sequence validation (stop_sequence)
    • Freshness checks (data recency)
  4. Statistical Validation

    • Outlier detection
    • Distribution checks
    • Completeness metrics

Monitoring Data Quality

Access the data quality dashboard:

-- Query DLT event logs for quality metrics
SELECT 
    flow_name,
    expectation_name,
    SUM(passed_records) as passed,
    SUM(failed_records) as failed,
    ROUND(SUM(passed_records) * 100.0 / 
          (SUM(passed_records) + SUM(failed_records)), 2) as pass_rate
FROM event_log_raw
WHERE event_type = 'flow_progress'
GROUP BY flow_name, expectation_name
ORDER BY pass_rate ASC;

Unity Catalog Security

Row-Level Security Example

-- Create row filter function
CREATE FUNCTION is_authorized_agency(agency_id STRING)
RETURNS BOOLEAN
RETURN CASE
    WHEN is_member('agency_a_analysts') AND agency_id = 'AGENCY_A' THEN TRUE
    WHEN is_member('agency_b_analysts') AND agency_id = 'AGENCY_B' THEN TRUE
    WHEN is_member('system_admin') THEN TRUE
    ELSE FALSE
END;

-- Apply to table
ALTER TABLE gold.gold_route_summary
SET ROW FILTER is_authorized_agency ON (agency_id);

Column Masking Example

-- Create masking function
CREATE FUNCTION mask_operator_id(operator_id STRING)
RETURNS STRING
RETURN CASE
    WHEN is_member('operations_admin') THEN operator_id
    ELSE 'REDACTED'
END;

-- Apply to column
ALTER TABLE silver.silver_trips
ALTER COLUMN operator_id SET MASK mask_operator_id;

πŸ“Š Analytics & Dashboards

Available Dashboards

  1. Pipeline Health Dashboard

    • Real-time pipeline status
    • Execution duration trends
    • Error rate monitoring
    • DBU consumption tracking
  2. Data Quality Scorecard

    • Expectation pass/fail rates
    • Quality trends over time
    • Top failing rules
    • Data completeness metrics
  3. Transit Operations Dashboard

    • Route performance metrics
    • Stop frequency analysis
    • Service pattern visualization
    • Peak vs off-peak comparisons

Sample Analytics Queries

1. Route Performance Analysis

-- Top 10 busiest routes by trip count
SELECT 
    r.route_short_name,
    r.route_long_name,
    COUNT(DISTINCT t.trip_id) as total_trips,
    COUNT(DISTINCT st.stop_id) as unique_stops,
    MIN(st.arrival_time) as first_service,
    MAX(st.departure_time) as last_service
FROM gold.gold_route_summary r
JOIN silver.silver_trips t ON r.route_id = t.route_id
JOIN silver.silver_stop_times st ON t.trip_id = st.trip_id
GROUP BY r.route_short_name, r.route_long_name
ORDER BY total_trips DESC
LIMIT 10;

2. Service Frequency Analysis

-- Service frequency by hour of day for a specific route
SELECT 
    hour_of_day,
    route_id,
    avg_headway_minutes,
    total_trips,
    CASE 
        WHEN hour_of_day BETWEEN 6 AND 9 THEN 'Morning Peak'
        WHEN hour_of_day BETWEEN 16 AND 19 THEN 'Evening Peak'
        ELSE 'Off-Peak'
    END as service_period
FROM gold.gold_hourly_service_levels
WHERE route_id = 'ROUTE_001'
ORDER BY hour_of_day;

3. Stop Activity Heatmap

-- Identify the busiest stops across all routes
SELECT 
    s.stop_name,
    s.stop_lat,
    s.stop_lon,
    COUNT(DISTINCT st.trip_id) as total_trips,
    COUNT(DISTINCT st.route_id) as routes_served,
    AVG(sf.daily_frequency) as avg_daily_visits
FROM silver.silver_stops s
JOIN silver.silver_stop_times st ON s.stop_id = st.stop_id
JOIN gold.gold_stop_frequency sf ON s.stop_id = sf.stop_id
GROUP BY s.stop_name, s.stop_lat, s.stop_lon
ORDER BY total_trips DESC
LIMIT 20;

4. Service Pattern Comparison

-- Compare weekday vs weekend service patterns
SELECT 
    sp.service_id,
    sp.service_type,
    COUNT(DISTINCT sp.route_id) as routes_count,
    SUM(sp.total_trips) as total_trips,
    AVG(sp.avg_trip_duration_minutes) as avg_duration,
    sp.days_of_week
FROM gold.gold_service_patterns sp
GROUP BY sp.service_id, sp.service_type, sp.days_of_week
ORDER BY total_trips DESC;

5. Route Coverage Analysis

-- Analyze geographic coverage by route
SELECT 
    r.route_id,
    r.route_short_name,
    COUNT(DISTINCT s.stop_id) as total_stops,
    ROUND(MAX(s.stop_lat) - MIN(s.stop_lat), 4) as lat_span,
    ROUND(MAX(s.stop_lon) - MIN(s.stop_lon), 4) as lon_span,
    COUNT(DISTINCT sh.shape_id) as shape_variations
FROM silver.silver_routes r
JOIN silver.silver_trips t ON r.route_id = t.route_id
JOIN silver.silver_stop_times st ON t.trip_id = st.trip_id
JOIN silver.silver_stops s ON st.stop_id = s.stop_id
LEFT JOIN silver.silver_shapes sh ON t.shape_id = sh.shape_id
GROUP BY r.route_id, r.route_short_name
ORDER BY total_stops DESC;

6. Trip Duration Statistics

-- Analyze trip duration patterns by route and direction
SELECT 
    ts.route_id,
    ts.direction_id,
    ts.total_trips,
    ROUND(ts.avg_duration_minutes, 2) as avg_duration,
    ROUND(ts.min_duration_minutes, 2) as min_duration,
    ROUND(ts.max_duration_minutes, 2) as max_duration,
    ROUND(ts.std_duration_minutes, 2) as duration_variance
FROM gold.gold_trip_statistics ts
WHERE ts.total_trips > 10
ORDER BY ts.route_id, ts.direction_id;

7. Data Quality Monitoring

-- Monitor data quality metrics across all pipelines
SELECT 
    DATE(update_time) as date,
    flow_name,
    COUNT(DISTINCT dataset_name) as tables_processed,
    SUM(num_output_rows) as total_rows,
    SUM(CASE WHEN expectation_failed = true THEN 1 ELSE 0 END) as failed_expectations,
    ROUND(AVG(duration_seconds), 2) as avg_processing_time
FROM event_log_raw
WHERE event_type = 'flow_progress'
  AND update_time >= CURRENT_DATE - INTERVAL 7 DAYS
GROUP BY DATE(update_time), flow_name
ORDER BY date DESC, flow_name;

8. Real-time Service Alerts

-- Identify routes with potential service issues (low frequency)
SELECT 
    hsl.route_id,
    r.route_short_name,
    hsl.hour_of_day,
    hsl.avg_headway_minutes,
    hsl.total_trips,
    CASE 
        WHEN hsl.avg_headway_minutes > 60 THEN 'Low Frequency'
        WHEN hsl.avg_headway_minutes > 30 THEN 'Medium Frequency'
        ELSE 'High Frequency'
    END as service_level
FROM gold.gold_hourly_service_levels hsl
JOIN silver.silver_routes r ON hsl.route_id = r.route_id
WHERE hsl.hour_of_day BETWEEN 7 AND 20  -- Peak hours
  AND hsl.avg_headway_minutes > 30
ORDER BY hsl.avg_headway_minutes DESC;

🚒 Deployment

Deployment Environments

The project supports multiple deployment targets:

  • dev: Development environment (default)
  • prod: Production environment

Manual Deployment

# Deploy to development
databricks bundle deploy -t dev

# Deploy to production
databricks bundle deploy -t prod

CI/CD with GitHub Actions

The project includes three automated workflows:

  1. Validate (.github/workflows/validate.yml)

    • Runs on pull requests
    • Validates bundle configuration
    • Checks Python syntax
  2. Deploy (.github/workflows/deploy.yml)

    • Runs on merge to main
    • Deploys to production
    • Runs smoke tests
  3. Integration Tests (.github/workflows/integration-tests.yml)

    • Scheduled daily
    • Validates end-to-end pipeline
    • Checks data quality metrics

Deployment Best Practices

  1. Always validate before deploying

    databricks bundle validate -t prod
  2. Use separate catalogs for dev/prod

    targets:
      dev:
        variables:
          catalog_name: transit_analytics_dev
      prod:
        variables:
          catalog_name: transit_analytics_prod
  3. Test in dev before promoting to prod

    • Deploy to dev
    • Run validation queries
    • Check data quality metrics
    • Review pipeline logs
    • Then deploy to prod
  4. Monitor deployments

    • Check GitHub Actions logs
    • Verify pipeline creation in workspace
    • Validate table creation in Unity Catalog

πŸ’° Cost Management

Trial Budget Optimization

This project is designed to stay within the $40 Databricks trial credit limit:

Resource Configuration Estimated Cost/Day
DLT Pipelines (Serverless) 2 workers, 15min runs $8-12
Interactive Cluster r5.large, single-node $2-4 (15min auto-term)
Job Clusters Ephemeral, SPOT instances $3-5
Storage (S3) Delta tables + checkpoints $0.50
Unity Catalog Metadata storage Included
Total ~$15-20/day

Cost Optimization Strategies

  1. Use Serverless for DLT

    • No cluster management overhead
    • Pay only for actual compute time
    • Automatic scaling
  2. Enable Auto-Termination

    clusters:
      - label: "default"
        autotermination_minutes: 15
  3. Use SPOT Instances

    aws_attributes:
      availability: "SPOT"
      spot_bid_price_percent: 100
  4. Optimize Pipeline Schedules

    • Run pipelines during off-peak hours
    • Use incremental processing
    • Batch small updates
  5. Monitor DBU Consumption

    -- Query to track DBU usage
    SELECT 
        date_trunc('day', usage_date) as day,
        SUM(usage_quantity) as total_dbus,
        SUM(usage_quantity * list_price) as estimated_cost
    FROM system.billing.usage
    WHERE sku_name LIKE '%DLT%'
    GROUP BY day
    ORDER BY day DESC;

About

Enterprise-grade transit analytics platform built on Databricks. Implements Medallion Architecture (Bronze-Silver-Gold) with Delta Live Tables, Unity Catalog governance, and 20+ data quality rules. Demonstrates production-ready data engineering patterns including Liquid Clustering, Change Data Feed, and automated pipeline orchestration.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages