Skip to content

Commit d69aab1

Browse files
committed
fix(providers): add provider for generic producer
GH-0
1 parent cfe60bf commit d69aab1

File tree

11 files changed

+134
-3
lines changed

11 files changed

+134
-3
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ export class StartConsumer implements IConsumer<TestStream, Events.start> {
164164

165165
A Producer is a loopback service for producing message for a particular topic, you can inject a producer using the `@producer(TOPIC_NAME)` decorator.
166166
Note: The topic name passed to decorator must be first configured in the Component configuration's topic property -
167+
If you want to produce a raw message without any event type, you can use the `@genericProducer(TOPIC_NAME)` decorator, note that in this case, the topic name must be passed in the genericTopics property of the component configuration.
167168

168169
#### Example
169170

src/__tests__/acceptance/application.test.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import {
1313
setupConsumerApplication,
1414
setupProducerApplication,
1515
} from './test-helper';
16+
import {GenericProducerService} from './fixtures/producer/generic-producer.service';
1617

1718
describe('end-to-end', () => {
1819
let consumerApp: Application;
@@ -93,6 +94,18 @@ describe('end-to-end', () => {
9394
);
9495
});
9596

97+
it('should produce from a generic producer without events for a single topic', async () => {
98+
const producerService = producerApp.getSync<GenericProducerService>(
99+
`services.GenericProducerService`,
100+
);
101+
const message = 'message';
102+
await producerService.produceMessage(message);
103+
sinon.assert.called(genericHandler);
104+
expect(genericHandler.getCalls()[0].args[0]).to.be.deepEqual({
105+
data: message,
106+
});
107+
});
108+
96109
it('should consume from a generic consumer without events for a single topic', async () => {
97110
const producerInstance = producerApp.getSync<Producer<TestStream>>(
98111
producerKey(Topics.Generic),
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import {genericProducer} from '../../../../decorators/generic-producer.decorator';
2+
import {GenericProducer} from '../../../../types';
3+
import {GenericStream} from '../stream';
4+
import {Topics} from '../topics.enum';
5+
6+
export class GenericProducerService {
7+
constructor(
8+
@genericProducer(Topics.Generic)
9+
private producer: GenericProducer<GenericStream>,
10+
) {}
11+
12+
async produceMessage(message: string): Promise<void> {
13+
await this.producer.send([
14+
{
15+
data: message,
16+
},
17+
]);
18+
}
19+
}

src/__tests__/acceptance/fixtures/producer/producer-app.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {KafkaClientComponent} from '../../../../component';
77
import {KafkaClientBindings} from '../../../../keys';
88
import {KafkaClientStub} from '../../../stubs';
99
import {Topics} from '../topics.enum';
10+
import {GenericProducerService} from './generic-producer.service';
1011

1112
export class ProducerApp extends BootMixin(
1213
ServiceMixin(RepositoryMixin(RestApplication)),
@@ -16,11 +17,13 @@ export class ProducerApp extends BootMixin(
1617

1718
this.configure(KafkaClientBindings.Component).to({
1819
topics: Object.values(Topics) as string[],
20+
genericTopics: [Topics.Generic],
1921
});
2022
this.bind<KafkaClientStub>(KafkaClientBindings.KafkaClient).to(
2123
options.client,
2224
);
2325
this.component(KafkaClientComponent);
26+
this.service(GenericProducerService);
2427

2528
this.projectRoot = __dirname;
2629
// Customize @loopback/boot Booter Conventions here

src/__tests__/acceptance/fixtures/stream.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,6 @@ export interface TestStream extends IStreamDefinition {
1717
export interface GenericStream extends IStreamDefinition {
1818
topic: Topics.Generic;
1919
messages: {
20-
[Events.close]: {};
20+
data: string;
2121
};
2222
}

src/component.ts

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,12 @@ import {
1010
} from '@loopback/core';
1111
import {LoggerExtensionComponent} from '@sourceloop/core';
1212
import {Kafka} from 'kafkajs';
13-
import {KafkaClientBindings, producerKey} from './keys';
13+
import {genericProducerKey, KafkaClientBindings, producerKey} from './keys';
1414
import {KafkaObserver} from './observers';
15-
import {KafkaProducerFactoryProvider} from './providers';
15+
import {
16+
GenericKafkaProducerFactoryProvider,
17+
KafkaProducerFactoryProvider,
18+
} from './providers';
1619
import {KafkaConsumerService} from './services/kafka-consumer.service';
1720
import {KafkaClientOptions} from './types';
1821

@@ -39,6 +42,11 @@ export class KafkaClientComponent implements Component {
3942
.toProvider(KafkaProducerFactoryProvider)
4043
.inScope(BindingScope.SINGLETON);
4144

45+
app
46+
.bind(KafkaClientBindings.GenericProducerFactor)
47+
.toProvider(GenericKafkaProducerFactoryProvider)
48+
.inScope(BindingScope.SINGLETON);
49+
4250
app.service(KafkaConsumerService);
4351

4452
if (configuration?.topics) {
@@ -50,6 +58,18 @@ export class KafkaClientComponent implements Component {
5058
.inScope(BindingScope.SINGLETON);
5159
});
5260
}
61+
62+
if (configuration?.genericTopics) {
63+
const genericProducerFactory = app.getSync(
64+
KafkaClientBindings.GenericProducerFactor,
65+
);
66+
configuration.genericTopics.forEach(topic => {
67+
app
68+
.bind(genericProducerKey(topic))
69+
.to(genericProducerFactory(topic))
70+
.inScope(BindingScope.SINGLETON);
71+
});
72+
}
5373
if (configuration?.initObservers) {
5474
app.lifeCycleObserver(KafkaObserver);
5575
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
import {inject} from '@loopback/core';
2+
import {genericProducerKey} from '../keys';
3+
4+
export function genericProducer(topic: string) {
5+
return inject(genericProducerKey(topic));
6+
}

src/keys.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import {KafkaClientComponent} from './component';
44
import {KafkaConsumerService} from './services/kafka-consumer.service';
55
import {
66
ConsumerConfig,
7+
GenericProducer,
8+
GenericProducerFactoryType,
79
IStreamDefinition,
810
Producer,
911
ProducerFactoryType,
@@ -30,6 +32,9 @@ export namespace KafkaClientBindings {
3032
export const ProducerFactory = BindingKey.create<
3133
ProducerFactoryType<IStreamDefinition>
3234
>(`${KafkaNamespace}.ProducerFactory`);
35+
export const GenericProducerFactor = BindingKey.create<
36+
GenericProducerFactoryType<IStreamDefinition>
37+
>(`${KafkaNamespace}.GenericProducerFactory`);
3338
export const LifeCycleGroup = `${KafkaNamespace}.KAFKA_OBSERVER_GROUP`;
3439
}
3540

@@ -38,6 +43,11 @@ export const producerKey = (topic: string) =>
3843
`${KafkaNamespace}.producer.${topic}`,
3944
);
4045

46+
export const genericProducerKey = (topic: string) =>
47+
BindingKey.create<GenericProducer<IStreamDefinition>>(
48+
`${KafkaNamespace}.generic.producer.${topic}`,
49+
);
50+
4151
export const eventHandlerKey = <
4252
Stream extends IStreamDefinition,
4353
K extends keyof Stream['messages'],
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import {inject, Provider} from '@loopback/core';
2+
import {ILogger, LOGGER} from '@sourceloop/core';
3+
import {CompressionTypes, Kafka, ProducerConfig} from 'kafkajs';
4+
import {KafkaErrorKeys} from '../error-keys';
5+
import {GenericProducerFactoryType, IStreamDefinition} from '../types';
6+
import {KafkaClientBindings} from '../keys';
7+
8+
/* The class `GenericKafkaProducerFactoryProvider` is a TypeScript class that provides a factory for creating
9+
Kafka producers to send messages to specified topics without events. */
10+
export class GenericKafkaProducerFactoryProvider<T extends IStreamDefinition>
11+
implements Provider<GenericProducerFactoryType<T>>
12+
{
13+
constructor(
14+
@inject(KafkaClientBindings.KafkaClient)
15+
private client: Kafka,
16+
@inject(LOGGER.LOGGER_INJECT) private readonly logger: ILogger,
17+
@inject(KafkaClientBindings.ProducerConfiguration, {optional: true})
18+
private configuration?: ProducerConfig,
19+
) {}
20+
21+
value(): GenericProducerFactoryType<T> {
22+
return (topic: string) => {
23+
return {
24+
send: async (payload: T['messages'][], key?: string): Promise<void> => {
25+
const producer = this.client.producer(this.configuration);
26+
27+
try {
28+
await producer.connect();
29+
await producer.send({
30+
topic: topic,
31+
compression: CompressionTypes.GZIP,
32+
messages: payload.map(message => ({
33+
key,
34+
value: JSON.stringify(message),
35+
})),
36+
});
37+
await producer.disconnect();
38+
} catch (e) {
39+
this.logger.error(
40+
`${KafkaErrorKeys.PublishFailed}: ${JSON.stringify(e)}`,
41+
);
42+
await producer.disconnect();
43+
throw e;
44+
}
45+
},
46+
};
47+
};
48+
}
49+
}

src/providers/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
export * from './kafka-producer-factory.provider';
2+
export * from './generic-kafka-producer-factory.provider';

0 commit comments

Comments
 (0)