Apache Kafka

Apache Kafka Streams FlatMap Transformations

Apache Kafka Streams is a feature-rich and robust library which is built on top of Apache Kafka to build the real-time data processing applications. Kafka Streams provides a high-level Streams DSL that allows us to process and transform the stream data into various formats.

One of the transformations that is available in Kafka is a FlatMap operation. This transformation transforms the records from an input stream into zero, one, or multiple records.

This tutorial explores what this transformation does, how it works, and how we can use it in a Kafka Streams application.

Kafka Streams FlatMap Transformation

The FlatMap transform is only supported in KStream and Ktable objects in Kafka Streams. One of the significant reasons why a FlatMap transform is powerful is because the FlatMap transform genertaes a different number of output records for each input record.

Using FlatMap Transform on a KStream Object

To use a FlatMap transformation on a KStream object, we can use the flatMap() or flatMapValues() methods.

The following shows the definition for the flatMap() method:

<KR, VR> KStream<KR, VR> flatMap(KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper)

The following is the explanation for the parameters in the previous method:

K and V – It represent the key and value type of the input KStream, respectively.

KR and VR – represent the key and value types of the output KStream.

KeyValueMapper – It represents a functional interface for a user-defined transformation function.

Mapper – This defines an instance of the KeyValueMapper interface is used to implement the transformation logic.

The method then applies the mapper function to each input record in the stream and generates an iterable collection of key-value pairs for each input record.

The following shows a basic usage for the flatMap() method in action:

import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValue;

KStream outputStream = inputStream.flatMap(
    (key, value) -> {
        List<KeyValue> result = new ArrayList();
        for (String word : value.split(" ")) {
            result.add(KeyValue.pair(key, word));
        }
        return result;
    }
);

The next is the flatMapValues() method, which is closely similar to flatMap(), but instead, it only applies the transformation to the value, not both the key and value.

The following shows basic usage of the flatMapValues() method on a Kafka KStream object.

import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.KStream;


KStream outputStream = inputStream.flatMapValues(
    value -> Arrays.asList(value.split(" "))
);

Using a FlatMap Transform on A KTable Object

Unlike a KStream object, we cannot directly apply a flatMap() transform on a KTable object. However, we can first convert the KTable to a KStream object using the toStream() method, apply the flatMap() transform, and then convert the result back to a Ktable using the toTable() method.

The following shows a simple snippet of how we can accomplish the previous tasks:

import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KStream;

KStream intermediateStream = inputTable.toStream().flatMapValues(
    value -> Arrays.asList(value.split(" "))
);

KTable outputTable = intermediateStream.toTable();

The given example does require an existing KTable that you wish to transform.

Conclusion

This tutorial covers how to work with flatMap transformations in Apache Kafka streams. Feel free to check out the other articles to learn more.

About the author

John Otieno

My name is John and am a fellow geek like you. I am passionate about all things computers from Hardware, Operating systems to Programming. My dream is to share my knowledge with the world and help out fellow geeks. Follow my content by subscribing to LinuxHint mailing list