Skip to content

Commit 44efddc

Browse files
committed
GH-230: Expose kplBackPressureThreshold property
Fixes: #230
1 parent 4683e99 commit 44efddc

File tree

3 files changed

+33
-9
lines changed

3 files changed

+33
-9
lines changed

spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,11 @@ recordMetadataChannel::
359359
The bean name of a MessageChannel to which successful send results should be sent.
360360
Works only for async mode.
361361

362+
kplBackPressureThreshold::
363+
Maximum records in flight for handling backpressure.
364+
No backpressure by default.
365+
When backpressure handling is enabled and number of records in flight exceeds the threshold, a `KplBackpressureException` would be thrown.
366+
362367
[[kinesis-error-channels]]
363368
== Error Channels
364369

spring-cloud-stream-binder-kinesis/src/main/java/org/springframework/cloud/stream/binder/kinesis/KinesisMessageChannelBinder.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2023 the original author or authors.
2+
* Copyright 2017-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -224,18 +224,19 @@ protected MessageHandler createProducerMessageHandler(ProducerDestination destin
224224
? m.getHeaders().get(BinderHeaders.PARTITION_HEADER)
225225
: m.getPayload().hashCode());
226226
AbstractAwsMessageHandler<?> messageHandler;
227+
KinesisProducerProperties kinesisProducerProperties = producerProperties.getExtension();
227228
if (this.configurationProperties.isKplKclEnabled()) {
228-
messageHandler = createKplMessageHandler(destination, partitionKeyExpression,
229-
producerProperties.getExtension().isEmbedHeaders() && !producerProperties.isUseNativeEncoding());
229+
messageHandler = createKplMessageHandler(destination, partitionKeyExpression, kinesisProducerProperties,
230+
kinesisProducerProperties.isEmbedHeaders() && !producerProperties.isUseNativeEncoding());
230231
}
231232
else {
232233
messageHandler = createKinesisMessageHandler(destination, partitionKeyExpression,
233-
producerProperties.getExtension().isEmbedHeaders() && !producerProperties.isUseNativeEncoding());
234+
kinesisProducerProperties.isEmbedHeaders() && !producerProperties.isUseNativeEncoding());
234235
}
235-
messageHandler.setAsync(!producerProperties.getExtension().isSync());
236-
messageHandler.setSendTimeout(producerProperties.getExtension().getSendTimeout());
236+
messageHandler.setAsync(!kinesisProducerProperties.isSync());
237+
messageHandler.setSendTimeout(kinesisProducerProperties.getSendTimeout());
237238
messageHandler.setBeanFactory(getBeanFactory());
238-
String recordMetadataChannel = producerProperties.getExtension().getRecordMetadataChannel();
239+
String recordMetadataChannel = kinesisProducerProperties.getRecordMetadataChannel();
239240
if (StringUtils.hasText(recordMetadataChannel)) {
240241
messageHandler.setOutputChannelName(recordMetadataChannel);
241242
}
@@ -279,11 +280,13 @@ private AbstractAwsMessageHandler<?> createKinesisMessageHandler(ProducerDestina
279280
}
280281

281282
private AbstractAwsMessageHandler<?> createKplMessageHandler(ProducerDestination destination,
282-
FunctionExpression<Message<?>> partitionKeyExpression, boolean embedHeaders) {
283+
FunctionExpression<Message<?>> partitionKeyExpression, KinesisProducerProperties kinesisProducerProperties,
284+
boolean embedHeaders) {
283285

284286
KplMessageHandler messageHandler = new KplMessageHandler(new KinesisProducer(this.kinesisProducerConfiguration));
285287
messageHandler.setStream(destination.getName());
286288
messageHandler.setPartitionKeyExpression(partitionKeyExpression);
289+
messageHandler.setBackPressureThreshold(kinesisProducerProperties.getKplBackPressureThreshold());
287290
if (embedHeaders) {
288291
messageHandler.setEmbeddedHeadersMapper(this.embeddedHeadersMapper);
289292
}

spring-cloud-stream-binder-kinesis/src/main/java/org/springframework/cloud/stream/binder/kinesis/properties/KinesisProducerProperties.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2024 the original author or authors.
2+
* Copyright 2017-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -46,6 +46,14 @@ public class KinesisProducerProperties {
4646
*/
4747
private String recordMetadataChannel;
4848

49+
/**
50+
* Maximum records in flight for handling backpressure.
51+
* No backpressure by default.
52+
* When backpressure handling is enabled and number of records in flight exceeds the threshold, a
53+
* 'KplBackpressureException' would be thrown.
54+
*/
55+
private long kplBackPressureThreshold;
56+
4957
public void setSync(boolean sync) {
5058
this.sync = sync;
5159
}
@@ -78,4 +86,12 @@ public void setRecordMetadataChannel(String recordMetadataChannel) {
7886
this.recordMetadataChannel = recordMetadataChannel;
7987
}
8088

89+
public long getKplBackPressureThreshold() {
90+
return this.kplBackPressureThreshold;
91+
}
92+
93+
public void setKplBackPressureThreshold(long kplBackPressureThreshold) {
94+
this.kplBackPressureThreshold = kplBackPressureThreshold;
95+
}
96+
8197
}

0 commit comments

Comments
 (0)