Kafka · Dataproc · Databricks · Snowflake · dbt · FastAPI · LangChain · Chroma · React/TypeScript · Power BI · GCP · Terraform · CI/CD
Project Status: Production-ready.
- Full CI/CD + IaC + DevSecOps pipeline implemented
- Dashboard live; Databricks–Snowflake connector and job orchestration finalized
- dbt semantic layer (SVI definitions, climate questions, metric dictionary, vulnerability tables)
- AI analytics layer with multi-mode RAG service (district risk overview, metric explanation, survey question analysis, district comparison)
- FastAPI microservices + status/health endpoints
- Chroma vector database with 5K+ embedded documents
- React/TypeScript UI for natural-language district equity insights
- Project Overview
- Purpose
- Architecture Overview
- Quick Start (Local Simulation)
- Databricks Integration
- Dataproc (Batch SVI Ingestion)
- Terraform Infrastructure-as-Code (IaC)
- RAG Service – Semantic Q&A for SVI + School Climate
- What RAG Adds
- Why This Matters
- CI (Continuous Integration)
- CD (Continuous Delivery — Manual Only)
- Security (DevSecOps)
- Roadmap
- License
- Author
This project is a modern, end-to-end data platform that unifies streaming, batch, semantic modeling, and AI-assisted analytics. It demonstrates how Kafka, Databricks, dbt, and Snowflake integrate in a Medallion Architecture (Bronze → Silver → Gold) to power equity-focused insights across NYC School Climate and Social Vulnerability Index (SVI) data.
On top of the ELT pipeline, the project adds a lightweight Retrieval-Augmented Generation (RAG) service using FastAPI, LangChain, Chroma, and React/TypeScript. This semantic layer enables district leaders to ask natural-language questions—“What are the top risk indicators for District 29?”—and receive grounded, contextual answers backed by dbt-validated Gold tables.
Built as part of the Data Engineering Modern Toolkit initiative, the system showcases real-world engineering practices across cloud infrastructure, CI/CD, DevSecOps, semantic modeling, and AI-driven data experiences.
Many organizations still rely on siloed spreadsheets, manual data merges, and ad-hoc reporting, making equity analysis slow, inconsistent, and difficult to scale. This project demonstrates how to modernize these workflows using a unified, cloud-native data and AI architecture:
- Streaming ingestion (Kafka → GCS Bronze)
- Batch ingestion (REST API → GCS Bronze via Dataproc)
- Distributed compute (Databricks Spark Structured Streaming + Dataproc PySpark)
- Automated SQL transformations & semantic modeling (dbt)
- Cloud warehousing (Snowflake Gold layer)
- Cross-platform orchestration (Databricks Jobs + GitHub Actions)
- Enterprise-ready monitoring & visualization (Power BI)
- AI-assisted analytics (FastAPI + LangChain + Chroma RAG service)
The result is a scalable, reproducible, and secure ELT platform with a semantic, AI-driven analytics layer, suitable for production-grade data engineering environments and real-world decision-making.
┌──────────────────────────┐ ┌──────────────────────────┐
│ Kafka │ │ REST API │
│ (Real-time Streaming) │ │ (Batch Ingestion) │
└──────────────┬───────────┘ └──────────────┬───────────┘
│ │
└──────────────┬─────────────────┘
▼
Bronze (Raw)
GCS Landing Zone
▼
┌───────────────────────────────┬──────────────────────────────┐
│ │ │
▼ ▼ │
┌────────────────────────┐ ┌──────────────────────────────┐ │
│ Databricks (Spark │ │ Dataproc (Batch) │ │
│ Structured Streaming + │ │ SVI ingestion + large-scale │ │
│ Batch ETL) │ │ PySpark transforms │ │
└────────────────────────┘ └──────────────────────────────┘ │
└───────────────────────────────┬──────────────────────────────┘
▼
Silver (Cleaned)
Delta / Parquet stored in GCS
▼
dbt → Snowflake (Gold)
(Semantic Models + Metrics Layer)
▼
┌──────────────────────────────┬────────────────────────────┬────────────────────┐
▼ ▼ ▼ ▼
Power BI Dashboard FastAPI RAG Service (LLM) APIs / Apps Other Consumers
(District Equity KPIs) (Semantic Q&A on Gold Layer)
flowchart TB
subgraph Sources
K["Kafka<br/>(Real-time Streaming)"]
R["REST API<br/>(Batch Ingestion)"]
end
K --> B["Bronze (Raw)<br/>GCS Landing Zone"]
R --> B
subgraph Compute
D["Databricks<br/>Spark Structured Streaming<br/>+ Batch ETL"]
P["Dataproc<br/>Batch SVI Ingestion<br/>+ PySpark Transforms"]
end
B --> D
B --> P
D --> S["Silver (Cleaned)<br/>Delta/Parquet on GCS"]
P --> S
S --> G["dbt -> Snowflake (Gold)<br/>Semantic Models + Metrics"]
subgraph Consumers
PB["Power BI Dashboard<br/>Equity KPIs"]
RAG["FastAPI RAG Service<br/>(LLM Semantic Q&A)"]
OC["Other Consumers / APIs<br/>Downstream Pipelines"]
end
G --> PB
G --> RAG
G --> OC
- Bronze – Unprocessed, schema-flexible raw data
- Silver – Cleaned, normalized, typed Delta/Parquet
- Gold – dbt-modeled analytical tables powering dashboards
A detailed architecture diagram is found in /diagrams/.
- Languages – Python, SQL
- Streaming – Kafka (Confluent)
- Compute – Databricks (Spark Structured Streaming)
- Storage – GCS (Bronze/Silver)
- Warehouse – Snowflake
- Transformations – dbt
- Orchestration – Databricks Jobs, GitHub Actions
- API / Services – FastAPI (RAG microservice)
- AI / RAG – LangChain, OpenAI API (LLM & embeddings), Chroma (vector DB)
- Frontend – React, TypeScript, Vite
- Visualization – Power BI
- DevOps – Terraform (IaC), Makefile, pre-commit, detect-secrets
rt-sch-cli-equity-pipeline/
│
├── README.md
├── SECURITY.md
├── .gitignore
├── .env.example
├── Makefile
├── pyproject.toml
├── pre-commit-config.yaml
├── LICENSE
│
├── infra/ # IaC for GCP, Snowflake, Databricks
│ └── terraform/
│ ├── main.tf
│ ├── providers.tf
│ ├── variables.tf
│ ├── gcs.tf
│ ├── snowflake.tf
│ ├── databricks.tf
│ ├── dataproc.tf
│ ├── gcp_snowflake_integration.tf
│ ├── terraform.tfvars.example
│ └── terraform.dev.tfvars (ignored)
│
├── dbt/ # dbt semantic modeling (Bronze → Silver → Gold)
│ ├── models/
│ │ ├── bronze/
│ │ ├── silver/
│ │ └── gold/
│ ├── seeds/
│ ├── tests/
│ └── macros/
│
├── dataproc/ # Batch SVI ingestion jobs (PySpark)
│ ├── jobs/
│ │ └── load_svi_to_snowflake.py
│ ├── kafka_streaming.py
│ └── config/
│
├── rag_service/ # FastAPI RAG microservice
│ ├── main.py
│ ├── api.py
│ ├── prompts.py
│ ├── langchain_chain.py
│ ├── embeddings.py
│ ├── ingest.py
│ ├── vector_store.py
│ └── config.py
│
├── rag-ui/ # React/TypeScript RAG frontend
│ ├── public/
│ ├── src/
│ ├── vite.config.ts
│ └── .env.local (ignored)
│
├── kafka/ # Local Kafka producer for mock streaming
│ ├── kafka_producer.py
│ └── config/
│
├── rt_databricks/ # Mirror of Databricks repo - Structured streaming + batch transforms
│
├── scripts/ # Utility + integration scripts
│ ├── gcp/
│ ├── snowflake/
│ └── utilities/
│
├── powerbi/
├── diagrams/
└── screenshots/
*No real credentials are committed. .env and .tfvars are gitignored.
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txtcp infra/terraform/terraform.tfvars.example infra/terraform/terraform.dev.tfvars
cp config/secrets.example.yml config/secrets.local.ymlpython scripts/gcp/fetch_svi_to_gcs.pypython scripts/streaming/mock_stream.pyDatabricks powers the real-time streaming and large-scale batch side:
- Kafka → Bronze streaming pipelines via Spark Structured Streaming
- Bronze → Silver cleaning using notebook-driven transformations
- Silver → Snowflake Gold sync via Snowflake connector
- Databricks Secret Scopes for secure GCP + Snowflake integration
- Configurable job cluster defined via Terraform
- Orchestration via Databricks Jobs (auto-paused)
Dataproc handles the large-scale batch ingestion of CDC Social Vulnerability Index (SVI) data:
- PySpark job to ingest raw SVI files → GCS Bronze
- Transform + normalize tract-level features → GCS Silver
- Load into Snowflake external and internal tables (SVI semantic layer)
- Provisioned and wired via Terraform (optional/feature-flagged)
This keeps batch SVI processing decoupled from the Databricks streaming workloads while still landing in the same medallion architecture.
Terraform (in infra/terraform/) provisions the entire data platform:
- GCS Bronze/Silver/Gold buckets
- Snowflake GCS service account
- IAM bindings for integration
- Optional Dataproc cluster (feature-flagged)
- Warehouse:
PIPELINE_WH - Database:
SCHOOL_CLIMATE - Schemas:
BRONZE,SILVER,GOLD,DBT_DYLAN - Roles:
PIPELINE_ROLE,BI_ROLE - Grants: USAGE / ALL PRIVILEGES / SELECT via classic provider
- Storage Integration + External Stage for Bronze
- Job definition for Bronze → Silver transformations
- Job cluster (Spark runtime, node specs)
-
terraform.dev.tfvars(ignored) -
terraform.tfvars.example -
Flags:
enable_databricks_jobenable_dataproc_cluster
cd infra/terraform
set -a && source ../../.env && set +a
terraform fmt
terraform init -backend=false
terraform validate
terraform plan -var-file="terraform.dev.tfvars"This project includes a lightweight Retrieval-Augmented Generation (RAG) service that sits on top of the GOLD semantic layer and SVI tract data to provide leadership-friendly natural language answers to equity questions.
It supports multiple modes:
district_risk_overview– high-level risk + equity overview for a districtexplain_metric– deep dive on what a metric means and why it mattersexplain_question– deep dive on a climate survey questioncompare_districts– equity-focused comparison of two districts (API-only for now)
The RAG service reads from:
-
GOLD.DIM_CLIMATE_QUESTIONSemantic question dimension (group, domain, short/full text, response scale). -
GOLD.DIM_CLIMATE_METRIC_DEFINITIONMetric definitions (label, group, definition, formula, source table, grain). -
GOLD.DIM_SVI_DEFINITIONSVI semantic layer (overall SVI, theme names/descriptions, bucket logic). -
BRONZE_GOLD.GOLD_CLIMATE_VULNERABILITYTract-level SVI scores:SVI_OVERALL_SCORE,SVI_OVERALL_BUCKET,RPL_THEME1–4. -
GOLD.SCHOOL_CLIMATE_SNAPSHOT(optional) District-level climate metrics:PARENT_RESPONSE_RATETEACHER_RESPONSE_RATESTUDENT_RESPONSE_RATEDISTRICT_NUMBER,DBN
If SCHOOL_CLIMATE_SNAPSHOT is missing or inaccessible, the service degrades gracefully and responds without numeric metrics.
The semantic layer is managed via dbt seeds:
dbt/seeds/svi/dim_svi_definition.csv→GOLD.DIM_SVI_DEFINITIONdbt/seeds/climate/dim_climate_metric_definition.csv→GOLD.DIM_CLIMATE_METRIC_DEFINITIONdbt/seeds/climate/dim_climate_question.csv→GOLD.DIM_CLIMATE_QUESTION
To (re)seed:
cd dbt
dbt seed --select dim_svi_definition dim_climate_metric_definition dim_climate_question \
--profiles-dir ../.dbtGOLD.DIM_CLIMATE_QUESTION
GOLD.DIM_CLIMATE_METRIC_DEFINITION
GOLD.DIM_SVI_DEFINITION
BRONZE_GOLD.GOLD_CLIMATE_VULNERABILITY
│
▼
rag_service.ingest
(builds text corpus + embeddings)
│
▼
Chroma vector store (data/chroma_index)
│
▼
LangChain retriever + LLM
│
▼
FastAPI (/api/rag/query, /api/status) → React UI (rag-ui)
The RAG service supports both offline/dev mode and real model mode via environment flags in .env:
# Embeddings
USE_FAKE_EMBEDDINGS=true # or false for real embeddings via OpenAI
# LLM
USE_FAKE_LLM=true # or false for real LLM completions-
Fake embeddings (
USE_FAKE_EMBEDDINGS=true)- Uses a deterministic local
FakeEmbeddingsclass - No OpenAI embedding calls
- Good for testing ingestion, retrieval, and UI wiring
- Uses a deterministic local
-
Fake LLM (
USE_FAKE_LLM=true)- Uses
FakeChatLLM, which returns canned but structured text - No OpenAI completion calls
- Perfect for offline dev / demos without any API usage
- Uses
When you’re ready to use real models:
-
Set billing limits in OpenAI (e.g., soft: $3, hard: $10).
-
Flip the flags in
.env:USE_FAKE_EMBEDDINGS=false USE_FAKE_LLM=false
-
Rebuild embeddings once:
python -m rag_service.ingest
-
Restart the backend:
uvicorn rag_service.main:app --host 0.0.0.0 --port 8000 --reload
From project root:
# Load env vars (including USE_FAKE_* and SNOWFLAKE_*):
set -a
source .env
set +a
# Start the API
uvicorn rag_service.main:app --host 0.0.0.0 --port 8000 --reloadKey endpoints:
-
GET /api/statusReturns backend mode:{ "use_fake_embeddings": true, "use_fake_llm": true, "embedding_model": "text-embedding-3-large", "llm_model": "gpt-4.1-mini" } -
POST /api/rag/queryBody:{ "question": "Provide a district-level risk and equity overview for District 29 using SVI and climate data.", "district_id": 29, "other_district_id": 30, // optional, for compare_districts "year": 2024, "mode": "district_risk_overview" // one of: "district_risk_overview", "explain_metric", "explain_question", "compare_districts" }Response:
{ "answer": "High-level narrative...", "high_level_bullets": ["Theme: ... – Metric: ... – Explanation: ...", "..."], "metrics": [ { "metric_name": "Average Parent Response Rate", "value": 0.42, "year": null, "source": "SCHOOL_CLIMATE.GOLD.SCHOOL_CLIMATE_SNAPSHOT" } ], "citations": [ { "id": "svi_tract::36081000100", "source_type": "svi_tract", "source_id": "36081000100" } ] }
The frontend lives in rag-ui/ and talks to the RAG backend via Vite env vars:
-
rag-ui/.env.local:VITE_API_BASE_URL=http://localhost:8000 VITE_FAKE_MODE=true # purely for UI labeling; backend is authoritative
To run the UI:
cd rag-ui
npm install # first time
npm run devThen visit: http://localhost:5173
UI features:
-
Mode selector:
District risk overviewExplain a metricExplain a question
-
District + year filters
-
“Sample question” buttons per mode
-
“Ask” + “Clear” controls
-
Status badges:
- BACKEND: REAL/FAKE based on
/api/status - FRONTEND FAKE FLAG: based on
VITE_FAKE_MODE
- BACKEND: REAL/FAKE based on
- Semantic retrieval over climate + SVI domain knowledge
- District-specific metrics pulled live from Snowflake
- Human-readable equity explanations
- Risk indicators
- Metric interpretation
- Survey question analysis
- Optional district comparisons
- Multiple modes:
- district_risk_overview
- explain_metric
- explain_question
- compare_districts
RAG transforms the pipeline from a traditional ETL/ELT system into a decision-support tool:
- Leadership can ask complex equity questions in plain English
- Responses remain grounded in real district data
- dbt ensures all definitions and metrics are consistent and validated
- The semantic index makes unstructured domain context instantly searchable
Located at .github/workflows/ci.yml.
Runs on every push + PR:
- whitespace cleanup
- EOF fixes
- YAML validation
- detect-secrets scan
blackformattingruff&flake8linting
pytest(unit + integration)
dbt depsdbt compile(using a dummy CI profile—no Snowflake calls made)
terraform fmt -checkterraform init -backend=falseterraform validate
All CI checks run without secrets.
Located at .github/workflows/cd.yml.
A manual workflow_dispatch that supports:
- Running dbt against Snowflake
- Running dbt against Databricks
- Optional
terraform apply - Per-environment (
devorprod) - Credentials loaded from GitHub Secrets (never in Git)
This ensures deployments are explicit, safe, and auditable.
See SECURITY.md for full policy.
Key features:
- No credentials committed —
.env,*.tfvars, and service accounts are gitignored detect-secretsguards the repo from accidental exposure- Terraform providers pinned to prevent supply-chain drift
- CI/CD workflows segregated (CI = validate only, CD = manual apply)
- Principle-of-least-privilege Snowflake & GCP roles
[] Add detailed table-level lineage diagram (Bronze → Silver → Gold, SVI + Climate models) [] Add automated integration test suite (end-to-end tests hitting dev Snowflake / GCS) [] Add Databricks Jobs API orchestration (trigger + monitor jobs via REST/SDK) [] Add Docker local environment for reproducible dev + CI [] Add Power BI refresh automation (triggered after successful ELT runs) [] Integrate SVI dashboard and merge SVI data with School Climate data for cross-referenced equity analysis
This project is released under the MIT License. You are free to use, modify, and distribute this project for personal or commercial purposes. See the LICENSE file for full details.
Developed by Dylan Picart at Partnership With Children
Data Engineer · Analytics Engineer · AI/ML Practitioner
- 🌐 Portfolio: https://www.dylanpicart.com
- 💼 LinkedIn: https://linkedin.com/in/dylankpicart