@@ -17,23 +17,15 @@ CREATE MATERIALIZED VIEW mv_customers_r INTO customers AS
17
17
1 ::int8 as _tp_delta
18
18
FROM customers_cdc WHERE raw:payload .op = ' r' SETTINGS seek_to= ' earliest' ;
19
19
CREATE MATERIALIZED VIEW mv_customers_u INTO customers AS
20
- WITH before AS (
21
- SELECT to_time(raw:payload .ts_ms )- 1ms AS _tp_time,
22
- raw:payload .before .id::int AS id,
23
- raw:payload .before .first_name AS first_name,
24
- raw:payload .before .last_name AS last_name,
25
- raw:payload .before .email AS email,
26
- - 1 ::int8 as _tp_delta
27
- FROM customers_cdc WHERE raw:payload .op = ' u' SETTINGS seek_to= ' earliest'
28
- ),after AS (
29
- SELECT to_time(raw:payload .ts_ms ) AS _tp_time,
30
- raw:payload .after .id::int AS id,
31
- raw:payload .after .first_name AS first_name,
32
- raw:payload .after .last_name AS last_name,
33
- raw:payload .after .email AS email,
34
- 1 ::int8 as _tp_delta
35
- FROM customers_cdc WHERE raw:payload .op = ' u' SETTINGS seek_to= ' earliest'
36
- )SELECT * FROM before UNION SELECT * FROM after;
20
+ WITH cdc_changes AS (
21
+ SELECT ts_ms, array_join(changes) AS change, change .1 as val, change .2 AS _tp_delta
22
+ FROM
23
+ (SELECT to_time(raw:payload .ts_ms ) AS ts_ms, raw:payload .before AS before, raw:payload .after AS after, [(before, - 1 ::int8), (after, 1 ::int8)] AS changes
24
+ FROM customers_cdc
25
+ WHERE raw:payload .op = ' u' SETTINGS seek_to = ' earliest' )
26
+ )
27
+ SELECT ts_ms AS _tp_time, val:id::int32 AS id, val:first_name AS first_name, val:last_name AS last_name, val:email AS email, _tp_delta
28
+ FROM cdc_changes
37
29
38
30
CREATE MATERIALIZED VIEW mv_customers_d INTO customers AS
39
31
SELECT to_time(raw:payload .ts_ms ) AS _tp_time,
@@ -51,4 +43,4 @@ CREATE MATERIALIZED VIEW mv_customers_c INTO customers AS
51
43
raw:payload .after .last_name AS last_name,
52
44
raw:payload .after .email AS email,
53
45
1 ::int8 as _tp_delta
54
- FROM customers_cdc WHERE raw:payload .op = ' c' SETTINGS seek_to= ' earliest' ;
46
+ FROM customers_cdc WHERE raw:payload .op = ' c' SETTINGS seek_to= ' earliest' ;
0 commit comments