Building resilient event-driven architectures with Apache Kafka

Efficiency and reliability in message processing within messaging environments like Apache Kafka are crucial for maintaining a consistent data flow. However, as messages grow in size and volume, consumers often face challenges in processing them quickly, leading to issues that affect the overall system performance.

"When you have a hammer, all problems tend to look like nails."

One of these issues revolves around the maxPollTimeout parameter, which triggers a rebalance among consumers within a consumer group, resulting in the interruption of message processing.

In this article, we will explore how the use of pause-resume methods in combination with the asynchronous implementation provided by Spring Boot can be a good solution to avoid these problems.

Problem

Message consumption is a fundamental process in data transmission architecture. However, when messages are large or consumers find processing bottlenecks, the maxPollTimeout parameter can be exceeded. This causes the consumer to inform its coordinator that it couldn't process the message within the expected time, leading to a rebalancing in the consumer group. During this rebalancing, consumers disconnect and reorganize to redistribute the workload. This interruption not only affects data processing continuity but can also lead to efficiency loss in the overall system, message reprocessing, and undesired behaviors.

Let's analyze the possible causes of this problem.

Large messages

The maxPollTimeout parameter plays a crucial role in the performance and stability of consumers in a Kafka environment, especially when dealing with large messages in topics. This value defines the maximum time a consumer waits between two consecutive "poll" calls to fetch new message records before it's considered inactive and triggers a rebalance. In scenarios where messages are considerably large, the time required to process each message can be substantial. This can lead to the maxPollTimeout value being reached before a consumer has the opportunity to process all messages retrieved in a single "poll" call, causing the consumer to be considered inactive, effectively triggering a rebalance.

Complex processing messages

Similarly to the previous case, managing consumers, especially when dealing with messages that require complex processing, can exceed the maxPollTimeout parameter. These messages, which involve intensive operations, elaborate calculations, database inserts, network latencies, etc., can pose additional challenges in balancing processing time and data flow continuity.

Many messages from different sources

When a consumer is subscribed to many topics/partitions, and there are a large number of messages to process in these partitions, there is a possibility of exceeding the maxPollTimeout. This can happen either because some messages are complex, as explained earlier, or simply because each poll fetches a quantity of messages from each topic/partition that cannot be processed within the timeout (e.g., due to lack of CPU).

Addressing the problem

To mitigate issues related to maxPollTimeout, several measures can be taken:

Adjust maxPollTimeOut

Increasing the value of maxPollTimeout could give consumers more time to process large and/or complex messages in a single "poll" call. However, this approach must be carefully balanced to avoid consumers remaining inactive for extended periods, which could impact the processing of new messages.

Asynchronous processing

Instead of processing large messages synchronously in a "poll" call, consumers could opt for an asynchronous approach. This involves receiving messages in a "poll" call and then processing them in separate threads or another concurrency model, allowing the consumer to handle large messages without blocking the main thread. For this approach, it should be considered that processing messages from the same "poll" means they come from the same partition. When processing them in parallel, it cannot be guaranteed that they will be processed in order.

Message partitioning

If possible, producers can split large messages into smaller segments before sending them to the Kafka topic. This allows consumers to process individual parts of a large message more efficiently and reduces the pressure on maxPollTimeout. In this approach, again, care must be taken to process new messages in the correct order when necessary.

Use of pause-resume

The pause-resume strategy can be especially useful in the case of large messages. Consumers can temporarily pause subscription to a topic/partition to fetch new messages when facing a lengthy and/or complex message and then resume consumption once they are ready to process again. During this "pause", the consumer must maintain communication with the brokers of the assigned partitions to let them know not to perform a rebalance and reassignment, only to wait.

Implementing the Solution

An effective solution to mitigate this problem is the implementation of the "pause-resume" function. This feature allows consumers to pause the reception of new messages until they are ready to process them. In the context of Spring Boot, this functionality is achieved using the Kafka client provided by Spring Kafka. When a consumer encounters difficulties in processing messages, instead of simply closing the connection or consuming messages without proper processing, it can pause the subscription to that topic/partition for new messages until it is ready to handle them again.

Along with the pause-resume strategy, Spring Boot offers the possibility of implementing message processing asynchronously. In this approach, consumers receive messages in a "poll" call and then process them in a separate thread. This allows the consumer to handle complex tasks without blocking the main thread, improving efficiency and avoiding triggering maxPollTimeout and subsequent rebalances. By keeping the main thread unblocked, the consumer in "pause" mode continues making "poll" invocations but without receiving new messages. This communicates to the brokers that the consumer is active and still subscribed to that topic/partition, only taking a long time to process.

