Skip to content

Commit c0d964f

Browse files
committed
Bump aws version and add query
1 parent db107a1 commit c0d964f

File tree

4 files changed

+36
-19
lines changed

4 files changed

+36
-19
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ val testContainersVersion = "1.19.7"
1818
val keycloakVersion = "21.1.2" // stay with 21.x because of Java 11 compatibility
1919
val sttpVersion = "3.9.0"
2020
val influxdbVersion = "6.10.0"
21-
val awsClientVersion = "2.25.21"
21+
val awsClientVersion = "2.25.32"
2222

2323
libraryDependencies ++= Seq(
2424
"org.scala-lang.modules" %% "scala-parallel-collections" % "1.0.4",

src/main/scala/alpakka/dynamodb/DynamoDBEcho.scala

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class DynamoDBEcho(urlWithMappedPort: URI, accessKey: String, secretKey: String,
4040
for {
4141
_ <- createTable()
4242
_ <- writeItems(noOfItems)
43-
result <- readItems()
43+
result <- readItems(noOfItems)
4444
} yield result
4545
}
4646

@@ -53,7 +53,7 @@ class DynamoDBEcho(urlWithMappedPort: URI, accessKey: String, secretKey: String,
5353
.via(DynamoDb.flow(parallelism = 1))
5454

5555
source
56-
.map(descTableResponse => logger.info(s"Successfully created table: ${descTableResponse.table.tableName}"))
56+
.wireTap(descTableResponse => logger.info(s"Successfully created table: ${descTableResponse.table.tableName}"))
5757
.runWith(Sink.ignore)
5858
}
5959

@@ -95,7 +95,7 @@ class DynamoDBEcho(urlWithMappedPort: URI, accessKey: String, secretKey: String,
9595
val request = PutItemRequest.builder().tableName(testTableName).item(Map(
9696
"Id" -> AttributeValue.builder().s(item.toString).build(),
9797
"att1" -> AttributeValue.builder().s(s"att1-$item").build(),
98-
"att2" -> AttributeValue.builder().s(s"att2-$item").build()
98+
"att2" -> AttributeValue.builder().n(s"$item").build()
9999
).asJava).build()
100100
(request, RequestContext(testTableName, requestId))
101101
})
@@ -114,23 +114,34 @@ class DynamoDBEcho(urlWithMappedPort: URI, accessKey: String, secretKey: String,
114114
.runWith(Sink.ignore)
115115
}
116116

117-
private def readItems() = {
118-
logger.info(s"About to read items...")
117+
private def readItems(noOfItems: Int) = {
118+
logger.info(s"About to read 2nd half of all items...")
119+
120+
val filterExpression = "#att2 > :val"
121+
val expressionAttrNames = new java.util.HashMap[String, String]()
122+
expressionAttrNames.put("#att2", "att2")
123+
124+
val expressionAttrValues = new java.util.HashMap[String, AttributeValue]()
125+
expressionAttrValues.put(":val", AttributeValue.builder().n((noOfItems / 2).toString).build())
126+
127+
val scanRequest = ScanRequest.builder()
128+
.tableName(testTableName) // This hangs when no table is available
129+
.filterExpression(filterExpression)
130+
.expressionAttributeNames(expressionAttrNames)
131+
.expressionAttributeValues(expressionAttrValues)
132+
.build()
119133

120-
// This hangs when no table is available
121-
val scanRequest = ScanRequest.builder().tableName(testTableName).build()
122134
val scanPageInFlow: Source[ScanResponse, NotUsed] =
123135
Source
124136
.single(scanRequest)
125137
.via(DynamoDb.flowPaginated())
126138

127-
scanPageInFlow.map { (response: ScanResponse) => {
128-
val count = response.scannedCount()
129-
logger.info(s"Successfully read $count items")
130-
response.items().forEach(item => logger.info(s"Item: $item"))
131-
response.items().size()
132-
}
133-
139+
scanPageInFlow.map { scanResponse =>
140+
val count = scanResponse.scannedCount()
141+
val resultCount = scanResponse.items().size()
142+
logger.info(s"Successfully read $resultCount/$count items")
143+
scanResponse.items().forEach(item => logger.info(s"Item: $item"))
144+
resultCount
134145
}.runWith(Sink.head)
135146
}
136147

src/test/scala/alpakka/dynamodb/DynamoDBEchoIT.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@
2020

2121
/**
2222
* Setup/run {@link alpakka.dynamodb.DynamoDBEcho} on localStack container
23+
*
24+
* Running this example against AWS:
25+
* Looks as if there is a way to delete a DB instance via the SDK:
26+
* https://docs.aws.amazon.com/code-library/latest/ug/rds_example_rds_DeleteDBInstance_section.html
27+
* However, getting the `dbInstanceIdentifier` via SDK is not straightforward
28+
* Therefore, we only run against localStack for now in order to avoid dangling resources
2329
* <p>
2430
* Doc:
2531
* https://pekko.apache.org/docs/pekko-connectors/current/dynamodb.html#aws-dynamodb
@@ -45,10 +51,10 @@ public static void beforeAll() {
4551
@Test
4652
public void testLocal() throws ExecutionException, InterruptedException {
4753
DynamoDBEcho dynamoDBEcho = new DynamoDBEcho(localStack.getEndpointOverride(DYNAMODB), localStack.getAccessKey(), localStack.getSecretKey(), localStack.getRegion());
48-
int noOfItems = 10;
54+
int noOfItemsEven = 10;
4955

50-
CompletionStage<Object> result = FutureConverters.asJava(dynamoDBEcho.run(noOfItems));
51-
assertThat(result.toCompletableFuture().get()).isEqualTo(noOfItems);
56+
CompletionStage<Object> result = FutureConverters.asJava(dynamoDBEcho.run(noOfItemsEven));
57+
assertThat(result.toCompletableFuture().get()).isEqualTo(noOfItemsEven / 2);
5258
}
5359
}
5460

src/test/scala/alpakka/kinesis/KinesisEchoIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828

2929
/**
3030
* Setup/run {@link alpakka.kinesis.KinesisEcho} on localStack container
31-
* Additionally use the classic sync AWS KinesisClient to create/delete streams
31+
* Use the classic sync AWS KinesisClient to create/delete streams
3232
* <p>
3333
* Doc:
3434
* https://testcontainers.com/modules/localstack

0 commit comments

Comments
 (0)