How to handle poison pills in Spring Kafka?

Updated: 29 June 2021 19:07 - 13 min read

Modern applications are increasingly build using smaller components that communicate with each other using events, a so-called event-driven architecture. Event-driven architectures have three key components: event producers, event broker, and event consumers. It is important that the consumers can consume the events the producer has produced. Whenever this is not the case and you’re not ready for this scenario a single event can cause your systems to halt. Such an event is a poison pill.

In this blog I will discuss what exactly are poison pills, how they occur and how to deal with them. I also created an example project where you can see the effects of a poison pill and what you can do to handle them.

Serialization and deserialization

To understand what a poison pill is and how they occur, we first need to learn what serialization and deserialization is exactly. Wikipedia has in my opinion one of the best explanations:

Serialization is the process of converting an object state into a format (series of bytes) that can be stored or transmitted and reconstructed later possibly in a different computer environment. The opposite operation, converting a series of bytes to an object state, is called deserialization.

image

In the context of Kafka, serialization is used to convert a Java object into a byte array that is subsequently produced by the producer to a topic. Deserialization is again used in the opposite manner, consuming a series of bytes from a topic and converting it into a Java object. Both the key and value of the record are serialized and later when consumed deserialized.

What is a poison pill

A poison pill is a record that has been produced to a Kafka topic, but cannot be consumed by a consumer, regardless of how many times it is attempted. The three most likely causes of a poison pill are corrupted records, an error during the deserialization of the record or a schema mismatch. A schema is used to describe the format of the produced and consumed events. If a consumed event does not adhere to the schema the consumer will return an error as well. Deserialization errors and schema mismatches are the most likely to occur. In my time using Kafka for the last two years, I have never seen a corrupt record, but I have seen quite some deserialization errors and schema mismatches. I never caused any of them of course😉.

Cause of a poison pill

Like I explained in the previous paragraph, corrupted records could be the cause of a poison pill. Luckily only in some rare circumstances it can happen that a corrupted record is placed in a Kafka topic. A network hiccup is an example of what can result in a corrupted record. If a bit is flipped during transport the CRC checksum will fail and the data can no longer be trusted. Since corrupt records do not occur often and can have many peculiar explanations, this blog will not focus on dealing with corrupted records. The remainder of this blog will focus on deserialization errors instead. Deserialization errors and schema mismatches can be used interchangeable in the context of a poison pill and the same solutions can be used to handle schema mismatches. Whenever you read deserialization error, you can think of a schema mismatch as well.

If the producer and consumer are using compatible serializers and deserializers everything will work as expected. In the image below an example of compatible JSON serializers and deserializers is given. The poison pill scenario is in this case avoided.

image

Deserialization errors will occur when producers and consumers have incompatible serializers and deserializers. Both the key and value deserializers should be compatible in order to prevent deserialization errors. In the example below the producer is using a StringSerializer while the consumer is using the incompatible deserializer FloatDeserializer.

image

Poison pill in action

If you want to see for yourself what a poison pill looks like, I have created an example project that uses an incompatible serializer and deserializer which mimics a poison pill. The kafka-poison-pill example project can be found on GitHub. In case you don’t have a local Kafka environment, the Confluent documentation you can help you setting up Kafka locally. If you’re already running Kafka, don’t forget to update application.yml with your configuration if necessary. You can also read along to see what will happen. Consider this your spoiler alert warning!

After running the application, you can run the following command or pasting the url in your browser to trigger the poison pill:

curl http://localhost:8080/produce-order

In the logs you will now see a continuous stream of error logs. Here is an example of a log message:

2021-06-13 13:21:27.196 ERROR 69190 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception

java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:194) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1598) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) ~[spring-kafka-2.7.1.jar:2.7.1]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition orders-0 at offset 32. If needed, please seek past the record to continue consumption.

