-
Notifications
You must be signed in to change notification settings - Fork 42
Description
If an error occurs during deserialization of the Kafka message key, the consumer stops and the following error message is logged:
"Fatal error occurred processing the consumed message. The consumer will be stopped."
When using Schema Registry serializers, such as Avro, there are a couple of potential key deserialization problems, which we'd prefer not to cause a consumer to stop:
- Malformed message key content - Avro deserializer expects key bytes to contain magic byte and schema id
- Schema Registry outage - Deserializer makes a REST API call to SR before caching the schema for the topic key
This is specific to message key deserialization. Currently, keys are deserialized during an earlier consumer step in Silverback.Messaging.Broker.KafkaConsumer
, where there is no opportunity to handle the error. Errors encountered during message value deserialization are thrown within the consumer behaviour pipeline and can be handled by the error policy (move bad message to dlq topic, retry if there is a transient Schema Registry connectivity error). Ideally key deserialization errors could be handled in the same way.
I can help with a PR, but will wait to hear if maintainers have any advice on the best approach.
One suggestion:
- Extend
RawInboundEnvelope
with optionalRawKey
property - Extend
KafkaConsumer
to populate theRawKey
property (viaConsumer
) - Introduce a new
IMessageKeySerializer
interface type that extendsIMessageSerializer
with key serde methods.IKafkaMessageKeySerializer
inherits from this. This is just to avoid having Kafka-specific stuff in behaviors, which work with messaging abstractions. - Extend
IConsumerEndpoint
with aMessageKey
property (MessageKeySettings
withHasKey
andKeyHeader
properties) - Add a new
KeyDeserializerConsumerBehavior
to default pipeline. If endpoint's MessageKeySettings indicate that key is applicable, raw key is available and endpoint's serializer implementsIMessageKeySerializer
, deserialize the key and add the headers - Configuration to enable new key deserialization behaviour to preserve backwards compatibility