Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/integration-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,22 @@ jobs:
distribution: 'adopt'
java-version: 17

- name: install org.apache.pulsar.tests:integration:jar:tests:4.0.1
- name: install org.apache.pulsar.tests:integration:jar:tests:4.0.2
if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
run: |
cd ~
git clone --depth 50 --single-branch --branch v4.0.1 https://github.com/apache/pulsar
git clone --depth 50 --single-branch --branch v4.0.2 https://github.com/apache/pulsar
cd pulsar
mvn -B -ntp -f tests/pom.xml -pl org.apache.pulsar.tests:tests-parent,org.apache.pulsar.tests:integration install

- name: build apachepulsar/pulsar-test-latest-version:latest
if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
run: |
cd ~/pulsar
docker pull apachepulsar/pulsar-all:4.0.1
docker pull apachepulsar/pulsar:4.0.1
docker tag apachepulsar/pulsar-all:4.0.1 apachepulsar/pulsar-all:4.0.1-$(git rev-parse --short=7 HEAD)
docker tag apachepulsar/pulsar:4.0.1 apachepulsar/pulsar:4.0.1-$(git rev-parse --short=7 HEAD)
docker pull apachepulsar/pulsar-all:4.0.2
docker pull apachepulsar/pulsar:4.0.2
docker tag apachepulsar/pulsar-all:4.0.2 apachepulsar/pulsar-all:4.0.2-$(git rev-parse --short=7 HEAD)
docker tag apachepulsar/pulsar:4.0.2 apachepulsar/pulsar:4.0.2-$(git rev-parse --short=7 HEAD)
mvn -B -ntp -f tests/docker-images/pom.xml install -pl org.apache.pulsar.tests:latest-version-image -am -Pdocker,-main -DskipTests

- name: run integration tests
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ mvn install
In order to build this repository the linked Pulsar release must be released to Maven Central
other wise you have to build it locally.

For instance if this code depends on Pulsar 4.0.1 you have to build Pulsar 4.0.1 locally
For instance if this code depends on Pulsar 4.0.2 you have to build Pulsar 4.0.2 locally

```
git clone https://github.com/apache/pulsar
git checkout v4.0.1
git checkout v4.0.2
mvn clean install -DskipTests
```

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
</issueManagement>

<properties>
<pulsar.version>4.0.1</pulsar.version>
<pulsar.version>4.0.2</pulsar.version>
<kafka-client.version>2.7.2</kafka-client.version>
<storm.version>2.0.0</storm.version>
<kafka_0_8.version>0.8.1.1</kafka_0_8.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,21 @@ public void ack(Object msgId) {
}
}

public void negativeAck(Object msg) {
if (msg instanceof Message) {
Message<?> pulsarMsg = (Message<?>) msg;
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Received negative ack for message {}", spoutId, pulsarMsg.getMessageId());
}
consumer.negativeAcknowledge(pulsarMsg);
pendingMessageRetries.remove(pulsarMsg.getMessageId());
// we should also remove message from failedMessages but it will be
// eventually removed while emitting next
// tuple
--pendingAcks;
}
}