Here is what happened:

  1. The consumer tries to deserialize the record. Since the record is serialized using the StringSerializer and the consumer expects JSON, the consumer is not able to deserialize the series of bytes.
  2. The consumer is not able to acknowledge the record, and thus the offset is not moving ahead.
  3. Now the consumer will try to deserialize the record again which will fail again and thus the consumer will end up in an infinite loop. Each deserialization failure is logged and even after a few moments thousands of messages have already been logged.

Consequences

You can imagine that if for every deserialization failure a message is logged, many gigabytes of logs are written to disk rapidly. Many production grade systems will also ship their logs to a centralized system where flooding this system can lead to huge costs. Also, finding other non-Kafka logs will become basically impossible. Depending on the size of the storage you have in place, all the storage could be filled by Kafka logs which will probably result in a non-functioning machine. So, in short, a poison pill could have quite a severe impact.

How to deal with poison pills?

There are four options of dealing with poison pills, however I would strongly recommend only one.

The first option is to wait until the retention period of the Kafka topic has passed, which is 7 days per default. After the retention period the poison pill is discarded. Waiting for 7 days until the consumer can consume messages again is far from ideal. There is the possibility to set the retention period to a lower number, which will discard poison pills earlier. However, records that are produced closely after the poison pill will likely be discarded as well. Depending on the situation this can be even more damaging.

The second option is to manually update the offset to after the offset of the poison pill. The advantage of this option is that you have a lot of control. You can set the offset to exactly to the offset after the poison pill. The disadvantage is that it is not straightforward. You must have access to the production Kafka cluster - which is never a good sign - and you will need to have knowledge about the Kafka binaries.

In case you do need to reset the offset programmatically you can use the following command:

kafka-consumer-groups --bootstrap-server $BOOTSTRAP_SERVER --group $CONSUMER_GROUP --reset-offsets --to-offset $OFFSET --topic $TOPIC --execute

Note: The consumer should be stopped before resetting the offset

The third option is easier than the second option and does not require the execution of commands. It is also possible to change the consumer group and start consuming from the latest offset. The disadvantage of this ‘solution’ is that messages between the poison pill and the last produced record to the topic won’t be consumed and will be lost. Should another poison pill occur in the future, the consumer group will need to be changed again. The image below shows what happens after the consumer group is changed.

image

The last and recommended option is to configure a ErrorHandlingDeserializer using Spring Kafka.

ErrorHandlingDeserializer setup

The first thing we have to do is to replace the deserializers by the ErrorHandlingDeserializer for both the key and value. Since the ErrorHandlingDeserializer will delegate the deserialization to the real deserializers, we need to add the classes where the ErrorHandlingDeserializer can delegate to. The delegate classes can be added to the config by adding the spring.deserializer.key.delegate.class and spring.deserializer.value.delegate.class properties:

application.yml

spring:
  kafka:
    consumer:
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
    properties:
      spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
      spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer

How does the ErrorHandlingDeserializer work in practice? The ErrorHandlingDeserializer will try to deserialize the key and value using the delegated class. If no DeserializationException is thrown the record is passed to the consumer and will work as normal. However, if a DeserializationException is thrown, the record is no longer passed to the consumer. The configured ErrorHandler is instead called with the thrown exception and the failing record. After the ErrorHandler has handled the error, the consumer will resume consuming as if nothing has happened. The offset has now been moved forward. The ErrorHandler swallowed the poison pill and no more continuous streams of error messages.

Optionally, a simple bean can be configured that will log some additional information about the failing record compared to the default SeekToCurrentErrorHandler:

@Bean
public LoggingErrorHandler errorHandler() {
    return new LoggingErrorHandler();
}

In case you have multiple consumers, it is also possible to programmatically configure the ErrorHandlingDeserializer. The following examples demonstrates how to configure a JsonDeserializer with the LoggingErrorHandler:

private final KafkaProperties kafkaProperties;

private ConsumerFactory<String, String> jsonConsumerFactory() {
    JsonDeserializer jsonDeserializer = new JsonDeserializer();
    jsonDeserializer.addTrustedPackages("your.package");
    ErrorHandlingDeserializer errorHandlingDeserializer = new ErrorHandlingDeserializer(jsonDeserializer);

    Map<String, Object> props = kafkaProperties.buildConsumerProperties();
    return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), errorHandlingDeserializer);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> jsonListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(jsonConsumerFactory());
    factory.setErrorHandler(new LoggingErrorHandler());
    return factory;
}

