diff --git a/pom.xml b/pom.xml
index a786d36..50ac245 100644
--- a/pom.xml
+++ b/pom.xml
@@ -147,6 +147,27 @@
test
+
+ org.apache.commons
+ commons-exec
+ 1.4.0
+ test
+
+
+
+
+ org.jdbi
+ jdbi3-core
+ 3.45.1
+ test
+
+
+ org.jdbi
+ jdbi3-sqlobject
+ 3.45.1
+ test
+
+
org.apache.flink
flink-connector-kafka
@@ -203,6 +224,7 @@
${flink.version}
+
central
diff --git a/src/test/docker/docker-compose.yml b/src/test/docker/docker-compose.yml
index f25309c..b1ac9c0 100644
--- a/src/test/docker/docker-compose.yml
+++ b/src/test/docker/docker-compose.yml
@@ -22,13 +22,15 @@ networks:
services:
postgres:
- image: postgres:16
+ image: postgres:17
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: datasqrl
ports:
- "5432:5432"
+ volumes:
+ - ./sqrl/postgres-schema.sql:/docker-entrypoint-initdb.d/database-schema.sql:ro
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 1s
@@ -38,18 +40,17 @@ services:
- datasqrl_network
jobmanager:
- user: 1000:1000
image: flink:${flink-base-image}
environment:
- JDBC_URL=jdbc:postgresql://postgres:5432/datasqrl
- JDBC_USERNAME=postgres
- JDBC_PASSWORD=postgres
- - PROPERTIES_BOOTSTRAP_SERVERS=kafka:9092
- - PROPERTIES_GROUP_ID=mygroupid
- |
FLINK_PROPERTIES=
taskmanager.slot.timeout: 30000ms
jobmanager.rpc.address: jobmanager
+ state.savepoints-storage: filesystem
+ state.savepoints.dir: file:///tmp/flink-jar-runner
command: jobmanager
ports:
- "8081:8081" # Flink JobManager REST port
@@ -62,27 +63,32 @@ services:
- flink_conf:/opt/flink/conf
- ./:/opt/flink/usrlib/
- ./datasources/:/datasources/
+ - /tmp:/tmp
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:8081/ || exit 1"]
interval: 1s
timeout: 1s
retries: 50
+ restart: always
+ deploy:
+ resources:
+ limits:
+ memory: 2g
networks:
- datasqrl_network
taskmanager:
- user: 1000:1000
image: flink:${flink-base-image}
environment:
- JDBC_URL=jdbc:postgresql://postgres:5432/datasqrl
- JDBC_USERNAME=postgres
- JDBC_PASSWORD=postgres
- - PROPERTIES_BOOTSTRAP_SERVERS=kafka:9092
- - PROPERTIES_GROUP_ID=mygroupid
- |
FLINK_PROPERTIES=
taskmanager.slot.timeout: 30000ms
jobmanager.rpc.address: jobmanager
+ state.savepoints-storage: filesystem
+ state.savepoints.dir: file:///tmp/flink-jar-runner
command: taskmanager
depends_on:
jobmanager:
@@ -91,6 +97,12 @@ services:
- flink_conf:/opt/flink/conf
- ./:/opt/flink/usrlib/
- ./datasources/:/datasources/
+ - /tmp:/tmp
+ restart: always
+ deploy:
+ resources:
+ limits:
+ memory: 2g
networks:
- datasqrl_network
diff --git a/src/test/java/com/datasqrl/CommandLineUtil.java b/src/test/java/com/datasqrl/CommandLineUtil.java
new file mode 100644
index 0000000..a31ea91
--- /dev/null
+++ b/src/test/java/com/datasqrl/CommandLineUtil.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright © 2024 DataSQRL (contact@datasqrl.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datasqrl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.time.Duration;
+import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.exec.CommandLine;
+import org.apache.commons.exec.DefaultExecutor;
+import org.apache.commons.exec.ExecuteException;
+import org.apache.commons.exec.ExecuteWatchdog;
+import org.apache.commons.exec.PumpStreamHandler;
+
+@Slf4j
+@UtilityClass
+public class CommandLineUtil {
+
+ public static String execute(String command) throws ExecuteException {
+ return execute(Path.of("."), command);
+ }
+
+ public static String execute(Path workDir, String command) throws ExecuteException {
+ command = command.replaceAll("\\s+", " ").trim();
+
+ log.info("Executing command: {}", command);
+
+ var cmdLine = CommandLine.parse(command);
+
+ var output = new ByteArrayOutputStream();
+ var streamHandler = new PumpStreamHandler(output);
+
+ var executor =
+ DefaultExecutor.builder()
+ .setWorkingDirectory(workDir.toFile())
+ .setExecuteStreamHandler(streamHandler)
+ .get();
+ executor.setExitValue(0);
+
+ var watchdog = ExecuteWatchdog.builder().setTimeout(Duration.ofMinutes(5)).get();
+ executor.setWatchdog(watchdog);
+
+ try {
+ var exitValue =
+ // This call is synchronous and will block until the command completes
+ executor.execute(cmdLine);
+ log.info("Installation completed successfully with exit code: {}", exitValue);
+
+ return new String(output.toByteArray());
+ } catch (IOException e) {
+ var result = new String(output.toByteArray());
+ log.error("Error while executing command:\n{}\noutput:\n{}", command, result);
+ if (e instanceof ExecuteException) {
+ ExecuteException ee = (ExecuteException) e;
+ throw new ExecuteException(result, ee.getExitValue(), ee);
+ }
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/src/test/java/com/datasqrl/FlinkMainIT.java b/src/test/java/com/datasqrl/FlinkMainIT.java
index 29c17d2..fcb47da 100644
--- a/src/test/java/com/datasqrl/FlinkMainIT.java
+++ b/src/test/java/com/datasqrl/FlinkMainIT.java
@@ -15,16 +15,27 @@
*/
package com.datasqrl;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.fail;
+import com.google.common.collect.MoreCollectors;
import com.nextbreakpoint.flink.client.model.JarRunResponseBody;
import com.nextbreakpoint.flink.client.model.JobExceptionsInfoWithHistory;
import com.nextbreakpoint.flink.client.model.JobStatus;
import com.nextbreakpoint.flink.client.model.TerminationMode;
import com.nextbreakpoint.flink.client.model.UploadStatus;
import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.OffsetDateTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -33,6 +44,17 @@
import lombok.SneakyThrows;
import org.apache.flink.shaded.curator5.com.google.common.base.Objects;
import org.apache.flink.shaded.curator5.com.google.common.collect.Lists;
+import org.awaitility.core.ThrowingRunnable;
+import org.jdbi.v3.core.Jdbi;
+import org.jdbi.v3.core.statement.SqlLogger;
+import org.jdbi.v3.core.statement.StatementContext;
+import org.jdbi.v3.sqlobject.SqlObjectPlugin;
+import org.jdbi.v3.sqlobject.customizer.Bind;
+import org.jdbi.v3.sqlobject.statement.SqlQuery;
+import org.jdbi.v3.sqlobject.statement.SqlUpdate;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
@@ -40,6 +62,35 @@
class FlinkMainIT extends AbstractITSupport {
+ interface TransactionDao {
+
+ @SqlQuery(
+ "SELECT count(1) FROM public.transactionbymerchant_1 t where t.\"__timestamp\" > :minDate")
+ int getRowCount(@Bind("minDate") Timestamp minDate);
+
+ @SqlQuery("SELECT count(1) FROM public.transactionbymerchant_1 t")
+ int getRowCount();
+
+ @SqlQuery("SELECT double_ta FROM public.transactionbymerchant_1 limit 1")
+ int getDoubleTA();
+
+ @SqlQuery(
+ "SELECT max(transactionbymerchant_1.\"__timestamp\") FROM public.transactionbymerchant_1")
+ OffsetDateTime getMaxTimestamp();
+
+ @SqlUpdate("TRUNCATE public.transactionbymerchant_1")
+ void truncateTable();
+ }
+
+ @BeforeEach
+ @AfterEach
+ void terminateJobs() throws Exception {
+ var jobs = client.getJobsOverview();
+ jobs.getJobs().stream()
+ .filter(job -> Objects.equal(job.getState(), JobStatus.RUNNING))
+ .forEach(job -> stopJobs(job.getJid()));
+ }
+
static Stream sqlScripts() {
var scripts = List.of("flink.sql", "test_sql.sql");
var config = List.of(true, false);
@@ -82,8 +133,119 @@ void givenPlansScript_whenExecuting_thenSuccess(String filename, boolean config)
execute(args.toArray(String[]::new));
}
+ @Test
+ void givenPlanScript_whenCheckPoint_thenResumeSuccessfulySuccess() throws Exception {
+ var transactionDao = connect();
+ // clear any old data
+ transactionDao.truncateTable();
+ assertThat(transactionDao.getRowCount()).isZero();
+
+ // change the compilation plan
+ updateCompiledPlan(3);
+
+ String planFile = "/opt/flink/usrlib/sqrl/compiled-plan.json";
+ var args = new ArrayList();
+ args.add("--planfile");
+ args.add(planFile);
+ args.add("--config-dir");
+ args.add("/opt/flink/usrlib/config/");
+ var jobResponse = execute(args.toArray(String[]::new));
+
+ untilAssert(() -> assertThat(transactionDao.getRowCount()).isEqualTo(10));
+ // test if initial change to compilation plan took effect
+ assertThat(transactionDao.getDoubleTA()).isEqualTo(3);
+
+ Path savePoint = Path.of("/tmp/flink-jar-runner/" + System.currentTimeMillis());
+
+ // take a savepoint
+ CommandLineUtil.execute(
+ Path.of("."),
+ "docker exec -t flink-jar-runner-jobmanager-1 bin/flink stop "
+ + jobResponse.getJobid()
+ + " --savepointPath "
+ + savePoint);
+
+ // check if STOPed, make sure no new records are create
+ untilAssert(
+ () ->
+ assertThat(
+ transactionDao.getRowCount(
+ Timestamp.valueOf(
+ ZonedDateTime.now()
+ .withZoneSameInstant(ZoneId.of("Z"))
+ .toLocalDateTime())))
+ .isEqualTo(0));
+ assertThat(savePoint).isNotEmptyDirectory();
+
+ Path savePointFile;
+ try (var filePathStream = Files.walk(savePoint)) {
+ savePointFile =
+ filePathStream
+ .filter(Files::isRegularFile)
+ .filter(path -> "_metadata".equals(path.getFileName().toString()))
+ .collect(MoreCollectors.onlyElement());
+ }
+ // make sure savepoint was created
+ assertThat(savePointFile).exists();
+
+ // change compiled plan again
+ updateCompiledPlan(5);
+
+ var restoration =
+ Timestamp.valueOf(
+ ZonedDateTime.now().withZoneSameInstant(ZoneId.of("Z")).toLocalDateTime());
+
+ // restart with savepoint
+ restoreAndExecute(savePointFile.getParent().toString(), args.toArray(String[]::new));
+
+ untilAssert(() -> assertThat(transactionDao.getRowCount(restoration)).isEqualTo(10));
+ assertThat(transactionDao.getDoubleTA()).isEqualTo(5);
+ }
+
@SneakyThrows
+ private void updateCompiledPlan(int newValue) {
+ var contents = Files.readString(Path.of("src/test/resources/sqrl/compiled-plan.json"), UTF_8);
+ contents = contents.replace("\"value\" : 2", "\"value\" : " + newValue);
+ Files.writeString(Path.of("target/test-classes/sqrl/compiled-plan.json"), contents, UTF_8);
+ }
+
+ public void untilAssert(ThrowingRunnable assertion) {
+ await()
+ .atMost(20, SECONDS)
+ .pollInterval(100, MILLISECONDS)
+ .ignoreExceptions()
+ .untilAsserted(assertion);
+ }
+
+ private TransactionDao connect() {
+ var jdbi = Jdbi.create("jdbc:postgresql://localhost:5432/datasqrl", "postgres", "postgres");
+ jdbi.installPlugin(new SqlObjectPlugin());
+ jdbi.setSqlLogger(
+ new SqlLogger() {
+ @Override
+ public void logAfterExecution(StatementContext context) {
+ System.out.printf(
+ "Executed in '%s' with parameters '%s'\n",
+ context.getParsedSql().getSql(), context.getBinding());
+ }
+
+ @Override
+ public void logException(StatementContext context, SQLException ex) {
+ System.out.printf(
+ "Exception while executing '%s' with parameters '%s'\n",
+ context.getParsedSql().getSql(), context.getBinding(), ex);
+ }
+ });
+
+ return jdbi.onDemand(TransactionDao.class);
+ }
+
JarRunResponseBody execute(String... arguments) {
+ return restoreAndExecute(null, arguments);
+ }
+
+ @SneakyThrows
+ JarRunResponseBody restoreAndExecute(String savepointPath, String... arguments) {
var jarFile = new File("target/flink-jar-runner.uber.jar");
var uploadResponse = client.uploadJar(jarFile);
@@ -91,7 +253,7 @@ JarRunResponseBody execute(String... arguments) {
assertThat(uploadResponse.getStatus()).isEqualTo(UploadStatus.SUCCESS);
// Step 2: Extract jarId from the response
- String jarId =
+ var jarId =
uploadResponse.getFilename().substring(uploadResponse.getFilename().lastIndexOf("/") + 1);
// Step 3: Submit the job
@@ -99,8 +261,8 @@ JarRunResponseBody execute(String... arguments) {
client.submitJobFromJar(
jarId,
null,
- null,
- null,
+ savepointPath == null ? null : false,
+ savepointPath,
null,
Arrays.stream(arguments).collect(Collectors.joining(",")),
null,
@@ -108,8 +270,13 @@ JarRunResponseBody execute(String... arguments) {
String jobId = jobResponse.getJobid();
assertThat(jobId).isNotNull();
- SECONDS.sleep(10);
+ SECONDS.sleep(2);
+ return jobResponse;
+ }
+
+ @SneakyThrows
+ private void stopJobs(String jobId) {
var status = client.getJobStatusInfo(jobId);
if (Objects.equal(status.getStatus(), JobStatus.RUNNING)) {
client.cancelJob(jobId, TerminationMode.CANCEL);
@@ -117,8 +284,6 @@ JarRunResponseBody execute(String... arguments) {
JobExceptionsInfoWithHistory exceptions = client.getJobExceptions(jobId, 5, null);
fail(exceptions.toString());
}
-
- return jobResponse;
}
@ParameterizedTest(name = "{0}")
diff --git a/src/test/resources/sqrl/compiled-plan.json b/src/test/resources/sqrl/compiled-plan.json
new file mode 100644
index 0000000..887da8f
--- /dev/null
+++ b/src/test/resources/sqrl/compiled-plan.json
@@ -0,0 +1,256 @@
+{
+ "flinkVersion" : "1.19",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`_transaction_1`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "transactionId",
+ "dataType" : "BIGINT NOT NULL"
+ }, {
+ "name" : "cardNo",
+ "dataType" : "DOUBLE NOT NULL"
+ }, {
+ "name" : "time",
+ "dataType" : "TIMESTAMP NOT NULL"
+ }, {
+ "name" : "amount",
+ "dataType" : "DOUBLE NOT NULL"
+ }, {
+ "name" : "merchantId",
+ "dataType" : "BIGINT NOT NULL"
+ } ],
+ "watermarkSpecs" : [ ],
+ "primaryKey" : {
+ "name" : "PK_transactionId_time",
+ "type" : "PRIMARY_KEY",
+ "columns" : [ "transactionId", "time" ]
+ }
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "datagen",
+ "rows-per-second" : "10",
+ "fields.merchantId.min" : "1",
+ "fields.merchantId.max" : "10",
+ "fields.amount.min" : "1",
+ "fields.amount.max" : "20"
+ }
+ }
+ }
+ },
+ "outputType" : "ROW<`transactionId` BIGINT NOT NULL, `cardNo` DOUBLE NOT NULL, `time` TIMESTAMP NOT NULL, `amount` DOUBLE NOT NULL, `merchantId` BIGINT NOT NULL>",
+ "description" : "TableSourceScan(table=[[default_catalog, default_database, _transaction_1]], fields=[transactionId, cardNo, time, amount, merchantId])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : "BIGINT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "DOUBLE NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "TIMESTAMP NOT NULL"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`merchantId` BIGINT NOT NULL, `amount` DOUBLE NOT NULL, `time` TIMESTAMP NOT NULL>",
+ "description" : "Calc(select=[merchantId, amount, time])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`merchantId` BIGINT NOT NULL, `amount` DOUBLE NOT NULL, `time` TIMESTAMP NOT NULL>",
+ "description" : "Exchange(distribution=[hash[merchantId]])"
+ }, {
+ "id" : 4,
+ "type" : "stream-exec-group-aggregate_1",
+ "configuration" : {
+ "table.exec.mini-batch.enabled" : "false",
+ "table.exec.mini-batch.size" : "-1"
+ },
+ "grouping" : [ 0 ],
+ "aggCalls" : [ {
+ "name" : "max_amount",
+ "internalName" : "$MAX$1",
+ "argList" : [ 1 ],
+ "filterArg" : -1,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : "DOUBLE NOT NULL"
+ }, {
+ "name" : "__timestamp",
+ "internalName" : "$MAX$1",
+ "argList" : [ 2 ],
+ "filterArg" : -1,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : "TIMESTAMP NOT NULL"
+ } ],
+ "aggCallNeedRetractions" : [ false, false ],
+ "generateUpdateBefore" : false,
+ "needRetraction" : false,
+ "state" : [ {
+ "index" : 0,
+ "ttl" : "0 ms",
+ "name" : "groupAggregateState"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`merchantId` BIGINT NOT NULL, `max_amount` DOUBLE NOT NULL, `__timestamp` TIMESTAMP NOT NULL>",
+ "description" : "GroupAggregate(groupBy=[merchantId], select=[merchantId, MAX(amount) AS max_amount, MAX(time) AS __timestamp])"
+ }, {
+ "id" : 5,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 2,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "DOUBLE NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "TIMESTAMP NOT NULL"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`merchantId` BIGINT NOT NULL, `double_ta` INT NOT NULL, `max_amount` DOUBLE NOT NULL, `__timestamp` TIMESTAMP NOT NULL>",
+ "description" : "Calc(select=[merchantId, 2 AS double_ta, max_amount, __timestamp])"
+ }, {
+ "id" : 6,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`transactionbymerchant_1`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "merchantId",
+ "dataType" : "BIGINT NOT NULL"
+ }, {
+ "name" : "double_ta",
+ "dataType" : "INT NOT NULL"
+ }, {
+ "name" : "max_amount",
+ "dataType" : "DOUBLE NOT NULL"
+ }, {
+ "name" : "__timestamp",
+ "dataType" : "TIMESTAMP NOT NULL"
+ } ],
+ "watermarkSpecs" : [ ],
+ "primaryKey" : {
+ "name" : "PK_merchantId",
+ "type" : "PRIMARY_KEY",
+ "columns" : [ "merchantId" ]
+ }
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "jdbc",
+ "url" : "jdbc:postgresql://postgres:5432/datasqrl",
+ "table-name" : "transactionbymerchant_1",
+ "username" : "postgres",
+ "password" : "postgres"
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT", "UPDATE_AFTER" ],
+ "inputUpsertKey" : [ 0 ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`merchantId` BIGINT NOT NULL, `double_ta` INT NOT NULL, `max_amount` DOUBLE NOT NULL, `__timestamp` TIMESTAMP NOT NULL>",
+ "description" : "Sink(table=[default_catalog.default_database.transactionbymerchant_1], fields=[merchantId, double_ta, max_amount, __timestamp])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 4,
+ "target" : 5,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 5,
+ "target" : 6,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git a/src/test/resources/sqrl/creditcard-local/transaction.schema.yml b/src/test/resources/sqrl/creditcard-local/transaction.schema.yml
new file mode 100644
index 0000000..043837b
--- /dev/null
+++ b/src/test/resources/sqrl/creditcard-local/transaction.schema.yml
@@ -0,0 +1,25 @@
+---
+name: "transaction"
+schema_version: "1"
+partial_schema: false
+columns:
+- name: "transactionId"
+ type: "BIGINT"
+ tests:
+ - "not_null"
+- name: "cardNo"
+ type: "DOUBLE"
+ tests:
+ - "not_null"
+- name: "time"
+ type: "TIMESTAMP"
+ tests:
+ - "not_null"
+- name: "amount"
+ type: "DOUBLE"
+ tests:
+ - "not_null"
+- name: "merchantId"
+ type: "BIGINT"
+ tests:
+ - "not_null"
\ No newline at end of file
diff --git a/src/test/resources/sqrl/creditcard-local/transaction.table.json b/src/test/resources/sqrl/creditcard-local/transaction.table.json
new file mode 100644
index 0000000..6692efd
--- /dev/null
+++ b/src/test/resources/sqrl/creditcard-local/transaction.table.json
@@ -0,0 +1,14 @@
+{
+ "flink" : {
+ "format" : "flexible-json",
+ "path" : "s3://sqrl-examples-data-bucket/cc-datasqrl-example/d42a69082/transaction.jsonl",
+ "source.monitor-interval" : "10 min",
+ "connector" : "filesystem"
+ },
+ "version" : 1,
+ "table" : {
+ "type" : "source",
+ "primary-key" : ["transactionId", "time"],
+ "timestamp" : "time"
+ }
+}
diff --git a/src/test/resources/sqrl/package.json b/src/test/resources/sqrl/package.json
new file mode 100644
index 0000000..aba8650
--- /dev/null
+++ b/src/test/resources/sqrl/package.json
@@ -0,0 +1,15 @@
+{
+ "version": "1",
+ "enabled-engines": ["vertx", "postgres", "kafka", "flink"],
+ "profiles": ["datasqrl.profile.default"],
+ "script": {
+ "main": "script.sqrl"
+ },
+ "dependencies": [{
+ "creditcard-data": {
+ "name": "creditcard-local",
+ "version": "1",
+ "variant": "dev"
+ }
+ }]
+}
diff --git a/src/test/resources/sqrl/postgres-schema.sql b/src/test/resources/sqrl/postgres-schema.sql
new file mode 100644
index 0000000..a6536e1
--- /dev/null
+++ b/src/test/resources/sqrl/postgres-schema.sql
@@ -0,0 +1,4 @@
+CREATE TABLE IF NOT EXISTS transactionbymerchant_1 ("merchantid" BIGINT NOT NULL,"double_ta" DOUBLE PRECISION NOT NULL,"max_amount" DOUBLE PRECISION NOT NULL,"__timestamp" TIMESTAMP
+ NOT NULL , PRIMARY KEY ("merchantid"));
+
+
diff --git a/src/test/resources/sqrl/script.sqrl b/src/test/resources/sqrl/script.sqrl
new file mode 100644
index 0000000..eb0fb30
--- /dev/null
+++ b/src/test/resources/sqrl/script.sqrl
@@ -0,0 +1,12 @@
+IMPORT creditcard-data.Transaction AS _Transaction;
+
+_TransactionAggregate := SELECT merchantId,
+ 1 as total_amount,
+ max(amount) as max_amount
+ FROM _Transaction
+ GROUP BY merchantId;
+
+TransactionByMerchant := SELECT merchantId,
+ total_amount * 2 as double_ta,
+ max_amount
+ FROM _TransactionAggregate;