Enterprise-grade transit analytics platform built on Databricks, demonstrating advanced data engineering capabilities and best practices for modern data pipelines.
- Overview
- Architecture
- Features
- Quick Start
- Project Structure
- Data Pipelines
- Data Quality & Governance
- Analytics & Dashboards
- Deployment
- Cost Management
The Real-Time Transit Analytics Platform is a production-ready data engineering solution that processes GTFS (General Transit Feed Specification) data to provide real-time insights into public transportation systems. Built using Databricks Asset Bundles (DABs), this project demonstrates industry best practices for:
- Medallion Architecture: Structured Bronze β Silver β Gold data layers
- Delta Live Tables (DLT): Declarative ETL with built-in data quality
- Unity Catalog: Enterprise-grade data governance and security
- CI/CD Automation: GitHub Actions for deployment and testing
- Liquid Clustering: Advanced query optimization for multi-dimensional analytics
- Streaming & Batch: Hybrid processing patterns for different data sources
β
Static GTFS Processing: Complete implementation with 8 Silver tables and 5 Gold aggregates
β
Data Quality Framework: 20+ DLT expectations with automated monitoring
β
Security & Governance: Row-level security, column masking, audit logging
β
Observability: Event log analysis, pipeline health dashboards, alerting
β
Cost Optimization: Serverless compute, SPOT instances, auto-termination
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β DATA SOURCES β
β β’ GTFS Static Schedule (CSV files) β
β β’ GTFS-RT Feeds (Protocol Buffers) - POC only β
ββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β BRONZE LAYER (Raw Data) β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β’ Auto Loader with schema evolution β β
β β β’ Immutable source-of-truth storage β β
β β β’ Rescued data column for malformed records β β
β β β’ Partition by ingestion date β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β Tables: bronze_agencies, bronze_routes, bronze_trips, β
β bronze_stops, bronze_stop_times, bronze_calendar, etc. β
ββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SILVER LAYER (Validated Data) β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β’ Data quality expectations (20+ rules) β β
β β β’ Type casting and standardization β β
β β β’ Deduplication and null handling β β
β β β’ Change Data Feed (CDF) enabled β β
β β β’ SCD Type 2 for historical tracking β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β Tables: silver_agencies, silver_routes, silver_trips, β
β silver_stops, silver_stop_times, silver_calendar, etc. β
ββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β GOLD LAYER (Business Metrics) β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β’ Pre-aggregated business metrics β β
β β β’ Liquid Clustering for query optimization β β
β β β’ Optimized for BI tool consumption β β
β β β’ Row-level security and column masking β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β Tables: gold_route_summary, gold_stop_frequency, β
β gold_service_patterns, gold_trip_statistics, etc. β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ANALYTICS & DASHBOARDS β
β β’ SQL Analytics Dashboards β
β β’ Real-time monitoring views β
β β’ Data quality scorecards β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
| Component | Technology | Purpose |
|---|---|---|
| Compute | Databricks Serverless | DLT pipeline execution |
| Storage | Delta Lake on S3 | ACID transactions, time travel |
| Orchestration | Delta Live Tables | Declarative ETL framework |
| Governance | Unity Catalog | Metadata management, access control |
| CI/CD | GitHub Actions | Automated deployment & testing |
| IaC | Databricks Asset Bundles | Infrastructure as code |
| Monitoring | DLT Event Logs | Pipeline observability |
- Delta Live Tables: Declarative pipeline definitions with automatic dependency resolution
- Auto Loader: Incremental file ingestion with schema evolution
- Change Data Feed: Track all changes for downstream consumers
- Liquid Clustering: Multi-dimensional clustering for optimal query performance
- Z-Ordering: Data skipping for faster queries on high-cardinality columns
- 20+ DLT Expectations: Automated data quality checks at Silver layer
- Expectation Types:
expect_or_drop: Drop invalid records (e.g., invalid coordinates)expect_or_fail: Fail pipeline on critical violations (e.g., missing IDs)expect: Track violations without blocking (e.g., data freshness)
- Quality Monitoring: Real-time dashboards showing DQ metrics
- Automated Alerts: Email notifications on quality threshold breaches
- Row-Level Security: Filter data based on user group membership
- Column Masking: Redact sensitive fields for non-privileged users
- Audit Logging: Track all data access and modifications
- Unity Catalog Integration: Centralized metadata and lineage
- Encryption: At-rest and in-transit encryption for all data
Note: The security features described, including Row-Level Security and Column Masking, have been implemented. The core logic is managed in the notebooks/security/unity_catalog_security.sql notebook.
- Pipeline Health Dashboard: Real-time monitoring of DLT pipelines
- Event Log Analysis: Query pipeline execution history and metrics
- Data Quality Scorecard: Track expectation pass/fail rates
- Cost Tracking: Monitor DBU consumption per pipeline
- Alerting: Configurable alerts for failures and anomalies
- Databricks Workspace: 14-day trial or existing workspace
- AWS Account: For S3 storage (or Azure/GCP equivalent)
- Python: 3.9 or higher
- Databricks CLI: Latest version
- Git: For version control
-
Clone the repository
git clone https://github.com/Barrese93/Real-Time-Transit-Analytics-Platform.git cd Real-Time-Transit-Analytics-Platform -
Install Databricks CLI
pip install databricks-cli
-
Configure authentication
databricks configure --token
Enter your workspace URL and personal access token when prompted.
-
Set up S3 bucket (first-time only)
Create an S3 bucket for raw data storage:
# Create bucket (replace with your unique name) aws s3 mb s3://your-transit-analytics-bucket # Create folder structure aws s3api put-object --bucket your-transit-analytics-bucket --key gtfs-static/ aws s3api put-object --bucket your-transit-analytics-bucket --key checkpoints/
-
Update configuration
Edit
databricks.ymlto set your workspace URL and catalog name:workspace: host: https://your-workspace.cloud.databricks.com variables: catalog_name: default: your_catalog_name raw_data_bucket: default: "your-s3-bucket-name"
-
Validate the bundle
databricks bundle validate -t dev
-
Deploy to development
databricks bundle deploy -t dev
-
Upload sample GTFS data to S3
aws s3 cp sample-data/gtfs/ s3://your-bucket/gtfs-static/ --recursive
-
Trigger static data ingestion job
databricks jobs run-now --job-id <static-ingestion-job-id>
-
Start the Silver layer DLT pipeline
databricks pipelines start-update --pipeline-id <silver-pipeline-id>
-
Monitor pipeline execution
databricks pipelines get --pipeline-id <silver-pipeline-id>
-
View results in Unity Catalog
Navigate to your Databricks workspace β Data β Catalog β
your_catalogβsilverschema
Real-Time-Transit-Analytics-Platform/
β
βββ databricks.yml # Root bundle configuration
βββ .gitignore # Git ignore patterns
βββ README.md # This file
βββ AWS_S3_BUCKET_SETUP_GUIDE.md # S3 setup instructions
β
βββ src/ # Source code
β βββ pipelines/ # DLT pipeline definitions
β β βββ dlt_static_gtfs_silver.py # Silver layer (8 tables, 20+ DQ rules)
β β βββ dlt_static_gtfs_gold.py # Gold layer (5 aggregated tables)
β β βββ dlt_transit_realtime.py # Realtime POC (mock data)
β βββ jobs/ # Databricks jobs
β β βββ ingest_static_gtfs.py # Static GTFS ingestion
β βββ utils/ # Utility modules
β βββ gtfs_parser.py # GTFS parsing utilities
β
βββ resources/ # Databricks resource definitions
β βββ pipelines/ # DLT pipeline YAML configs
β β βββ static_gtfs_silver_pipeline.yml
β β βββ static_gtfs_gold_pipeline.yml
β β βββ transit_pipeline.yml
β βββ jobs/ # Job YAML configs
β βββ static_ingestion_job.yml
β βββ pipeline_trigger_job.yml
β βββ maintenance_job.yml
β βββ monitoring_job.yml
β
βββ notebooks/ # Analysis and maintenance notebooks
β βββ analytics/ # Analytics queries and dashboards
β β βββ dashboard_definitions.py
β β βββ sql_views_analytics.sql
β βββ monitoring/ # Observability notebooks
β β βββ dlt_event_log_analysis.sql
β β βββ data_quality_monitoring.sql
β βββ maintenance/ # Maintenance operations
β β βββ optimize_gold_tables.py
β βββ validation/ # End-to-end validation
β βββ end_to_end_validation.sql
β
βββ docs/ # Additional documentation
β βββ GOLD_LAYER_USER_GUIDE.md
β
βββ .github/ # CI/CD workflows
βββ workflows/
βββ validate.yml # Bundle validation on PR
βββ deploy.yml # Automated deployment
βββ integration-tests.yml # Scheduled integration tests
Status: β
Production-ready
File: src/pipelines/dlt_static_gtfs_silver.py
Tables: 8 Silver tables with comprehensive data quality
| Table | Records | DQ Rules | Description |
|---|---|---|---|
silver_agencies |
~10 | 3 | Transit agencies with validated contact info |
silver_routes |
~100 | 4 | Routes with type validation and color codes |
silver_trips |
~5,000 | 5 | Trips with direction and service validation |
silver_stops |
~500 | 6 | Stops with coordinate validation |
silver_stop_times |
~50,000 | 7 | Stop times with sequence validation |
silver_calendar |
~365 | 4 | Service calendars with date validation |
silver_calendar_dates |
~100 | 3 | Service exceptions |
silver_shapes |
~10,000 | 5 | Route shapes with coordinate validation |
# Example expectations from the pipeline
@dlt.expect_or_drop("valid_coordinates",
"stop_lat BETWEEN -90 AND 90 AND stop_lon BETWEEN -180 AND 180")
@dlt.expect_or_fail("required_stop_id", "stop_id IS NOT NULL")
@dlt.expect("recent_data", "ingestion_date >= current_date() - INTERVAL 7 DAYS")Status: β
Production-ready
File: src/pipelines/dlt_static_gtfs_gold.py
Tables: 5 Gold aggregated tables with Liquid Clustering
| Table | Clustering Keys | Description |
|---|---|---|
gold_route_summary |
[route_id, agency_id] |
Route-level metrics and statistics |
gold_stop_frequency |
[stop_id, route_id] |
Stop visit frequency by route |
gold_service_patterns |
[service_id] |
Service pattern analysis |
gold_trip_statistics |
[route_id, direction_id] |
Trip-level aggregations |
gold_hourly_service_levels |
[route_id, hour_of_day] |
Hourly service frequency |
-- gold_route_summary structure
SELECT
route_id,
agency_id,
route_short_name,
route_long_name,
total_trips,
total_stops,
avg_trip_duration_minutes,
service_days_per_week,
first_departure_time,
last_departure_time,
peak_frequency_minutes,
off_peak_frequency_minutes
FROM gold.gold_route_summary
WHERE route_id = 'ROUTE_001';Status:
File: src/pipelines/dlt_transit_realtime.py
Purpose: Technical demonstration
This pipeline demonstrates DLT streaming patterns but uses hardcoded data. See the file header for details on converting to production.
The platform implements a comprehensive data quality framework using DLT expectations:
-
Structural Validation
- Required fields (NOT NULL checks)
- Data type validation
- Format validation (e.g., email, phone)
-
Business Rules
- Coordinate ranges (-90 to 90 for latitude)
- Enumeration validation (route types, service days)
- Referential integrity (foreign key checks)
-
Temporal Validation
- Date range checks
- Sequence validation (stop_sequence)
- Freshness checks (data recency)
-
Statistical Validation
- Outlier detection
- Distribution checks
- Completeness metrics
Access the data quality dashboard:
-- Query DLT event logs for quality metrics
SELECT
flow_name,
expectation_name,
SUM(passed_records) as passed,
SUM(failed_records) as failed,
ROUND(SUM(passed_records) * 100.0 /
(SUM(passed_records) + SUM(failed_records)), 2) as pass_rate
FROM event_log_raw
WHERE event_type = 'flow_progress'
GROUP BY flow_name, expectation_name
ORDER BY pass_rate ASC;-- Create row filter function
CREATE FUNCTION is_authorized_agency(agency_id STRING)
RETURNS BOOLEAN
RETURN CASE
WHEN is_member('agency_a_analysts') AND agency_id = 'AGENCY_A' THEN TRUE
WHEN is_member('agency_b_analysts') AND agency_id = 'AGENCY_B' THEN TRUE
WHEN is_member('system_admin') THEN TRUE
ELSE FALSE
END;
-- Apply to table
ALTER TABLE gold.gold_route_summary
SET ROW FILTER is_authorized_agency ON (agency_id);-- Create masking function
CREATE FUNCTION mask_operator_id(operator_id STRING)
RETURNS STRING
RETURN CASE
WHEN is_member('operations_admin') THEN operator_id
ELSE 'REDACTED'
END;
-- Apply to column
ALTER TABLE silver.silver_trips
ALTER COLUMN operator_id SET MASK mask_operator_id;-
Pipeline Health Dashboard
- Real-time pipeline status
- Execution duration trends
- Error rate monitoring
- DBU consumption tracking
-
Data Quality Scorecard
- Expectation pass/fail rates
- Quality trends over time
- Top failing rules
- Data completeness metrics
-
Transit Operations Dashboard
- Route performance metrics
- Stop frequency analysis
- Service pattern visualization
- Peak vs off-peak comparisons
-- Top 10 busiest routes by trip count
SELECT
r.route_short_name,
r.route_long_name,
COUNT(DISTINCT t.trip_id) as total_trips,
COUNT(DISTINCT st.stop_id) as unique_stops,
MIN(st.arrival_time) as first_service,
MAX(st.departure_time) as last_service
FROM gold.gold_route_summary r
JOIN silver.silver_trips t ON r.route_id = t.route_id
JOIN silver.silver_stop_times st ON t.trip_id = st.trip_id
GROUP BY r.route_short_name, r.route_long_name
ORDER BY total_trips DESC
LIMIT 10;-- Service frequency by hour of day for a specific route
SELECT
hour_of_day,
route_id,
avg_headway_minutes,
total_trips,
CASE
WHEN hour_of_day BETWEEN 6 AND 9 THEN 'Morning Peak'
WHEN hour_of_day BETWEEN 16 AND 19 THEN 'Evening Peak'
ELSE 'Off-Peak'
END as service_period
FROM gold.gold_hourly_service_levels
WHERE route_id = 'ROUTE_001'
ORDER BY hour_of_day;-- Identify the busiest stops across all routes
SELECT
s.stop_name,
s.stop_lat,
s.stop_lon,
COUNT(DISTINCT st.trip_id) as total_trips,
COUNT(DISTINCT st.route_id) as routes_served,
AVG(sf.daily_frequency) as avg_daily_visits
FROM silver.silver_stops s
JOIN silver.silver_stop_times st ON s.stop_id = st.stop_id
JOIN gold.gold_stop_frequency sf ON s.stop_id = sf.stop_id
GROUP BY s.stop_name, s.stop_lat, s.stop_lon
ORDER BY total_trips DESC
LIMIT 20;-- Compare weekday vs weekend service patterns
SELECT
sp.service_id,
sp.service_type,
COUNT(DISTINCT sp.route_id) as routes_count,
SUM(sp.total_trips) as total_trips,
AVG(sp.avg_trip_duration_minutes) as avg_duration,
sp.days_of_week
FROM gold.gold_service_patterns sp
GROUP BY sp.service_id, sp.service_type, sp.days_of_week
ORDER BY total_trips DESC;-- Analyze geographic coverage by route
SELECT
r.route_id,
r.route_short_name,
COUNT(DISTINCT s.stop_id) as total_stops,
ROUND(MAX(s.stop_lat) - MIN(s.stop_lat), 4) as lat_span,
ROUND(MAX(s.stop_lon) - MIN(s.stop_lon), 4) as lon_span,
COUNT(DISTINCT sh.shape_id) as shape_variations
FROM silver.silver_routes r
JOIN silver.silver_trips t ON r.route_id = t.route_id
JOIN silver.silver_stop_times st ON t.trip_id = st.trip_id
JOIN silver.silver_stops s ON st.stop_id = s.stop_id
LEFT JOIN silver.silver_shapes sh ON t.shape_id = sh.shape_id
GROUP BY r.route_id, r.route_short_name
ORDER BY total_stops DESC;-- Analyze trip duration patterns by route and direction
SELECT
ts.route_id,
ts.direction_id,
ts.total_trips,
ROUND(ts.avg_duration_minutes, 2) as avg_duration,
ROUND(ts.min_duration_minutes, 2) as min_duration,
ROUND(ts.max_duration_minutes, 2) as max_duration,
ROUND(ts.std_duration_minutes, 2) as duration_variance
FROM gold.gold_trip_statistics ts
WHERE ts.total_trips > 10
ORDER BY ts.route_id, ts.direction_id;-- Monitor data quality metrics across all pipelines
SELECT
DATE(update_time) as date,
flow_name,
COUNT(DISTINCT dataset_name) as tables_processed,
SUM(num_output_rows) as total_rows,
SUM(CASE WHEN expectation_failed = true THEN 1 ELSE 0 END) as failed_expectations,
ROUND(AVG(duration_seconds), 2) as avg_processing_time
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND update_time >= CURRENT_DATE - INTERVAL 7 DAYS
GROUP BY DATE(update_time), flow_name
ORDER BY date DESC, flow_name;-- Identify routes with potential service issues (low frequency)
SELECT
hsl.route_id,
r.route_short_name,
hsl.hour_of_day,
hsl.avg_headway_minutes,
hsl.total_trips,
CASE
WHEN hsl.avg_headway_minutes > 60 THEN 'Low Frequency'
WHEN hsl.avg_headway_minutes > 30 THEN 'Medium Frequency'
ELSE 'High Frequency'
END as service_level
FROM gold.gold_hourly_service_levels hsl
JOIN silver.silver_routes r ON hsl.route_id = r.route_id
WHERE hsl.hour_of_day BETWEEN 7 AND 20 -- Peak hours
AND hsl.avg_headway_minutes > 30
ORDER BY hsl.avg_headway_minutes DESC;The project supports multiple deployment targets:
- dev: Development environment (default)
- prod: Production environment
# Deploy to development
databricks bundle deploy -t dev
# Deploy to production
databricks bundle deploy -t prodThe project includes three automated workflows:
-
Validate (
.github/workflows/validate.yml)- Runs on pull requests
- Validates bundle configuration
- Checks Python syntax
-
Deploy (
.github/workflows/deploy.yml)- Runs on merge to main
- Deploys to production
- Runs smoke tests
-
Integration Tests (
.github/workflows/integration-tests.yml)- Scheduled daily
- Validates end-to-end pipeline
- Checks data quality metrics
-
Always validate before deploying
databricks bundle validate -t prod
-
Use separate catalogs for dev/prod
targets: dev: variables: catalog_name: transit_analytics_dev prod: variables: catalog_name: transit_analytics_prod
-
Test in dev before promoting to prod
- Deploy to dev
- Run validation queries
- Check data quality metrics
- Review pipeline logs
- Then deploy to prod
-
Monitor deployments
- Check GitHub Actions logs
- Verify pipeline creation in workspace
- Validate table creation in Unity Catalog
This project is designed to stay within the $40 Databricks trial credit limit:
| Resource | Configuration | Estimated Cost/Day |
|---|---|---|
| DLT Pipelines (Serverless) | 2 workers, 15min runs | $8-12 |
| Interactive Cluster | r5.large, single-node | $2-4 (15min auto-term) |
| Job Clusters | Ephemeral, SPOT instances | $3-5 |
| Storage (S3) | Delta tables + checkpoints | $0.50 |
| Unity Catalog | Metadata storage | Included |
| Total | ~$15-20/day |
-
Use Serverless for DLT
- No cluster management overhead
- Pay only for actual compute time
- Automatic scaling
-
Enable Auto-Termination
clusters: - label: "default" autotermination_minutes: 15
-
Use SPOT Instances
aws_attributes: availability: "SPOT" spot_bid_price_percent: 100
-
Optimize Pipeline Schedules
- Run pipelines during off-peak hours
- Use incremental processing
- Batch small updates
-
Monitor DBU Consumption
-- Query to track DBU usage SELECT date_trunc('day', usage_date) as day, SUM(usage_quantity) as total_dbus, SUM(usage_quantity * list_price) as estimated_cost FROM system.billing.usage WHERE sku_name LIKE '%DLT%' GROUP BY day ORDER BY day DESC;