Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 0 additions & 31 deletions pipeline/TrafficImage.py

This file was deleted.

137 changes: 86 additions & 51 deletions pipeline/api.py
Original file line number Diff line number Diff line change
@@ -1,51 +1,86 @@
import requests


class APIClient:
def __init__(self, url="https://api.data.gov.sg/v1/transport/traffic-images"):
self.url = url
self.timestamp = None
self.api_status = "Unverified"
self.camera_id_array = []

# Get API response
response = requests.get(self.url)
response_json = response.json()
self.metadata = response_json

# Get and set API status
self.api_status = self.metadata["api_info"]["status"]

# Get and set timestamp
self.timestamp = self.metadata["items"][0]["timestamp"]

print(f"The API status is: {self.api_status}")
print(f"The API was called at: {self.timestamp}")

for item in self.metadata["items"]:
for camera in item["cameras"]:
self.camera_id_array.append(camera["camera_id"])

def extract_image(self, camera_id):
# Loop through the items and cameras to find the correct camera_id
for item in self.metadata["items"]:
for camera in item["cameras"]:
if camera["camera_id"] == str(camera_id):
return camera[
"image"
] # Return the image URL if the camera ID matches
# If camera ID is not found
raise RuntimeError(f"Camera ID {camera_id} not found.")

def extract_latlon(self, camera_id):
for item in self.metadata["items"]:
for camera in item["cameras"]:
if camera["camera_id"] == str(camera_id):
longitude = camera["location"]["longitude"]
latitude = camera["location"]["latitude"]
return (
longitude,
latitude,
) # Return both longitude and latitude as a tuple
# If camera ID is not found
raise RuntimeError(f"Camera ID {camera_id} not found.")
#
# Flowmotion
# Pipeline
# Traffic Images API Client
#

import asyncio
from pathlib import Path

import httpx

from data import Camera, Location, TrafficImage


class TrafficImageAPI:
"""Data.gov.sg Traffic Images API Client."""

API_URL = "https://api.data.gov.sg/v1/transport/traffic-images"

def __init__(self, api_url: str = API_URL):
self.api_url = api_url
self._sync = httpx.Client()
self._async = httpx.AsyncClient()

def get_cameras(self) -> list[Camera]:
"""Get Traffic Camera metadata from traffic images API endpoint.

Returns:
Parsed traffic camera metadata.
"""
# fetch traffic-images api endpoint
response = self._sync.get(self.API_URL)
response.raise_for_status()
meta = response.json()
return parse_cameras(meta)

def get_traffic_images(
self, cameras: list[Camera], image_dir: Path
) -> list[TrafficImage]:
"""Save Traffic Camera images from given Cameras into image_dir.
Creates image_dir if it does not already exist.

Args:
cameras:
List of traffic cameras to retrieve traffic images from.
image_dir:
Path the image directory to write retrieved images.
Returns:
List of retrieve Traffic Images.
"""
# ensure image directory exists
image_dir.mkdir(parents=True, exist_ok=True)

async def fetch(camera: Camera) -> TrafficImage:
response = await self._async.get(camera.image_url)
# write image bytes to image file on disk
image_path = image_dir / f"{camera.id}.jpg"
with open(image_path, "wb") as f:
for chunk in response.iter_bytes():
f.write(chunk)

return TrafficImage.from_camera(camera, image_path)

async def fetch_all() -> list[TrafficImage]:
# perform all image fetches asynchronously
return await asyncio.gather(*[fetch(camera) for camera in cameras])

return asyncio.run(fetch_all())


def parse_cameras(meta: dict) -> list[Camera]:
meta = meta["items"][0]
retrieved_on = meta["timestamp"]
return [
Camera(
id=c["camera_id"],
retrieved_on=retrieved_on,
captured_on=c["timestamp"],
image_url=c["image"],
location=Location(
longitude=c["location"]["longitude"],
latitude=c["location"]["latitude"],
),
)
for c in meta["cameras"]
]
37 changes: 37 additions & 0 deletions pipeline/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#
# Flowmotion
# Test Fixtures
#

from datetime import datetime

import pytest

from data import Camera, Location, TrafficImage
from model import Model


@pytest.fixture
def camera() -> Camera:
return Camera(
id="1001",
image_url="https://images.data.gov.sg/api/traffic/1001.jpg",
captured_on=datetime(2024, 9, 27, 8, 30, 0),
retrieved_on=datetime(2024, 9, 27, 8, 31, 0),
location=Location(longitude=103.851959, latitude=1.290270),
)


# Fixture for TrafficImage instance
@pytest.fixture
def traffic_image():
return TrafficImage(
image="some_image_url",
processed=True,
congestion_rating=0.5,
camera_id="camera_123",
longitude=103.851959,
latitude=1.290270,
processed_on=datetime(2024, 9, 27, 8, 30, 0),
model_id=Model.MODEL_ID,
)
120 changes: 112 additions & 8 deletions pipeline/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@
#

import json
import math
from datetime import datetime
from pathlib import Path
from types import NotImplementedType
from typing import Optional, cast

