Skip to content

Commit a926edd

Browse files
zeejersZach Sherbondy
andauthored
xadd-maxlen-support (#22)
Co-authored-by: Zach Sherbondy <zachsherbondy@Zachs-MBP.localdomain>
1 parent 45d1873 commit a926edd

File tree

3 files changed

+28
-3
lines changed

3 files changed

+28
-3
lines changed

lib/interfaces.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@ interface RedisStreamOptionsXreadGroup {
1818
deleteMessagesAfterAck?: boolean;
1919
}
2020

21-
export type RedisStreamOptions = RedisStreamOptionsXreadGroup;
21+
interface RedisStreamOptionsXadd {
22+
maxLen?: number;
23+
}
24+
25+
export type RedisStreamOptions = RedisStreamOptionsXreadGroup & RedisStreamOptionsXadd;
2226

2327
// [id, [key, value, key, value]]
2428
export type RawStreamMessage = [id: string, payload: string[]];

lib/redis.client.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { RequestsMap } from './requests-map';
77
import { deserialize, generateCorrelationId, serialize } from './streams.utils';
88
import { RedisStreamContext } from './stream.context';
99
import { firstValueFrom, share } from 'rxjs';
10+
import { RedisValue } from 'ioredis';
1011

1112
@Injectable()
1213
export class RedisStreamClient extends ClientProxy {
@@ -104,9 +105,16 @@ export class RedisStreamClient extends ClientProxy {
104105
try {
105106
if (!this.client) throw new Error('Redis client instance not found.');
106107

108+
const commandArgs: RedisValue[] = [];
109+
if(this.options.streams?.maxLen){
110+
commandArgs.push("MAXLEN")
111+
commandArgs.push("~")
112+
commandArgs.push(this.options.streams.maxLen.toString())
113+
}
114+
commandArgs.push("*")
107115
let response = await this.client.xadd(
108116
stream,
109-
'*',
117+
...commandArgs,
110118
...serializedPayloadArray,
111119
);
112120
return response;

lib/redis.server.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { CONNECT_EVENT, ERROR_EVENT } from '@nestjs/microservices/constants';
1212
import { deserialize, serialize } from './streams.utils';
1313
import { RedisStreamContext } from './stream.context';
1414
import { Observable } from 'rxjs';
15+
import { RedisValue } from 'ioredis';
1516

1617
export class RedisStreamStrategy
1718
extends Server
@@ -148,7 +149,19 @@ export class RedisStreamStrategy
148149

149150
if (!this.client) throw new Error('Redis client instance not found.');
150151

151-
await this.client.xadd(responseObj.stream, '*', ...serializedEntries);
152+
const commandArgs: RedisValue[] = [];
153+
if(this.options.streams?.maxLen){
154+
commandArgs.push("MAXLEN")
155+
commandArgs.push("~")
156+
commandArgs.push(this.options.streams.maxLen.toString())
157+
}
158+
commandArgs.push("*")
159+
160+
await this.client.xadd(
161+
responseObj.stream,
162+
...commandArgs,
163+
...serializedEntries,
164+
);
152165
}),
153166
);
154167

0 commit comments

Comments
 (0)