Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
</a>
</p>


## Overview

A Kafka Client for Loopback4 built on top of [KafkaJS](https://kafka.js.org/).
Expand Down Expand Up @@ -97,7 +96,13 @@ export class TestStream implements IStreamDefinition {

### Consumer

A Consumer is a [`loopback extension`](https://loopback.io/doc/en/lb4/Extension-point-and-extensions.html) that is used by the [`KafkaConsumerService`](./src/services/kafka-consumer.service.ts) to initialize consumers. It must implement the `IConsumer` interface and should be using the `@consumer()` decorator. If you want the consumers to start at the start of your application, you should pass the `initObservers` config to the Component configuration.
A Consumer is a [`loopback extension`](https://loopback.io/doc/en/lb4/Extension-point-and-extensions.html) that is used by the [`KafkaConsumerService`](./src/services/kafka-consumer.service.ts) to initialize consumers. It must implement one of the `IConsumer`, `ISharedConsumer` or `IGenericConsumer` interfaces and should be using the `asConsumer` binding template. If you want the consumers to start at the start of your application, you should pass the `initObservers` config to the Component configuration.

- `IConsumer` - simple consumer for 1 event in a stream
- `ISharedConsumer` - consumer that consumes data for multiple events in a stream (defined with an array of events)
- `IGenericConsumer` - consumer that consumes data for all events in a stream/topic (defined without any event). By default it is not triggered for an event if a more specific consumer is bound for that event. This behaviour can be changed using the `alwaysRunGenericConsumer` option in consumer configuration.

You can bind any consumer related configuration using the `KafkaClientBindings.ConsumerConfiguration` key. It accepts all the options of KafkaJS, along with an additional option - `alwaysRunGenericConsumer` - this option runs any generic consumer if available always, even if more specific consumers are bound by the client(only the specific consumer would run if this option is false or not provided).

##### Example

Expand All @@ -112,13 +117,14 @@ this.configure(KafkaConnectorComponentBindings.COMPONENT).to({

```ts
// start.consumer.ts
// use @genericConsumer for a generic consumer
@consumer<TestStream, Events.start>()
export class StartConsumer implements IConsumer<TestStream, Events.start> {
constructor(
@inject('test.handler.start')
public handler: StreamHandler<TestStream, Events.start>,
) {}
topic: Topics.First = Topics.First;
topic = Topics.First;
event: Events.start = Events.start;
// you can write the handler as a method
handler(payload: StartEvent) {
Expand Down Expand Up @@ -149,7 +155,7 @@ export class StartConsumer implements IConsumer<TestStream, Events.start> {
@eventHandler<TestStream>(Events.Start)
public handler: StreamHandler<TestStream, Events.start>,
) {}
topic: Topics.First = Topics.First;
topic = Topics.First;
event: Events.start = Events.start;
}
```
Expand All @@ -158,6 +164,7 @@ export class StartConsumer implements IConsumer<TestStream, Events.start> {

A Producer is a loopback service for producing message for a particular topic, you can inject a producer using the `@producer(TOPIC_NAME)` decorator.
Note: The topic name passed to decorator must be first configured in the Component configuration's topic property -
If you want to produce a raw message without any event type, you can use the `@genericProducer(TOPIC_NAME)` decorator, note that in this case, the topic name must be passed in the genericTopics property of the component configuration.

#### Example

Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 19 additions & 5 deletions src/__tests__/acceptance/application.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
setupConsumerApplication,
setupProducerApplication,
} from './test-helper';
import {GenericProducerService} from './fixtures/producer/generic-producer.service';

describe('end-to-end', () => {
let consumerApp: Application;
Expand Down Expand Up @@ -93,18 +94,31 @@ describe('end-to-end', () => {
);
});

it('should produce from a generic producer without events for a single topic', async () => {
const producerService = producerApp.getSync<GenericProducerService>(
`services.GenericProducerService`,
);
const message = 'message';
await producerService.produceMessage(message);
sinon.assert.called(genericHandler);
expect(genericHandler.getCalls()[0].args[0]).to.be.deepEqual({
data: message,
});
});

it('should consume from a generic consumer without events for a single topic', async () => {
const producerInstance = producerApp.getSync<Producer<TestStream>>(
producerKey(Topics.First),
producerKey(Topics.Generic),
);
const close = {
closeTime: new Date(),
};
await producerInstance.send(Events.close, [close]);
sinon.assert.called(genericHandler);
expect(genericHandler.getCalls()[0].args[0]).to.be.deepEqual(
JSON.parse(JSON.stringify(close)),
);
expect(genericHandler.getCalls()[0].args[0]).to.be.deepEqual({
data: JSON.parse(JSON.stringify(close)),
event: Events.close,
});
});

it('should not handle an unspecified events', async () => {
Expand All @@ -118,7 +132,7 @@ describe('end-to-end', () => {
};
await producerInstance.connect();
producerInstance.send({
topic: Topics.First,
topic: Topics.Generic,
messages: [payload],
});

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import {inject, injectable} from '@loopback/core';
import {asConsumer} from '../../../../keys';
import {TestStream} from '../stream';
import {IGenericConsumer, StreamHandler} from '../../../../types';
import {GenericStream} from '../stream';
import {GenericStreamHandler, IGenericConsumer} from '../../../../types';
import {Topics} from '../topics.enum';
import {Events} from '../events.enum';

@injectable(asConsumer)
export class GenericConsumer implements IGenericConsumer<TestStream> {
export class GenericConsumer implements IGenericConsumer<GenericStream> {
constructor(
@inject('eventHandler.generic')
public handler: StreamHandler<TestStream, Events>,
public handler: GenericStreamHandler<GenericStream>,
) {}
topic: Topics.First = Topics.First;
topic: Topics.Generic = Topics.Generic;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import {genericProducer} from '../../../../decorators/generic-producer.decorator';
import {GenericProducer} from '../../../../types';
import {GenericStream} from '../stream';
import {Topics} from '../topics.enum';

export class GenericProducerService {
constructor(
@genericProducer(Topics.Generic)
private producer: GenericProducer<GenericStream>,
) {}

async produceMessage(message: string): Promise<void> {
await this.producer.send([
{
data: message,
},
]);
}
}
3 changes: 3 additions & 0 deletions src/__tests__/acceptance/fixtures/producer/producer-app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {KafkaClientComponent} from '../../../../component';
import {KafkaClientBindings} from '../../../../keys';
import {KafkaClientStub} from '../../../stubs';
import {Topics} from '../topics.enum';
import {GenericProducerService} from './generic-producer.service';

export class ProducerApp extends BootMixin(
ServiceMixin(RepositoryMixin(RestApplication)),
Expand All @@ -16,11 +17,13 @@ export class ProducerApp extends BootMixin(

this.configure(KafkaClientBindings.Component).to({
topics: Object.values(Topics) as string[],
genericTopics: [Topics.Generic],
});
this.bind<KafkaClientStub>(KafkaClientBindings.KafkaClient).to(
options.client,
);
this.component(KafkaClientComponent);
this.service(GenericProducerService);

this.projectRoot = __dirname;
// Customize @loopback/boot Booter Conventions here
Expand Down
7 changes: 7 additions & 0 deletions src/__tests__/acceptance/fixtures/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,10 @@ export interface TestStream extends IStreamDefinition {
[Events.close]: {};
};
}

export interface GenericStream extends IStreamDefinition {
topic: Topics.Generic;
messages: {
data: string;
};
}
1 change: 1 addition & 0 deletions src/__tests__/acceptance/fixtures/topics.enum.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export enum Topics {
First = 'first',
Second = 'second',
Generic = 'generic',
}
24 changes: 22 additions & 2 deletions src/component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ import {
} from '@loopback/core';
import {LoggerExtensionComponent} from '@sourceloop/core';
import {Kafka} from 'kafkajs';
import {KafkaClientBindings, producerKey} from './keys';
import {genericProducerKey, KafkaClientBindings, producerKey} from './keys';
import {KafkaObserver} from './observers';
import {KafkaProducerFactoryProvider} from './providers';
import {
GenericKafkaProducerFactoryProvider,
KafkaProducerFactoryProvider,
} from './providers';
import {KafkaConsumerService} from './services/kafka-consumer.service';
import {KafkaClientOptions} from './types';

Expand All @@ -39,6 +42,11 @@ export class KafkaClientComponent implements Component {
.toProvider(KafkaProducerFactoryProvider)
.inScope(BindingScope.SINGLETON);

app
.bind(KafkaClientBindings.GenericProducerFactor)
.toProvider(GenericKafkaProducerFactoryProvider)
.inScope(BindingScope.SINGLETON);

app.service(KafkaConsumerService);

if (configuration?.topics) {
Expand All @@ -50,6 +58,18 @@ export class KafkaClientComponent implements Component {
.inScope(BindingScope.SINGLETON);
});
}

if (configuration?.genericTopics) {
const genericProducerFactory = app.getSync(
KafkaClientBindings.GenericProducerFactor,
);
configuration.genericTopics.forEach(topic => {
app
.bind(genericProducerKey(topic))
.to(genericProducerFactory(topic))
.inScope(BindingScope.SINGLETON);
});
}
if (configuration?.initObservers) {
app.lifeCycleObserver(KafkaObserver);
}
Expand Down
9 changes: 9 additions & 0 deletions src/decorators/generic-consumer.decorator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import {Constructor, injectable} from '@loopback/core';
import {asConsumer} from '../keys';
import {IGenericConsumer, IStreamDefinition} from '../types';

export function genericConsumer<T extends IStreamDefinition>() {
return injectable(asConsumer) as (
target: Constructor<IGenericConsumer<T>>,
) => void;
}
6 changes: 6 additions & 0 deletions src/decorators/generic-producer.decorator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import {inject} from '@loopback/core';
import {genericProducerKey} from '../keys';

export function genericProducer(topic: string) {
return inject(genericProducerKey(topic));
}
2 changes: 2 additions & 0 deletions src/decorators/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
export * from './producer.decorator';
export * from './handler.decorator';
export * from './consumer.decorator';
export * from './generic-consumer.decorator';
export * from './generic-producer.decorator';
10 changes: 10 additions & 0 deletions src/keys.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import {KafkaClientComponent} from './component';
import {KafkaConsumerService} from './services/kafka-consumer.service';
import {
ConsumerConfig,
GenericProducer,
GenericProducerFactoryType,
IStreamDefinition,
Producer,
ProducerFactoryType,
Expand All @@ -30,6 +32,9 @@ export namespace KafkaClientBindings {
export const ProducerFactory = BindingKey.create<
ProducerFactoryType<IStreamDefinition>
>(`${KafkaNamespace}.ProducerFactory`);
export const GenericProducerFactor = BindingKey.create<
GenericProducerFactoryType<IStreamDefinition>
>(`${KafkaNamespace}.GenericProducerFactory`);
export const LifeCycleGroup = `${KafkaNamespace}.KAFKA_OBSERVER_GROUP`;
}

Expand All @@ -38,6 +43,11 @@ export const producerKey = (topic: string) =>
`${KafkaNamespace}.producer.${topic}`,
);

export const genericProducerKey = (topic: string) =>
BindingKey.create<GenericProducer<IStreamDefinition>>(
`${KafkaNamespace}.generic.producer.${topic}`,
);

export const eventHandlerKey = <
Stream extends IStreamDefinition,
K extends keyof Stream['messages'],
Expand Down
49 changes: 49 additions & 0 deletions src/providers/generic-kafka-producer-factory.provider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import {inject, Provider} from '@loopback/core';
import {ILogger, LOGGER} from '@sourceloop/core';
import {CompressionTypes, Kafka, ProducerConfig} from 'kafkajs';
import {KafkaErrorKeys} from '../error-keys';
import {GenericProducerFactoryType, IStreamDefinition} from '../types';
import {KafkaClientBindings} from '../keys';

/* The class `GenericKafkaProducerFactoryProvider` is a TypeScript class that provides a factory for creating
Kafka producers to send messages to specified topics without events. */
export class GenericKafkaProducerFactoryProvider<T extends IStreamDefinition>
implements Provider<GenericProducerFactoryType<T>>
{
constructor(
@inject(KafkaClientBindings.KafkaClient)
private client: Kafka,
@inject(LOGGER.LOGGER_INJECT) private readonly logger: ILogger,
@inject(KafkaClientBindings.ProducerConfiguration, {optional: true})
private configuration?: ProducerConfig,
) {}

value(): GenericProducerFactoryType<T> {
return (topic: string) => {
return {
send: async (payload: T['messages'][], key?: string): Promise<void> => {
const producer = this.client.producer(this.configuration);

try {
await producer.connect();
await producer.send({
topic: topic,
compression: CompressionTypes.GZIP,
messages: payload.map(message => ({
key,
value: JSON.stringify(message),
})),
});
await producer.disconnect();
} catch (e) {
this.logger.error(
`${KafkaErrorKeys.PublishFailed}: ${JSON.stringify(e)}`,
);
await producer.disconnect();
throw e;
}
},
};
};
}
}
1 change: 1 addition & 0 deletions src/providers/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from './kafka-producer-factory.provider';
export * from './generic-kafka-producer-factory.provider';
4 changes: 2 additions & 2 deletions src/services/kafka-consumer.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export class KafkaConsumerService<T extends IStreamDefinition> {

const {consumerMap, genericConsumerMap} = await this.buildConsumerMaps();
const topics: string[] = Array.from(consumerMap.keys());

topics.push(...Array.from(genericConsumerMap.keys()));
await kafkaConsumerClient.subscribe({
topics,
});
Expand Down Expand Up @@ -70,7 +70,7 @@ export class KafkaConsumerService<T extends IStreamDefinition> {
(!consumer || this.configuration.alwaysRunGenericConsumer) &&
genericConsumer
) {
await genericConsumer.handler(message.data);
await genericConsumer.handler(message);
}
} else {
this.logger.warn(
Expand Down
Loading
Loading