Skip to content

Commit 9d7585e

Browse files
committed
(feat!): use dynamic tables
1 parent 71ee57e commit 9d7585e

File tree

8 files changed

+89
-29
lines changed

8 files changed

+89
-29
lines changed

.envrc.example

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,14 @@ export TODOAPP_USER=<snowflake todoapp user>
88
export TODOAPP_USER_PWD=<snowflake todoapp user password>
99
export TODOAPP_USER_ROLE=todoapp_user
1010
export TODOAPP_DATABASE=<snowflake todoapp database>
11+
# table to hold redpanda topic records
12+
export TODO_LIST_TABLE=TODO_LIST
13+
# table that will be used by Streamlit dashboard
14+
export TODOS_TABLE=TODOS
1115
# update to different warehouse, this is default for trial accounts
1216
export SNOWSQL_WAREHOUSE=COMPUTE_WH
1317
# update the schema to use if you are on a different schema, demo code is set to work with this by default, changing this need to update ACL accordingly
14-
export SNOWSQL_SCHEMA="public"
18+
export SNOWSQL_SCHEMA=public
1519
## Redpanda
1620
export RPK_BROKERS=localhost:19092
1721
export COMPOSE_PROJECT_NAME=grpc-todo-app

README.md

Lines changed: 53 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -212,14 +212,17 @@ export TODOAPP_USER_RSA_PUBLIC_KEY=$(awk 'NR > 2 {print last} {last=$0}' ORS=''
212212
213213
> **IMPORTANT**: The commands in this section will be executed as Snowflake Account Admin user
214214
215-
Create Database `$TODOAPP_DATABASE`,
215+
Create Database `$TODOAPP_DATABASE` and `$TODO_LIST_TABLE`,
216216
217217
```shell
218218
snowsql -c admin \
219-
--query "CREATE DATABASE IF NOT EXISTS $TODOAPP_DATABASE;"
219+
--warehouse "$SNOWSQL_WAREHOUSE" \
220+
--variable db_name=$TODOAPP_DATABASE \
221+
--variable todo_list_table="$TODO_LIST_TABLE" \
222+
--filename etc/snowflake/core.sql
220223
```
221224
222-
Create the `$TODOAPP_USER` user and provide the requried **GRANTs**,
225+
Create the `$TODOAPP_USER` user and provide the required **GRANTs**,
223226
224227
```shell
225228
snowsql -c admin \
@@ -229,6 +232,7 @@ snowsql -c admin \
229232
--variable todo_user_role=$TODOAPP_USER_ROLE \
230233
--variable todoapp_user=$TODOAPP_USER \
231234
--variable todo_pub_key=$TODOAPP_USER_RSA_PUBLIC_KEY \
235+
--variable todos_table_name=$TODOS_TABLE \
232236
--filename "$DEMO_HOME/etc/snowflake/acl.sql"
233237
```
234238
@@ -248,6 +252,18 @@ snowsql -u $TODOAPP_USER \
248252
--query "SELECT CURRENT_DATE;"
249253
```
250254
255+
Create a Stream to capture the changes of `$TODO_LIST_TABLE`,
256+
257+
```shell
258+
snowsql -u "$TODOAPP_USER" \
259+
--private-key-path="$DEMO_HOME/keys/rsa_key.p8" \
260+
--warehouse "$SNOWSQL_WAREHOUSE" \
261+
--dbname "$TODOAPP_DATABASE" \
262+
--variable stream_name="$TODO_LIST_TABLE"_stream \
263+
--variable todo_list_table=$TODO_LIST_TABLE \
264+
--filename etc/snowflake/stream.sql
265+
```
266+
251267
> **NOTE**: First time login will ask for a password change.
252268
253269
## Kafka Connect API
@@ -330,6 +346,7 @@ http --body "$KAFKA_CONNECT_URI/connectors?expand=status"
330346
```shell
331347
snowsql -u $TODOAPP_USER \
332348
--private-key-path="$DEMO_HOME/keys/rsa_key.p8" \
349+
--warehouse "$SNOWSQL_WAREHOUSE" \
333350
--dbname $TODOAPP_DATABASE \
334351
--query "SHOW TERSE TABLES LIKE '%todo%';"
335352
```
@@ -386,46 +403,50 @@ snowsql -u $TODOAPP_USER \
386403
--query "SELECT count(*) from TODO_LIST;"
387404
```
388405
389-
### Visualizing the Data
390-
391-
You can now use the synchronized data to build a dashboard using [Streamlit](https://streamlit.io). The demo builds a simple dashboard that shows task by category along with a tabular representation of the tasks data.
392-
393-
#### Create the Stored Procedure
394-
395-
Stored Procedure to extract the data the table synchronized with Redpanda topic to any custom table.
406+
Verify the inserted records are captured by the stream
396407
397408
```shell
398409
snowsql -u $TODOAPP_USER \
399410
--private-key-path="$DEMO_HOME/keys/rsa_key.p8" \
400411
--warehouse $SNOWSQL_WAREHOUSE \
401412
--dbname $TODOAPP_DATABASE \
402-
--filename "$DEMO_HOME/etc/snowflake/todos_sp.sql"
413+
--query "SELECT * FROM "$TODO_LIST_TABLE"_STREAM"
403414
```
404415
405-
#### Prepare Data for Dashboard
416+
You should see all the inserted records on the stream.
417+
418+
### Visualizing the Data
419+
420+
You can now use the synchronized data to build a dashboard using [Streamlit](https://streamlit.io). The demo builds a simple dashboard that shows task by category along with a tabular representation of the tasks data.
406421
407-
Extract the data from `TODO_LIST` table and load the same on to `todos` which will be used by Streamlit dashboard,
422+
#### Create Dynamic Table
423+
424+
The demo will use Snowflake [Dynamic Table](https://docs.snowflake.com/en/user-guide/dynamic-tables-about) to transform the raw records on `$TODO_LIST_TABLE` table to a structure usable for the Streamlit dashboard,
408425
409426
```shell
410427
snowsql -u $TODOAPP_USER \
411428
--private-key-path="$DEMO_HOME/keys/rsa_key.p8" \
412429
--warehouse $SNOWSQL_WAREHOUSE \
413-
--dbname "$TODOAPP_DATABASE" \
414-
--query "CALL todos('todo_list','todos')"
430+
--dbname $TODOAPP_DATABASE \
431+
--variable todo_warehouse=$SNOWSQL_WAREHOUSE \
432+
--variable topic_name=$TOPICS \
433+
--variable table_name=$TODOS_TABLE \
434+
--filename "$DEMO_HOME/etc/snowflake/dynamic_table.sql"
415435
```
416436
417-
Query the `todos` table,
437+
#### Prepare Data for Dashboard
438+
439+
Query the `$TODOS_TABLE` table,
418440
419441
```shell
420442
snowsql -u $TODOAPP_USER \
421443
--private-key-path="$DEMO_HOME/keys/rsa_key.p8" \
422444
--warehouse $SNOWSQL_WAREHOUSE \
423445
--dbname $TODOAPP_DATABASE \
424-
--query "SELECT * FROM TODOS;"
446+
--query "SELECT * FROM $TODOS_TABLE"
425447
```
426448
427-
> **NOTE**: You need to refresh the data on the `TODOS` table on every new task added to th Redpanda topic.
428-
> **WIP**: Leveraging streams and dynamic tables
449+
> **NOTE**: `$TODOS_TABLE` gets refreshed within a minute of data updates on `$TODO_LIST_TABLE`
429450
430451
#### Use predefined conda environment
431452
@@ -467,14 +488,25 @@ snowsql -u $TODOAPP_USER \
467488
--private-key-path="$DEMO_HOME/keys/rsa_key.p8" \
468489
--warehouse $SNOWSQL_WAREHOUSE \
469490
--dbname $TODOAPP_DATABASE \
491+
--variable todo_list_table=$TODO_LIST_TABLE \
492+
--variable todos_table_name=$TODOS_TABLE \
470493
--filename "$DEMO_HOME/etc/snowflake/cleanup.sql"
471494
```
472495
496+
### Drop User
497+
498+
```shell
499+
snowsql -c admin \
500+
--warehouse $SNOWSQL_WAREHOUSE \
501+
--query "DROP USER IF EXISTS $TODOAPP_USER;"
502+
```
503+
473504
### Drop Database
474505
475506
```shell
476-
snowsql -c admin --warehouse $SNOWSQL_WAREHOUSE \
477-
--query "DROP DATABASE IF EXISTS $SNOWSQL_DATABASE;"
507+
snowsql -c admin \
508+
--warehouse $SNOWSQL_WAREHOUSE \
509+
--query "DROP DATABASE IF EXISTS $TODOAPP_DATABASE;"
478510
```
479511
480512
## References

etc/config/todolist-snow-sink.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
55
"tasks.max": "8",
66
"topics": "${TOPICS}",
7-
"snowflake.topic2table.map": "${TOPICS}:TODO_LIST",
7+
"snowflake.topic2table.map": "${TOPICS}:${TODO_LIST_TABLE}",
88
"buffer.count.records": "10",
99
"buffer.flush.time": "10",
1010
"buffer.size.bytes": "500",

etc/snowflake/acl.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ GRANT INSERT,DELETE,SELECT,UPDATE,TRUNCATE ON FUTURE TABLES IN SCHEMA &db_name.P
3636
-- Ability to create stage(s)
3737
GRANT CREATE STAGE ON SCHEMA &db_name.PUBLIC TO ROLE &todo_user_role;
3838

39-
-- GRANT OWNERSHIP ON PROCEDURE TODOS(string,string) TO ROLE &todo_user_role;
39+
-- GRANT OWNERSHIP ON DYNAMIC TABLE &todos_table_name TO ROLE &todo_user_role;
4040

4141
-- Create Streamlit apps in the PUBLIC schema
4242
GRANT CREATE STREAMLIT ON SCHEMA &db_name.PUBLIC TO ROLE &todo_user_role;

etc/snowflake/cleanup.sql

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
-- Drop table
2-
DROP TABLE IF EXISTS todo_list;
3-
DROP TABLE IF EXISTS todos;
2+
DROP TABLE IF EXISTS &todo_list_table;
43

5-
-- Stored Procedure
6-
DROP PROCEDURE IF EXISTS TODOS(string,string);
4+
-- Dynamic Table
5+
DROP DYNAMIC TABLE IF EXISTS &todos_table_name;
76

87
-- Pipe
98
DROP PIPE IF EXISTS SNOWFLAKE_KAFKA_CONNECTOR_TODOLIST_PIPE_TODO_LIST_0;

etc/snowflake/core.sql

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
-- Create Database
2+
CREATE DATABASE IF NOT EXISTS &db_name;
3+
4+
-- Create table to hold the records from Redpanda todo_list topic
5+
CREATE TABLE IF NOT EXISTS "&db_name".PUBLIC."&todo_list_table" (
6+
RECORD_METADATA VARIANT,
7+
RECORD_CONTENT VARIANT
8+
);
9+
10+
-- Enable Change Tracking
11+
ALTER TABLE"&db_name".PUBLIC."&todo_list_table" SET CHANGE_TRACKING = TRUE;

etc/snowflake/dashboard_data.sql

Lines changed: 0 additions & 1 deletion
This file was deleted.

etc/snowflake/dynamic_table.sql

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
-- Create dynamic table to capture the records from todo_list_table
2+
CREATE OR REPLACE DYNAMIC TABLE &table_name
3+
TARGET_LAG = '1minute'
4+
WAREHOUSE = '&todo_warehouse'
5+
AS
6+
select
7+
record_metadata:key::string as key,
8+
record_content:title::string as title,
9+
record_content:description::text as description,
10+
record_content:category::string as category,
11+
record_content:status::boolean as status,
12+
record_metadata:CreateTime::bigint as tz
13+
from todo_list
14+
where record_metadata:topic = '&topic_name'
15+
ORDER BY key

0 commit comments

Comments
 (0)