Problem with Kafka streams!?Back 12/07/17
What is Kafka Streams?
Before diving straight into the main topic, let me introduce you with Kafka Streams first.
We have all heard about Apache Kafka as it has been used extensively in the Big Data and Stream processing world. For those that are new in theKafka world, a good intro can be a blog written by Nikola Ivancevic. Kafka Streams is a library that comes with Apache Kafka, it enables easy and powerful stream processing of Kafka events. This way, existing applications can use Kafka Streams API by simply importing the library. This means that all applications that use Kafka Streams can be run in virtually any environment.
Kafka Streams API is implemented in Java. For now only applications written in JVM can utilize this library.
How it works
Kafka Streams library can be used in two distinct ways:
High-Level Stream DSL
High-Level Stream DSL
Using Stream DSL,a user can express transformation, aggregations, grouping, etc, with a simple use of several provided methods. In several lines of code and within a couple of minutes it’s possible to create and deploy a working application, at least in theory. When transforming data from one format to another, internal Kafka Topics are used for storing intermediate results. This means that, for each transformation in chain, Kafka Stream will create one internal topic. With this approach, users enjoy all the benefits and processing security of Kafka without having to invest time in developing any. If anything unexpected occurs during runtime, the application is able to continue from where it was before the breakdown.
All this sounds like a neat solution but it has some serious drawbacks:
With each transformation, data has to be serialized and written into the topic, then for the next operation in the chain it has to be read from the topic, meaning that all side operations happen for every entity like partition key calculation, persisting to disk, etc.
Kafka Streams assumes that Serde class used for Serialization/Deserialization is the one provided in the config. With changing the format of data in operation chain user has to provide the appropriate Serde. If existing Serdes can not handle the used format, the user has to create a custom Serde. No big deal, just extend the Serde class and implement custom Serializer and custom Deserializer. From one class we ended up with 4, kinda not optimal. So, for each custom format of data in the operation chain we create three additional classes. An alternative approach is to use generic JSON or AVRO Serdes. One more thing, the user has to specify a serde for both key and value part of the message,
Restarting of the application. After the breakdown the application will go through each internal topic to the last valid offset, and this can take some time, especially if log compaction is not used and/or retention period is not set up.
Another way to use Kafka Streams is to use Processor API. Processor API provides complete flexibility, the user can implement anything not supported by Stream DSL. It is intended for users that have not completely grasped Stream DSL or when some exotic functionality is required. To use Processor API, the user has to provide concrete implementation of ProcessorSuplier that returns the user’s implementation of the Processor.
The user’s implementation of Processor class needs to override four methods:
init: a method that will be called upon creating an instance of Processor class. It provides a ProcessorContext object for the user. ProcessorContext is commonly used to schedule punctuation period, to forward transformed messages to output topic or to commit processors state,
process: a method that is called for each arriving record, this is where the transformations usually happen,
punctuate: a scheduled method that is called on the configured period of times, often used to send messages in batches, export metrics, populate database, etc,
close: a method called on terminating the app, used to close connections.
Where does the problem lie?
To completely understand the problem, we will first go into detail how the ingest and process occur by default in Kafka Stream. For example purposes, the punctuate method is configured to occur every 10 seconds and in input stream we have exactly 1 message per second. The purpose of the job is to parse input messages, collect them and, in the punctuate method, do batch insert in the database, then to send metrics.
After running the Kafka Stream application the Processor will be created followed by init method. Here is where all the connections are established. Upon a successful start, the application will listen to input topic for incoming messages. It will remain idle until the first message arrives. When the first message arrives the process method is called - this is where transformations occur and the result is stored for later use. If no messages are in input topic, the application will go idle again, waiting for the next message. After each successful process the application checks if punctuate should be called. In our case we will have ten process calls followed by onepunctuate call, with this cycle repeating indefinitely as long as there are messages.
A pretty obvious behavior, isn’t it? Then why is ‘one’ bolded?
This is where the things get tricky. An application can call more than one punctuate methods subsequently. To understand why, read the following sentence again: “After each successful process the application checks if punctuate should be called.”
What would happen if we have processed nine messages and there are no more messages in input topic?
The application will wait until new messages arrive in the topic. But we have started the punctuate period, and now more than 10 seconds have passed. The application is still idle and will remain like that without regard to the started punctuate period. For the purpose of example, the next message arrives one hour later. That message will be processed and put in the same punctuate period before calling insert for the entire batch. After the first expected punctuate theapplication will call 359 more punctuate methods (60 minutes * 6 punctuate per minute). Only after finishing these 360 punctuate calls the application will resume consuming input messages.
There are two problems with this approach:
Messages that arrive long after last processed can end up in a wrong batch,
Multiple punctuate methods can be called in succession.
If we want to group messages in timed windows so that each window contains only records that arrived in Kafka in the exact time bucket before adding a message to bulk we should check the timestamp of the arriving message. After bulk insert (to database, Kafka, etc) save current time and use that time to manually check if arriving messages belong to the next batch. The first message that doesn’t belong to the batch will trigger bulk insert and start a new batch.
What if we are outputting metrics during the punctuate method? One solution is to remember the time of the last punctuate. New punctuate will do output only if the correct amount of time has passed. Again, for the purpose of example, let’s say that the first punctuate takes 1s to complete and the remaining 359 punctuate methods in succession take 1s to check (without metrics output), the application will process messages for 8 seconds before bulk insert. The batch collect period has been shortened by 20% because of punctuate methods.
Why does it last only 8 seconds?
Kafka Stream maps periods on regular and “correct” intervals by default. It will start counting from 0 always, for our configuration intervals this will look like: [0, 10000),[10000, 20000), … Note that Kafka Streams counts the duration of intervals in milliseconds. This means that the scheduled time for the first punctuate method is 10000ms from starting time. No matter how many input messages arrive, the first process method that finishes after 10000ms will trigger punctuate. The second punctuate is scheduled to occur 20000ms from starting time, it will not be affected by the duration of punctuate or process methods.
This will cause wrong metrics data (number of processed messages, latency time, etc).
Multiple punctuate calls in succession can occur if punctuate method lasts more than intended. For example, network slowdown occurs during database population and high database usage is present (large insert in the same tables, thus locking the table and preventing access), results in punctuate last longer that 10 seconds (our interval period); for example, it lasted 30 seconds. This means that the next processed will trigger punctuate again, followed by two unwanted punctuate calls.
One can argue that, if we have such “slow” input, then stream processing is perhaps not the correct solution. This scenario can occur in systems with “fast” input; it all depends on the rate of messages and punctuate period. Many legacy systems still have regular downtimes, when batch jobs are run, database is updated, new deploy is in progress, etc. Scheduled downtimes in the legacy part of the system can affect the correctness of the stream processing part of the system and developers have to keep that in mind.
Another way to avoid the mentioned problems is to apply stateless processing. After each processed message sends the result to output topic, leave the batching to Kafka Producer. This will only shift problems down the line, as this introduces more database calls, more tcp/ip packets over the network and more IO calls, to replicate and permanently store Kafka messages on disks.
The Kafka Streams library provides a performant and scalable solution that can be incorporated into virtually any existing application. It’s developed with ease of use in mind. It provides most commonly used functions while allowing users to implement their own.
Kafka Streams is by no means a silver bullet and the same solution might not work in different use cases. Processor API is intended to be flexible, but that only places the implementation of corner cases on the user. High-Level Stream DSL provides several solutions for windowing and aggregations, while not covering all.
Some frameworks like Apache Flink put a strong emphasis on windowing and correctness, at some cost in performance or usability. No matter the framework, corner cases always require special care.