@Override
public void fail(Object msgId) {
if (msgId instanceof Message) {
Expand All @@ -183,8 +198,13 @@ public void fail(Object msgId) {
--pendingAcks;
messagesFailed++;
} else {
LOG.warn("[{}] Number of retries limit reached, dropping the message {}", spoutId, id);
ack(msg);
if(pulsarSpoutConf.shouldNegativeAckFailedMessages()){
LOG.warn("[{}] Number of retries limit reached, negative acking the message {}", spoutId, id);
negativeAck(msg);
} else {
LOG.warn("[{}] Number of retries limit reached, dropping the message {}", spoutId, id);
ack(msg);
}
}
}

Expand Down Expand Up @@ -447,6 +467,11 @@ public void acknowledgeAsync(Message<?> msg) {
consumer.acknowledgeAsync(msg);
}

@Override
public void negativeAcknowledge(Message<?> msg) {
consumer.negativeAcknowledge(msg);
}

@Override
public void close() throws PulsarClientException {
consumer.close();
Expand Down Expand Up @@ -477,6 +502,11 @@ public void acknowledgeAsync(Message<?> msg) {
// No-op
}

@Override
public void negativeAcknowledge(Message<?> msg) {
// No-op
}

@Override
public void close() throws PulsarClientException {
try {
Expand All @@ -491,4 +521,4 @@ public void unsubscribe() throws PulsarClientException {
// No-op
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@ public class PulsarSpoutConfiguration extends PulsarStormConfiguration {
private boolean autoUnsubscribe = false;
private boolean durableSubscription = true;
// read position if non-durable subscription is enabled : default oldest message available in topic
private MessageId nonDurableSubscriptionReadPosition = MessageId.earliest;
private MessageId nonDurableSubscriptionReadPosition = MessageId.earliest;
private boolean negativeAckFailedMessages = false;




/**
* @return the subscription name for the consumer in the spout
*/
Expand Down Expand Up @@ -192,4 +194,21 @@ public MessageId getNonDurableSubscriptionReadPosition() {
public void setNonDurableSubscriptionReadPosition(MessageId nonDurableSubscriptionReadPosition) {
this.nonDurableSubscriptionReadPosition = nonDurableSubscriptionReadPosition;
}

/**
*
* @return whether the consumer will negative ack the failed messages
*/
public boolean shouldNegativeAckFailedMessages(){
return this.negativeAckFailedMessages;
}

/**
* Sets whether the consumer will negative ack the failed messages. <i>(default: false)</i>
*
* @param negativeAckFailedMessages
*/
public void setNegativeAckFailedMessages(boolean negativeAckFailedMessages){
this.negativeAckFailedMessages = negativeAckFailedMessages;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ public interface PulsarSpoutConsumer {
*/
void acknowledgeAsync(Message<?> msg);

/**
* Negative ack the message.
*
* @param msg
*/
void negativeAcknowledge(Message<?> msg);

/**
* unsubscribe the consumer
* @throws PulsarClientException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,7 @@
*/
package org.apache.pulsar.storm;

import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

Expand All @@ -38,13 +29,11 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.storm.PulsarSpout.SpoutConsumer;
import org.apache.storm.spout.SpoutOutputCollector;
Expand Down Expand Up @@ -98,6 +87,71 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
verify(consumer, atLeast(1)).receive(anyInt(), any());
}

@Test
public void testFailedMessageNegativeAck() throws Exception {
testFailedMessageRetryExhausted(true);
}

@Test
public void testFailedMessageAckAndDrop() throws Exception {
testFailedMessageRetryExhausted(false);
}


public void testFailedMessageRetryExhausted(boolean negativeAckFailedMessages) throws Exception {

PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration();
conf.setServiceUrl("http://localhost:8080");
conf.setSubscriptionName("sub1");
conf.setTopic("persistent://prop/ns1/topic1");
conf.setSubscriptionType(SubscriptionType.Exclusive);
conf.setMaxFailedRetries(1);
conf.setNegativeAckFailedMessages(negativeAckFailedMessages);
conf.setMessageToValuesMapper(new MessageToValuesMapper() {
@Override
public Values toValues(Message<byte[]> msg) {
return null;
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}

});

DeadLetterPolicy deadLetterPolicy = DeadLetterPolicy.builder()
.maxRedeliverCount(1)
.deadLetterTopic("persistent://prop/ns1/dl-topic-1")
.build();
ConsumerConfigurationData<byte[]> consumerConfig = new ConsumerConfigurationData<>();
consumerConfig.setDeadLetterPolicy(deadLetterPolicy);

ClientConfigurationData clientConfigurationData = spy(new ClientBuilderImpl()).getClientConfigurationData().clone();
PulsarSpout spout = spy(new PulsarSpout(conf, clientConfigurationData, consumerConfig));

Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(),
new byte[0], Schema.BYTES, new MessageMetadata());
Consumer<byte[]> consumer = mock(Consumer.class);
SpoutConsumer spoutConsumer = new SpoutConsumer(consumer);
doNothing().when(consumer).negativeAcknowledge(msg);

Field consField = PulsarSpout.class.getDeclaredField("consumer");
consField.setAccessible(true);
consField.set(spout, spoutConsumer);

spout.fail(msg);
spout.fail(msg);

if(negativeAckFailedMessages){
verify(consumer, atLeast(1)).negativeAcknowledge(msg);
verify(consumer, never()).acknowledgeAsync(msg);
} else {
verify(consumer, never()).negativeAcknowledge(msg);
verify(consumer, atLeast(1)).acknowledgeAsync(msg);
}

}

@Test
public void testPulsarTuple() throws Exception {
testPulsarSpout(true);
Expand Down