From cfe60bfc7c9fbc7be2387fba64aaec0c364a79c7 Mon Sep 17 00:00:00 2001
From: akshatdubeysf
Date: Thu, 26 Dec 2024 20:48:46 +0530
Subject: [PATCH 1/2] fix(core): add subscription of generic topics
GH-0
---
README.md | 14 ++++++++++----
package-lock.json | 4 ++--
src/__tests__/acceptance/application.test.ts | 11 ++++++-----
.../consumer/generic-consumer.extension.ts | 11 +++++------
src/__tests__/acceptance/fixtures/stream.ts | 7 +++++++
src/__tests__/acceptance/fixtures/topics.enum.ts | 1 +
src/decorators/generic-consumer.decorator.ts | 9 +++++++++
src/decorators/index.ts | 1 +
src/services/kafka-consumer.service.ts | 4 ++--
src/types.ts | 6 +++++-
10 files changed, 48 insertions(+), 20 deletions(-)
create mode 100644 src/decorators/generic-consumer.decorator.ts
diff --git a/README.md b/README.md
index 6e97699..b0950d1 100644
--- a/README.md
+++ b/README.md
@@ -23,7 +23,6 @@
-
## Overview
A Kafka Client for Loopback4 built on top of [KafkaJS](https://kafka.js.org/).
@@ -97,7 +96,13 @@ export class TestStream implements IStreamDefinition {
### Consumer
-A Consumer is a [`loopback extension`](https://loopback.io/doc/en/lb4/Extension-point-and-extensions.html) that is used by the [`KafkaConsumerService`](./src/services/kafka-consumer.service.ts) to initialize consumers. It must implement the `IConsumer` interface and should be using the `@consumer()` decorator. If you want the consumers to start at the start of your application, you should pass the `initObservers` config to the Component configuration.
+A Consumer is a [`loopback extension`](https://loopback.io/doc/en/lb4/Extension-point-and-extensions.html) that is used by the [`KafkaConsumerService`](./src/services/kafka-consumer.service.ts) to initialize consumers. It must implement one of the `IConsumer`, `ISharedConsumer` or `IGenericConsumer` interfaces and should be using the `asConsumer` binding template. If you want the consumers to start at the start of your application, you should pass the `initObservers` config to the Component configuration.
+
+- `IConsumer` - simple consumer for 1 event in a stream
+- `ISharedConsumer` - consumer that consumes data for multiple events in a stream (defined with an array of events)
+- `IGenericConsumer` - consumer that consumes data for all events in a stream/topic (defined without any event). By default it is not triggered for an event if a more specific consumer is bound for that event. This behaviour can be changed using the `alwaysRunGenericConsumer` option in consumer configuration.
+
+You can bind any consumer related configuration using the `KafkaClientBindings.ConsumerConfiguration` key. It accepts all the options of KafkaJS, along with an additional option - `alwaysRunGenericConsumer` - this option runs any generic consumer if available always, even if more specific consumers are bound by the client(only the specific consumer would run if this option is false or not provided).
##### Example
@@ -112,13 +117,14 @@ this.configure(KafkaConnectorComponentBindings.COMPONENT).to({
```ts
// start.consumer.ts
+// use @genericConsumer for a generic consumer
@consumer()
export class StartConsumer implements IConsumer {
constructor(
@inject('test.handler.start')
public handler: StreamHandler,
) {}
- topic: Topics.First = Topics.First;
+ topic = Topics.First;
event: Events.start = Events.start;
// you can write the handler as a method
handler(payload: StartEvent) {
@@ -149,7 +155,7 @@ export class StartConsumer implements IConsumer {
@eventHandler(Events.Start)
public handler: StreamHandler,
) {}
- topic: Topics.First = Topics.First;
+ topic = Topics.First;
event: Events.start = Events.start;
}
```
diff --git a/package-lock.json b/package-lock.json
index c3b5261..fa076c3 100644
--- a/package-lock.json
+++ b/package-lock.json
@@ -1,12 +1,12 @@
{
"name": "loopback4-kafka-client",
- "version": "3.0.5",
+ "version": "3.0.6",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "loopback4-kafka-client",
- "version": "3.0.5",
+ "version": "3.0.6",
"license": "MIT",
"dependencies": {
"@loopback/boot": "^7.0.9",
diff --git a/src/__tests__/acceptance/application.test.ts b/src/__tests__/acceptance/application.test.ts
index 9520993..0294b63 100644
--- a/src/__tests__/acceptance/application.test.ts
+++ b/src/__tests__/acceptance/application.test.ts
@@ -95,16 +95,17 @@ describe('end-to-end', () => {
it('should consume from a generic consumer without events for a single topic', async () => {
const producerInstance = producerApp.getSync>(
- producerKey(Topics.First),
+ producerKey(Topics.Generic),
);
const close = {
closeTime: new Date(),
};
await producerInstance.send(Events.close, [close]);
sinon.assert.called(genericHandler);
- expect(genericHandler.getCalls()[0].args[0]).to.be.deepEqual(
- JSON.parse(JSON.stringify(close)),
- );
+ expect(genericHandler.getCalls()[0].args[0]).to.be.deepEqual({
+ data: JSON.parse(JSON.stringify(close)),
+ event: Events.close,
+ });
});
it('should not handle an unspecified events', async () => {
@@ -118,7 +119,7 @@ describe('end-to-end', () => {
};
await producerInstance.connect();
producerInstance.send({
- topic: Topics.First,
+ topic: Topics.Generic,
messages: [payload],
});
diff --git a/src/__tests__/acceptance/fixtures/consumer/generic-consumer.extension.ts b/src/__tests__/acceptance/fixtures/consumer/generic-consumer.extension.ts
index 9343021..315242a 100644
--- a/src/__tests__/acceptance/fixtures/consumer/generic-consumer.extension.ts
+++ b/src/__tests__/acceptance/fixtures/consumer/generic-consumer.extension.ts
@@ -1,15 +1,14 @@
import {inject, injectable} from '@loopback/core';
import {asConsumer} from '../../../../keys';
-import {TestStream} from '../stream';
-import {IGenericConsumer, StreamHandler} from '../../../../types';
+import {GenericStream} from '../stream';
+import {GenericStreamHandler, IGenericConsumer} from '../../../../types';
import {Topics} from '../topics.enum';
-import {Events} from '../events.enum';
@injectable(asConsumer)
-export class GenericConsumer implements IGenericConsumer {
+export class GenericConsumer implements IGenericConsumer {
constructor(
@inject('eventHandler.generic')
- public handler: StreamHandler,
+ public handler: GenericStreamHandler,
) {}
- topic: Topics.First = Topics.First;
+ topic: Topics.Generic = Topics.Generic;
}
diff --git a/src/__tests__/acceptance/fixtures/stream.ts b/src/__tests__/acceptance/fixtures/stream.ts
index 9b2e1da..ae3a68b 100644
--- a/src/__tests__/acceptance/fixtures/stream.ts
+++ b/src/__tests__/acceptance/fixtures/stream.ts
@@ -13,3 +13,10 @@ export interface TestStream extends IStreamDefinition {
[Events.close]: {};
};
}
+
+export interface GenericStream extends IStreamDefinition {
+ topic: Topics.Generic;
+ messages: {
+ [Events.close]: {};
+ };
+}
diff --git a/src/__tests__/acceptance/fixtures/topics.enum.ts b/src/__tests__/acceptance/fixtures/topics.enum.ts
index 3a2c41a..412df88 100644
--- a/src/__tests__/acceptance/fixtures/topics.enum.ts
+++ b/src/__tests__/acceptance/fixtures/topics.enum.ts
@@ -1,4 +1,5 @@
export enum Topics {
First = 'first',
Second = 'second',
+ Generic = 'generic',
}
diff --git a/src/decorators/generic-consumer.decorator.ts b/src/decorators/generic-consumer.decorator.ts
new file mode 100644
index 0000000..2ae426e
--- /dev/null
+++ b/src/decorators/generic-consumer.decorator.ts
@@ -0,0 +1,9 @@
+import {Constructor, injectable} from '@loopback/core';
+import {asConsumer} from '../keys';
+import {IGenericConsumer, IStreamDefinition} from '../types';
+
+export function genericConsumer() {
+ return injectable(asConsumer) as (
+ target: Constructor>,
+ ) => void;
+}
diff --git a/src/decorators/index.ts b/src/decorators/index.ts
index 10c3343..9c156ef 100644
--- a/src/decorators/index.ts
+++ b/src/decorators/index.ts
@@ -1,3 +1,4 @@
export * from './producer.decorator';
export * from './handler.decorator';
export * from './consumer.decorator';
+export * from './generic-consumer.decorator';
diff --git a/src/services/kafka-consumer.service.ts b/src/services/kafka-consumer.service.ts
index b2d872a..5342853 100644
--- a/src/services/kafka-consumer.service.ts
+++ b/src/services/kafka-consumer.service.ts
@@ -41,7 +41,7 @@ export class KafkaConsumerService {
const {consumerMap, genericConsumerMap} = await this.buildConsumerMaps();
const topics: string[] = Array.from(consumerMap.keys());
-
+ topics.push(...Array.from(genericConsumerMap.keys()));
await kafkaConsumerClient.subscribe({
topics,
});
@@ -70,7 +70,7 @@ export class KafkaConsumerService {
(!consumer || this.configuration.alwaysRunGenericConsumer) &&
genericConsumer
) {
- await genericConsumer.handler(message.data);
+ await genericConsumer.handler(message);
}
} else {
this.logger.warn(
diff --git a/src/types.ts b/src/types.ts
index 020968f..47cffeb 100644
--- a/src/types.ts
+++ b/src/types.ts
@@ -46,7 +46,7 @@ export interface ISharedConsumer {
export interface IGenericConsumer {
topic: TopicForStream;
- handler: StreamHandler>;
+ handler: GenericStreamHandler;
}
export function isGenericConsumer<
@@ -89,3 +89,7 @@ export type StreamHandler<
Stream extends IStreamDefinition,
K extends EventsInStream,
> = (payload: Stream['messages'][K]) => Promise;
+
+export type GenericStreamHandler = (
+ payload: Stream['messages'],
+) => Promise;
From 5076645cbd3bd48c9772df75b15446a06ebfbe48 Mon Sep 17 00:00:00 2001
From: akshatdubeysf
Date: Fri, 27 Dec 2024 18:08:23 +0530
Subject: [PATCH 2/2] fix(providers): add provider for generic producer
GH-0
---
README.md | 1 +
src/__tests__/acceptance/application.test.ts | 13 +++++
.../producer/generic-producer.service.ts | 19 +++++++
.../fixtures/producer/producer-app.ts | 3 ++
src/__tests__/acceptance/fixtures/stream.ts | 2 +-
src/component.ts | 24 ++++++++-
src/decorators/generic-producer.decorator.ts | 6 +++
src/decorators/index.ts | 1 +
src/keys.ts | 10 ++++
...generic-kafka-producer-factory.provider.ts | 49 +++++++++++++++++++
src/providers/index.ts | 1 +
src/types.ts | 9 ++++
12 files changed, 135 insertions(+), 3 deletions(-)
create mode 100644 src/__tests__/acceptance/fixtures/producer/generic-producer.service.ts
create mode 100644 src/decorators/generic-producer.decorator.ts
create mode 100644 src/providers/generic-kafka-producer-factory.provider.ts
diff --git a/README.md b/README.md
index b0950d1..34b5508 100644
--- a/README.md
+++ b/README.md
@@ -164,6 +164,7 @@ export class StartConsumer implements IConsumer {
A Producer is a loopback service for producing message for a particular topic, you can inject a producer using the `@producer(TOPIC_NAME)` decorator.
Note: The topic name passed to decorator must be first configured in the Component configuration's topic property -
+If you want to produce a raw message without any event type, you can use the `@genericProducer(TOPIC_NAME)` decorator, note that in this case, the topic name must be passed in the genericTopics property of the component configuration.
#### Example
diff --git a/src/__tests__/acceptance/application.test.ts b/src/__tests__/acceptance/application.test.ts
index 0294b63..d19c944 100644
--- a/src/__tests__/acceptance/application.test.ts
+++ b/src/__tests__/acceptance/application.test.ts
@@ -13,6 +13,7 @@ import {
setupConsumerApplication,
setupProducerApplication,
} from './test-helper';
+import {GenericProducerService} from './fixtures/producer/generic-producer.service';
describe('end-to-end', () => {
let consumerApp: Application;
@@ -93,6 +94,18 @@ describe('end-to-end', () => {
);
});
+ it('should produce from a generic producer without events for a single topic', async () => {
+ const producerService = producerApp.getSync(
+ `services.GenericProducerService`,
+ );
+ const message = 'message';
+ await producerService.produceMessage(message);
+ sinon.assert.called(genericHandler);
+ expect(genericHandler.getCalls()[0].args[0]).to.be.deepEqual({
+ data: message,
+ });
+ });
+
it('should consume from a generic consumer without events for a single topic', async () => {
const producerInstance = producerApp.getSync>(
producerKey(Topics.Generic),
diff --git a/src/__tests__/acceptance/fixtures/producer/generic-producer.service.ts b/src/__tests__/acceptance/fixtures/producer/generic-producer.service.ts
new file mode 100644
index 0000000..26ccf52
--- /dev/null
+++ b/src/__tests__/acceptance/fixtures/producer/generic-producer.service.ts
@@ -0,0 +1,19 @@
+import {genericProducer} from '../../../../decorators/generic-producer.decorator';
+import {GenericProducer} from '../../../../types';
+import {GenericStream} from '../stream';
+import {Topics} from '../topics.enum';
+
+export class GenericProducerService {
+ constructor(
+ @genericProducer(Topics.Generic)
+ private producer: GenericProducer,
+ ) {}
+
+ async produceMessage(message: string): Promise {
+ await this.producer.send([
+ {
+ data: message,
+ },
+ ]);
+ }
+}
diff --git a/src/__tests__/acceptance/fixtures/producer/producer-app.ts b/src/__tests__/acceptance/fixtures/producer/producer-app.ts
index 881ff58..5bfc55f 100644
--- a/src/__tests__/acceptance/fixtures/producer/producer-app.ts
+++ b/src/__tests__/acceptance/fixtures/producer/producer-app.ts
@@ -7,6 +7,7 @@ import {KafkaClientComponent} from '../../../../component';
import {KafkaClientBindings} from '../../../../keys';
import {KafkaClientStub} from '../../../stubs';
import {Topics} from '../topics.enum';
+import {GenericProducerService} from './generic-producer.service';
export class ProducerApp extends BootMixin(
ServiceMixin(RepositoryMixin(RestApplication)),
@@ -16,11 +17,13 @@ export class ProducerApp extends BootMixin(
this.configure(KafkaClientBindings.Component).to({
topics: Object.values(Topics) as string[],
+ genericTopics: [Topics.Generic],
});
this.bind(KafkaClientBindings.KafkaClient).to(
options.client,
);
this.component(KafkaClientComponent);
+ this.service(GenericProducerService);
this.projectRoot = __dirname;
// Customize @loopback/boot Booter Conventions here
diff --git a/src/__tests__/acceptance/fixtures/stream.ts b/src/__tests__/acceptance/fixtures/stream.ts
index ae3a68b..148844d 100644
--- a/src/__tests__/acceptance/fixtures/stream.ts
+++ b/src/__tests__/acceptance/fixtures/stream.ts
@@ -17,6 +17,6 @@ export interface TestStream extends IStreamDefinition {
export interface GenericStream extends IStreamDefinition {
topic: Topics.Generic;
messages: {
- [Events.close]: {};
+ data: string;
};
}
diff --git a/src/component.ts b/src/component.ts
index 02d875a..b75cdec 100644
--- a/src/component.ts
+++ b/src/component.ts
@@ -10,9 +10,12 @@ import {
} from '@loopback/core';
import {LoggerExtensionComponent} from '@sourceloop/core';
import {Kafka} from 'kafkajs';
-import {KafkaClientBindings, producerKey} from './keys';
+import {genericProducerKey, KafkaClientBindings, producerKey} from './keys';
import {KafkaObserver} from './observers';
-import {KafkaProducerFactoryProvider} from './providers';
+import {
+ GenericKafkaProducerFactoryProvider,
+ KafkaProducerFactoryProvider,
+} from './providers';
import {KafkaConsumerService} from './services/kafka-consumer.service';
import {KafkaClientOptions} from './types';
@@ -39,6 +42,11 @@ export class KafkaClientComponent implements Component {
.toProvider(KafkaProducerFactoryProvider)
.inScope(BindingScope.SINGLETON);
+ app
+ .bind(KafkaClientBindings.GenericProducerFactor)
+ .toProvider(GenericKafkaProducerFactoryProvider)
+ .inScope(BindingScope.SINGLETON);
+
app.service(KafkaConsumerService);
if (configuration?.topics) {
@@ -50,6 +58,18 @@ export class KafkaClientComponent implements Component {
.inScope(BindingScope.SINGLETON);
});
}
+
+ if (configuration?.genericTopics) {
+ const genericProducerFactory = app.getSync(
+ KafkaClientBindings.GenericProducerFactor,
+ );
+ configuration.genericTopics.forEach(topic => {
+ app
+ .bind(genericProducerKey(topic))
+ .to(genericProducerFactory(topic))
+ .inScope(BindingScope.SINGLETON);
+ });
+ }
if (configuration?.initObservers) {
app.lifeCycleObserver(KafkaObserver);
}
diff --git a/src/decorators/generic-producer.decorator.ts b/src/decorators/generic-producer.decorator.ts
new file mode 100644
index 0000000..abb596b
--- /dev/null
+++ b/src/decorators/generic-producer.decorator.ts
@@ -0,0 +1,6 @@
+import {inject} from '@loopback/core';
+import {genericProducerKey} from '../keys';
+
+export function genericProducer(topic: string) {
+ return inject(genericProducerKey(topic));
+}
diff --git a/src/decorators/index.ts b/src/decorators/index.ts
index 9c156ef..e41160f 100644
--- a/src/decorators/index.ts
+++ b/src/decorators/index.ts
@@ -2,3 +2,4 @@ export * from './producer.decorator';
export * from './handler.decorator';
export * from './consumer.decorator';
export * from './generic-consumer.decorator';
+export * from './generic-producer.decorator';
diff --git a/src/keys.ts b/src/keys.ts
index a5c0be9..38d8f6c 100644
--- a/src/keys.ts
+++ b/src/keys.ts
@@ -4,6 +4,8 @@ import {KafkaClientComponent} from './component';
import {KafkaConsumerService} from './services/kafka-consumer.service';
import {
ConsumerConfig,
+ GenericProducer,
+ GenericProducerFactoryType,
IStreamDefinition,
Producer,
ProducerFactoryType,
@@ -30,6 +32,9 @@ export namespace KafkaClientBindings {
export const ProducerFactory = BindingKey.create<
ProducerFactoryType
>(`${KafkaNamespace}.ProducerFactory`);
+ export const GenericProducerFactor = BindingKey.create<
+ GenericProducerFactoryType
+ >(`${KafkaNamespace}.GenericProducerFactory`);
export const LifeCycleGroup = `${KafkaNamespace}.KAFKA_OBSERVER_GROUP`;
}
@@ -38,6 +43,11 @@ export const producerKey = (topic: string) =>
`${KafkaNamespace}.producer.${topic}`,
);
+export const genericProducerKey = (topic: string) =>
+ BindingKey.create>(
+ `${KafkaNamespace}.generic.producer.${topic}`,
+ );
+
export const eventHandlerKey = <
Stream extends IStreamDefinition,
K extends keyof Stream['messages'],
diff --git a/src/providers/generic-kafka-producer-factory.provider.ts b/src/providers/generic-kafka-producer-factory.provider.ts
new file mode 100644
index 0000000..fbb9955
--- /dev/null
+++ b/src/providers/generic-kafka-producer-factory.provider.ts
@@ -0,0 +1,49 @@
+import {inject, Provider} from '@loopback/core';
+import {ILogger, LOGGER} from '@sourceloop/core';
+import {CompressionTypes, Kafka, ProducerConfig} from 'kafkajs';
+import {KafkaErrorKeys} from '../error-keys';
+import {GenericProducerFactoryType, IStreamDefinition} from '../types';
+import {KafkaClientBindings} from '../keys';
+
+/* The class `GenericKafkaProducerFactoryProvider` is a TypeScript class that provides a factory for creating
+Kafka producers to send messages to specified topics without events. */
+export class GenericKafkaProducerFactoryProvider
+ implements Provider>
+{
+ constructor(
+ @inject(KafkaClientBindings.KafkaClient)
+ private client: Kafka,
+ @inject(LOGGER.LOGGER_INJECT) private readonly logger: ILogger,
+ @inject(KafkaClientBindings.ProducerConfiguration, {optional: true})
+ private configuration?: ProducerConfig,
+ ) {}
+
+ value(): GenericProducerFactoryType {
+ return (topic: string) => {
+ return {
+ send: async (payload: T['messages'][], key?: string): Promise => {
+ const producer = this.client.producer(this.configuration);
+
+ try {
+ await producer.connect();
+ await producer.send({
+ topic: topic,
+ compression: CompressionTypes.GZIP,
+ messages: payload.map(message => ({
+ key,
+ value: JSON.stringify(message),
+ })),
+ });
+ await producer.disconnect();
+ } catch (e) {
+ this.logger.error(
+ `${KafkaErrorKeys.PublishFailed}: ${JSON.stringify(e)}`,
+ );
+ await producer.disconnect();
+ throw e;
+ }
+ },
+ };
+ };
+ }
+}
diff --git a/src/providers/index.ts b/src/providers/index.ts
index 2ed770c..4f91654 100644
--- a/src/providers/index.ts
+++ b/src/providers/index.ts
@@ -1 +1,2 @@
export * from './kafka-producer-factory.provider';
+export * from './generic-kafka-producer-factory.provider';
diff --git a/src/types.ts b/src/types.ts
index 47cffeb..9cdfced 100644
--- a/src/types.ts
+++ b/src/types.ts
@@ -4,6 +4,7 @@ export interface KafkaClientOptions {
connection: KafkaConfig;
topics?: string[];
initObservers?: boolean;
+ genericTopics?: string[];
}
export type ConsumerConfig = {
@@ -81,10 +82,18 @@ export interface Producer {
): Promise;
}
+export interface GenericProducer {
+ send(payload: Stream['messages'][], key?: string): Promise;
+}
+
export type ProducerFactoryType = (
topic: Stream['topic'],
) => Producer;
+export type GenericProducerFactoryType = (
+ topic: Stream['topic'],
+) => GenericProducer;
+
export type StreamHandler<
Stream extends IStreamDefinition,
K extends EventsInStream,