This folder contains the Python client library for interacting with the Rio controller API. It provides both REST API and WebSocket streaming clients, along with example Jupyter notebooks.
Two client implementations are available:
-
RioClient— Direct HTTP client usingrequests(simple, fast)- Uses legacy routes (
/api/control/*) - Synchronous responses (no polling)
- Fast (10-50ms per action)
- Uses legacy routes (
-
RioThingClient— WoT-compliant client using LabThings ThingClient (standard, auto-generated)- Uses WoT routes (
/flow/,/heater/, etc.) - Auto-generated from Thing Descriptions
- Async action polling (50-150ms per action, but WoT-compliant)
- Uses WoT routes (
Both clients provide the same interface — you can switch between them easily. Use RioThingClient for WoT compliance, or RioClient for maximum speed.
api_client.py: Python client library (RioClientfor REST,RioStreamClientfor WebSocket)notebooks/: Example Jupyter notebookstutorial.ipynb: Step-by-step learning notebookinteractive_control.ipynb: Interactive UI with ipywidgets
On your computer (for accessing Pi API):
The client library requires:
pip install requests websocket-clientFor Jupyter notebooks, also install:
pip install ipywidgets matplotlib pandas numpy jupyterNote: The client library code is in software/api/client/api_client.py. You can either:
- Import directly if
software/is in your Python path - Copy
api_client.pyto your project - Add
software/toPYTHONPATH
Connecting to Pi API from your computer:
import sys
# Add client library to path (adjust path to your rio-controller repository)
sys.path.insert(0, '/path/to/rio-controller/software')
from api.client import RioClient, RioStreamClient
# REST API client (replace with your Pi's IP or hostname)
PI_ADDRESS = "raspberrypi.local" # or "192.168.1.100"
API_URL = f"http://{PI_ADDRESS}:8000"
client = RioClient(base_url=API_URL)
# Check health
health = client.health()
print(f"Status: {health['status']}")
# Get flow state
state = client.get_flow_state()
print(f"Flow channel 0: {state['flow_actuals_ul_hr'][0]} ul/hr")
# Set flow rate
client.set_flow(0, 100.0) # Set channel 0 to 100 ul/hr
# WebSocket streaming
stream = RioStreamClient(base_url=API_URL)
stream.subscribe(["flow"], channels={"flow": [0, 1]})
for msg in stream.iter_messages(timeout=10.0):
print(f"{msg['topic']} channel {msg['channel']}: {msg['value']}")ROI recording (WoT / RioThingClient):
from api.client import RioThingClient
thing_client = RioThingClient(base_url=API_URL)
result = thing_client.record_roi_frames(frames=50)
print(result)For local development (API on same machine):
client = RioClient(base_url="http://localhost:8000")The clients support context managers for automatic cleanup:
with RioClient(base_url="http://localhost:8000") as client:
state = client.get_flow_state()
# Session automatically closed on exit
with RioStreamClient(base_url="http://localhost:8000") as stream:
stream.subscribe(["flow"])
for msg in stream.iter_messages(timeout=5.0):
print(msg)
# WebSocket automatically closed on exitThe client raises custom exceptions:
RioAPIError: Base exception for all API errorsRioConnectionError: Connection failures (network, timeout)RioHTTPError: HTTP errors (4xx, 5xx) with status code and response detailsRioWebSocketError: WebSocket connection failures
Example:
from api.client import RioClient, RioConnectionError, RioHTTPError
try:
client = RioClient(base_url="http://192.168.1.100:8000")
client.set_flow(0, 100.0)
except RioConnectionError as e:
print(f"Connection failed: {e}")
except RioHTTPError as e:
print(f"HTTP {e.status_code}: {e.response}")The client automatically retries transient failures (timeouts, connection errors) up to 3 times by default. Configure with:
client = RioClient(base_url="...", max_retries=5)Note: All methods use legacy /api/control/* routes for backward compatibility. WoT routes are available at /flow/, /heater/, etc. (see API docs).
System:
health()- Get API health statuscapabilities()- Get available modules
Channel Configuration:
get_channels()- Get channel metadataset_channels(config)- Update channel metadata
Flow/Pressure:
get_flow_state()- Get current stateset_flow(index, flow_ul_hr)- Set flow rateset_pressure(index, pressure_mbar)- Set pressureset_flow_mode(index, mode_ui)- Set control modeset_flow_pi_consts(index, p, i)- Set PI constants
Heater:
get_heater_state()- Get heater statesset_heater_temp(index, temp_c)- Set temperatureset_heater_pid(index, enabled)- Enable/disable PIDset_heater_stir(index, enabled)- Enable/disable stirrer
Camera:
get_camera_snapshot()- Get JPEG snapshotset_camera_resolution(...)- Set resolutionset_camera_roi(x, y, w, h)- Set ROIclear_camera_roi()- Clear ROI
Strobe:
set_strobe_enable(enabled)- Enable/disableset_strobe_hold(hold)- Enable/disable hold modeset_strobe_timing(period_ns, wait_ns)- Set timing
Droplet Detection:
droplet_start()- Start detectiondroplet_stop()- Stop detectiondroplet_status()- Get statusdroplet_histogram()- Get histogramdroplet_statistics()- Get statistics
Data Capture:
capture_start(topics, channels, path)- Start CSV capturecapture_stop()- Stop capturecapture_status()- Get capture status
For WoT-compliant access, you can use LabThings ThingClient:
from labthings_fastapi.client import ThingClient
# Connect to FlowThing
flow = ThingClient("http://192.168.1.100:8000/flow/")
# Get state (property)
state = flow.state
# Set flow (action)
flow.set_flow(index=0, flow_ul_hr=100.0)See LabThings documentation for more details on ThingClient usage.
The WebSocket client uses a thread-safe queue for message delivery:
stream = RioStreamClient(base_url="http://localhost:8000")
stream.subscribe(["flow", "pressure"], channels={"flow": [0, 1]})
# Messages are queued in background thread
for msg in stream.iter_messages(timeout=10.0):
print(f"{msg['topic']}: {msg['value']}")Messages are dictionaries with:
topic: "flow", "pressure", or "heater"channel: Channel index (0-3)timestamp: Unix timestamp (float)value: Sensor value (float)unit: Unit string (e.g., "ul_hr", "mbar", "celsius")
The client automatically connects when iter_messages() is called. Close explicitly:
stream.close() # Close WebSocket connectionOr use context manager:
with RioStreamClient() as stream:
# Connection automatically closed on exit
passA step-by-step learning notebook covering:
- System health and capabilities
- Channel configuration
- Flow/pressure control
- Heater control
- Camera snapshots
- WebSocket streaming
- Data visualization
- On-demand data capture
Best for: Learning the API, understanding concepts, exploring features.
An interactive UI with ipywidgets for practical control:
- Flow/pressure sliders with real-time updates
- Heater controls with PID/stirrer toggles
- Camera snapshot capture
- Strobe timing controls
- Emergency stop button
Best for: Daily use, quick adjustments, real-time monitoring.
Set the API base URL via environment variable:
export RIO_API_URL="http://192.168.1.100:8000"Or in Python:
import os
os.environ["RIO_API_URL"] = "http://192.168.1.100:8000"Or pass directly:
client = RioClient(base_url="http://192.168.1.100:8000")"Failed to connect":
- Check that the API server is running
- Verify the IP address and port
- Check firewall/network settings
- Try
client.health()to test connection
"No module named 'api.client'":
- Make sure you're running from the repo root or have
software/in your Python path - In Jupyter, the notebook should handle path setup automatically
- If not, add manually:
sys.path.insert(0, '/path/to/rio-controller/software')
No messages received:
- Check that you've called
subscribe()beforeiter_messages() - Verify the API server supports WebSocket (check
/api/streams/aggregate) - Check network connectivity
- Look for errors in the notebook output
See the Jupyter notebooks in notebooks/ for complete examples:
tutorial.ipynb: Comprehensive tutorial with explanationsinteractive_control.ipynb: Practical control interface
For complete API documentation, see:
- API server README:
../README.md - OpenAPI/Swagger docs:
http://<API_BASE_URL>/docs(when server is running)
This file was AI-generated and may contain errors. Please verify against the source code and runtime behavior.
- Date: 2025-01-XX
- Maintenance: If you change the client API, methods, or behavior, update this document.