1- CREATE TEMPORARY FUNCTION IF NOT EXISTS ` timestamptostring` AS ' com.datasqrl.time.TimestampToString' LANGUAGE JAVA;
2-
3- CREATE TEMPORARY FUNCTION IF NOT EXISTS ` endofmonth` AS ' com.datasqrl.time.EndOfMonth' LANGUAGE JAVA;
4-
5- CREATE TEMPORARY FUNCTION IF NOT EXISTS ` timestamptoepochmilli` AS ' com.datasqrl.time.TimestampToEpochMilli' LANGUAGE JAVA;
6-
7- CREATE TEMPORARY FUNCTION IF NOT EXISTS ` endofweek` AS ' com.datasqrl.time.EndOfWeek' LANGUAGE JAVA;
8-
9- CREATE TEMPORARY FUNCTION IF NOT EXISTS ` parsetimestamp` AS ' com.datasqrl.time.ParseTimestamp' LANGUAGE JAVA;
10-
11- CREATE TEMPORARY FUNCTION IF NOT EXISTS ` epochmillitotimestamp` AS ' com.datasqrl.time.EpochMilliToTimestamp' LANGUAGE JAVA;
12-
13- CREATE TEMPORARY FUNCTION IF NOT EXISTS ` endofminute` AS ' com.datasqrl.time.EndOfMinute' LANGUAGE JAVA;
14-
15- CREATE TEMPORARY FUNCTION IF NOT EXISTS ` timestamptoepoch` AS ' com.datasqrl.time.TimestampToEpoch' LANGUAGE JAVA;
16-
17- CREATE TEMPORARY FUNCTION IF NOT EXISTS ` endofsecond` AS ' com.datasqrl.time.EndOfSecond' LANGUAGE JAVA;
18-
19- CREATE TEMPORARY FUNCTION IF NOT EXISTS ` epochtotimestamp` AS ' com.datasqrl.time.EpochToTimestamp' LANGUAGE JAVA;
20-
21- CREATE TEMPORARY FUNCTION IF NOT EXISTS ` atzone` AS ' com.datasqrl.time.AtZone' LANGUAGE JAVA;
22-
23- CREATE TEMPORARY FUNCTION IF NOT EXISTS ` endofday` AS ' com.datasqrl.time.EndOfDay' LANGUAGE JAVA;
24-
25- CREATE TEMPORARY FUNCTION IF NOT EXISTS ` endofhour` AS ' com.datasqrl.time.EndOfHour' LANGUAGE JAVA;
26-
27- CREATE TEMPORARY FUNCTION IF NOT EXISTS ` endofyear` AS ' com.datasqrl.time.EndOfYear' LANGUAGE JAVA;
1+ -- Updated Flink SQL using datagen connector and BIGINT for timestamps, with full pipeline and view updates
282
293CREATE TEMPORARY TABLE ` transaction_1` (
304 ` transactionId` BIGINT NOT NULL ,
315 ` cardNo` DOUBLE NOT NULL ,
32- ` time` TIMESTAMP (3 ) WITH LOCAL TIME ZONE NOT NULL ,
6+ ` time` TIMESTAMP_LTZ (3 ) NOT NULL ,
337 ` amount` DOUBLE NOT NULL ,
348 ` merchantId` BIGINT NOT NULL ,
359 PRIMARY KEY (` transactionId` , ` time` ) NOT ENFORCED,
36- WATERMARK FOR ` time` AS ` time` - INTERVAL ' 1.0 ' SECOND
10+ WATERMARK FOR ` time` AS ` time` - INTERVAL ' 1' SECOND
3711) WITH (
38- ' format' = ' flexible-json' ,
39- ' path' = ' file:///datasources/transaction.jsonl' ,
40- ' source.monitor-interval' = ' 1 min' ,
41- ' connector' = ' filesystem'
12+ ' connector' = ' datagen' ,
13+ ' rows-per-second' = ' 10' ,
14+ ' fields.transactionId.kind' = ' sequence' ,
15+ ' fields.transactionId.start' = ' 1' ,
16+ ' fields.transactionId.end' = ' 1000000' ,
17+ ' fields.cardNo.min' = ' 1000' ,
18+ ' fields.cardNo.max' = ' 9999' ,
19+ ' fields.amount.min' = ' 1' ,
20+ ' fields.amount.max' = ' 5000' ,
21+ ' fields.merchantId.min' = ' 1' ,
22+ ' fields.merchantId.max' = ' 100'
4223);
4324
4425CREATE TEMPORARY TABLE ` cardassignment_1` (
4526 ` customerId` BIGINT NOT NULL ,
4627 ` cardNo` DOUBLE NOT NULL ,
47- ` timestamp` TIMESTAMP (3 ) WITH LOCAL TIME ZONE NOT NULL ,
48- ` cardType` VARCHAR (2147483647 ) CHARACTER SET ` UTF-16LE ` NOT NULL ,
28+ ` timestamp` TIMESTAMP_LTZ (3 ) NOT NULL ,
29+ ` cardType` VARCHAR (100 ) NOT NULL ,
4930 PRIMARY KEY (` customerId` , ` cardNo` , ` timestamp` ) NOT ENFORCED,
50- WATERMARK FOR ` timestamp` AS ` timestamp` - INTERVAL ' 1.0 ' SECOND
31+ WATERMARK FOR ` timestamp` AS ` timestamp` - INTERVAL ' 1' SECOND
5132) WITH (
52- ' format' = ' flexible-json' ,
53- ' path' = ' file:///datasources/cardAssignment.jsonl' ,
54- ' source.monitor-interval' = ' 1 min' ,
55- ' connector' = ' filesystem'
33+ ' connector' = ' datagen' ,
34+ ' rows-per-second' = ' 5' ,
35+ ' fields.customerId.kind' = ' sequence' ,
36+ ' fields.customerId.start' = ' 1' ,
37+ ' fields.customerId.end' = ' 10000' ,
38+ ' fields.cardNo.min' = ' 1000' ,
39+ ' fields.cardNo.max' = ' 9999' ,
40+ ' fields.cardType.length' = ' 10'
5641);
5742
5843CREATE TEMPORARY TABLE ` merchant_1` (
5944 ` merchantId` BIGINT NOT NULL ,
60- ` name` VARCHAR (2147483647 ) CHARACTER SET ` UTF-16LE ` NOT NULL ,
61- ` category` VARCHAR (2147483647 ) CHARACTER SET ` UTF-16LE ` NOT NULL ,
62- ` updatedTime` TIMESTAMP (3 ) WITH LOCAL TIME ZONE NOT NULL ,
45+ ` name` VARCHAR (100 ) NOT NULL ,
46+ ` category` VARCHAR (50 ) NOT NULL ,
47+ ` updatedTime` TIMESTAMP_LTZ (3 ) NOT NULL ,
6348 PRIMARY KEY (` merchantId` , ` updatedTime` ) NOT ENFORCED,
64- WATERMARK FOR ` updatedTime` AS ` updatedTime` - INTERVAL ' 1.0 ' SECOND
49+ WATERMARK FOR ` updatedTime` AS ` updatedTime` - INTERVAL ' 1' SECOND
6550) WITH (
66- ' format' = ' flexible-json' ,
67- ' path' = ' file:///datasources/merchant.jsonl' ,
68- ' source.monitor-interval' = ' 1 min' ,
69- ' connector' = ' filesystem'
51+ ' connector' = ' datagen' ,
52+ ' rows-per-second' = ' 2' ,
53+ ' fields.merchantId.kind' = ' sequence' ,
54+ ' fields.merchantId.start' = ' 1' ,
55+ ' fields.merchantId.end' = ' 100' ,
56+ ' fields.name.length' = ' 20' ,
57+ ' fields.category.length' = ' 10'
7058);
7159
7260CREATE TEMPORARY TABLE ` _spendingbyday_1` (
@@ -76,7 +64,7 @@ CREATE TEMPORARY TABLE `_spendingbyday_1` (
7664 PRIMARY KEY (` customerid` , ` timeDay` ) NOT ENFORCED
7765) WITH (
7866 ' password' = ' ${JDBC_PASSWORD}' ,
79- ' connector' = ' jdbc-sqrl ' ,
67+ ' connector' = ' jdbc' ,
8068 ' driver' = ' org.postgresql.Driver' ,
8169 ' table-name' = ' _spendingbyday_1' ,
8270 ' url' = ' ${JDBC_URL}' ,
@@ -94,7 +82,7 @@ CREATE TEMPORARY TABLE `customertransaction_1` (
9482 PRIMARY KEY (` transactionId` , ` time` ) NOT ENFORCED
9583) WITH (
9684 ' password' = ' ${JDBC_PASSWORD}' ,
97- ' connector' = ' jdbc-sqrl ' ,
85+ ' connector' = ' jdbc' ,
9886 ' driver' = ' org.postgresql.Driver' ,
9987 ' table-name' = ' customertransaction_1' ,
10088 ' url' = ' ${JDBC_URL}' ,
@@ -109,7 +97,7 @@ CREATE TEMPORARY TABLE `spendingbycategory_1` (
10997 PRIMARY KEY (` customerid` , ` timeWeek` , ` category` ) NOT ENFORCED
11098) WITH (
11199 ' password' = ' ${JDBC_PASSWORD}' ,
112- ' connector' = ' jdbc-sqrl ' ,
100+ ' connector' = ' jdbc' ,
113101 ' driver' = ' org.postgresql.Driver' ,
114102 ' table-name' = ' spendingbycategory_1' ,
115103 ' url' = ' ${JDBC_URL}' ,
@@ -132,7 +120,7 @@ WHERE `_rownum` = 1;
132120
133121CREATE VIEW ` table$3`
134122AS
135- SELECT ` $cor0` .` customerId` AS ` customerid` , ENDOFDAY( ` $cor0` .` time` ) AS ` timeDay` , ` $cor0` .` amount` , ` $cor0` .` transactionId` , ` $cor0` .` time`
123+ SELECT ` $cor0` .` customerId` AS ` customerid` , CAST(FLOOR( ` $cor0` .` time` TO DAY) + INTERVAL ' 1 ' DAY AS TIMESTAMP_LTZ( 3 ) ) AS ` timeDay` , ` $cor0` .` amount` , ` $cor0` .` transactionId` , ` $cor0` .` time`
136124FROM (SELECT *
137125 FROM ` transaction_1` AS ` $cor1`
138126 INNER JOIN ` table$2` FOR SYSTEM_TIME AS OF ` $cor1` .` time` AS ` t2` ON ` $cor1` .` cardNo` = ` t2` .` cardNo` ) AS ` $cor0`
@@ -182,11 +170,21 @@ WHERE `_rownum` = 1;
182170
183171CREATE VIEW ` table$10`
184172AS
185- SELECT ` $cor6` .` customerId` AS ` customerid` , ENDOFWEEK(` $cor6` .` time` ) AS ` timeWeek` , ` t0` .` category` , ` $cor6` .` amount` , ` $cor6` .` transactionId` , ` $cor6` .` time`
186- FROM (SELECT *
173+ SELECT
174+ ` $cor6` .` customerId` AS ` customerid` ,
175+ CAST(FLOOR(` $cor6` .` time` TO DAY) + INTERVAL ' 1' DAY * (7 - EXTRACT(DOW FROM ` $cor6` .` time` )) AS TIMESTAMP_LTZ(3 )) AS ` timeWeek` ,
176+ ` t0` .` category` ,
177+ ` $cor6` .` amount` ,
178+ ` $cor6` .` transactionId` ,
179+ ` $cor6` .` time`
180+ FROM (
181+ SELECT *
187182 FROM ` transaction_1` AS ` $cor7`
188- INNER JOIN ` table$9` FOR SYSTEM_TIME AS OF ` $cor7` .` time` AS ` t2` ON ` $cor7` .` cardNo` = ` t2` .` cardNo` ) AS ` $cor6`
189- INNER JOIN ` table$8` FOR SYSTEM_TIME AS OF ` $cor6` .` time` AS ` t0` ON ` $cor6` .` merchantId` = ` t0` .` merchantId` ;
183+ INNER JOIN ` table$9` FOR SYSTEM_TIME AS OF ` $cor7` .` time` AS ` t2`
184+ ON ` $cor7` .` cardNo` = ` t2` .` cardNo`
185+ ) AS ` $cor6`
186+ INNER JOIN ` table$8` FOR SYSTEM_TIME AS OF ` $cor6` .` time` AS ` t0`
187+ ON ` $cor6` .` merchantId` = ` t0` .` merchantId` ;
190188
191189CREATE VIEW ` table$11`
192190AS
@@ -200,12 +198,11 @@ INSERT INTO `_spendingbyday_1`
200198 FROM ` table$4` )
201199;
202200INSERT INTO ` customertransaction_1`
203- (SELECT *
201+ (SELECT *
204202 FROM ` table$7` )
205- ;
206- INSERT INTO ` spendingbycategory_1`
207- (SELECT *
203+ ;
204+ INSERT INTO ` spendingbycategory_1`
205+ (SELECT *
208206 FROM ` table$11` )
209- ;
210- END;
211-
207+ ;
208+ END;
0 commit comments