-
Notifications
You must be signed in to change notification settings - Fork 96
KinesisStreamProvisioner is overwriting headerMode property #234
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KinesisStreamProvisioner is overwriting headerMode property #234
Conversation
Signed-off-by: Tomek Bielecki <tomasz.bielecki@gmail.com>
Signed-off-by: Tomek Bielecki <tomasz.bielecki@gmail.com>
e14bdb5
to
a7bce8f
Compare
Did you investigate why it had been done like that ? Sorry, not on the laptop to dig into the code . |
This was introduced in #185 but I'm not sure why. If you look here a7bce8f#diff-d536075c01df04e70b25e9933ff91ff520209321602ccb436a6ffc41f1c0b560L91 the if condition is always false. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, find some simple review on line.
And sorry that my unreasonable change caused some problems on your side.
Thank you!
...a/org/springframework/cloud/stream/binder/kinesis/provisioning/KinesisStreamProvisioner.java
Show resolved
Hide resolved
...a/org/springframework/cloud/stream/binder/kinesis/provisioning/KinesisStreamProvisioner.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I remember now why I did that.
You see, there is an embedded headers functionality in the Spring Cloud Stream Core.
The logic is like this in the AbstractMessageChannelBinder
:
if (HeaderMode.embeddedHeaders.equals(properties.getHeaderMode())) {
bindingTarget.addInterceptor(0, this.embeddedHeadersChannelInterceptor);
}
According to the problem described in the mentioned #185, we cannot rely on that for tracing headers. Therefore, the logic of embedded headers has been moved to the respective Kinesis channel adapter implementation.
I would love to ignore that HeaderMode
from Spring Cloud Stream Core, but that's not what is possible from this project perspective.
Maybe for now you can workaround in your project that spring.cloud.stream.default.<producer|consumer>.headerMode=<value>
to the spring.cloud.stream.kinesis.default.<producer|consumer>.embedHeaders=true|false
?
I think we can raise a GH issue against Spring Cloud Stream Core to consider to deprecate embeddedHeaders
option in favor of custom one in the respective Binder implementation.
Would you mind demonstrating your use-case to understand problem better? |
My setup is like so:
When using multiplexing When multiplexing is not enabled the test-stream2 will not be parsing the payloads and decoding the headers because at the time the consumer gets created the header mode is reset to none by the provisioning code for test-stream1. Adding
doesn't change anything here. Now I'm not advocating for removing the defaults, my issue is with overriding them with none. |
I see. I'm not familiar with that As a fix I suggest to rely only
instead of what we have so far. |
provisioning streams. Signed-off-by: Tomek Bielecki <Tomek.Bielecki@aus.com>
Sorry I was not being clear, I've implemented your suggestions and also added handling of |
@tomek82 , thank you for contribution; looking forward for more! |
Thank you for the fix and your work! Unfortunately the change of the default value is a breaking change and should not be released as a patch version. This change broke our api test as the message format was changed. |
When provisioning a stream consumer or producer the passed in binder properties get overwritten with HeaderMode.none.
The issue arises when using multiple destinations (i.e. a default function router for two streams) and not using multiplexing where in such case only the first stream will be able to parse/write the headers.