diff --git a/pom.xml b/pom.xml index a786d36..50ac245 100644 --- a/pom.xml +++ b/pom.xml @@ -147,6 +147,27 @@ test + + org.apache.commons + commons-exec + 1.4.0 + test + + + + + org.jdbi + jdbi3-core + 3.45.1 + test + + + org.jdbi + jdbi3-sqlobject + 3.45.1 + test + + org.apache.flink flink-connector-kafka @@ -203,6 +224,7 @@ ${flink.version} + central diff --git a/src/test/docker/docker-compose.yml b/src/test/docker/docker-compose.yml index f25309c..b1ac9c0 100644 --- a/src/test/docker/docker-compose.yml +++ b/src/test/docker/docker-compose.yml @@ -22,13 +22,15 @@ networks: services: postgres: - image: postgres:16 + image: postgres:17 environment: POSTGRES_USER: postgres POSTGRES_PASSWORD: postgres POSTGRES_DB: datasqrl ports: - "5432:5432" + volumes: + - ./sqrl/postgres-schema.sql:/docker-entrypoint-initdb.d/database-schema.sql:ro healthcheck: test: ["CMD-SHELL", "pg_isready -U postgres"] interval: 1s @@ -38,18 +40,17 @@ services: - datasqrl_network jobmanager: - user: 1000:1000 image: flink:${flink-base-image} environment: - JDBC_URL=jdbc:postgresql://postgres:5432/datasqrl - JDBC_USERNAME=postgres - JDBC_PASSWORD=postgres - - PROPERTIES_BOOTSTRAP_SERVERS=kafka:9092 - - PROPERTIES_GROUP_ID=mygroupid - | FLINK_PROPERTIES= taskmanager.slot.timeout: 30000ms jobmanager.rpc.address: jobmanager + state.savepoints-storage: filesystem + state.savepoints.dir: file:///tmp/flink-jar-runner command: jobmanager ports: - "8081:8081" # Flink JobManager REST port @@ -62,27 +63,32 @@ services: - flink_conf:/opt/flink/conf - ./:/opt/flink/usrlib/ - ./datasources/:/datasources/ + - /tmp:/tmp healthcheck: test: ["CMD-SHELL", "curl -f http://localhost:8081/ || exit 1"] interval: 1s timeout: 1s retries: 50 + restart: always + deploy: + resources: + limits: + memory: 2g networks: - datasqrl_network taskmanager: - user: 1000:1000 image: flink:${flink-base-image} environment: - JDBC_URL=jdbc:postgresql://postgres:5432/datasqrl - JDBC_USERNAME=postgres - JDBC_PASSWORD=postgres - - PROPERTIES_BOOTSTRAP_SERVERS=kafka:9092 - - PROPERTIES_GROUP_ID=mygroupid - | FLINK_PROPERTIES= taskmanager.slot.timeout: 30000ms jobmanager.rpc.address: jobmanager + state.savepoints-storage: filesystem + state.savepoints.dir: file:///tmp/flink-jar-runner command: taskmanager depends_on: jobmanager: @@ -91,6 +97,12 @@ services: - flink_conf:/opt/flink/conf - ./:/opt/flink/usrlib/ - ./datasources/:/datasources/ + - /tmp:/tmp + restart: always + deploy: + resources: + limits: + memory: 2g networks: - datasqrl_network diff --git a/src/test/java/com/datasqrl/CommandLineUtil.java b/src/test/java/com/datasqrl/CommandLineUtil.java new file mode 100644 index 0000000..a31ea91 --- /dev/null +++ b/src/test/java/com/datasqrl/CommandLineUtil.java @@ -0,0 +1,75 @@ +/* + * Copyright © 2024 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.file.Path; +import java.time.Duration; +import lombok.experimental.UtilityClass; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.exec.CommandLine; +import org.apache.commons.exec.DefaultExecutor; +import org.apache.commons.exec.ExecuteException; +import org.apache.commons.exec.ExecuteWatchdog; +import org.apache.commons.exec.PumpStreamHandler; + +@Slf4j +@UtilityClass +public class CommandLineUtil { + + public static String execute(String command) throws ExecuteException { + return execute(Path.of("."), command); + } + + public static String execute(Path workDir, String command) throws ExecuteException { + command = command.replaceAll("\\s+", " ").trim(); + + log.info("Executing command: {}", command); + + var cmdLine = CommandLine.parse(command); + + var output = new ByteArrayOutputStream(); + var streamHandler = new PumpStreamHandler(output); + + var executor = + DefaultExecutor.builder() + .setWorkingDirectory(workDir.toFile()) + .setExecuteStreamHandler(streamHandler) + .get(); + executor.setExitValue(0); + + var watchdog = ExecuteWatchdog.builder().setTimeout(Duration.ofMinutes(5)).get(); + executor.setWatchdog(watchdog); + + try { + var exitValue = + // This call is synchronous and will block until the command completes + executor.execute(cmdLine); + log.info("Installation completed successfully with exit code: {}", exitValue); + + return new String(output.toByteArray()); + } catch (IOException e) { + var result = new String(output.toByteArray()); + log.error("Error while executing command:\n{}\noutput:\n{}", command, result); + if (e instanceof ExecuteException) { + ExecuteException ee = (ExecuteException) e; + throw new ExecuteException(result, ee.getExitValue(), ee); + } + throw new RuntimeException(e); + } + } +} diff --git a/src/test/java/com/datasqrl/FlinkMainIT.java b/src/test/java/com/datasqrl/FlinkMainIT.java index 29c17d2..fcb47da 100644 --- a/src/test/java/com/datasqrl/FlinkMainIT.java +++ b/src/test/java/com/datasqrl/FlinkMainIT.java @@ -15,16 +15,27 @@ */ package com.datasqrl; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.fail; +import com.google.common.collect.MoreCollectors; import com.nextbreakpoint.flink.client.model.JarRunResponseBody; import com.nextbreakpoint.flink.client.model.JobExceptionsInfoWithHistory; import com.nextbreakpoint.flink.client.model.JobStatus; import com.nextbreakpoint.flink.client.model.TerminationMode; import com.nextbreakpoint.flink.client.model.UploadStatus; import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.OffsetDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -33,6 +44,17 @@ import lombok.SneakyThrows; import org.apache.flink.shaded.curator5.com.google.common.base.Objects; import org.apache.flink.shaded.curator5.com.google.common.collect.Lists; +import org.awaitility.core.ThrowingRunnable; +import org.jdbi.v3.core.Jdbi; +import org.jdbi.v3.core.statement.SqlLogger; +import org.jdbi.v3.core.statement.StatementContext; +import org.jdbi.v3.sqlobject.SqlObjectPlugin; +import org.jdbi.v3.sqlobject.customizer.Bind; +import org.jdbi.v3.sqlobject.statement.SqlQuery; +import org.jdbi.v3.sqlobject.statement.SqlUpdate; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.CsvSource; @@ -40,6 +62,35 @@ class FlinkMainIT extends AbstractITSupport { + interface TransactionDao { + + @SqlQuery( + "SELECT count(1) FROM public.transactionbymerchant_1 t where t.\"__timestamp\" > :minDate") + int getRowCount(@Bind("minDate") Timestamp minDate); + + @SqlQuery("SELECT count(1) FROM public.transactionbymerchant_1 t") + int getRowCount(); + + @SqlQuery("SELECT double_ta FROM public.transactionbymerchant_1 limit 1") + int getDoubleTA(); + + @SqlQuery( + "SELECT max(transactionbymerchant_1.\"__timestamp\") FROM public.transactionbymerchant_1") + OffsetDateTime getMaxTimestamp(); + + @SqlUpdate("TRUNCATE public.transactionbymerchant_1") + void truncateTable(); + } + + @BeforeEach + @AfterEach + void terminateJobs() throws Exception { + var jobs = client.getJobsOverview(); + jobs.getJobs().stream() + .filter(job -> Objects.equal(job.getState(), JobStatus.RUNNING)) + .forEach(job -> stopJobs(job.getJid())); + } + static Stream sqlScripts() { var scripts = List.of("flink.sql", "test_sql.sql"); var config = List.of(true, false); @@ -82,8 +133,119 @@ void givenPlansScript_whenExecuting_thenSuccess(String filename, boolean config) execute(args.toArray(String[]::new)); } + @Test + void givenPlanScript_whenCheckPoint_thenResumeSuccessfulySuccess() throws Exception { + var transactionDao = connect(); + // clear any old data + transactionDao.truncateTable(); + assertThat(transactionDao.getRowCount()).isZero(); + + // change the compilation plan + updateCompiledPlan(3); + + String planFile = "/opt/flink/usrlib/sqrl/compiled-plan.json"; + var args = new ArrayList(); + args.add("--planfile"); + args.add(planFile); + args.add("--config-dir"); + args.add("/opt/flink/usrlib/config/"); + var jobResponse = execute(args.toArray(String[]::new)); + + untilAssert(() -> assertThat(transactionDao.getRowCount()).isEqualTo(10)); + // test if initial change to compilation plan took effect + assertThat(transactionDao.getDoubleTA()).isEqualTo(3); + + Path savePoint = Path.of("/tmp/flink-jar-runner/" + System.currentTimeMillis()); + + // take a savepoint + CommandLineUtil.execute( + Path.of("."), + "docker exec -t flink-jar-runner-jobmanager-1 bin/flink stop " + + jobResponse.getJobid() + + " --savepointPath " + + savePoint); + + // check if STOPed, make sure no new records are create + untilAssert( + () -> + assertThat( + transactionDao.getRowCount( + Timestamp.valueOf( + ZonedDateTime.now() + .withZoneSameInstant(ZoneId.of("Z")) + .toLocalDateTime()))) + .isEqualTo(0)); + assertThat(savePoint).isNotEmptyDirectory(); + + Path savePointFile; + try (var filePathStream = Files.walk(savePoint)) { + savePointFile = + filePathStream + .filter(Files::isRegularFile) + .filter(path -> "_metadata".equals(path.getFileName().toString())) + .collect(MoreCollectors.onlyElement()); + } + // make sure savepoint was created + assertThat(savePointFile).exists(); + + // change compiled plan again + updateCompiledPlan(5); + + var restoration = + Timestamp.valueOf( + ZonedDateTime.now().withZoneSameInstant(ZoneId.of("Z")).toLocalDateTime()); + + // restart with savepoint + restoreAndExecute(savePointFile.getParent().toString(), args.toArray(String[]::new)); + + untilAssert(() -> assertThat(transactionDao.getRowCount(restoration)).isEqualTo(10)); + assertThat(transactionDao.getDoubleTA()).isEqualTo(5); + } + @SneakyThrows + private void updateCompiledPlan(int newValue) { + var contents = Files.readString(Path.of("src/test/resources/sqrl/compiled-plan.json"), UTF_8); + contents = contents.replace("\"value\" : 2", "\"value\" : " + newValue); + Files.writeString(Path.of("target/test-classes/sqrl/compiled-plan.json"), contents, UTF_8); + } + + public void untilAssert(ThrowingRunnable assertion) { + await() + .atMost(20, SECONDS) + .pollInterval(100, MILLISECONDS) + .ignoreExceptions() + .untilAsserted(assertion); + } + + private TransactionDao connect() { + var jdbi = Jdbi.create("jdbc:postgresql://localhost:5432/datasqrl", "postgres", "postgres"); + jdbi.installPlugin(new SqlObjectPlugin()); + jdbi.setSqlLogger( + new SqlLogger() { + @Override + public void logAfterExecution(StatementContext context) { + System.out.printf( + "Executed in '%s' with parameters '%s'\n", + context.getParsedSql().getSql(), context.getBinding()); + } + + @Override + public void logException(StatementContext context, SQLException ex) { + System.out.printf( + "Exception while executing '%s' with parameters '%s'\n", + context.getParsedSql().getSql(), context.getBinding(), ex); + } + }); + + return jdbi.onDemand(TransactionDao.class); + } + JarRunResponseBody execute(String... arguments) { + return restoreAndExecute(null, arguments); + } + + @SneakyThrows + JarRunResponseBody restoreAndExecute(String savepointPath, String... arguments) { var jarFile = new File("target/flink-jar-runner.uber.jar"); var uploadResponse = client.uploadJar(jarFile); @@ -91,7 +253,7 @@ JarRunResponseBody execute(String... arguments) { assertThat(uploadResponse.getStatus()).isEqualTo(UploadStatus.SUCCESS); // Step 2: Extract jarId from the response - String jarId = + var jarId = uploadResponse.getFilename().substring(uploadResponse.getFilename().lastIndexOf("/") + 1); // Step 3: Submit the job @@ -99,8 +261,8 @@ JarRunResponseBody execute(String... arguments) { client.submitJobFromJar( jarId, null, - null, - null, + savepointPath == null ? null : false, + savepointPath, null, Arrays.stream(arguments).collect(Collectors.joining(",")), null, @@ -108,8 +270,13 @@ JarRunResponseBody execute(String... arguments) { String jobId = jobResponse.getJobid(); assertThat(jobId).isNotNull(); - SECONDS.sleep(10); + SECONDS.sleep(2); + return jobResponse; + } + + @SneakyThrows + private void stopJobs(String jobId) { var status = client.getJobStatusInfo(jobId); if (Objects.equal(status.getStatus(), JobStatus.RUNNING)) { client.cancelJob(jobId, TerminationMode.CANCEL); @@ -117,8 +284,6 @@ JarRunResponseBody execute(String... arguments) { JobExceptionsInfoWithHistory exceptions = client.getJobExceptions(jobId, 5, null); fail(exceptions.toString()); } - - return jobResponse; } @ParameterizedTest(name = "{0}") diff --git a/src/test/resources/sqrl/compiled-plan.json b/src/test/resources/sqrl/compiled-plan.json new file mode 100644 index 0000000..887da8f --- /dev/null +++ b/src/test/resources/sqrl/compiled-plan.json @@ -0,0 +1,256 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 1, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`_transaction_1`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "transactionId", + "dataType" : "BIGINT NOT NULL" + }, { + "name" : "cardNo", + "dataType" : "DOUBLE NOT NULL" + }, { + "name" : "time", + "dataType" : "TIMESTAMP NOT NULL" + }, { + "name" : "amount", + "dataType" : "DOUBLE NOT NULL" + }, { + "name" : "merchantId", + "dataType" : "BIGINT NOT NULL" + } ], + "watermarkSpecs" : [ ], + "primaryKey" : { + "name" : "PK_transactionId_time", + "type" : "PRIMARY_KEY", + "columns" : [ "transactionId", "time" ] + } + }, + "partitionKeys" : [ ], + "options" : { + "connector" : "datagen", + "rows-per-second" : "10", + "fields.merchantId.min" : "1", + "fields.merchantId.max" : "10", + "fields.amount.min" : "1", + "fields.amount.max" : "20" + } + } + } + }, + "outputType" : "ROW<`transactionId` BIGINT NOT NULL, `cardNo` DOUBLE NOT NULL, `time` TIMESTAMP NOT NULL, `amount` DOUBLE NOT NULL, `merchantId` BIGINT NOT NULL>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, _transaction_1]], fields=[transactionId, cardNo, time, amount, merchantId])", + "inputProperties" : [ ] + }, { + "id" : 2, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "BIGINT NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "DOUBLE NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "TIMESTAMP NOT NULL" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`merchantId` BIGINT NOT NULL, `amount` DOUBLE NOT NULL, `time` TIMESTAMP NOT NULL>", + "description" : "Calc(select=[merchantId, amount, time])" + }, { + "id" : 3, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`merchantId` BIGINT NOT NULL, `amount` DOUBLE NOT NULL, `time` TIMESTAMP NOT NULL>", + "description" : "Exchange(distribution=[hash[merchantId]])" + }, { + "id" : 4, + "type" : "stream-exec-group-aggregate_1", + "configuration" : { + "table.exec.mini-batch.enabled" : "false", + "table.exec.mini-batch.size" : "-1" + }, + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : "max_amount", + "internalName" : "$MAX$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "DOUBLE NOT NULL" + }, { + "name" : "__timestamp", + "internalName" : "$MAX$1", + "argList" : [ 2 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "TIMESTAMP NOT NULL" + } ], + "aggCallNeedRetractions" : [ false, false ], + "generateUpdateBefore" : false, + "needRetraction" : false, + "state" : [ { + "index" : 0, + "ttl" : "0 ms", + "name" : "groupAggregateState" + } ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`merchantId` BIGINT NOT NULL, `max_amount` DOUBLE NOT NULL, `__timestamp` TIMESTAMP NOT NULL>", + "description" : "GroupAggregate(groupBy=[merchantId], select=[merchantId, MAX(amount) AS max_amount, MAX(time) AS __timestamp])" + }, { + "id" : 5, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "BIGINT NOT NULL" + }, { + "kind" : "LITERAL", + "value" : 2, + "type" : "INT NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "DOUBLE NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "TIMESTAMP NOT NULL" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`merchantId` BIGINT NOT NULL, `double_ta` INT NOT NULL, `max_amount` DOUBLE NOT NULL, `__timestamp` TIMESTAMP NOT NULL>", + "description" : "Calc(select=[merchantId, 2 AS double_ta, max_amount, __timestamp])" + }, { + "id" : 6, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`transactionbymerchant_1`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "merchantId", + "dataType" : "BIGINT NOT NULL" + }, { + "name" : "double_ta", + "dataType" : "INT NOT NULL" + }, { + "name" : "max_amount", + "dataType" : "DOUBLE NOT NULL" + }, { + "name" : "__timestamp", + "dataType" : "TIMESTAMP NOT NULL" + } ], + "watermarkSpecs" : [ ], + "primaryKey" : { + "name" : "PK_merchantId", + "type" : "PRIMARY_KEY", + "columns" : [ "merchantId" ] + } + }, + "partitionKeys" : [ ], + "options" : { + "connector" : "jdbc", + "url" : "jdbc:postgresql://postgres:5432/datasqrl", + "table-name" : "transactionbymerchant_1", + "username" : "postgres", + "password" : "postgres" + } + } + } + }, + "inputChangelogMode" : [ "INSERT", "UPDATE_AFTER" ], + "inputUpsertKey" : [ 0 ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`merchantId` BIGINT NOT NULL, `double_ta` INT NOT NULL, `max_amount` DOUBLE NOT NULL, `__timestamp` TIMESTAMP NOT NULL>", + "description" : "Sink(table=[default_catalog.default_database.transactionbymerchant_1], fields=[merchantId, double_ta, max_amount, __timestamp])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 5, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/src/test/resources/sqrl/creditcard-local/transaction.schema.yml b/src/test/resources/sqrl/creditcard-local/transaction.schema.yml new file mode 100644 index 0000000..043837b --- /dev/null +++ b/src/test/resources/sqrl/creditcard-local/transaction.schema.yml @@ -0,0 +1,25 @@ +--- +name: "transaction" +schema_version: "1" +partial_schema: false +columns: +- name: "transactionId" + type: "BIGINT" + tests: + - "not_null" +- name: "cardNo" + type: "DOUBLE" + tests: + - "not_null" +- name: "time" + type: "TIMESTAMP" + tests: + - "not_null" +- name: "amount" + type: "DOUBLE" + tests: + - "not_null" +- name: "merchantId" + type: "BIGINT" + tests: + - "not_null" \ No newline at end of file diff --git a/src/test/resources/sqrl/creditcard-local/transaction.table.json b/src/test/resources/sqrl/creditcard-local/transaction.table.json new file mode 100644 index 0000000..6692efd --- /dev/null +++ b/src/test/resources/sqrl/creditcard-local/transaction.table.json @@ -0,0 +1,14 @@ +{ + "flink" : { + "format" : "flexible-json", + "path" : "s3://sqrl-examples-data-bucket/cc-datasqrl-example/d42a69082/transaction.jsonl", + "source.monitor-interval" : "10 min", + "connector" : "filesystem" + }, + "version" : 1, + "table" : { + "type" : "source", + "primary-key" : ["transactionId", "time"], + "timestamp" : "time" + } +} diff --git a/src/test/resources/sqrl/package.json b/src/test/resources/sqrl/package.json new file mode 100644 index 0000000..aba8650 --- /dev/null +++ b/src/test/resources/sqrl/package.json @@ -0,0 +1,15 @@ +{ + "version": "1", + "enabled-engines": ["vertx", "postgres", "kafka", "flink"], + "profiles": ["datasqrl.profile.default"], + "script": { + "main": "script.sqrl" + }, + "dependencies": [{ + "creditcard-data": { + "name": "creditcard-local", + "version": "1", + "variant": "dev" + } + }] +} diff --git a/src/test/resources/sqrl/postgres-schema.sql b/src/test/resources/sqrl/postgres-schema.sql new file mode 100644 index 0000000..a6536e1 --- /dev/null +++ b/src/test/resources/sqrl/postgres-schema.sql @@ -0,0 +1,4 @@ +CREATE TABLE IF NOT EXISTS transactionbymerchant_1 ("merchantid" BIGINT NOT NULL,"double_ta" DOUBLE PRECISION NOT NULL,"max_amount" DOUBLE PRECISION NOT NULL,"__timestamp" TIMESTAMP + NOT NULL , PRIMARY KEY ("merchantid")); + + diff --git a/src/test/resources/sqrl/script.sqrl b/src/test/resources/sqrl/script.sqrl new file mode 100644 index 0000000..eb0fb30 --- /dev/null +++ b/src/test/resources/sqrl/script.sqrl @@ -0,0 +1,12 @@ +IMPORT creditcard-data.Transaction AS _Transaction; + +_TransactionAggregate := SELECT merchantId, + 1 as total_amount, + max(amount) as max_amount + FROM _Transaction + GROUP BY merchantId; + +TransactionByMerchant := SELECT merchantId, + total_amount * 2 as double_ta, + max_amount + FROM _TransactionAggregate;