An open-source tool for streaming Connected TV (CTV) data analysis, insights, and automated optimizations using temporal knowledge graphs. Built with integration support for Prebid Server Video and Google Ad Manager 360.
This project applies the temporal knowledge graph concepts from the paper "Zep: A Temporal Knowledge Graph Architecture for Agent Memory" to CTV advertising optimization:
- Temporal Knowledge Graph: Uses Neo4j with Graphiti-inspired architecture to capture CTV events over time
- Hierarchical Memory Structure: Implements episode, semantic entity, and community subgraphs for CTV data
- Bi-Temporal Modeling: Tracks both event occurrence time and system recording time
- Efficient Retrieval: Combines semantic similarity, keyword search, and graph traversal for insights
- Real-time Event Processing: Stream processing of CTV bid requests, impressions, viewability, and completion events
- Temporal Relationship Mapping: Track relationships between devices, campaigns, creatives, and content over time
- Community Detection: Automatically discover clusters of related entities for audience insights
- Performance Trend Analysis: Historical performance analysis with temporal context
- Rule-based Optimization: Configurable rules for bid adjustments, creative pausing, and frequency capping
- Machine Learning Insights: Predictive models for viewability and completion rate optimization
- Real-time Alerts: Automated alerts for performance anomalies and optimization opportunities
- Prebid Server Video: Direct integration for bid price adjustments and placement optimization
- Google Ad Manager 360: Campaign management and reporting integration
- Streaming Infrastructure: Kafka-based event streaming with Redis caching
- Docker and Docker Compose
- 8GB+ RAM recommended
- Google Ad Manager 360 credentials (optional)
- Prebid Server access (optional)
- Clone the repository
git clone https://github.com/your-org/ctv-temporal-analytics.git
cd ctv-temporal-analytics
- Setup environment
cp .env.example .env
# Edit .env with your configuration
- Start the infrastructure
make setup-dev
- Run the application
make run
- Analytics Dashboard: http://localhost:3000 (Grafana)
- Kafka UI: http://localhost:8080
- Neo4j Browser: http://localhost:7474
- API Documentation: http://localhost:8000/docs
βββββββββββββββββββ ββββββββββββββββββββ βββββββββββββββββββ
β CTV Events βββββΆβ Kafka Topics βββββΆβ Analytics β
β β β β β Engine β
β β’ Bid Requests β β β’ ctv_impressionsβ β β
β β’ Impressions β β β’ ctv_viewabilityβ β β’ Event β
β β’ Viewability β β β’ ctv_completionsβ β Processing β
β β’ Completions β β β’ ctv_clicks β β β’ Entity β
β β’ Clicks β β β β Extraction β
βββββββββββββββββββ ββββββββββββββββββββ β β’ Optimization β
βββββββββββββββββββ
β
βΌ
βββββββββββββββββββ ββββββββββββββββββββ βββββββββββββββββββ
β Optimization ββββββ Temporal KnowledgeβββββΆβ Dashboards β
β Actions β β Graph β β & Reports β
β β β β β β
β β’ Bid Adjusts β β Episode Subgraph β β β’ Performance β
β β’ Creative Pauseβ β Entity Subgraph β β Analytics β
β β’ Frequency Cap β β Community Graph β β β’ Trend Analysisβ
βββββββββββββββββββ ββββββββββββββββββββ β β’ Optimization β
β Insights β
βββββββββββββββββββ
database:
neo4j:
uri: bolt://localhost:7687
user: neo4j
password: your_password
streaming:
kafka:
servers: localhost:9092
consumer_group: ctv-analytics-group
integrations:
gam:
credentials_path: /app/credentials/gam-credentials.json
network_code: your_network_code
prebid:
server_url: http://localhost:8080
optimization:
rules:
enabled: true
evaluation_interval: 60
Add custom optimization rules:
from src.optimization import OptimizationRule
from datetime import timedelta
# Low viewability optimization
rule = OptimizationRule(
rule_id="low_viewability_optimization",
condition="avg_viewability < 0.6",
action="adjust_bid_price",
confidence_threshold=0.6,
temporal_window=timedelta(hours=4)
)
engine.optimization_engine.add_optimization_rule(rule)
from datetime import timedelta
import asyncio
# Get campaign insights
insights = await engine.get_campaign_insights(
campaign_id="campaign_123",
time_window=timedelta(days=7)
)
print(f"Campaign CTR: {insights['metrics']['ctr']:.3f}")
print(f"Avg Viewability: {insights['metrics']['avg_viewability']:.2f}")
# Query device behavior over time
temporal_data = await engine.graph_engine.temporal_query(
entity_id="device_456",
time_window=timedelta(hours=24)
)
# Analyze viewing patterns
viewing_events = [
event for event in temporal_data
if event['event_type'] == 'impression'
]
# Send custom events to the analytics engine
event = CTVEvent(
event_id="custom_001",
event_type=EventType.IMPRESSION,
timestamp=datetime.now(),
device_id="device_789",
campaign_id="campaign_456",
creative_id="creative_123",
placement_id="placement_789",
viewability_score=0.85
)
await engine.data_streamer.process_event(event)
- Episode: Raw event data with temporal context
- Device: CTV devices with viewing behavior
- Campaign: Advertising campaigns with performance metrics
- Creative: Ad creatives with engagement data
- Content: Video content with context metadata
- Placement: Ad placements with performance history
- INVOLVES: Links episodes to entities with temporal context
- TARGETS: Campaign targeting relationships
- DISPLAYS: Creative display relationships
- VIEWED_ON: Content viewing relationships
- timestamp: Event occurrence time
- recorded_at: System recording time (bi-temporal)
- first_seen: Entity first appearance
- last_updated: Entity last modification
Predicts video ad viewability based on:
- Device characteristics
- Content context
- Historical performance
- Temporal patterns
Optimizes for video completion using:
- Creative performance history
- Audience behavior patterns
- Content genre correlations
- Time-of-day effects
Dynamic bid pricing based on:
- Real-time performance metrics
- Historical conversion data
- Competitive landscape
- Temporal performance patterns
make run-dev
kubectl apply -f k8s/
# Scale analytics pods
kubectl scale deployment ctv-analytics-engine --replicas=5
# Scale Kafka partitions
kafka-topics --alter --topic ctv_impressions --partitions 12
- Event processing rates
- Optimization success rates
- System performance metrics
- Business KPIs (CTR, viewability, completion rates)
- High error rates
- Processing delays
- Optimization anomalies
- System resource issues
# Application health
curl http://localhost:8000/health
# Database connectivity
make neo4j-shell
# Streaming status
make kafka-topics
make test-local
make test
# Generate test data at scale
docker-compose up data-generator
# Monitor performance
make logs
ctv-temporal-analytics/
βββ src/
β βββ main.py # Application entry point
β βββ temporal_graph.py # Temporal knowledge graph engine
β βββ streaming.py # Event streaming components
β βββ optimization.py # Optimization engine
β βββ integrations/ # External service integrations
β βββ analytics/ # Analytics and reporting
βββ config/ # Configuration files
βββ scripts/ # Setup and utility scripts
βββ tests/ # Test suite
βββ k8s/ # Kubernetes manifests
βββ docker-compose.yml # Development environment
- Define optimization rule:
rule = OptimizationRule(
rule_id="custom_optimization",
condition="custom_metric < threshold",
action="custom_action",
confidence_threshold=0.8,
temporal_window=timedelta(hours=2)
)
- Implement action handler:
async def handle_custom_action(self, event: CTVEvent):
# Custom optimization logic
pass
- Register with engine:
engine.optimization_engine.add_optimization_rule(rule)
GET /api/campaigns/{id}/insights
- Get campaign insightsGET /api/devices/{id}/behavior
- Get device behavior analysisGET /api/performance/trends
- Get performance trends
POST /api/optimization/rules
- Add optimization ruleGET /api/optimization/history
- Get optimization historyPUT /api/optimization/rules/{id}
- Update optimization rule
WebSocket /ws/events
- Real-time event streamWebSocket /ws/optimizations
- Real-time optimization updates
- Fork the