Skip to content

Commit c34b191

Browse files
Add csv data loader example (#179)
### Changelog <!-- Write a one-sentence summary of the user-impacting change (API, UI/UX, performance, etc) that could appear in a changelog. Write "None" if there is no user-facing change --> None ### Docs <!-- Link to a Docs PR, tracking ticket in Linear, OR write "None" if no documentation changes are needed. --> None ### Description Adds another example data loader for loading from a CSV. It uses an index to keep track of where to start reading from. It logs each column as a different topic and uses a "timestamp_nanos" column for the log time.
1 parent 7a6620b commit c34b191

File tree

11 files changed

+368
-0
lines changed

11 files changed

+368
-0
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
dist
2+
node_modules
3+
rust/Cargo.lock
4+
rust/target
5+
package-lock.json
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
arrowParens: always
2+
printWidth: 100
3+
trailingComma: "all"
4+
tabWidth: 2
5+
semi: true

examples/csv-data-loader/README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# CSV Data Loader
2+
3+
This is a simple [Foxglove](http://foxglove.dev/) [extension](https://docs.foxglove.dev/docs/visualization/extensions) that loads a CSV file.
4+
The file must have a column called `timestamp_nanos` in order to be read.
5+
6+
## Building
7+
8+
Install rust with [rustup](https://www.rust-lang.org/tools/install), then install wasm32 support:
9+
10+
```
11+
rustup target add wasm32-unknown-unknown
12+
```
13+
14+
Then to build the rust code and generate the extension file:
15+
16+
```
17+
npm install
18+
npm run package
19+
```
20+
21+
These steps will produce a `.foxe` file you can install as an extension from the Foxglove settings page.

examples/csv-data-loader/config.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
module.exports = {
2+
webpack: (config) => {
3+
// Set up Webpack to inline .wasm imports as a base64 URL
4+
config.module.rules.push({
5+
test: /\.wasm$/i,
6+
type: "asset/inline",
7+
});
8+
return config;
9+
},
10+
};
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// @ts-check
2+
3+
const foxglove = require("@foxglove/eslint-plugin");
4+
const globals = require("globals");
5+
const tseslint = require("typescript-eslint");
6+
7+
module.exports = tseslint.config({
8+
files: ["src/**/*.ts", "src/**/*.tsx"],
9+
extends: [foxglove.configs.base, foxglove.configs.react, foxglove.configs.typescript],
10+
languageOptions: {
11+
globals: {
12+
...globals.es2020,
13+
...globals.browser,
14+
},
15+
parserOptions: {
16+
project: "tsconfig.json",
17+
tsconfigRootDir: __dirname,
18+
},
19+
},
20+
rules: {
21+
"react-hooks/exhaustive-deps": "error",
22+
},
23+
});
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
{
2+
"name": "csv-data-loader",
3+
"displayName": "CSV Data Loader",
4+
"description": "Data loader for loading CSV files. The file must have a field called 'timestamp_nanos' and be in ascending timestamp.",
5+
"publisher": "Foxglove",
6+
"version": "1.1.0",
7+
"license": "MIT",
8+
"main": "./dist/extension.js",
9+
"scripts": {
10+
"build": "npm run build:wasm && foxglove-extension build",
11+
"build:wasm": "cd rust && cargo build --release --target wasm32-unknown-unknown",
12+
"foxglove:prepublish": "npm run build:wasm && foxglove-extension build --mode production",
13+
"lint:ci": "eslint --report-unused-disable-directives .",
14+
"lint": "eslint --report-unused-disable-directives --fix .",
15+
"local-install": "foxglove-extension install",
16+
"package": "foxglove-extension package",
17+
"pretest": "foxglove-extension pretest"
18+
},
19+
"devDependencies": {
20+
"@foxglove/eslint-plugin": "2.0.0",
21+
"@foxglove/extension": "2.29.0",
22+
"@foxglove/schemas": "1.6.4",
23+
"@types/react": "18.3.12",
24+
"@types/react-dom": "18.3.1",
25+
"@types/wicg-file-system-access": "2023.10.6",
26+
"create-foxglove-extension": "1.0.4",
27+
"eslint": "9.15.0",
28+
"prettier": "3.3.3",
29+
"react": "18.3.1",
30+
"react-dom": "18.3.1",
31+
"typescript": "5.7.2"
32+
}
33+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
[package]
2+
name = "csv-foxglove-data-loader"
3+
version = "0.1.0"
4+
edition = "2024"
5+
6+
[lib]
7+
crate-type = ["cdylib"]
8+
9+
[dependencies]
10+
anyhow = "1.0"
11+
csv = "1.3.1"
12+
foxglove_data_loader = "0.1.0"
13+
serde_json = "1.0.142"
Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
use std::{
2+
collections::{BTreeMap, BTreeSet},
3+
io::{Cursor, Read},
4+
};
5+
6+
use foxglove_data_loader::{
7+
DataLoader, DataLoaderArgs, Initialization, Message, MessageIterator, MessageIteratorArgs,
8+
reader::{self},
9+
};
10+
11+
use anyhow::bail;
12+
use csv::StringRecord;
13+
use serde_json::json;
14+
15+
#[derive(Default)]
16+
struct CsvDataLoader {
17+
path: String,
18+
/// Index of timestamp to byte offset
19+
indexes: BTreeMap<u64, u64>,
20+
/// The index of the field containing timestamp
21+
log_time_index: usize,
22+
/// The keys from the first row of the CSV
23+
keys: Vec<String>,
24+
}
25+
26+
impl DataLoader for CsvDataLoader {
27+
type MessageIterator = CsvMessageIterator;
28+
type Error = anyhow::Error;
29+
30+
fn new(args: DataLoaderArgs) -> Self {
31+
let DataLoaderArgs { mut paths } = args;
32+
assert_eq!(
33+
paths.len(),
34+
1,
35+
"data loader is configured to only get one file"
36+
);
37+
Self {
38+
path: paths.remove(0),
39+
..Default::default()
40+
}
41+
}
42+
43+
fn initialize(&mut self) -> Result<Initialization, Self::Error> {
44+
let mut reader = csv::ReaderBuilder::new()
45+
.has_headers(true)
46+
.from_reader(reader::open(&self.path));
47+
48+
// Read the headers of the CSV and store them on the loader.
49+
// We will turn each column into a topic so the CSV needs to have a header.
50+
let headers = reader.headers()?;
51+
self.keys = headers.iter().map(String::from).collect();
52+
53+
// Read through the keys and try to find a field called "timestamp_nanos". If this doesn't
54+
// exit then we can't read the file as we have no way of knowing the log time.
55+
let Some(log_time_index) = self.keys.iter().position(|k| k == "timestamp_nanos") else {
56+
bail!("expected csv to contain column called timestamp_nanos")
57+
};
58+
59+
// Store the column index of the timestamp to be used for the log time.
60+
self.log_time_index = log_time_index;
61+
62+
let mut record = StringRecord::new();
63+
let mut position = reader.position().byte();
64+
65+
// Read the entire file to build up an index of timestamps to byte position.
66+
// Later on we'll use this index to make sure we can immediately start reading from the
67+
// correct place. This will take a little bit of time when the file loads for the first
68+
// time, but it will mean playback is snappy later on.
69+
while reader.read_record(&mut record)? {
70+
let timestamp_nanos: u64 = record[log_time_index].parse()?;
71+
self.indexes.insert(timestamp_nanos, position);
72+
position = reader.position().byte();
73+
}
74+
75+
let mut builder = Initialization::builder()
76+
.start_time(
77+
self.indexes
78+
.first_key_value()
79+
.map(|(timestamp, _)| *timestamp)
80+
.unwrap_or(0),
81+
)
82+
.end_time(
83+
self.indexes
84+
.last_key_value()
85+
.map(|(timestamp, _)| *timestamp)
86+
.unwrap_or(0),
87+
);
88+
89+
for (i, key) in self.keys.iter().enumerate() {
90+
// Don't add a channel for the column used for log time
91+
if i == self.log_time_index {
92+
continue;
93+
}
94+
95+
builder
96+
.add_channel_with_id(i as _, &format!("/{key}"))
97+
.expect("channel is free")
98+
.message_encoding("json")
99+
.message_count(self.indexes.len() as _);
100+
}
101+
102+
Ok(builder.build())
103+
}
104+
105+
fn create_iter(
106+
&mut self,
107+
args: MessageIteratorArgs,
108+
) -> Result<Self::MessageIterator, Self::Error> {
109+
let requested_channel_id = args.channels.into_iter().collect();
110+
111+
match self.indexes.range(args.start_time.unwrap_or(0)..).next() {
112+
Some((_, byte_offset)) => {
113+
let reader = reader::open(&self.path);
114+
reader.seek(*byte_offset);
115+
116+
Ok(CsvMessageIterator {
117+
row_to_flush: Default::default(),
118+
log_time_index: self.log_time_index,
119+
requested_channel_id,
120+
reader: csv::Reader::from_reader(Box::new(reader)),
121+
})
122+
}
123+
// If there is no byte offset (we've gone past the last timestamp), return empty iter
124+
None => Ok(CsvMessageIterator {
125+
log_time_index: self.log_time_index,
126+
row_to_flush: Default::default(),
127+
requested_channel_id: Default::default(),
128+
reader: csv::Reader::from_reader(Box::new(Cursor::new([]))),
129+
}),
130+
}
131+
}
132+
}
133+
134+
struct CsvMessageIterator {
135+
row_to_flush: Vec<Message>,
136+
log_time_index: usize,
137+
requested_channel_id: BTreeSet<u16>,
138+
reader: csv::Reader<Box<dyn Read>>,
139+
}
140+
141+
/// Try and coerce the string into a JSON value.
142+
///
143+
/// Try to convert to a f64, then bool, else finally return a string.
144+
fn to_json_value(value: &str) -> serde_json::Value {
145+
if let Ok(v) = value.parse::<f64>() {
146+
return json!(v);
147+
}
148+
149+
if let Ok(v) = value.parse::<bool>() {
150+
return json!(v);
151+
}
152+
153+
json!(value)
154+
}
155+
156+
impl MessageIterator for CsvMessageIterator {
157+
type Error = anyhow::Error;
158+
159+
fn next(&mut self) -> Option<Result<Message, Self::Error>> {
160+
loop {
161+
// We emit each column of a row as its own message.
162+
if let Some(message) = self.row_to_flush.pop() {
163+
return Some(Ok(message));
164+
}
165+
166+
let mut columns = StringRecord::new();
167+
168+
match self.reader.read_record(&mut columns) {
169+
Err(e) => {
170+
return Some(Err(e.into()));
171+
}
172+
Ok(false) => {
173+
return None;
174+
}
175+
// fall through
176+
Ok(true) => {}
177+
}
178+
179+
// Get the log time for the row. This will need to be on every message.
180+
let timestamp = match columns[self.log_time_index].parse::<u64>() {
181+
Ok(t) => t,
182+
Err(e) => {
183+
return Some(Err(e.into()));
184+
}
185+
};
186+
187+
for (index, cell) in columns.iter().enumerate() {
188+
// Don't emit the timestamp column as a message
189+
if index == self.log_time_index {
190+
continue;
191+
}
192+
193+
let channel_id = index as u16;
194+
195+
// If this column wasn't requested, skip it
196+
if !self.requested_channel_id.contains(&channel_id) {
197+
continue;
198+
}
199+
200+
let data = serde_json::to_vec(&json!({ "value": to_json_value(cell) }))
201+
.expect("json will not fail to serialize");
202+
203+
// Add this message to the row and continue onto the next column
204+
self.row_to_flush.push(Message {
205+
channel_id,
206+
log_time: timestamp,
207+
publish_time: timestamp,
208+
data,
209+
});
210+
}
211+
}
212+
}
213+
}
214+
215+
foxglove_data_loader::export!(CsvDataLoader);
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
// Webpack is configured to import .wasm files as a base64 URL
2+
declare module "*.wasm" {
3+
const url: string;
4+
export default url;
5+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import { Experimental } from "@foxglove/extension";
2+
3+
// Import the .wasm file as a base64 data URL to be bundled with the extension
4+
import wasmUrl from "../rust/target/wasm32-unknown-unknown/release/csv_foxglove_data_loader.wasm";
5+
6+
export function activate(extensionContext: Experimental.ExtensionContext): void {
7+
extensionContext.registerDataLoader({
8+
type: "file",
9+
wasmUrl,
10+
supportedFileType: ".csv",
11+
});
12+
}

0 commit comments

Comments
 (0)