Skip to content

Commit a903428

Browse files
authored
add sample SQL to detect idle stream ingestion (#862)
* add demo for the broken stream * update readme
1 parent 270404c commit a903428

File tree

3 files changed

+123
-0
lines changed

3 files changed

+123
-0
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
2+
3+
start:
4+
docker run -d \
5+
--name proton \
6+
--pull always \
7+
-p 3218:3218 \
8+
-p 8463:8463 \
9+
d.timeplus.com/timeplus-io/proton:latest
10+
11+
stop:
12+
docker stop proton
13+
14+
cli:
15+
docker exec -it proton proton client
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
2+
# Demo for broken stream monitoring using global aggregation
3+
4+
This demo shows how to leverage the new feature of stream SQL to monitor a broken stream, traditionally using watermark, the stream processing will generally stop processing when there is no new event which pushes the watermark farward.
5+
6+
In this demo, we show how to use the new feature `EMIT PERIODIC time_interval REPEAT` to emit existing result to help monitor stream status.
7+
8+
here are the steps
9+
10+
1. create a simulated data source using random stream
11+
12+
```sql
13+
CREATE RANDOM STREAM device
14+
(
15+
`sensor` string DEFAULT ['A', 'B', 'C'][(int_div(rand(), 1000000000) % 3) + 1],
16+
`temperature` float DEFAULT round(rand_normal(10, 0.5), 2)
17+
)
18+
SETTINGS eps = 1
19+
```
20+
21+
2. create a target stream which simulated the data ingested to proton, and use a mv to read the data from random stream into it.
22+
23+
```sql
24+
CREATE STREAM device_reader
25+
(
26+
`sensor` string,
27+
`temperature` float32
28+
);
29+
30+
CREATE MATERIALIZED VIEW mv_device_reader INTO device_reader
31+
AS
32+
SELECT
33+
*
34+
FROM
35+
device;
36+
```
37+
38+
3. now user can monitor the stream using following query
39+
40+
```sql
41+
WITH accumulate_count AS (
42+
SELECT count(*) AS count FROM device_reader EMIT PERIODIC 2s REPEAT
43+
)
44+
SELECT count, lag(count) AS previous_count FROM accumulate_count
45+
```
46+
47+
the accumulate_count will emit current observed total count every 2 second, even when there is no new event, the query will keep emitting existing aggregation result
48+
and then the query will return the current count and pervious observed count.
49+
50+
4. user can create an alert when the accumulation count does not change, which means something is wrong and the stream might be broken.
51+
52+
```sql
53+
WITH accumulate_count AS (
54+
SELECT count(*) AS count FROM device_reader EMIT PERIODIC 2s REPEAT
55+
)
56+
SELECT count, lag(count) AS previous_count FROM accumulate_count
57+
WHERE count = previous_count
58+
```
59+
60+
5. now, we can drop the mv and the previouse query should emit the alert
61+
62+
```sql
63+
DROP VIEW mv_device_reader
64+
```
65+
66+
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
2+
-- simulation of iot data source
3+
CREATE RANDOM STREAM device
4+
(
5+
`sensor` string DEFAULT ['A', 'B', 'C'][(int_div(rand(), 1000000000) % 3) + 1],
6+
`temperature` float DEFAULT round(rand_normal(10, 0.5), 2)
7+
)
8+
SETTINGS eps = 1;
9+
10+
-- target stream
11+
CREATE STREAM device_reader
12+
(
13+
`sensor` string,
14+
`temperature` float32
15+
);
16+
17+
-- simulate get data from source to proton
18+
CREATE MATERIALIZED VIEW mv_device_reader INTO device_reader
19+
AS
20+
SELECT
21+
*
22+
FROM
23+
device;
24+
25+
-- continously monitor the total observed count
26+
SELECT count(*) FROM device_reader EMIT PERIODIC 2s REPEAT;
27+
28+
-- using lag to detect the count change
29+
WITH accumulate_count AS (
30+
SELECT count(*) AS count FROM device_reader EMIT PERIODIC 2s REPEAT
31+
)
32+
SELECT count, lag(count) AS previous_count FROM accumulate_count;
33+
34+
-- alert on broken stream
35+
WITH accumulate_count AS (
36+
SELECT count(*) AS count FROM device_reader EMIT PERIODIC 2s REPEAT
37+
)
38+
SELECT count, lag(count) AS previous_count FROM accumulate_count
39+
WHERE count = previous_count;
40+
41+
-- drop the mv to simulate the broken stream
42+
DROP VIEW mv_device_reader;

0 commit comments

Comments
 (0)