Skip to content

Commit 2c92416

Browse files
authored
Allow delete messages option after acknowledge (#12)
* Allow delete message after acknowledge * Update Readme * Create node package * Revert name of package * Change option name to deleteMessagesAfterAck * Update Readme
1 parent 79d8a39 commit 2c92416

File tree

3 files changed

+9
-0
lines changed

3 files changed

+9
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ async function bootstrap() {
9797
block: 5000,
9898
consumer: 'users-1',
9999
consumerGroup: 'users',
100+
deleteMessagesAfterAck: true, // optional: delete message from stream
100101
},
101102
// optional. See our example main.ts file for more details...
102103
// serialization: {},

lib/interfaces.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ interface RedisStreamOptionsXreadGroup {
1515
block?: number;
1616
consumerGroup: string;
1717
consumer: string;
18+
deleteMessagesAfterAck?: boolean;
1819
}
1920

2021
export type RedisStreamOptions = RedisStreamOptionsXreadGroup;

lib/redis.server.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,13 @@ export class RedisStreamStrategy
153153
inboundContext.getMessageId(),
154154
);
155155

156+
if (true === this.options?.streams?.deleteMessagesAfterAck) {
157+
await this.client.xdel(
158+
inboundContext.getStream(),
159+
inboundContext.getMessageId(),
160+
);
161+
}
162+
156163
return true;
157164
} catch (error) {
158165
this.logger.error(error);

0 commit comments

Comments
 (0)