Skip to content

Exception throw during Kafka message key deserialization causes fatal consumer error #249

@danmalcolm

Description

@danmalcolm

If an error occurs during deserialization of the Kafka message key, the consumer stops and the following error message is logged:

"Fatal error occurred processing the consumed message. The consumer will be stopped."

When using Schema Registry serializers, such as Avro, there are a couple of potential key deserialization problems, which we'd prefer not to cause a consumer to stop:

  • Malformed message key content - Avro deserializer expects key bytes to contain magic byte and schema id
  • Schema Registry outage - Deserializer makes a REST API call to SR before caching the schema for the topic key

This is specific to message key deserialization. Currently, keys are deserialized during an earlier consumer step in Silverback.Messaging.Broker.KafkaConsumer, where there is no opportunity to handle the error. Errors encountered during message value deserialization are thrown within the consumer behaviour pipeline and can be handled by the error policy (move bad message to dlq topic, retry if there is a transient Schema Registry connectivity error). Ideally key deserialization errors could be handled in the same way.

I can help with a PR, but will wait to hear if maintainers have any advice on the best approach.

One suggestion:

  • Extend RawInboundEnvelope with optional RawKey property
  • Extend KafkaConsumer to populate the RawKey property (via Consumer)
  • Introduce a new IMessageKeySerializer interface type that extends IMessageSerializer with key serde methods. IKafkaMessageKeySerializer inherits from this. This is just to avoid having Kafka-specific stuff in behaviors, which work with messaging abstractions.
  • Extend IConsumerEndpoint with a MessageKey property (MessageKeySettings with HasKey and KeyHeader properties)
  • Add a new KeyDeserializerConsumerBehavior to default pipeline. If endpoint's MessageKeySettings indicate that key is applicable, raw key is available and endpoint's serializer implements IMessageKeySerializer, deserialize the key and add the headers
  • Configuration to enable new key deserialization behaviour to preserve backwards compatibility

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions