Apache Kafka

Apache Kafka Streams Session Window Transformations

In Apache Kafka Streams, a session gap is the period of inactivity that defines the end of a session in a session window transformation.

When events occur in a session window, they are grouped based on how closely they occur in time. The session gap parameter determines how long a session can be inactive before it is over.

For example, if the session gap duration is set to 5 minutes, any event that is produced in the period equal to or less than the session gap value is grouped as a single session. However, any event that is produced after the session gap duration starts a new session.

Setting the session gap too short can create too many small windows which increases the processing overhead. Similarly, placing it too long can result in unrelated combined sessions which leads to incorrect processing results.

Hence, it is good to find an optimal session gap depending on the specific use and the characteristics of the data that is stored in the cluster.

Kafka Streams Session Window Transformation

Apache Kafka Streams provides session window transformations to allow us to group the related events within a defined session gap.

Unlike tumbling windows, the session window transformations do not break the stream into fixed-sized chunks. Instead, they group the events based on how close they occur in time.

To be precise, the session window groups together all the events that occur within the session gap period of each other.

To use the session window transformation, we use the WindowedBy() method. The method definition is as shown in the following:

KStream<Windowed<String>, String> windowedStream =

stream.groupByKey()

.windowedBy(SessionWindows.with(Duration.ofMinutes(<value>)));

Feel free to explore the Kafka tutorial to discover how the function works and its various signatures.

Example Application:

The following example demonstrates how to create a session window transformation:

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
public class SessionWindowExample {

  public static void main(String[] args) {
    StreamsBuilder builder = new StreamsBuilder();
    KStream inputStream = builder.stream("users");

    SessionWindows sessionWindow = SessionWindows.with(Duration.ofMinutes(5));

    KGroupedStream groupedStream = inputStream
        .groupByKey()
        .windowedBy(sessionWindow);

    KTable<Windowed, Long> sessionCounts = groupedStream
        .count();
    sessionCounts.toStream().to("users-output");
    KafkaStreams streams = new KafkaStreams(builder.build(), config);
    streams.start();
  }
}

In the given example, we define the input stream from a Kafka topic using the SteamsBuilder class.

Next, we create a SessionWindow object within a 5-minute session gap and apply it to the users stream using the WindowedBy() method. This should create a KGroupedStream object that groups the users’ stream by key and applies the session window transformation.

The next step uses the count() method to aggregate within each session window, creating a Ktable object with the values for each session window.

Finally, we use the to() method to output the results to a new Kafka topic.

NOTE: Remember to define the stream configuration including the bootstrap servers, stream application ID, etc.

Conclusion

This tutorial explored about the Kafka Streams session gaps, what they mean, and how they work. We also discussed about the session window transformations and how to create one with Java.

About the author

John Otieno

My name is John and am a fellow geek like you. I am passionate about all things computers from Hardware, Operating systems to Programming. My dream is to share my knowledge with the world and help out fellow geeks. Follow my content by subscribing to LinuxHint mailing list