Skip to content

Commit d73906d

Browse files
committed
Bump versions
1 parent ea4ed73 commit d73906d

File tree

4 files changed

+33
-35
lines changed

4 files changed

+33
-35
lines changed

build.sbt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ val pekkoConnectorVersion = "1.0.2"
1212
val pekkoConnectorKafkaVersion = "1.0.0"
1313

1414
val kafkaVersion = "3.7.0"
15-
val activemqVersion = "5.18.4" // We are stuck with 5.x
16-
val artemisVersion = "2.35.0"
17-
val testContainersVersion = "1.19.8"
15+
val activemqVersion = "5.18.5" // We are stuck with 5.x
16+
val artemisVersion = "2.36.0"
17+
val testContainersVersion = "1.20.1"
1818
val keycloakVersion = "24.0.4"
1919
val sttpVersion = "3.9.0"
2020
val influxdbVersion = "7.1.0"
@@ -90,7 +90,7 @@ libraryDependencies ++= Seq(
9090

9191
"com.influxdb" %% "influxdb-client-scala" % influxdbVersion,
9292
"com.influxdb" % "flux-dsl" % influxdbVersion,
93-
"org.influxdb" % "influxdb-java" % "2.23",
93+
"org.influxdb" % "influxdb-java" % "2.24",
9494

9595
"ca.uhn.hapi" % "hapi-base" % "2.3",
9696
"ca.uhn.hapi" % "hapi-structures-v23" % "2.3",
@@ -111,7 +111,7 @@ libraryDependencies ++= Seq(
111111
"io.projectreactor" % "reactor-core" % "3.5.4",
112112
"io.reactivex.rxjava3" % "rxjava" % "3.1.6",
113113

114-
"com.github.blemale" %% "scaffeine" % "5.2.1",
114+
"com.github.blemale" %% "scaffeine" % "5.3.0",
115115
"ch.qos.logback" % "logback-classic" % "1.4.12",
116116

117117
"org.testcontainers" % "testcontainers" % testContainersVersion,

src/main/resources/broker.xml

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,16 @@ Apache Artemis minimal broker config
3131
</security-settings>
3232

3333
<!-- TODO sync with broker_docker.xml -->
34-
<queues>
35-
<queue name="jms.queue.exampleQueue">
36-
<address>jms.queue.exampleQueue</address>
37-
<durable>true</durable>
38-
</queue>
39-
</queues>
40-
34+
<addresses>
35+
<address name="jms.queue.exampleQueue">
36+
<multicast>
37+
<!-- pre-configured shared durable subscription queue -->
38+
<queue name="jms.queue.exampleQueue" max-consumers="10">
39+
<durable>true</durable>
40+
</queue>
41+
</multicast>
42+
</address>
43+
</addresses>
4144

4245
<address-settings>
4346
<address-setting match="jms.queue.exampleQueue">

src/main/scala/sample/stream_shared_state/LocalFileCacheCaffeine.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@ import scala.util.control.NonFatal
2222

2323
/**
2424
* Use case:
25-
* - Process a stream of messages with reoccurring TRACE_ID
26-
* - For the first TRACE_ID: download .zip file from FileServer, add path to cache and store file
25+
* - Process a stream of (random gen) msgs with reoccurring TRACE_ID
26+
* - For the first TRACE_ID: download .zip file from FileServer, add path to local cache and store file
2727
* - For subsequent TRACE_IDs: try to fetch path from local cache to avoid duplicate downloads per TRACE_ID
2828
* - On downstream error: the path needs to be kept longer in the cache
2929
* - On restart: populate cache from local filesystem
3030
*
3131
* Before running this class: start [[alpakka.env.FileServer]] to simulate non idempotent responses
32-
* Monitor localFileCache dir with cmd: watch ls -ltr
32+
* Monitor `localFileCache` dir with cmd: watch ls -ltr
3333
*
3434
* Doc:
3535
* - Caffeine: https://github.com/ben-manes/caffeine
@@ -50,8 +50,8 @@ object LocalFileCacheCaffeine {
5050
}
5151

5252
val scaleFactor = 1 // Raise to widen range of IDs and thus have more traffic
53-
val evictionTime: FiniteDuration = 5.minutes // Lower eg to 5.seconds to see cache and file system deletes
54-
val evictionTimeOnError: FiniteDuration = 10.minutes
53+
val cacheEvictionTime: FiniteDuration = 5.minutes // Lower eg to 5.seconds to see cache and file system deletes
54+
val cacheEvictionTimeOnError: FiniteDuration = 10.minutes
5555
val localFileCache: Path = Paths.get(System.getProperty("java.io.tmpdir")).resolve("localFileCache")
5656

5757
logger.info(s"Starting with localFileCache dir: $localFileCache")
@@ -70,7 +70,7 @@ object LocalFileCacheCaffeine {
7070
val cache: Cache[Int, Path] =
7171
Scaffeine()
7272
.recordStats()
73-
.expireAfter[Int, Path]((_, _) => evictionTime, (_, _, _) => evictionTimeOnError, (_, _, _) => evictionTime)
73+
.expireAfter[Int, Path]((_, _) => cacheEvictionTime, (_, _, _) => cacheEvictionTimeOnError, (_, _, _) => cacheEvictionTime)
7474
.maximumSize(1000)
7575
.removalListener((key, value, cause) => deleteFromFileStore(key, value, cause))
7676
.build[Int, Path]()

src/test/scala/alpakka/influxdb/InfluxdbIT.java

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package alpakka.influxdb;
22

3-
import org.apache.pekko.Done;
43
import org.apache.pekko.actor.ActorSystem;
5-
import org.junit.Ignore;
64
import org.junit.jupiter.api.*;
75
import org.slf4j.Logger;
86
import org.slf4j.LoggerFactory;
@@ -13,12 +11,9 @@
1311
import util.LogFileScanner;
1412

1513
import java.io.IOException;
16-
import java.util.List;
14+
import java.time.Duration;
1715
import java.util.concurrent.CompletableFuture;
18-
import java.util.concurrent.CompletionStage;
1916
import java.util.concurrent.ExecutionException;
20-
import java.util.concurrent.TimeUnit;
21-
import java.util.stream.Collectors;
2217
import java.util.stream.IntStream;
2318

2419
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
@@ -46,8 +41,8 @@ public static void setupBeforeClass() throws IOException, InterruptedException {
4641
// Doc: https://docs.influxdata.com/influxdb/v2.1/reference/release-notes/influxdb/
4742
LOGGER.info("InfluxDB container listening on port: {}. Running: {} ", influxDBContainer.getMappedPort(INFLUXDB_PORT), influxDBContainer.isRunning());
4843
Container.ExecResult result = influxDBContainer.execInContainer("influx", "setup", "-b", "testbucket", "-f", "-o", "testorg", "-t", "abcdefgh", "-u", "admin", "-p", "adminadmin");
49-
LOGGER.info("Result exit code: " + result.getExitCode());
50-
LOGGER.info("Result stdout: " + result.getStdout());
44+
LOGGER.info("Result exit code: {}", result.getExitCode());
45+
LOGGER.info("Result stdout: {}", result.getStdout());
5146
browserClient();
5247
}
5348

@@ -67,12 +62,15 @@ public void setupBeforeTest(TestInfo testInfo) {
6762
@Order(1)
6863
void testWriteAndRead() {
6964
int maxClients = 5;
70-
int nPoints = 1000;
65+
int nPoints = 100;
7166

72-
List<CompletionStage<Done>> futList = IntStream.rangeClosed(1, maxClients).boxed().parallel()
73-
.map(i -> influxDBWriter.writeTestPoints(nPoints, "sensor" + i))
74-
.collect(Collectors.toList());
75-
assertThat(CompletableFuture.allOf(futList.toArray(new CompletableFuture[futList.size()]))).succeedsWithin(5 * maxClients, TimeUnit.SECONDS);
67+
assertThat(
68+
CompletableFuture.allOf(
69+
IntStream.rangeClosed(1, maxClients)
70+
.mapToObj(i -> influxDBWriter.writeTestPoints(nPoints, "sensor" + i))
71+
.toArray(CompletableFuture[]::new)
72+
)
73+
).succeedsWithin(Duration.ofSeconds(10 * maxClients));
7674

7775
assertThat(influxDBReader.getQuerySync("testMem").length()).isEqualTo(nPoints * maxClients);
7876
assertThat(influxDBReader.fluxQueryCount("testMem")).isEqualTo(nPoints * maxClients);
@@ -81,13 +79,10 @@ void testWriteAndRead() {
8179

8280
@Test
8381
@Order(2)
84-
@Ignore
8582
void testWriteAndReadLineProtocol() throws ExecutionException, InterruptedException {
8683
int nPoints = 10;
8784
influxDBWriter.writeTestPointsFromLineProtocolSync();
88-
// TODO Activate, when "com.influxdb" %% "influxdb-client-scala" is available for pekko
89-
//assertThat(influxDBReader.getQuerySync("testMemLP").length()).isEqualTo(nPoints);
90-
assert (true);
85+
assertThat(influxDBReader.getQuerySync("testMemLP").length()).isEqualTo(nPoints);
9186
}
9287

9388
@Test

0 commit comments

Comments
 (0)