Skip to content

Commit 345bb9c

Browse files
authored
Bump Flink to latest patch versions, fix logging (#169)
1 parent 0f1350f commit 345bb9c

File tree

12 files changed

+89
-103
lines changed

12 files changed

+89
-103
lines changed

.github/workflows/uber-jar.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ jobs:
5555
run: |
5656
mvn -B org.apache.maven.plugins:maven-dependency-plugin:3.8.1:go-offline de.qaware.maven:go-offline-maven-plugin:1.2.8:resolve-dependencies -P${{ matrix.FLINK_PROFILE }}
5757
mvn -B -f flink-sql-runner/pom.xml org.apache.maven.plugins:maven-resources-plugin:3.3.1:resources -P${{ matrix.FLINK_PROFILE }}
58-
docker pull postgres:17
5958
grep '^FROM' flink-sql-runner/target/Dockerfile | awk '{print $2}' | xargs -n1 docker pull
6059
6160
- name: Get project version and store in env

connectors/postgresql-connector/src/test/java/com/datasqrl/flinkrunner/connector/postgresql/FlinkJdbcTest.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.datasqrl.flinkrunner.stdlib.json.to_jsonb;
2222
import java.io.IOException;
2323
import java.sql.DriverManager;
24+
import lombok.extern.slf4j.Slf4j;
2425
import org.apache.flink.core.memory.DataInputDeserializer;
2526
import org.apache.flink.core.memory.DataOutputSerializer;
2627
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -34,23 +35,26 @@
3435
import org.testcontainers.containers.PostgreSQLContainer;
3536

3637
@ExtendWith(MiniClusterExtension.class)
38+
@Slf4j
3739
public class FlinkJdbcTest {
3840

3941
public static void main(String[] args) throws IOException {
4042
var input =
4143
new DataInputDeserializer(
4244
EncodingUtils.decodeBase64ToBytes(
4345
"ADFjb20uZGF0YXNxcmwuanNvbi5GbGlua0pzb25UeXBlU2VyaWFsaXplclNuYXBzaG90AAAAAQApY29tLmRhdGFzcXJsLmpzb24uRmxpbmtKc29uVHlwZVNlcmlhbGl6ZXI="));
44-
System.out.println(input.readUTF());
45-
System.out.println(input.readInt());
46-
System.out.println(input.readUTF());
46+
47+
log.debug(input.readUTF());
48+
log.debug("{}", input.readInt());
49+
log.debug(input.readUTF());
4750

4851
var output = new DataOutputSerializer(91);
4952
var snapshot = new FlinkJsonTypeSerializerSnapshot();
5053
output.writeUTF(snapshot.getClass().getName());
5154
output.writeInt(snapshot.getCurrentVersion());
5255
snapshot.writeSnapshot(output);
53-
System.out.println(EncodingUtils.encodeBytesToBase64(output.getSharedBuffer()));
56+
57+
log.debug(EncodingUtils.encodeBytesToBase64(output.getSharedBuffer()));
5458
}
5559

5660
@Test

flink-sql-runner/pom.xml

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -156,21 +156,18 @@
156156
<dependency>
157157
<groupId>org.apache.logging.log4j</groupId>
158158
<artifactId>log4j-slf4j-impl</artifactId>
159-
<version>${log4j.version}</version>
160159
<scope>runtime</scope>
161160
</dependency>
162161

163162
<dependency>
164163
<groupId>org.apache.logging.log4j</groupId>
165164
<artifactId>log4j-api</artifactId>
166-
<version>${log4j.version}</version>
167165
<scope>runtime</scope>
168166
</dependency>
169167

170168
<dependency>
171169
<groupId>org.apache.logging.log4j</groupId>
172170
<artifactId>log4j-core</artifactId>
173-
<version>${log4j.version}</version>
174171
<scope>runtime</scope>
175172
</dependency>
176173

@@ -216,6 +213,12 @@
216213
<artifactId>com.nextbreakpoint.flink.client</artifactId>
217214
<version>${nextbreakpoint.flink.client.version}</version>
218215
<scope>test</scope>
216+
<exclusions>
217+
<exclusion>
218+
<groupId>org.apache.logging.log4j</groupId>
219+
<artifactId>log4j-slf4j2-impl</artifactId>
220+
</exclusion>
221+
</exclusions>
219222
</dependency>
220223

221224
<dependency>
@@ -385,7 +388,7 @@
385388
<configuration>
386389
<workingDirectory>${project.basedir}/target/</workingDirectory>
387390
<executable>docker</executable>
388-
<skip>${skipTests}</skip>
391+
<skip>${fast}</skip>
389392
<arguments>
390393
<argument>build</argument>
391394
<argument>--tag</argument>

flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/BaseRunner.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.function.Supplier;
2121
import javax.annotation.Nullable;
2222
import lombok.RequiredArgsConstructor;
23+
import lombok.extern.slf4j.Slf4j;
2324
import org.apache.commons.lang3.StringUtils;
2425
import org.apache.flink.annotation.VisibleForTesting;
2526
import org.apache.flink.api.common.RuntimeExecutionMode;
@@ -30,6 +31,7 @@
3031
import org.apache.flink.util.FileUtils;
3132

3233
@RequiredArgsConstructor
34+
@Slf4j
3335
abstract class BaseRunner {
3436

3537
final RuntimeExecutionMode mode;
@@ -88,7 +90,7 @@ TableResult run(Supplier<SqlExecutor> sqlExecutorSupplier) throws Exception {
8890
Configuration initConfiguration() {
8991
var conf = new Configuration();
9092
if (StringUtils.isNotBlank(configDir)) {
91-
System.out.printf("Loading configuration from %s\n", configDir);
93+
log.info("Loading Flink configuration from '{}'", configDir);
9294
conf = GlobalConfiguration.loadConfiguration(configDir);
9395
}
9496

flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/CliRunner.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
import java.util.Arrays;
1919
import java.util.concurrent.Callable;
2020
import javax.annotation.Nullable;
21+
import lombok.extern.slf4j.Slf4j;
2122
import org.apache.flink.annotation.VisibleForTesting;
2223
import org.apache.flink.api.common.RuntimeExecutionMode;
2324
import picocli.CommandLine;
2425
import picocli.CommandLine.Command;
2526
import picocli.CommandLine.Option;
2627

28+
@Slf4j
2729
public class CliRunner extends BaseRunner {
2830

2931
@SuppressWarnings("unused")
@@ -88,7 +90,7 @@ public CliRunner(
8890
}
8991

9092
public static void main(String[] args) throws Exception {
91-
System.out.printf("\n\nExecuting flink-sql-runner: %s\n\n", Arrays.toString(args));
93+
log.info("Executing flink-sql-runner: {}", Arrays.toString(args));
9294

9395
var cl = new CommandLine(new SqlRunner());
9496
var resCode = cl.execute(args);
@@ -110,6 +112,6 @@ public static void main(String[] args) throws Exception {
110112
new CliRunner(runner.mode, runner.sqlFile, runner.planFile, runner.configDir, runner.udfPath)
111113
.run();
112114

113-
System.out.println("Finished flink-sql-runner");
115+
log.info("Finished flink-sql-runner execution");
114116
}
115117
}

flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/SqlExecutor.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,6 @@ private void setupUdfPath(String udfPath) {
188188
}
189189
} catch (Exception e) {
190190
log.error("Failed to set up UDF path", e);
191-
e.printStackTrace(System.out);
192191
}
193192
}
194193

flink-sql-runner/src/main/resources/log4j2.properties

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,15 @@
1616

1717
# Set root logger level
1818
rootLogger.level = INFO
19+
rootLogger.appenderRef.console.ref = MainAppender
20+
21+
# Set logging level for org.apache.flink to WARN
22+
logger.flink.name = org.apache.flink
23+
logger.flink.level = WARN
1924

2025
# Define Console appender
26+
appender.console.name = MainAppender
2127
appender.console.type = Console
22-
appender.console.name = Console
2328
appender.console.target = SYSTEM_OUT
2429
appender.console.layout.type = PatternLayout
2530
appender.console.layout.pattern = %d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n
26-
27-
# Assign appender to root logger
28-
rootLogger.appenderRef.console.ref = Console
29-
30-
# Set logging level for org.apache.flink to WARN
31-
logger.flink.name = org.apache.flink
32-
logger.flink.level = WARN

flink-sql-runner/src/main/resources/log4j2.xml

Lines changed: 0 additions & 30 deletions
This file was deleted.

flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/SavepointIT.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.time.ZonedDateTime;
2929
import java.util.ArrayList;
3030
import lombok.SneakyThrows;
31+
import lombok.extern.slf4j.Slf4j;
3132
import org.jdbi.v3.core.Jdbi;
3233
import org.jdbi.v3.core.statement.SqlLogger;
3334
import org.jdbi.v3.core.statement.StatementContext;
@@ -39,6 +40,7 @@
3940
import org.junit.jupiter.api.io.TempDir;
4041
import org.testcontainers.containers.Container;
4142

43+
@Slf4j
4244
public class SavepointIT extends AbstractITSupport {
4345

4446
@TempDir private Path tempDir;
@@ -72,8 +74,8 @@ void givenPlanScript_whenSavepoint_thenResumeFromSavepointSuccessfully() throws
7274
flinkContainer.execInContainer(
7375
"flink", "stop", jobId, "--savepointPath", CONTAINER_TEST_OUT_PATH);
7476

75-
System.out.printf("Flink container STDOUT: %s%n", cmdRes.getStdout());
76-
System.out.printf("Flink container STDERR: %s%n", cmdRes.getStderr());
77+
log.debug("Flink container STDOUT: {}", cmdRes.getStdout());
78+
log.debug("Flink container STDERR: {}", cmdRes.getStderr());
7779

7880
// check if STOPed, make sure no new records are create
7981
untilAssert(
@@ -126,17 +128,19 @@ private TransactionDao connect() {
126128
jdbi.setSqlLogger(
127129
new SqlLogger() {
128130
@Override
129-
public void logAfterExecution(StatementContext context) {
130-
System.out.printf(
131-
"Executed in '%s' with parameters '%s'\n",
132-
context.getParsedSql().getSql(), context.getBinding());
131+
public void logAfterExecution(StatementContext ctx) {
132+
log.info(
133+
"Executed '{}' with parameters '{}'",
134+
ctx.getParsedSql().getSql(),
135+
ctx.getBinding());
133136
}
134137

135138
@Override
136-
public void logException(StatementContext context, SQLException ex) {
137-
System.out.printf(
138-
"Exception while executing '%s' with parameters '%s'\n",
139-
context.getParsedSql().getSql(), context.getBinding(), ex);
139+
public void logException(StatementContext ctx, SQLException ex) {
140+
log.info(
141+
"Exception while executing '{}' with parameters '{}'",
142+
ctx.getParsedSql().getSql(),
143+
ctx.getBinding());
140144
}
141145
});
142146

0 commit comments

Comments
 (0)