Let's get to work!

Following the implementation path of the pause-consume pattern, a service has been developed that incorporates key modifications to address the challenges posed by large or highly processed messages. Below, we will detail the implemented changes:

Enabling asynchronous execution

To enable support for asynchronous method execution, you need to use the @EnableAsync annotation in the main class of the application:

@SpringBootApplication

@EnableAsync

public class Application {

    public static void main(String[] args) {

        SpringApplication.run(Application.class);

    }

}

This annotation allows tasks to be processed in the background, which is essential for efficient message handling.

Kafka container support service

A new service named KafkaContainerSupportMethods has been created. This service exposes methods to pause and resume consumption in Kafka containers:

@Service

public class KafkaContainerSupportMethods {

    @Autowired

    private KafkaListenerEndpointRegistry registry;


    public void pauseConsume(String containerId) {

        getContainer(containerId).ifPresent(MessageListenerContainer::pause);

    }

 
    public void resumeConsumer(String containerId) {

        getContainer(containerId).ifPresent(MessageListenerContainer::resume);

    }

 
    private Optional<MessageListenerContainer> getContainer(String containerId) {

        return Optional.ofNullable(registry.getListenerContainer(containerId));

    }

}

These methods are implemented using a container identifier (containerId) and allow pausing and resuming message consumption in Kafka containers.

Asynchronous consumer with pause-consume strategy

The consumer class integrates the functionalities of pausing and resuming, along with asynchronous task execution. Additionally, a service named LongRunningJob and an asynchronous executor (AsyncListenableTaskExecutor) have been created to process messages efficiently:

@Service

@KafkaListener(topics = {"myBigMessageTopic"}, id = "${kafka.container.id}",concurrency = "20", idIsGroup = false, properties = {"max.poll.interval.ms=30000", "max.poll.records=1"})

public class Consumer{



   @Autowired

    private LongRunningJob longRunningJob;



   @Autowired

    private AsyncListenableTaskExecutor executor;



    @Autowired

    private KafkaContainerSupportMethods containerSupportMethods;

    @Value("${kafka.container.id}")

    private String containerId;

    

    

    @KafkaHandler

    public void handleEvent(@Payload String event,

            @Header(KafkaHeaders.RECEIVED_PARTITION) String partition,

            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,

            @Header(KafkaHeaders.RECEIVED_TIMESTAMP) Long timeStamp,

            @Header(KafkaHeaders.OFFSET) String offSet

            ) {

       MessageMetadata metadata = new MessageMetadata(event, partition, topic, timeStamp, offSet);

      

        System.out.println("Handling pause container for  "+ metadata);

       

        containerSupportMethods.pauseConsume(containerId);

        executor.submitListenable(() -> longRunningJob.run(metadata))

                .addCallback(result -> {

                            containerSupportMethods.resumeConsumer(containerId);

                            System.out.println("Success callback" + metadata);

                        },

                        ex -> {

                            //perform retry mechanism like a dead letter queue here

                            containerSupportMethods.resumeConsumer(containerId);

                            System.out.println("Error callback" + metadata);

                        }

                );

    }

}

The Consumer class utilizes the pause-consume strategy and the asynchronous executor to manage the processing of complex messages. Additionally, the @KafkaHandler annotation allows the handling of incoming messages and the application of the necessary logic.

Finally, the AsyncListenableTaskExecutor has been incorporated, enabling asynchronous task execution and providing notifications about their status.

Conclusion

Together, these modifications efficiently address challenges related to large or highly processed messages in Kafka, ensuring system stability and performance even in complex situations.

The implementation of pause-resume and asynchronous processing strategies in Spring Boot offers effective solutions to avoid rebalances and disconnections in Kafka consumers. These techniques allow consumers to face message processing challenges, such as large messages or intensive processing, without compromising the stability and efficiency of the overall system.

This doesn't mean that the pause-resume pattern should be present in all consumer implementations. Instead, it should promote the idea that there are various message consumption strategies for development teams to choose the most suitable one based on the combination of topics and messages they want to consume.

By adopting these practices, companies can maintain consistent and reliable data flows, ensuring an efficient messaging environment in diverse scenarios.