Skip to content

Conversation

tomek82
Copy link
Contributor

@tomek82 tomek82 commented May 6, 2025

When provisioning a stream consumer or producer the passed in binder properties get overwritten with HeaderMode.none.
The issue arises when using multiple destinations (i.e. a default function router for two streams) and not using multiplexing where in such case only the first stream will be able to parse/write the headers.

@tomek82 tomek82 marked this pull request as draft May 6, 2025 21:52
tomek82 added 2 commits May 6, 2025 14:54
Signed-off-by: Tomek Bielecki <tomasz.bielecki@gmail.com>
Signed-off-by: Tomek Bielecki <tomasz.bielecki@gmail.com>
@tomek82 tomek82 force-pushed the fix-properties-for-multiple-consumer-setup branch from e14bdb5 to a7bce8f Compare May 6, 2025 21:55
@artembilan
Copy link
Member

Did you investigate why it had been done like that ? Sorry, not on the laptop to dig into the code .

@tomek82 tomek82 marked this pull request as ready for review May 6, 2025 22:00
@tomek82
Copy link
Contributor Author

tomek82 commented May 6, 2025

This was introduced in #185 but I'm not sure why.

If you look here a7bce8f#diff-d536075c01df04e70b25e9933ff91ff520209321602ccb436a6ffc41f1c0b560L91 the if condition is always false.

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, find some simple review on line.

And sorry that my unreasonable change caused some problems on your side.

Thank you!

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I remember now why I did that.
You see, there is an embedded headers functionality in the Spring Cloud Stream Core.
The logic is like this in the AbstractMessageChannelBinder:

		if (HeaderMode.embeddedHeaders.equals(properties.getHeaderMode())) {
			bindingTarget.addInterceptor(0, this.embeddedHeadersChannelInterceptor);
		}

According to the problem described in the mentioned #185, we cannot rely on that for tracing headers. Therefore, the logic of embedded headers has been moved to the respective Kinesis channel adapter implementation.

I would love to ignore that HeaderMode from Spring Cloud Stream Core, but that's not what is possible from this project perspective.

Maybe for now you can workaround in your project that spring.cloud.stream.default.<producer|consumer>.headerMode=<value> to the spring.cloud.stream.kinesis.default.<producer|consumer>.embedHeaders=true|false?

I think we can raise a GH issue against Spring Cloud Stream Core to consider to deprecate embeddedHeaders option in favor of custom one in the respective Binder implementation.

@artembilan
Copy link
Member

The issue arises when using multiple destinations (i.e. a default function router for two streams)

Would you mind demonstrating your use-case to understand problem better?

@tomek82
Copy link
Contributor Author

tomek82 commented May 7, 2025

My setup is like so:

spring.cloud.function.definition=functionRouter
spring.cloud.function.routing-expression=headers['test_header']
spring.cloud.stream.function.routing.enabled=true
spring.cloud.stream.bindings.functionRouter-in-0.destination=test-stream1,test-stream2
spring.cloud.stream.bindings.functionRouter-in-0.consumer.header-mode=embeddedHeaders
spring.cloud.stream.kinesis.bindings.functionRouter-in-0.consumer.embed-headers=true

When using multiplexing spring.cloud.stream.bindings.functionRouter-in-0.consumer.multiplex=true this works fine. However this means the consumer is shared (therefore same thread) between streams which is not ideal if the throughput varies a lot between streams.

When multiplexing is not enabled the test-stream2 will not be parsing the payloads and decoding the headers because at the time the consumer gets created the header mode is reset to none by the provisioning code for test-stream1.

Adding

spring.cloud.stream.default.consumer.headerMode=embeddedHeaders
spring.cloud.stream.kinesis.default.consumer.embedHeaders=true

doesn't change anything here.

Now I'm not advocating for removing the defaults, my issue is with overriding them with none.

@artembilan
Copy link
Member

I see. I'm not familiar with that functionRouter, but looks like we cannot mitigate with existing logic, unless you can stay with multiplex=true for time being.

As a fix I suggest to rely only kinesis.bindings.functionRouter-in-0.consumer.embed-headers=true and fully ignore value of the consumer.headerMode and override it to none if embed-headers == true .
Make KinesisConsumerProperties.embedHeaders and KinesisProducerProperties.embedHeaders as true by default.
And in that KinesisStreamProvisioner have a logic like:

		KinesisProducerProperties kinesisProducerProperties = properties.getExtension();
		if (kinesisProducerProperties.isEmbedHeaders()) {
			properties.setHeaderMode(HeaderMode.none);
		}

instead of what we have so far.
Even if we override that consumer|producer.headerMode, we just don't deal with it and with the fact that embed-headers == true we do embedding on the Kinesis Binder level, not in upstream Spring Cloud Stream Core logic.

provisioning streams.

Signed-off-by: Tomek Bielecki <Tomek.Bielecki@aus.com>
@tomek82
Copy link
Contributor Author

tomek82 commented May 8, 2025

Sorry I was not being clear, functionRouter is the RoutingFunction from Spring Cloud Function project.

I've implemented your suggestions and also added handling of useNativeDecoding and useNativeEncoding and modified tests to work with new defaults. Or perhaps it would be better to handle those settings during provisioning? Basically header embedding needs to be ignored when native encoding/decoding is used.

@artembilan artembilan added this to the 4.0.5 milestone May 9, 2025
@artembilan artembilan added the bug label May 9, 2025
@artembilan artembilan merged commit bc81986 into spring-cloud:main May 9, 2025
2 checks passed
@artembilan
Copy link
Member

@tomek82 ,

thank you for contribution; looking forward for more!

@tomek82 tomek82 deleted the fix-properties-for-multiple-consumer-setup branch May 9, 2025 22:57
@Moritz-rt
Copy link

Thank you for the fix and your work!

Unfortunately the change of the default value is a breaking change and should not be released as a patch version. This change broke our api test as the message format was changed.
Fix for us was to adapt the config:
spring.cloud.stream.kinesis.bindings.processKinesisMessage-out-0.producer:
embedHeaders: false
headerMode: headers

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants