Skip to content

Commit ea4ed73

Browse files
committed
Bump Kafka version and switch to EmbeddedKafka
1 parent 743c013 commit ea4ed73

File tree

6 files changed

+25
-52
lines changed

6 files changed

+25
-52
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ val pekkoHTTPVersion = "1.1.0-M1"
1111
val pekkoConnectorVersion = "1.0.2"
1212
val pekkoConnectorKafkaVersion = "1.0.0"
1313

14-
val kafkaVersion = "3.6.1"
14+
val kafkaVersion = "3.7.0"
1515
val activemqVersion = "5.18.4" // We are stuck with 5.x
1616
val artemisVersion = "2.35.0"
1717
val testContainersVersion = "1.19.8"

docker/docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ services:
6161
- runtime-net
6262

6363
broker:
64-
image: confluentinc/cp-kafka
64+
image: confluentinc/cp-kafka:7.7.0
6565
hostname: broker
6666
depends_on:
6767
- zookeeper

src/main/scala/alpakka/env/KafkaServerTestcontainers.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import org.testcontainers.utility.DockerImageName
77
/**
88
* Uses testcontainers.org to run the
99
* latest Kafka-Version from Confluent
10+
* See also Kafka broker from: /docker/docker-compose.yml
1011
*
1112
* Alternative: [[KafkaServerEmbedded]]
1213
*
@@ -17,10 +18,7 @@ import org.testcontainers.utility.DockerImageName
1718
*/
1819
class KafkaServerTestcontainers {
1920
val logger: Logger = LoggerFactory.getLogger(this.getClass)
20-
// Pin cp-kafka version for now, because 'latest' does not work on github actions anymore
21-
// https://hub.docker.com/r/confluentinc/cp-kafka
22-
// https://docs.confluent.io/platform/current/installation/versions-interoperability.html#cp-and-apache-kafka-compatibility
23-
val kafkaVersion = "7.5.3"
21+
val kafkaVersion = "7.7.0"
2422
val imageName = s"confluentinc/cp-kafka:$kafkaVersion"
2523
val originalPort = 9093
2624
var mappedPort = 1111

src/main/scala/alpakka/kafka/DeleteTopicUtil.scala

Lines changed: 0 additions & 25 deletions
This file was deleted.

src/test/scala/alpakka/tcp_to_websockets/AlpakkaTrophySpec.scala

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
package alpakka.tcp_to_websockets
22

3-
import alpakka.env.{KafkaServerTestcontainers, WebsocketServer}
3+
import alpakka.env.WebsocketServer
44
import alpakka.tcp_to_websockets.hl7mllp.{Hl7Tcp2Kafka, Hl7TcpClient}
55
import alpakka.tcp_to_websockets.websockets.{Kafka2SSE, Kafka2Websocket}
6+
import io.github.embeddedkafka.EmbeddedKafka
67
import org.scalatest.matchers.should.Matchers
78
import org.scalatest.wordspec.AsyncWordSpec
89
import org.scalatest.{BeforeAndAfterEachTestData, TestData}
@@ -17,16 +18,13 @@ import util.LogFileScanner
1718
*
1819
* Remarks:
1920
* - The test focus is on log file scanning to check for processed messages and ERRORs
20-
* - This setup restarts Kafka for each test, so they can run independently. The downside
21-
* of this is that we have to deal with a new mapped port on each restart.
22-
* A setup with one Kafka start for all tests is here:
23-
* https://doc.akka.io/docs/alpakka-kafka/current/testing-testcontainers.html
21+
* - This test restarts Kafka for each test, so they can run independently.
2422
* - Since the shutdown of producers/consumers takes a long time, there are WARN msgs in the log
2523
*/
2624
final class AlpakkaTrophySpec extends AsyncWordSpec with Matchers with BeforeAndAfterEachTestData {
2725
val logger: Logger = LoggerFactory.getLogger(this.getClass)
2826

29-
val kafkaContainer: KafkaServerTestcontainers = KafkaServerTestcontainers()
27+
private var bootstrapServer: String = _
3028
var mappedPortKafka: Int = _
3129

3230
var websocketServer: WebsocketServer = _
@@ -39,7 +37,8 @@ final class AlpakkaTrophySpec extends AsyncWordSpec with Matchers with BeforeAnd
3937
val numberOfMessages = 10
4038
Hl7TcpClient(numberOfMessages)
4139

42-
new LogFileScanner().run(10, 10, "Starting test: Happy path should find all processed messages in WebsocketServer log", "ERROR").length should equal(0)
40+
// With EmbeddedKafka there is one ERROR due to port binding at the start
41+
new LogFileScanner().run(10, 10, "Starting test: Happy path should find all processed messages in WebsocketServer log", "ERROR").length should equal(1)
4342
// 10 + 1 Initial message
4443
new LogFileScanner().run(10, 10, "Starting test: Happy path should find all processed messages in WebsocketServer log", "WebsocketServer received:").length should equal(numberOfMessages + 1)
4544
}
@@ -103,24 +102,24 @@ final class AlpakkaTrophySpec extends AsyncWordSpec with Matchers with BeforeAnd
103102

104103
// Stopping after half of the msg are processed
105104
Thread.sleep(5000)
106-
logger.info("Re-starting Kafka container...")
107-
kafkaContainer.stop()
108-
kafkaContainer.run()
109-
val newMappedPortKafka = kafkaContainer.mappedPort
110-
logger.info(s"Re-started Kafka on new mapped port: $newMappedPortKafka")
105+
logger.info("Re-starting Kafka...")
106+
EmbeddedKafka.stop()
107+
mappedPortKafka = EmbeddedKafka.start().config.kafkaPort
108+
bootstrapServer = s"localhost:$mappedPortKafka"
109+
logger.info(s"Re-started Kafka on mapped port: $mappedPortKafka")
111110

112111
// Now we need to restart the components sending/receiving to/from Kafka as well,
113112
// to connect to the new mapped port
114113
hl7Tcp2Kafka.stop()
115-
hl7Tcp2Kafka = Hl7Tcp2Kafka(newMappedPortKafka)
114+
hl7Tcp2Kafka = Hl7Tcp2Kafka(mappedPortKafka)
116115
hl7Tcp2Kafka.run()
117116

118117
kafka2Websocket.stop()
119-
kafka2Websocket = Kafka2Websocket(newMappedPortKafka)
118+
kafka2Websocket = Kafka2Websocket(mappedPortKafka)
120119
kafka2Websocket.run()
121120

122121
kafka2SSE.stop()
123-
kafka2SSE = Kafka2SSE(newMappedPortKafka)
122+
kafka2SSE = Kafka2SSE(mappedPortKafka)
124123
kafka2SSE.run()
125124

126125
// 10 + 1 Initial message
@@ -132,9 +131,10 @@ final class AlpakkaTrophySpec extends AsyncWordSpec with Matchers with BeforeAnd
132131
// Write start indicator for the LogFileScanner
133132
logger.info(s"Starting test: ${testData.name}")
134133

135-
logger.info("Starting Kafka container...")
136-
kafkaContainer.run()
137-
mappedPortKafka = kafkaContainer.mappedPort
134+
logger.info("Starting Kafka...")
135+
mappedPortKafka = EmbeddedKafka.start().config.kafkaPort
136+
bootstrapServer = s"localhost:$mappedPortKafka"
137+
138138
logger.info(s"Running Kafka on mapped port: $mappedPortKafka")
139139

140140
// Start other components
@@ -152,8 +152,8 @@ final class AlpakkaTrophySpec extends AsyncWordSpec with Matchers with BeforeAnd
152152
}
153153

154154
override protected def afterEach(testData: TestData): Unit = {
155-
logger.info("Stopping Kafka container...")
156-
kafkaContainer.stop()
155+
logger.info("Stopping Kafka...")
156+
EmbeddedKafka.stop()
157157
logger.info("Stopping other components...")
158158
websocketServer.stop()
159159
hl7Tcp2Kafka.stop()

src/test/scala/util/LogFileScanner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ class LogFileScanner(localLogFilePath: String = "logs/application.log") {
2121
def run(scanDelaySeconds: Int = 0, scanForSeconds: Int = 5, searchAfterPattern: String, pattern: String): List[String] = {
2222
val path: Path = fs.getPath(localLogFilePath)
2323
val pollingInterval = 250.millis
24-
val maxLineSize: Int = 24 * 1024
24+
val maxLineSize: Int = 100 * 1024
2525

2626
// Wait for the components to produce log messages
2727
Thread.sleep(scanDelaySeconds * 1000)

0 commit comments

Comments
 (0)