Kafka Streams stands as a client library facilitating the creation of intricate stateful streaming applications, offering an adaptable deployment approach. Integrated seamlessly within the Apache Kafka ecosystem, Streams stores both input and output data within a Kafka cluster, seamlessly uniting the ease of developing and deploying standard Java and Scala applications on the client side with the robust server-side cluster capabilities inherent to Kafka.
Kafka Streams can be run stateful or stateless.When it is stateful, it uses a storage engine to do the state storing in order to process events in context of other events. The selected storage engine has a crucial role in determining the overall performance of the application since it has become part of the data path. Every simple or complicated operation should use the storage engine for the live streaming events.
Why does the storage engine’s performance matter in Kafka streams?
In Kafka Streams, stateful processing involves maintaining and updating the state as new events are processed. This state could include aggregations, counts, or any other information derived from the input data. To ensure fault tolerance and resiliency, Kafka Streams applications need to be able to recover their state after an unexpected shutdown or failure. When a Kafka Streams application restarts after a failure, it must restore its state to continue processing where it left off. This involves reading the checkpoint data from the changelog topic and restoring the state of the application accordingly.
The state restore operation can be heavy and its performance is affected by the storage engine used for the state storing.
Here is a comparison of state restore time when Kafka Streams is running with different storage engines:
In a failure time, this change impacts the users’ experience.
Today Kafka Streams uses RocksDB as the default storage engine. This storage engine has become very popular in the past few years but also has known issues and limitations that most users are dealing with.
In this blog post, I will provide a few tips on how to improve the performance of the stateful Kafka Streams application with some RocksDB tuning and also introduce an alternative for RocksDB, “RocksDB on steroids” = Speedb.
Write Buffer Size
Write buffer, AKA memtable, is an in-memory data structure that holds the newly written data in memory before flushing it to the disk. Data is written to the write buffer and when the memtable is full, the data is flushed to the disk.
The size of the write buffer has different effects on the LSM structure, as well as on the flush schedule and sizes.
Today by default the write buffer size is 16MB.
Changing it to 64MB can provide significant performance improvements: up to 50% more write workload and 30% decrease in write amplification
So for a total increase in RAM footprint of 200MB
(64(recommended)-16(default today))*4(max memtable num) per CF in MB you gain a 30% performance increase!
Note that you can increase the write buffer size of a specific column family, based on your needs. This is not a system-wide parameter.
Compaction Method
RocksDB provides several compaction methods, the most popular are Leveled compaction and Universal compaction (default in Kafka Streams).While universal compaction can provide better performance in some use cases, the space amplification cost is not negligible, nor is the write amplification.Some tests show that when changing to Leveled compaction can result in a 60% reduction in space amplification and 30% less write amplification compared to Universal compaction.
*The tests were running with 80M keys of 1KB
Read and seek performance improvement gains in these scenarios:
We also observed, when using 16MB write buffer size, there is a 10% decrease in write performance with Leveled compaction compared to Universal compaction.
Changing to Leveled compaction results:
60% lower space amplification
22% lower write amplification
12% improvement in mixed workload
38% improvement in Seek random
10% reduction in write workload
But
Increase the write buffer size to 64MB (in addition to the compaction method change) and you gain 10% more write workload!
Performance Or Memory Consumption?
Let's talk about tradeoffs. Using RocksDB comes with a tradeoff between memory consumption and performance. In a heavy write workload scenario, the storage engine can get into a situation where the write buffer size is not big enough to handle all of the new write requests. In such cases, you can either experience performance degradation or allow the application to use extra memory without affecting the performance.
The allow_stalls parameter, which is part of the write buffer manager settings, controls this behavior.By enabling allow_stalls, you prioritize memory consumption at the cost of performance degradation. When allow_stalls is disabled, the storage engine potentially can use unlimited memory to handle the write requests and can cause out-of-memory conditions.
There is no “1 optimal” configuration. It depends on the workload and your preferences.
Speedb - A RocksDB Alternative
Speedb is a drop-in replacement for RocksDB, designed to solve the known limitations RocksDB has and to improve the overall performance and the user experience of the storage engine.
One of the projects Speedb has implemented is a new Memory Manager. The purpose of the memory manager is to handle this tradeoff better and ensure you can maximize your system's resources without compromising on performance.
The memory management project handles both clean and dirty data. Dirty data refers to data that has not been flushed yet while clean refers to data that has been flushed to the disk. The dirty data manager consists of 2 main changes: enhanced delayed write mechanism and proactive flushes. We changed the delayed write mechanism to be more efficient and moderate to eliminate stalls and provide stable performance over time. The proactive flushes feature changed the flushing schedule and triggers so it releases memory when it can and prevents the creation of many small files in L0 that leads to performance degradation.
The graphs below show the benefits of the 2 mentioned improvements separately. The picture is clear: the performance is stable and higher.
The following graphs show the benefits of the dirty memory manager Speedb introduced: high performance, without stalls and lower memory usage compared to RocksDB, running the same workload and experiencing many stalls.
(Lower is better)
This memory vs performance tradeoff also exists in the clean data (data that has been flushed to the disks). Today RocksDB offers a pinning mechanism to improve read performance by pin metadata to the cache, index and filter blocks. The issue with the pinning mechanism is that in case the metadata is larger than the system memory, it can lead to an out-of-memory incident.
Speedb added a safety belt to avoid that.With Speedb the metadata pinning is happening up to a predefined threshold. When reaching the threshold, LRU is being used instead. This static pinning mechanism allows you to enjoy the benefit of pinning without any risk to the system's stability.
The new Speedb’s pinning policy can be created via TablePinningPolicy::CreateFromString.
For example, the call TablePinningPolicy::CreateFromString(...,“id=speedb_scoped_pinning_policy; capacity=1G; bottom_percent=10;mid_percent=80”, …)
creates a new scoped pinning policy that will:
- Pin up to 1G of memory
- Limit the bottom-most level to no more than 10% of 1G
- Limit the mid-level pinning to 80% of 1G.
The following graph shows the benefit of pinning vs LRU and the results are very clear: there is a big advantage when using pinning, and now with Speedb’s safety belt, there is no risk using it.
These are just a few examples of the benefits Speedb introduced In the Speedb Open source project, you can find these features and many more.
Let’s summarize the tips:
- Increase the write buffer size from 16MB to 64MB
- Change the compaction method to Leveled compaction
- Understand the memory vs performance tradeoff
- Use the Speedb storage engine in your Kafka Streams application for higher and stabler performance
For any RocksDB/Speedb questions join our community on Discord!https://discord.gg/5guy9eCNC3