from pydantic import BaseModel

from timetools import datetime_sgt


class Location(BaseModel):
"""Geolocation consisting of longitude and latitude."""
Expand All @@ -17,14 +23,6 @@ class Location(BaseModel):
latitude: float


class Rating(BaseModel):
"""Traffic Congestion rating performed by a model"""

rated_on: datetime
model_id: str
value: float


class Camera(BaseModel):
"""Traffic Camera capturing traffic images."""

Expand All @@ -35,13 +33,119 @@ class Camera(BaseModel):
location: Location


class TrafficImage:
"""Traffic Image to be rated for congestion.

Attributes:
image: URL that retrieves Traffic camera image.
processed: Whether this TrafficImage instance has been processed.
congestion_rating: 0-1 Congestion Rating
camera_id: ID of the camera that captured this image
longitude: Longitude of the camera that captured this image
latitude: Latitude of the camera that captured this image
processed_on: Timestamp when this TrafficImage instance has been processed.
model_id: Unique ID to identify the version of the model the performed the rating.
"""

def __init__(
self,
image,
camera_id,
longitude,
latitude,
processed=False,
congestion_rating=None,
processed_on: Optional[datetime] = None,
model_id: Optional[str] = None,
):
self.image = image
self.processed = processed
self.congestion_rating = congestion_rating
self.camera_id = camera_id
self.longitude = longitude
self.latitude = latitude
self.processed_on = processed_on
self.model_id = model_id

@classmethod
def from_camera(cls, camera: Camera, image_path: Path) -> "TrafficImage":
"""Create TrafficImage from camera & its image path.

Args:
camera:
Camera model to create TrafficImage from.
image_path:
Path to the retrieved captured image from the Camera.
Returns:
Constructed TrafficImage
"""
return cls(
image=str(image_path),
camera_id=camera.id,
longitude=camera.location.longitude,
latitude=camera.location.latitude,
)

def set_processed(self, congestion_rating: float, model_id: str):
self.processed = True
self.congestion_rating = congestion_rating
self.processed_on = datetime_sgt()
self.model_id = model_id


class Rating(BaseModel):
"""Traffic Congestion rating performed by a model"""

rated_on: datetime
model_id: str
value: float

@classmethod
def from_traffic_image(cls, image: TrafficImage) -> "Rating":
if (
image.processed_on is None
or image.congestion_rating is None
or image.model_id is None
):
__import__("pprint").pprint(image.__dict__)
raise ValueError(
"Invalid TrafficImage: Either 'processed_on' or 'congestion_rating' or 'model_id' is None."
)

return cls(
rated_on=image.processed_on,
value=image.congestion_rating,
model_id=image.model_id,
)

def equal(self, other: object) -> bool | NotImplementedType:
if not isinstance(other, Rating):
return NotImplemented
other = cast(Rating, other)
return (
self.model_id == other.model_id
and math.isclose(self.value, other.value)
and self.rated_on == other.rated_on
)


class Congestion(BaseModel):
"""Traffic Congestion data."""

camera: Camera
rating: Rating
updated_on: datetime

@classmethod
def from_traffic_image(
cls, image: TrafficImage, camera: Camera, updated_on: datetime
) -> "Congestion":
return cls(
camera=camera,
rating=Rating.from_traffic_image(image),
updated_on=updated_on,
)


def to_json_dict(model: BaseModel):
"""Convert given pydantic model into the its JSON dict representation"""
Expand Down
2 changes: 1 addition & 1 deletion pipeline/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def __init__(self) -> None:
Uses Google Application Default credentials with authenticate DB requests.
See https://firebase.google.com/docs/admin/setup#initialize-sdk.
"""
app = firebase_admin.initialize_app()
app = firebase_admin.initialize_app(options={"projectId": "flowmotion-4e268"})
self._db = firestore.client(app)

def insert(self, table: str, data: BaseModel) -> str:
Expand Down
10 changes: 7 additions & 3 deletions pipeline/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
from google.cloud import storage
from ultralytics import YOLO

from TrafficImage import TrafficImage
from data import TrafficImage


class Model:
# uniquely identifies versions of the model.
# NOTE: update this if the model changed in any way.
MODEL_ID = "yolo_detect_segment_v1"

def __init__(
self,
project_id: str = "flowmotion-4e268",
Expand Down Expand Up @@ -58,7 +62,7 @@ def predict(self, images: list[TrafficImage]):
car_results = self.car_model(img)
if len(car_results) == 0 or len(car_results[0].boxes) == 0:
print("No cars detected")
image.set_processed(0.0)
image.set_processed(0.0, Model.MODEL_ID)
continue

# Access car bounding boxes (x, y, w, h)
Expand Down Expand Up @@ -95,4 +99,4 @@ def predict(self, images: list[TrafficImage]):
print(f"Congestion rating: {congestion_rating}")

# Set the predicted congestion rating
image.set_processed(congestion_rating)
image.set_processed(congestion_rating, Model.MODEL_ID)
Loading