Blog

Structured Streaming Part 2 - Arbitrary Stateful Processing

Back Blog
Share

In the previous part of this miniseries I talked a little about Event time and watermarking in Structured Streaming. You’ve seen the benefits that Spark offers and the main idea behind them. But that’s not all!

I got a lot of questions from devs asking stuff like, what if there’s a need for a different and more complicated concepts of windows? Or what if you want some other transformations other than aggregation? Preceding explored processing based on event time is a very common scenario but it can’t answer these questions.

And that’s what this second part is all about - the answer: Arbitrary Stateful Processing.

As the name suggests itself, Arbitrary Stateful Processing gives us the opportunity to write our own custom code in order to manipulate it with multiple groups of data. With this tool we have the power to define how will the user-defined state look, how it will be updated over time, and what are the conditions for removing this state from memory. This way, everything is in the hands of the developer.

When to use Arbitrary Stateful Processing:

 

1. When we want to have windows based on the number of events (not on event time)

From the image above you can see that besides the typical window operations marked with start and end timestamp, you can now have count-based windows. In this example, the intermediate state preserved in memory will be cleared and printed to sink on every 100 incoming hashtags.

2. Unspecified time windows with a series of events 

Where you need arbitrary logic to maintain and update your state over time (e.g. sessionization – one session per user, the time-out state when the user is not active for a certain period of time)

In this case,you can introduce different sizes of windows. The size of a window is proportional to the duration of a user session. When the user wasn’t active for a certain period of time, the session is terminated, the state is dropped, and this user’s characteristics can be later used to compare to other users.

3. Custom aggregations instead of ordinary/built-in aggregations

You can play with creating custom output in relation to the one you’re using in aggregation.

Spark offers two different set of APIs for this task:

mapGroupsWithState and flatMapGroupsWithState.

Note that they work only with groups of data, and they are only called after the groupByKey method. mapGroupsWithState runs computations over each group of data and returns (at most) one result per group. Meanwhile flatMapGroupsWithState is a more powerful tool and can produce one or more outputs per each group.

For both APIs you need to define:

  • Class definitions for input data, state, and optionally an output (state can be output)
  • A function with the following parameters: key, an Iterator of events and state. With this function you can update the state on a given key with a set of events
  • A time-out (described below)

The hard part of arbitrary stateful processing is that the user must specify how and when to drop the state. Because of this, Spark introduces Time-outs, similar to watermarks. With this you can put the state in time-out based on processing time (GroupStateTimeout.ProcessingTimeTimeout) or event time (GroupStateTimeout.EventTimeTimeout). What’s great about time-outs is that they are calculated for each group from which one global time-out is derived and used across all groups.

Another thing to note is that it doesn’t have strict constraints about when the time-out will actually occur. What does this mean? Just like in stateful processing with event time, where the intermediate state is dropped only after a trigger - well, this is also true for time-outs. But the way Spark thinks is that if there isn’t any data in the input stream, there is no need for a trigger, so it will stick the current state in memory. The trigger will occur only when there is data in the stream.

Supported output modes for arbitrary operations are:

  • Update – for mapGroupWithState and flatMapGroupWithState (“after aggregations” are not allowed)
  • Append – for flatMapGroupWithState (“after aggregations” are allowed)

Arbitrary stateful operations are awesome.

They give you an opportunity to define how your state should look like, how to update it, and when to remove it from memory. But be warned, it’s complex. You are in the driver's seat and you are responsible for pretty much everything. From my point of view, the most fun thing is knowing that Spark is just starting to warm up and we have a lot of progress and a whole set of new possibilities to look forward to.


 

 

Next post

Milivoje Popovac