Skip to content

Commit ccb6f3d

Browse files
committed
example to query kafka with SQL and visualize with marimo notebook
1 parent 33283f6 commit ccb6f3d

File tree

3 files changed

+244
-0
lines changed

3 files changed

+244
-0
lines changed

examples/README.md

+2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ This folder lists some examples to run Timeplus Proton for various use cases. Fo
2222

2323
- [jdbc](jdbc): demonstrates how to connect to Proton via JDBC using DBeaver or Metabase.
2424

25+
- [marimo](marimo): demonstrates how to query Kafka data with SQL and visualize the results using Marimo.
26+
2527
- [nginx-access-logs-streaming](nginx-access-logs-streaming) analyzing Nginx access logs using Timeplus Proton
2628

2729
- [nginx-grafana](nginx-grafana) visualizing Nginx access logs using Timeplus and Grafana

examples/marimo/README.md

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# Example to Query Kafka Data with SQL and Visualize Results Using Marimo
2+
👋 This is a live notebook, built with [Timeplus](https://github.com/timeplus-io/proton) and [marimo](https://marimo.io), showing streaming data from GitHub via a public facing Kafka broker from [Aiven](https://aiven.io).
3+
4+
Simply run the following commands:
5+
```bash
6+
curl https://astral.sh/uv/install.sh | sh
7+
curl https://install.timeplus.com/oss | sh
8+
./proton server&
9+
uvx marimo run --sandbox github.py
10+
```

examples/marimo/github.py

+232
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
# /// script
2+
# requires-python = ">=3.13"
3+
# dependencies = [
4+
# "altair==5.5.0",
5+
# "marimo",
6+
# "polars[pyarrow]==1.26.0",
7+
# "sqlalchemy==2.0.40",
8+
# "sqlglot==26.12.1",
9+
# "timeplus-connect==0.8.16",
10+
# ]
11+
# ///
12+
13+
import marimo
14+
15+
__generated_with = "0.12.5"
16+
app = marimo.App(width="medium")
17+
18+
19+
@app.cell(hide_code=True)
20+
def _(sqlalchemy):
21+
engine = sqlalchemy.create_engine("timeplus://default:@localhost:8123")
22+
return (engine,)
23+
24+
25+
@app.cell(hide_code=True)
26+
def _():
27+
kafka_broker="kafka-public-read-timeplus.a.aivencloud.com:28864"
28+
kafka_pwd="AVNS_MUaDRshCpeePa93AQy_"
29+
return kafka_broker, kafka_pwd
30+
31+
32+
@app.cell(hide_code=True)
33+
def _(engine, kafka_broker, kafka_pwd, mo):
34+
_df = mo.sql(
35+
f"""
36+
CREATE EXTERNAL STREAM IF NOT EXISTS github_events(actor string,
37+
created_at string,
38+
id string,
39+
payload string,
40+
repo string,
41+
type string
42+
)
43+
SETTINGS type='kafka',
44+
brokers='{kafka_broker}',
45+
topic='github_events',
46+
security_protocol='SASL_SSL',
47+
sasl_mechanism='SCRAM-SHA-256',
48+
username='readonly',
49+
password='{kafka_pwd}',
50+
skip_ssl_cert_check=true,
51+
data_format='JSONEachRow',
52+
one_message_per_row=true
53+
""",
54+
output=False,
55+
engine=engine
56+
)
57+
return
58+
59+
60+
@app.cell(hide_code=True)
61+
def _(kafka_broker, mo):
62+
mo.md(
63+
f"""
64+
# Live GitHub Events
65+
👋 This is a live notebook, built with [Timeplus](https://github.com/timeplus-io/proton) and [marimo](https://marimo.io), showing streaming data from GitHub via a public facing Kafka broker from [Aiven](https://aiven.io):
66+
`{kafka_broker}`
67+
68+
Simply run the following commands:
69+
```bash
70+
curl https://astral.sh/uv/install.sh | sh
71+
curl https://install.timeplus.com/oss | sh
72+
./proton server&
73+
uvx marimo run --sandbox github.py
74+
```
75+
"""
76+
)
77+
return
78+
79+
80+
@app.cell(hide_code=True)
81+
def _(mo):
82+
range=mo.ui.slider(start=1, stop=10, step=1,label="View data for X minutes:", show_value=True)
83+
return (range,)
84+
85+
86+
@app.cell
87+
def _(mo):
88+
cntRefresh = mo.ui.refresh(options=["2s"],default_interval="2s")
89+
return (cntRefresh,)
90+
91+
92+
@app.cell
93+
def _(df_cnt, get_count, mo, set_count):
94+
_new_count=df_cnt["cnt"][0]
95+
_diff=_new_count-get_count()
96+
set_count(_new_count)
97+
mo.stat(_new_count,label="In the Kafka topic",caption=f"{_diff} events", direction="increase" if _diff >= 0 else "decrease")
98+
return
99+
100+
101+
@app.cell
102+
def _(df_last, mo):
103+
from datetime import datetime, timezone
104+
event_time=datetime.strptime(df_last["created_at"][0],"%Y-%m-%dT%H:%M:%S").replace(tzinfo=timezone.utc)
105+
current_time = datetime.now(timezone.utc)
106+
mo.stat(f"{datetime.strftime(event_time,"%H:%M:%S %m/%d")} (UTC)",label="Last Event",caption=f"{int((current_time - event_time).total_seconds()/60)} mins ago")
107+
return current_time, datetime, event_time, timezone
108+
109+
110+
@app.cell
111+
def _(mo):
112+
refresh = mo.ui.refresh(label="Refresh",options=["5s", "10s", "30s"])
113+
return (refresh,)
114+
115+
116+
@app.cell
117+
def _(chart_repos, chart_types, mo, range, refresh):
118+
mo.vstack([mo.hstack([range,refresh]),mo.hstack([chart_types,chart_repos],widths=[0,1])])
119+
return
120+
121+
122+
@app.cell(hide_code=True)
123+
def _(chart_types):
124+
_type=' '
125+
if chart_types.selections.get("select_point"):
126+
_array=chart_types.selections["select_point"].get("type",None)
127+
if _array:
128+
_type=f"WHERE type='{_array[0]}'"
129+
typeWhere=_type
130+
return (typeWhere,)
131+
132+
133+
@app.cell
134+
def _(mo):
135+
mo.md(r"""## Sample Events""")
136+
return
137+
138+
139+
@app.cell
140+
def _(engine, mo, refresh):
141+
df_last = mo.sql(
142+
f"""
143+
-- {refresh.value}
144+
SELECT * FROM github_events order by _tp_time desc limit 3 settings seek_to='-10s'
145+
""",
146+
engine=engine
147+
)
148+
return (df_last,)
149+
150+
151+
@app.cell(hide_code=True)
152+
def _():
153+
import marimo as mo
154+
import sqlalchemy
155+
import altair as alt
156+
return alt, mo, sqlalchemy
157+
158+
159+
@app.cell
160+
def _(alt, df_type, mo):
161+
chart_types = mo.ui.altair_chart(
162+
alt.Chart(df_type, height=150, width=150)
163+
.mark_arc()
164+
.encode(theta="cnt", color="type"),
165+
legend_selection=False
166+
)
167+
return (chart_types,)
168+
169+
170+
@app.cell
171+
def _(alt, df_hotrepo, mo):
172+
chart_repos = mo.ui.altair_chart(
173+
alt.Chart(df_hotrepo, height=200)
174+
.mark_bar()
175+
.encode(x='cnt',
176+
y=alt.Y('repo',sort=alt.EncodingSortField(field='cnt',order='descending')),)
177+
)
178+
return (chart_repos,)
179+
180+
181+
@app.cell
182+
def _(engine, mo, range, refresh, typeWhere):
183+
df_hotrepo = mo.sql(
184+
f"""
185+
-- {refresh.value}
186+
with cte as(SELECT top_k(repo,10,true) as a FROM github_events {typeWhere} limit 1 SETTINGS seek_to='-{range.value}m')
187+
select a.1 as repo,a.2 as cnt from cte array join a
188+
""",
189+
engine=engine
190+
)
191+
return (df_hotrepo,)
192+
193+
194+
@app.cell(hide_code=True)
195+
def _(engine, mo, range, refresh):
196+
df_type = mo.sql(
197+
f"""
198+
-- {refresh.value}
199+
with cte as(SELECT top_k(type,10,true) as a FROM github_events limit 1 SETTINGS seek_to='-{range.value}m')
200+
select a.1 as type,a.2 as cnt from cte array join a
201+
""",
202+
engine=engine
203+
)
204+
return (df_type,)
205+
206+
207+
@app.cell
208+
def _(cntRefresh):
209+
cntRefresh.style({"display": None})
210+
return
211+
212+
213+
@app.cell(hide_code=True)
214+
def cell_cnt(cntRefresh, engine, mo):
215+
df_cnt = mo.sql(
216+
f"""
217+
-- {cntRefresh.value}
218+
SELECT count() as cnt FROM github_events
219+
""",
220+
engine=engine
221+
)
222+
return (df_cnt,)
223+
224+
225+
@app.cell
226+
def _(mo):
227+
get_count, set_count = mo.state(0)
228+
return get_count, set_count
229+
230+
231+
if __name__ == "__main__":
232+
app.run()

0 commit comments

Comments
 (0)