I have also created an example project, where the ErrorHandlingDeserializer is configured, which can be found on GitHub as well. In this project you can again trigger the poison pill by running the following command or pasting the url in your browser:

curl http://localhost:8080/produce-order

With the ErrorHandlingDeserializer now configured you should see only one log message with the DeserializationException. Crisis averted 😃 !

Dead letter topics

You might have noticed that using the LoggingErrorHandler does not actually log the value of the poison pill, but instead only shows the sequence of bytes. It is possible to log the actual value of the poison pill, by publishing the poison pill to a so-called dead letter topic (DLT). You can think of a dead letter topic as a backup topic where records that were not able to be consumed are send to.

You can configure a DeadLetterPublishingRecoverer that will send the poison pill to the dead letter topic. It can be configured as follows:

@Bean
public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
    return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer);
}

@Bean
public DeadLetterPublishingRecoverer publisher() {
    DefaultKafkaProducerFactory<String, byte[]> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties(), new StringSerializer(), new ByteArraySerializer());
    KafkaTemplate<String, byte[]> bytesKafkaTemplate = new KafkaTemplate<>(defaultKafkaProducerFactory);
    return new DeadLetterPublishingRecoverer(bytesKafkaTemplate);
}

Since you’re most likely already configured a serializer and deserializer that is not using the byte[] type, we’ll need to configure a separate serializer and deserializer for the DeadLetterPublishingRecoverer:

@Bean
public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
    return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer);
}

@Bean
public DeadLetterPublishingRecoverer publisher() {
    DefaultKafkaProducerFactory<String, byte[]> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties(), new StringSerializer(), new ByteArraySerializer());
    KafkaTemplate<String, byte[]> bytesKafkaTemplate = new KafkaTemplate<>(defaultKafkaProducerFactory);
    return new DeadLetterPublishingRecoverer(bytesKafkaTemplate);
}

private ConsumerFactory<String, byte[]> bytesArrayConsumerFactory() {
    Map<String, Object> props = kafkaProperties.buildConsumerProperties();
    return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new ByteArrayDeserializer());
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, byte[]> bytesArrayListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(bytesArrayConsumerFactory());
    return factory;
}

You can now create a separate consumer that is able to consume messages from the DLT topic:

@Service
public class DltConsumer {

    @KafkaListener(topics = {"orders.DLT"}, containerFactory = "bytesArrayListenerContainerFactory")
    public void recoverDLT(@Payload ConsumerRecord<String, byte[]> consumerRecord) {
        log.info("Poison pill value: {}", new String(consumerRecord.value()));
    }
}

In the example project the DLT logic can be enabled by setting the property spring.kafka.dlt.enable to true. With the ErrorHandlingDeserializer and the DeadLetterPublishingRecoverer now configured you should see two log messages. One log message for the deserialization exception and one that displays the value of the poison pill.

Note: In the consumer above the poison pill's value is converted from bytes to a string. This conversion only works if the event value is a serialized string. In case the event value is of a different type, the poison pill consumer should convert it this type instead.

In short

A poison pill is a record that cannot be consumed, no matter how many times it is attempted by the consumer. Poison pills occur either because of a corrupt record , deserialization error or a schema mismatch. Without a configured ErrorHandlingDeserializer, many gigabytes of logs can be rapidly written to disk, which can cause the system to halt. The recommended way to handle a deserialization error is to configure an ErrorHandlingDeserializer, which will make sure that the consumer can continue processing new records. In case you need the actual value of a poison pill, a DeadLetterPublishingRecoverer can be configured, which allows you to log the actual value of the record.

I also provided an example project in which you could see the effects of a poison pill and I showed how you can survive a poison pill. If you have any questions or improvements, feel free to start a new discussion on GitHub.

Follow my blog for more upcoming posts about Kafka and related topics!