Skip to content

Commit 8c225a6

Browse files
authored
add coinbase sample (#451)
* add coinbase sample * update sql * update readme * update readme * using decimal instead of float
1 parent 84c4aed commit 8c225a6

File tree

5 files changed

+176
-0
lines changed

5 files changed

+176
-0
lines changed

examples/coinbase/Makefile

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
2+
3+
PWD = $(shell pwd)
4+
NAME = test
5+
6+
create:
7+
yq -o=json $(PWD)/pipeline.yaml | curl http://localhost:4195/streams/$(NAME) -X POST -d @-
8+
9+
delete:
10+
curl http://localhost:4195/streams/$(NAME) -X DELETE

examples/coinbase/README.md

+89
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
# Demo for Benthos data pipeline and Coinbase websocket data
2+
3+
4+
5+
This docker compose file demonstrates how to ingest WebSocket data into Proton by using Benthos pipeline.
6+
7+
8+
9+
## Start the stack
10+
11+
Simply run `docker compose up` in this folder. Three docker containers in the stack:
12+
13+
1. ghcr.io/timeplus-io/proton:latest, as the streaming database
14+
2. jeffail/benthos:latest, a [Benthos](https://www.benthos.dev/) service as the data pipeline
15+
3. init container, create the tickers stream when Proton database server is ready
16+
17+
the ddl to create the stream is:
18+
19+
```sql
20+
CREATE STREAM IF NOT EXISTS tickers (
21+
best_ask decimal(10,2),
22+
best_ask_size decimal(10,8),
23+
best_bid decimal(10,2),
24+
best_bid_size decimal(10,8),
25+
high_24h decimal(10,2),
26+
last_size decimal(10,8),
27+
low_24h decimal(10,2),
28+
open_24h decimal(10,2),
29+
price decimal(10,2),
30+
product_id string,
31+
sequence int,
32+
side string,
33+
time datetime,
34+
trade_id int,
35+
type string,
36+
volume_24h decimal(20,8),
37+
volume_30d decimal(20,8)
38+
)
39+
```
40+
41+
## Create a ingest data pipeline
42+
43+
run command `make create` to create following Benthos data pipeline, note you need install `jq` and `curl` to run this command
44+
45+
```
46+
input:
47+
label: coinbase
48+
websocket:
49+
url: wss://ws-feed.exchange.coinbase.com
50+
open_message: '{"type": "subscribe","product_ids": ["ETH-USD","ETH-EUR"],"channels": ["ticker"]}'
51+
open_message_type: text
52+
53+
output:
54+
http_client:
55+
url: http://proton:8123/proton/v1/ingest/streams/tickers
56+
verb: POST
57+
headers:
58+
Content-Type: application/json
59+
batching:
60+
count: 10
61+
period: 1000ms
62+
processors:
63+
- archive:
64+
format: json_array
65+
- mapping: |
66+
root.columns = this.index(0).keys()
67+
root.data = this.map_each( row -> root.columns.map_each( key -> row.get(key)) )
68+
69+
```
70+
71+
this pipeline will read data from coinbase websocket and then send the result to proton ingest api in a batch
72+
73+
74+
## Query you crypto price data with SQL
75+
76+
now you can run following query to get the OHLC of the crypto data:
77+
78+
```sql
79+
SELECT
80+
window_start, product_id, earliest(price) AS open, max(price) AS high, min(price) AS low, latest(price) AS close
81+
FROM
82+
tumble(tickers, 60s)
83+
WHERE
84+
product_id != '' and _tp_time > earliest_ts()
85+
GROUP BY
86+
window_start, product_id
87+
```
88+
89+

examples/coinbase/ddl.sql

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
-- ddl creare stream
2+
CREATE STREAM IF NOT EXISTS tickers (
3+
best_ask decimal(10,2),
4+
best_ask_size decimal(10,8),
5+
best_bid decimal(10,2),
6+
best_bid_size decimal(10,8),
7+
high_24h decimal(10,2),
8+
last_size decimal(10,8),
9+
low_24h decimal(10,2),
10+
open_24h decimal(10,2),
11+
price decimal(10,2),
12+
product_id string,
13+
sequence int,
14+
side string,
15+
time datetime,
16+
trade_id int,
17+
type string,
18+
volume_24h decimal(20,8),
19+
volume_30d decimal(20,8)
20+
)
21+

examples/coinbase/docker-compose.yml

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
version: '3.7'
2+
name: coinbase
3+
services:
4+
proton:
5+
image: ghcr.io/timeplus-io/proton:latest
6+
pull_policy: always
7+
ports:
8+
- "3218:3218" # HTTP Streaming
9+
healthcheck:
10+
test: ["CMD", "curl", "http://localhost:3218/proton/ping"]
11+
interval: 2s
12+
timeout: 10s
13+
retries: 3
14+
start_period: 10s
15+
16+
benthos:
17+
image: jeffail/benthos:latest
18+
pull_policy: always
19+
command: streams
20+
ports:
21+
- "4195:4195"
22+
23+
init-stream:
24+
image: ghcr.io/timeplus-io/proton:latest
25+
command:
26+
- sh
27+
- -c
28+
- |
29+
proton-client -h proton --query "CREATE STREAM IF NOT EXISTS tickers (best_ask decimal(10,2), best_ask_size decimal(10,8), best_bid decimal(10,2), best_bid_size decimal(10,8), high_24h decimal(10,2), last_size decimal(10,8), low_24h decimal(10,2), open_24h decimal(10,2), price decimal(10,2), product_id string, sequence int, side string, time datetime, trade_id int, type string, volume_24h decimal(20,8), volume_30d decimal(20,8))"
30+
depends_on:
31+
proton:
32+
condition: service_healthy
33+
34+

examples/coinbase/pipeline.yaml

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
input:
2+
label: coinbase
3+
websocket:
4+
url: wss://ws-feed.exchange.coinbase.com
5+
open_message: '{"type": "subscribe","product_ids": ["ETH-USD","ETH-EUR"],"channels": ["ticker"]}'
6+
open_message_type: text
7+
8+
output:
9+
http_client:
10+
url: http://proton:8123/proton/v1/ingest/streams/tickers
11+
verb: POST
12+
headers:
13+
Content-Type: application/json
14+
batching:
15+
count: 10
16+
period: 1000ms
17+
processors:
18+
- archive:
19+
format: json_array
20+
- mapping: |
21+
root.columns = this.index(0).keys()
22+
root.data = this.map_each( row -> root.columns.map_each( key -> row.get(key)) )

0 commit comments

Comments
 (0)