-
Notifications
You must be signed in to change notification settings - Fork 31
[Issue 53][PulsarSpout] Adds DLQ support in PulsarSpout of the Storm adapter #60
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[Issue 53][PulsarSpout] Adds DLQ support in PulsarSpout of the Storm adapter #60
Conversation
rdhabalia
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. minor comment
| } | ||
| } | ||
|
|
||
| public void negativeAck(Object msgId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be rename as msg
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was just following the same approach used in the ack and fail methods in the same class, where the method argument is called msgId and has the type Object. But inside the method, there is a check on whether msgId is an instance of the Message class and then is type cast into the Message type and assigned to a variable called msg.
Do you still want me to rename the argument to some other name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdhabalia Any comments on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, let's rename it to msg as it's not msgId
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed it to msg
| Values values; | ||
| try{ | ||
| values = pulsarSpoutConf.getMessageToValuesMapper().toValues(msg); | ||
| } catch (Exception e){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this change doesn't seem related to this PR? did we see any issue with the mapper failure? can we create a separate PR with the details if you have come across with such issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this change is not related to this PR. Let me remove it here and create a separate one for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is removed.
rdhabalia
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. let's fix the minor comment.
| } | ||
| } | ||
|
|
||
| public void negativeAck(Object msgId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, let's rename it to msg as it's not msgId
Addressed review comments
Fixes #53
Motivation
The current implementation of the Storm adapter does not utilise the dead letter queue functionality of Pulsar. Often it may be desired that when there is some error while processing a message in the Storm topology, the message be moved into the dead letter queue so that it can be handled separately. But the PulsarSpout currently keeps retrying for a while & finally just drops the message by acking it, which leads to data loss. This PR seeks to allow users of PulsarSpout to opt in to using DLQ queues for message processing failures.
Modifications
negativeAcknowledgeto thePulsarSpoutConsumerinterface & added the implementation for the same in theSpoutConsumerclass.negativeAckFailedMessagesin thePulsarSpoutConfigurationclass along with getter & setters for the same.negativeAckin thePulsarSpoutclass to negatively ack messages and send them to the DLQ.failmethod in thePulsarSpoutclass to negatively acknowledge messages whennegativeAckFailedMessagesis set totruemapToValueAndEmitmethod to throw an exception when mapping to the value fails.4.0.2. This is required for this fix as mentioned in the following PR [fix][client] Make DeadLetterPolicy & KeySharedPolicy serializable pulsar#23718Verifying this change
This change added tests and can be verified as follows:
Tested the cases where
negativeAckFailedMessagesis set totrueandfalsein thePulsarSpoutTesttest class.Does this pull request potentially affect one of the following parts:
If
yeswas chosen, please highlight the changesDocumentation
The corresponding PR in my fork: AnuragReddy2000#2