Skip to content

Commit ea7e678

Browse files
committed
Migrate to Java 17
1 parent 7d1a6de commit ea7e678

File tree

9 files changed

+50
-164
lines changed

9 files changed

+50
-164
lines changed

.github/workflows/ci.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@ jobs:
1414
steps:
1515
- name: Checkout
1616
uses: actions/checkout@v4
17-
# https://github.com/coursier/setup-action
18-
- name: Set up JDK 11
17+
# https://github.com/coursier/setup-action
18+
- name: Set up JDK 17
1919
uses: coursier/setup-action@v1
2020
with:
21-
jvm: adopt:11
21+
jvm: adopt:17
2222
apps: sbtn
2323
- name: Build and Test
2424
run: sbt -v +test

build.sbt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ val artemisVersion = "2.33.0"
1717
val testContainersVersion = "1.19.8"
1818
val keycloakVersion = "24.0.4"
1919
val sttpVersion = "3.9.0"
20-
val influxdbVersion = "6.10.0"
20+
val influxdbVersion = "7.1.0"
2121
val awsClientVersion = "2.25.32"
2222

2323
libraryDependencies ++= Seq(
@@ -86,8 +86,6 @@ libraryDependencies ++= Seq(
8686
"software.amazon.awssdk" % "sqs" % awsClientVersion,
8787

8888

89-
// Migrated to pekko, but new client 7.0.0 only supports Java 17 (not Java 11)
90-
// https://github.com/influxdata/influxdb-client-java/blob/master/CHANGELOG.md
9189
"com.influxdb" %% "influxdb-client-scala" % influxdbVersion,
9290
"com.influxdb" % "flux-dsl" % influxdbVersion,
9391
"org.influxdb" % "influxdb-java" % "2.23",

src/main/java/actor/DemoMessagesActor.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
package actor;
22

3-
import akka.Done;
4-
import akka.actor.*;
5-
import akka.pattern.Patterns;
3+
4+
import org.apache.pekko.Done;
5+
import org.apache.pekko.actor.*;
6+
import org.apache.pekko.pattern.Patterns;
67

78
import java.time.Duration;
89
import java.util.concurrent.CompletionStage;

src/main/scala/akkahttp/ReverseProxy.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import org.apache.pekko.http.scaladsl.server.Directives.*
1111
import org.apache.pekko.http.scaladsl.server.Route
1212
import org.apache.pekko.http.scaladsl.settings.ServerSettings
1313
import org.apache.pekko.http.scaladsl.{Http, HttpExt}
14-
import org.apache.pekko.pattern.CircuitBreaker
14+
import org.apache.pekko.pattern.{CircuitBreaker, CircuitBreakerOpenException}
1515
import org.apache.pekko.stream.ThrottleMode
1616
import org.apache.pekko.stream.scaladsl.{Sink, Source}
1717
import org.slf4j.{Logger, LoggerFactory}
@@ -168,7 +168,7 @@ object ReverseProxy extends App {
168168
val proxyReq = request.withUri(uri(target)).withHeaders(headers(target))
169169
circuitBreaker.withCircuitBreaker(http.singleRequest(proxyReq))
170170
}.recover {
171-
case _: akka.pattern.CircuitBreakerOpenException => BadGateway(id, "Circuit breaker opened")
171+
case _: CircuitBreakerOpenException => BadGateway(id, "Circuit breaker opened")
172172
case _: TimeoutException => GatewayTimeout(id)
173173
case e => BadGateway(id, e.getMessage)
174174
}

src/main/scala/alpakka/influxdb/InfluxdbReader.scala

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,14 @@ import com.influxdb.query.FluxTable
77
import com.influxdb.query.dsl.Flux
88
import com.influxdb.query.dsl.functions.restriction.Restrictions
99
import org.apache.pekko.actor.ActorSystem
10-
import org.apache.pekko.stream.Supervision
10+
import org.apache.pekko.stream.scaladsl.Sink
11+
import org.apache.pekko.stream.{ActorAttributes, Supervision}
1112
import org.slf4j.{Logger, LoggerFactory}
1213

1314
import java.time.temporal.ChronoUnit
1415
import java.util
15-
import scala.concurrent.ExecutionContextExecutor
1616
import scala.concurrent.duration.DurationInt
17+
import scala.concurrent.{Await, ExecutionContextExecutor}
1718
import scala.util.control.NonFatal
1819

1920
/**
@@ -54,25 +55,23 @@ class InfluxdbReader(baseURL: String, token: String, org: String = "testorg", bu
5455
|> range(start: -interval)
5556
"""
5657

57-
// TODO Activate, when "com.influxdb" %% "influxdb-client-scala" is available for pekko
58-
// def source() = influxdbClientScala
59-
// .getQueryScalaApi()
60-
// .query(query)
58+
def source() = influxdbClientScala
59+
.getQueryScalaApi()
60+
.query(query)
6161

62-
// TODO Activate, when "com.influxdb" %% "influxdb-client-scala" is available for pekko
6362
def getQuerySync(mem: String) = {
64-
// logger.info(s"Query raw for measurements of type: $mem")
65-
// val result = source()
66-
// .filter(fluxRecord => fluxRecord.getMeasurement().equals(mem) )
67-
// .wireTap(fluxRecord => {
68-
// val measurement = fluxRecord.getMeasurement()
69-
// val value = fluxRecord.getValue()
70-
// logger.debug(s"About to process measurement: $measurement with value: $value")
71-
// })
72-
// .withAttributes(ActorAttributes.supervisionStrategy(deciderFlow))
73-
// .runWith(Sink.seq)
74-
//
75-
// Await.result(result, 10.seconds)
63+
logger.info(s"Query raw for measurements of type: $mem")
64+
val result = source()
65+
.filter(fluxRecord => fluxRecord.getMeasurement().equals(mem))
66+
.wireTap(fluxRecord => {
67+
val measurement = fluxRecord.getMeasurement()
68+
val value = fluxRecord.getValue()
69+
logger.debug(s"About to process measurement: $measurement with value: $value")
70+
})
71+
.withAttributes(ActorAttributes.supervisionStrategy(deciderFlow))
72+
.runWith(Sink.seq)
73+
74+
Await.result(result, 10.seconds)
7675
}
7776

7877
def fluxQueryCount(mem: String): Long = {

src/main/scala/sample/stream/TcpEcho.scala

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,42 +14,39 @@ import scala.sys.process.*
1414
import scala.util.{Failure, Success}
1515

1616
/**
17-
* Inspired by:
18-
* https://doc.akka.io/docs/akka/current/stream/stream-io.html?language=scala
19-
*
20-
* Use without parameters to start server and 100 parallel clients.
17+
* TCP echo client server round trip
18+
* Use without parameters to start server and 100 parallel clients
2119
*
2220
* Use parameters `server 127.0.0.1 6000` to start server listening on port 6000
2321
*
24-
* Use parameters `client 127.0.0.1 6000` to start one client connecting to
25-
* server on 127.0.0.1:6000
22+
* Use parameters `client 127.0.0.1 6000` to start one client
2623
*
2724
* Run cmd line client:
2825
* echo -n "Hello World" | nc 127.0.0.1 6000
2926
*
27+
* Doc:
28+
* https://pekko.apache.org/docs/pekko/current/stream/stream-io.html?language=scala
3029
*/
3130
object TcpEcho extends App {
3231
val logger: Logger = LoggerFactory.getLogger(this.getClass)
3332
val systemServer = ActorSystem("TcpEchoServer")
3433
val systemClient = ActorSystem("TcpEchoClient")
3534

36-
var serverBinding: Future[Tcp.ServerBinding] = _
37-
3835
if (args.isEmpty) {
3936
val (host, port) = ("127.0.0.1", 6000)
40-
serverBinding = server(systemServer, host, port)
37+
server(systemServer, host, port)
4138

39+
// Issue: https://github.com/akka/akka/issues/29842
4240
checkResources()
43-
// Issue:
44-
// https://github.com/akka/akka/issues/29842
41+
4542
val maxClients = 100
4643
(1 to maxClients).par.foreach(each => client(each, systemClient, host, port))
4744
} else {
4845
val (host, port) =
4946
if (args.length == 3) (args(1), args(2).toInt)
5047
else ("127.0.0.1", 6000)
5148
if (args(0) == "server") {
52-
serverBinding = server(systemServer, host, port)
49+
server(systemServer, host, port)
5350
} else if (args(0) == "client") {
5451
client(1, systemClient, host, port)
5552
}
@@ -105,12 +102,14 @@ object TcpEcho extends App {
105102

106103
// We want "halfClose behavior" on the client side. Doc:
107104
// https://github.com/akka/akka/issues/22163
108-
val connection: Flow[ByteString, ByteString, Future[Tcp.OutgoingConnection]] = Tcp().outgoingConnection(remoteAddress = InetSocketAddress.createUnresolved(host, port), halfClose = true)
105+
val connection: Flow[ByteString, ByteString, Future[Tcp.OutgoingConnection]] =
106+
Tcp().outgoingConnection(remoteAddress = InetSocketAddress.createUnresolved(host, port), halfClose = true)
109107
val testInput = ('a' to 'z').map(ByteString(_)) ++ Seq(ByteString("BYE"))
108+
logger.info(s"Client: $id sending: ${testInput.length} bytes")
110109

111110
val restartSettings = RestartSettings(1.second, 10.seconds, 0.2).withMaxRestarts(10, 1.minute)
112111
val restartSource = RestartSource.onFailuresWithBackoff(restartSettings) { () => Source(testInput).via(connection) }
113-
val closed = restartSource.runForeach(each => logger.info(s"Client: $id received echo: ${each.utf8String}"))
112+
val closed = restartSource.runForeach(each => logger.info(s"Client: $id received: ${each.utf8String}"))
114113
closed.onComplete(each => logger.info(s"Client: $id closed: $each"))
115114
}
116115

src/main/scala/sample/stream/TcpEchoJava.java

Lines changed: 0 additions & 104 deletions
This file was deleted.

src/test/scala/alpakka/clickhousedb/ClickhousedbIT.java

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -171,21 +171,15 @@ protected DataSource getDataSource(JdbcDatabaseContainer<?> container) {
171171
}
172172

173173
protected void createTable() throws SQLException {
174-
// Since we want to be Java 11 source compatible
175174
// Doc: https://clickhouse.com/docs/en/engines/table-engines
176-
String newLine = System.lineSeparator();
177-
String createStatementTextBlock =
178-
"CREATE TABLE test.my_table"
179-
+ newLine
180-
+ "("
181-
+ newLine
182-
+ "`myfloat_nullable` Nullable(Float32),"
183-
+ newLine
184-
+ "`mystr` String,"
185-
+ newLine
186-
+ "`myint_id` Int32"
187-
+ newLine
188-
+ ") ENGINE = Log";
175+
String createStatementTextBlock = """
176+
CREATE TABLE test.my_table
177+
(
178+
`myfloat_nullable` Nullable(Float32),
179+
`mystr` String,
180+
`myint_id` Int32
181+
) ENGINE = Log
182+
""";
189183

190184
LOGGER.info(createStatementTextBlock);
191185

src/test/scala/alpakka/influxdb/InfluxdbIT.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,7 @@ void testWriteAndRead() {
7474
.collect(Collectors.toList());
7575
assertThat(CompletableFuture.allOf(futList.toArray(new CompletableFuture[futList.size()]))).succeedsWithin(5 * maxClients, TimeUnit.SECONDS);
7676

77-
// TODO Activate, when "com.influxdb" %% "influxdb-client-scala" is available for pekko
78-
//assertThat(influxDBReader.getQuerySync("testMem").length()).isEqualTo(nPoints * maxClients);
77+
assertThat(influxDBReader.getQuerySync("testMem").length()).isEqualTo(nPoints * maxClients);
7978
assertThat(influxDBReader.fluxQueryCount("testMem")).isEqualTo(nPoints * maxClients);
8079
assertThat(new LogFileScanner("logs/application.log").run(1, 2, searchAfterPattern, "ERROR").length()).isZero();
8180
}

0 commit comments

Comments
 (0)