The interest in event-driven architecture has sped up within the last couple of years, with a great level of adoption and modernization effort across all enterprises. Apache Kafka, one of the most pervasive streaming middleware technologies, is being tried and tested by many development teams. High performance is a critical goal for these teams.
There are numerous resources for configuring and benchmarking your Kafka cluster size. Such guides and benchmarks naturally involve producers and consumers, but their primary aim is the performance of the Kafka cluster itself. Recently, I was asked to give pointers regarding how to fine-tune Kafka producers for high throughput. Many guides explain the most important Kafka producer configurations and their relationship to performance, as well as the trade-offs. But there isn’t much benchmark data showcasing how different configuration combinations can impact producer message throughput.
In this article, I show the throughput outcomes resulting from various producer configurations I employed in a recent test setup. My hope is to help other developers and architects better understand the relationship between producer configurations and message throughput. You can use this information to make educated guesses while configuring your own Kafka clusters.
The test environment
You will need the following technologies if you want to reproduce my test environment:
- Quarkus 1.11.1 (compiled as Java application with OpenJDK 11)
- Red Hat OpenShift Container Platform 4.5
- Red Hat AMQ Streams 7.7 on Red Hat OpenShift
- Apache Kafka 2.6
- Prometheus 2.16
- Grafana 7.1.1
Note that I used the Red Hat-supported operators to deploy AMQ Streams, Prometheus, and Grafana. Additionally, you cannot replicate the work explained in this article on OpenShift Container Platform versions earlier than 4.5 or Kubernetes 1.18.
Setting up the test environment on OpenShift
I conducted my tests with a simple Quarkus application. In a nutshell, I deployed a Kafka cluster using the Red Hat AMQ Streams 7.7 distribution on an OpenShift Container Platform 4.5 cluster. I also deployed a Prometheus instance in order to collect metrics from both the Quarkus (Java) application and the Kafka cluster, and a Grafana instance.
In the tests, the application sends bursts of messages to the Kafka cluster for a period of time. I initiated the requests with different combinations of producer configurations on the application side and observed the metrics on a Grafana dashboard configured to show data from Kafka producers.
My test setup consists of three projects, or namespaces, in the OpenShift cluster:
- kafka-cluster: Where AMQ Streams is deployed.
- application-monitoring: Where the Prometheus and Grafana instances are deployed.
- kafka-performance-test: Where the Kafka client is deployed (on one pod).
I built my Quarkus application using OpenJDK 11 and deployed it using the Quarkus helm chart and Helm chart values found here. You will need an image pull secret in order to use the same OpenJDK image stream.
Setting up the Kafka cluster
There are two ways client applications can connect to the Kafka brokers. One way is to set up AMQ Streams to create OpenShift routes for external access to the Kafka brokers. This approach is best if the clients are deployed outside the OpenShift cluster, or if they are deployed in different network zones and you want the network traffic to flow through a firewall.
The other way is to enable the network from the
kafka-performance-test project to the
kafka-cluster project. This is achieved with the
NetworkPolicy type of resources. By default, the AMQ Streams Operator creates network policies to allow incoming traffic throughout the OpenShift cluster on certain ports. I relied on this default behavior in my current setup. (Note that you might need to have the right roles to perform these deployments.)
My Kafka cluster consists of three brokers, each with two CPU limits and 6 GB of memory. I deployed them with persistent storage. Here is the custom resource (CR) for deploying the Kafka cluster with Strimzi.
I created a topic using the
KafkaTopic custom resource provided with the AMQ Streams Operator. The topic configuration is three partitions with three replicas.
Prometheus and Grafana
Finally, I deployed Prometheus and a Grafana instance in order to collect metrics from my application and analyze the data. You can find all the resources I created related to monitoring here. All of them are created in the
application-monitoring namespace. I labeled my application with “
monitor: 'true'” as I configured the related PodMonitor to target pods with that label.
The first test I performed was with the default producer configurations. Subsequently, I focused on a few fundamental configurations and tested the client application with combinations of different values for these. The application exposes an API endpoint to receive a payload to send to Kafka. The application then commits the same message, in the desired number, to a Kafka emitter serially. In each test scenario, I sent parallel requests with different payloads and the desired number of messages to this endpoint in a loop.
Here are the configurations I played with for my producer application:
- batch.size in bytes: Does not take effect unless
linger.msis non-zero. This lets the producer package messages and send them together, which reduces the number of requests to the cluster.
- linger.ms in milliseconds: Determines how long a message will be buffered in the current batch until the batch is sent. In other words, the producer will send the current batch either when
batch.sizeis reached or the
linger.msamount of time has passed since the batch started to be filled.
- compression.type: For applications that produce messages of big sizes, compression can help improve the throughput.
- acks: As a business requirement, you might need to replicate messages across your Kafka cluster. In some cases, you might need to acknowledge all replicas; in others, it might be enough to get acknowledgment only from the original node. I also looked at the impact of this configuration.
- CPU limit: The computational power of the client application considerably impacts the throughput from the producer’s perspective. Though I won't focus on this parameter here, I tried to give some insight.
Other noteworthy parameters are
max.block.ms. Producers take both into account regarding the send buffer. The
buffer.memory parameter is by default 32 MB (or 33,554,432 bytes). I will leave analyzing the effect of these parameters to future work. For the curious, the Kafka Producers Grafana dashboard I provided has the metric of how the producers use the buffer.
I used default values for all the remaining configurations.
For different test scenarios, I generated random JSON dictionaries to use as payloads. I used a combination of these in looped requests to my Kafka producer in order to achieve different average payload sizes, which serve as an additional test parameter in my experiments. I generated different average payload sizes of 1.1 KB and 6 KB. The former is achieved with a mixture of small and medium payload sizes, and the latter with a mixture of medium and large payloads.
Note: My benchmarks do not necessarily include the best possible combination for a certain scenario, so don’t pay attention to the numbers, but rather to the trends.
Analyzing producer message throughput
Let's start by considering an overview of what the producer message throughput looks like with the default configuration, shown in Table 1.
Average throughput (with 1.1 KB average payload and one CPU)
Average throughput (with 1.1 KB average payload and 0.5 CPU)
Average throughput (with 6 KB average payload and one CPU)
Average throughput (with 6 KB average payload and 0.5 CPU)
The producer message throughput is around 14,700 on average for an average payload size of 1.1 KB, when running on one core. For a larger average payload size of 6 KB, the throughput is unsurprisingly lower at approximately 6,000. When I decreased the number of cores allocated to my application to 0.5, I observed drastically lower throughput for each payload size. Consequently, we can see that CPU alone dramatically impacts the producer’s message throughput.
Producer message throughput over batch size
Now, let’s see how the numbers change when we start playing with the aforementioned parameters. Figure 1 consists of four graphs that show throughput over various batch sizes under different scenarios. The top-row plots are detailed by
linger.ms, and the bottom row by average payload size. The left column shows the results without any compression employed, and the right column shows the results with compression of the type “snappy."
It is obvious at first glance that in this particular test, the default configuration does not provide the best throughput. By setting
linger.ms to a positive value, and thus enabling batching, we immediately gain an increase in throughput of 10% to 20%. However, as a reminder, this outcome is for this particular experiment, with average payload sizes of 1.1 KB and 6 KB in different test scenarios. When the compression type is none, increasing the batch size, together with setting
linger.ms to a non-zero value, improves the throughput with respect to the default configuration scenario. All the 1.1 KB payload scenarios demonstrate approximately 15,500 to 17,500 message throughput: A considerable improvement from the default configuration.
When compression is none,
linger.ms has no significant impact. The differences between results with employed
linger.ms values in these experiments might be circumstantial because there does not seem to be a pattern. This aspect would require further research and experiments. Remember that
linger.ms sets how much time a producer will wait till the producer batch is filled before transmitting the batch to the Kafka brokers. Setting this parameter to a very high number should not be an option to consider unless the application architecture and use case fit it. Nevertheless, the values I used—10, 20, and 40—are quite typical, and you can always start with such settings for
What happens when message compression is enabled?
Kafka supports compressing messages with standard compression algorithms. Compression is done on batches of data, so it is not necessarily a matter of gaining performance in the case of large messages. Nevertheless, let’s see if there is any difference when the average message size increases considerably.
Since I'm not focusing on comparing different compression types for this benchmark, I only run tests with one compression type: snappy. You can find numerous works on comparing the compression types in Kafka on the Internet. One benefit that’s worth mentioning is that compressed messages mean lower disk usage.
An interesting detail that impacts not only the throughput, but especially storage, is the average batch size. I observed keenly in my Kafka Producers Grafana dashboard that the average batch size reported in the metrics is significantly lower than the
batch.size setting. This makes sense as the batches are first filled up to the
batch.size and then compressed, if compression is enabled. This results in faster transmission of the network packets, but also in lower storage requirements on the broker side. When considering using a compression mechanism for your producers, this is one of the dimensions to keep in mind.
Enabling compression adds computational overhead on the producer side. When compression is enabled, increasing
batch.size alone does not yield a higher throughput, as one can observe in the case of no compression. The throughput seems to be negatively affected by higher
batch.size, but especially when the payload size is relatively small. You can see this clearly in the plot showing the throughout over batch size with average payload size. When the average payload size is approximately 6 KB, increasing the batch size does not seem to impact the throughput with compression enabled. When the compression is not employed, a higher batch size results in slightly better throughput. Also, notice the trade-off between
linger.ms with the compression enabled. High
linger.ms results in higher throughput for 64 KB (65,536 bytes) of
batch.size, while the result clearly reverses for a
batch.size of 256 KB (262,144 bytes). However, this does not seem to indicate a specific pattern. As with small
batch.size (16 KB),
linger.ms's impact is similar to the case of
Finally, it is clear in these experiments that throughput improves with compression enabled when the average payload size is large.
How acknowledgments impact message throughput
Unsurprisingly, when I set the
all, the message throughput decreases considerably. In this case, higher
batch.size values seem to help achieve relatively satisfactory message throughput. In other words, the default
batch.size does not result in optimal message throughput when
acks is set to
all. This parameter can also be set to zero, meaning that a producer would not wait for any acknowledgments from the Kafka brokers. In certain streaming scenarios, this option could be desirable, but many enterprise applications require stronger data resilience. Therefore, I did not include this option in the tests. Still, it is safe to guess that throughput would be better than, or at least as good as
acks=1, as shown in Figure 2.
Kafka brokers were observed to reach a max CPU usage of 0.8 and around 5.55 GB memory usage during the tests. Consequently, the performance of the Kafka cluster was not a limiting factor on the throughput.
Many applications don’t produce a high volume of messages nonstop. In many scenarios, a high volume of message production might occur sporadically. If your application matches this scenario, you might anticipate it by setting proper limits and requests for your deployments, and also by setting automatic pod scaling with low requests. This would optimize the use of your computational resources, as opposed to my setup of one pod for my application.
One follow-up to this experiment would be a larger overview of performance with both producers and consumers. My Kafka cluster served only a single producer application and was not bogged down by consumers. Consumers might decrease the throughput due to the Kafka cluster’s higher workload. The rate at which a consumer can consume messages from a Kafka topic would then be another subject for experimentation.
Another area for investigation is the impact of additional producer parameters of interest, such as
buffer.memory. Its relationship to
linger.ms should reveal interesting directions for fine-tuning.
Last but not least, I focused this article solely on message throughput. In many enterprises, disk space might be a limiting criteria. Analyzing the trade-off between throughout, producer parameters, and disk usage would nicely complement this work.
This article demonstrated how setting various parameters on a Kafka producer leads to different message throughput, with some trade-offs between them. I hope these results help you fine-tune your producers, as you can decide where to start in a more informed manner. There are many valuable guides on this subject, in addition to the empirical results presented here. Optimizing Kafka producers is one such guide, which explains in-depth important parameters and how they might impact the performance of your producers. For further resources, consider the following:
- Download the resources used in this experiment from my Quarkus-Kafka GitHub repository. The Quarkus application as well as the resources to create and deploy the Kafka cluster, Prometheus, and Grafana (including the dashboards for producers and Kafka clusters) are all in the repository. Remember that you need to install the operators for Kafka, Prometheus, and Grafana on your Kubernetes or OpenShift cluster.
- Get instructions for installing and configuring Prometheus on an OpenShift cluster using the Prometheus Operator.
- See some Strimzi metrics examples.
- Check out the Kafka 2.6 documentation.
- I recommend reading Optimizing Kafka producers (Paul Mello, 2020).