Skip to content

Commit cfe60bf

Browse files
committed
fix(core): add subscription of generic topics
GH-0
1 parent 6b1e76a commit cfe60bf

File tree

10 files changed

+48
-20
lines changed

10 files changed

+48
-20
lines changed

README.md

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
</a>
2424
</p>
2525

26-
2726
## Overview
2827

2928
A Kafka Client for Loopback4 built on top of [KafkaJS](https://kafka.js.org/).
@@ -97,7 +96,13 @@ export class TestStream implements IStreamDefinition {
9796

9897
### Consumer
9998

100-
A Consumer is a [`loopback extension`](https://loopback.io/doc/en/lb4/Extension-point-and-extensions.html) that is used by the [`KafkaConsumerService`](./src/services/kafka-consumer.service.ts) to initialize consumers. It must implement the `IConsumer` interface and should be using the `@consumer()` decorator. If you want the consumers to start at the start of your application, you should pass the `initObservers` config to the Component configuration.
99+
A Consumer is a [`loopback extension`](https://loopback.io/doc/en/lb4/Extension-point-and-extensions.html) that is used by the [`KafkaConsumerService`](./src/services/kafka-consumer.service.ts) to initialize consumers. It must implement one of the `IConsumer`, `ISharedConsumer` or `IGenericConsumer` interfaces and should be using the `asConsumer` binding template. If you want the consumers to start at the start of your application, you should pass the `initObservers` config to the Component configuration.
100+
101+
- `IConsumer` - simple consumer for 1 event in a stream
102+
- `ISharedConsumer` - consumer that consumes data for multiple events in a stream (defined with an array of events)
103+
- `IGenericConsumer` - consumer that consumes data for all events in a stream/topic (defined without any event). By default it is not triggered for an event if a more specific consumer is bound for that event. This behaviour can be changed using the `alwaysRunGenericConsumer` option in consumer configuration.
104+
105+
You can bind any consumer related configuration using the `KafkaClientBindings.ConsumerConfiguration` key. It accepts all the options of KafkaJS, along with an additional option - `alwaysRunGenericConsumer` - this option runs any generic consumer if available always, even if more specific consumers are bound by the client(only the specific consumer would run if this option is false or not provided).
101106

102107
##### Example
103108

@@ -112,13 +117,14 @@ this.configure(KafkaConnectorComponentBindings.COMPONENT).to({
112117

113118
```ts
114119
// start.consumer.ts
120+
// use @genericConsumer for a generic consumer
115121
@consumer<TestStream, Events.start>()
116122
export class StartConsumer implements IConsumer<TestStream, Events.start> {
117123
constructor(
118124
@inject('test.handler.start')
119125
public handler: StreamHandler<TestStream, Events.start>,
120126
) {}
121-
topic: Topics.First = Topics.First;
127+
topic = Topics.First;
122128
event: Events.start = Events.start;
123129
// you can write the handler as a method
124130
handler(payload: StartEvent) {
@@ -149,7 +155,7 @@ export class StartConsumer implements IConsumer<TestStream, Events.start> {
149155
@eventHandler<TestStream>(Events.Start)
150156
public handler: StreamHandler<TestStream, Events.start>,
151157
) {}
152-
topic: Topics.First = Topics.First;
158+
topic = Topics.First;
153159
event: Events.start = Events.start;
154160
}
155161
```

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/__tests__/acceptance/application.test.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,16 +95,17 @@ describe('end-to-end', () => {
9595

9696
it('should consume from a generic consumer without events for a single topic', async () => {
9797
const producerInstance = producerApp.getSync<Producer<TestStream>>(
98-
producerKey(Topics.First),
98+
producerKey(Topics.Generic),
9999
);
100100
const close = {
101101
closeTime: new Date(),
102102
};
103103
await producerInstance.send(Events.close, [close]);
104104
sinon.assert.called(genericHandler);
105-
expect(genericHandler.getCalls()[0].args[0]).to.be.deepEqual(
106-
JSON.parse(JSON.stringify(close)),
107-
);
105+
expect(genericHandler.getCalls()[0].args[0]).to.be.deepEqual({
106+
data: JSON.parse(JSON.stringify(close)),
107+
event: Events.close,
108+
});
108109
});
109110

110111
it('should not handle an unspecified events', async () => {
@@ -118,7 +119,7 @@ describe('end-to-end', () => {
118119
};
119120
await producerInstance.connect();
120121
producerInstance.send({
121-
topic: Topics.First,
122+
topic: Topics.Generic,
122123
messages: [payload],
123124
});
124125

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
import {inject, injectable} from '@loopback/core';
22
import {asConsumer} from '../../../../keys';
3-
import {TestStream} from '../stream';
4-
import {IGenericConsumer, StreamHandler} from '../../../../types';
3+
import {GenericStream} from '../stream';
4+
import {GenericStreamHandler, IGenericConsumer} from '../../../../types';
55
import {Topics} from '../topics.enum';
6-
import {Events} from '../events.enum';
76

87
@injectable(asConsumer)
9-
export class GenericConsumer implements IGenericConsumer<TestStream> {
8+
export class GenericConsumer implements IGenericConsumer<GenericStream> {
109
constructor(
1110
@inject('eventHandler.generic')
12-
public handler: StreamHandler<TestStream, Events>,
11+
public handler: GenericStreamHandler<GenericStream>,
1312
) {}
14-
topic: Topics.First = Topics.First;
13+
topic: Topics.Generic = Topics.Generic;
1514
}

src/__tests__/acceptance/fixtures/stream.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,10 @@ export interface TestStream extends IStreamDefinition {
1313
[Events.close]: {};
1414
};
1515
}
16+
17+
export interface GenericStream extends IStreamDefinition {
18+
topic: Topics.Generic;
19+
messages: {
20+
[Events.close]: {};
21+
};
22+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
export enum Topics {
22
First = 'first',
33
Second = 'second',
4+
Generic = 'generic',
45
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import {Constructor, injectable} from '@loopback/core';
2+
import {asConsumer} from '../keys';
3+
import {IGenericConsumer, IStreamDefinition} from '../types';
4+
5+
export function genericConsumer<T extends IStreamDefinition>() {
6+
return injectable(asConsumer) as (
7+
target: Constructor<IGenericConsumer<T>>,
8+
) => void;
9+
}

src/decorators/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
export * from './producer.decorator';
22
export * from './handler.decorator';
33
export * from './consumer.decorator';
4+
export * from './generic-consumer.decorator';

src/services/kafka-consumer.service.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ export class KafkaConsumerService<T extends IStreamDefinition> {
4141

4242
const {consumerMap, genericConsumerMap} = await this.buildConsumerMaps();
4343
const topics: string[] = Array.from(consumerMap.keys());
44-
44+
topics.push(...Array.from(genericConsumerMap.keys()));
4545
await kafkaConsumerClient.subscribe({
4646
topics,
4747
});
@@ -70,7 +70,7 @@ export class KafkaConsumerService<T extends IStreamDefinition> {
7070
(!consumer || this.configuration.alwaysRunGenericConsumer) &&
7171
genericConsumer
7272
) {
73-
await genericConsumer.handler(message.data);
73+
await genericConsumer.handler(message);
7474
}
7575
} else {
7676
this.logger.warn(

src/types.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ export interface ISharedConsumer<Stream extends IStreamDefinition> {
4646

4747
export interface IGenericConsumer<Stream extends IStreamDefinition> {
4848
topic: TopicForStream<Stream>;
49-
handler: StreamHandler<Stream, EventsInStream<Stream>>;
49+
handler: GenericStreamHandler<Stream>;
5050
}
5151

5252
export function isGenericConsumer<
@@ -89,3 +89,7 @@ export type StreamHandler<
8989
Stream extends IStreamDefinition,
9090
K extends EventsInStream<Stream>,
9191
> = (payload: Stream['messages'][K]) => Promise<void>;
92+
93+
export type GenericStreamHandler<Stream extends IStreamDefinition> = (
94+
payload: Stream['messages'],
95+
) => Promise<void>;

0 commit comments

Comments
 (0)