One of the transformations that is available in Kafka is a FlatMap operation. This transformation transforms the records from an input stream into zero, one, or multiple records.
This tutorial explores what this transformation does, how it works, and how we can use it in a Kafka Streams application.
Kafka Streams FlatMap Transformation
The FlatMap transform is only supported in KStream and Ktable objects in Kafka Streams. One of the significant reasons why a FlatMap transform is powerful is because the FlatMap transform genertaes a different number of output records for each input record.
Using FlatMap Transform on a KStream Object
To use a FlatMap transformation on a KStream object, we can use the flatMap() or flatMapValues() methods.
The following shows the definition for the flatMap() method:
The following is the explanation for the parameters in the previous method:
K and V – It represent the key and value type of the input KStream, respectively.
KR and VR – represent the key and value types of the output KStream.
KeyValueMapper – It represents a functional interface for a user-defined transformation function.
Mapper – This defines an instance of the KeyValueMapper interface is used to implement the transformation logic.
The method then applies the mapper function to each input record in the stream and generates an iterable collection of key-value pairs for each input record.
The following shows a basic usage for the flatMap() method in action:
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValue;
…
KStream outputStream = inputStream.flatMap(
(key, value) -> {
List<KeyValue> result = new ArrayList();
for (String word : value.split(" ")) {
result.add(KeyValue.pair(key, word));
}
return result;
}
);
The next is the flatMapValues() method, which is closely similar to flatMap(), but instead, it only applies the transformation to the value, not both the key and value.
The following shows basic usage of the flatMapValues() method on a Kafka KStream object.
import org.apache.kafka.streams.kstream.KStream;
…
KStream outputStream = inputStream.flatMapValues(
value -> Arrays.asList(value.split(" "))
);
Using a FlatMap Transform on A KTable Object
Unlike a KStream object, we cannot directly apply a flatMap() transform on a KTable object. However, we can first convert the KTable to a KStream object using the toStream() method, apply the flatMap() transform, and then convert the result back to a Ktable using the toTable() method.
The following shows a simple snippet of how we can accomplish the previous tasks:
import org.apache.kafka.streams.kstream.KStream;
…
KStream intermediateStream = inputTable.toStream().flatMapValues(
value -> Arrays.asList(value.split(" "))
);
KTable outputTable = intermediateStream.toTable();
The given example does require an existing KTable that you wish to transform.
Conclusion
This tutorial covers how to work with flatMap transformations in Apache Kafka streams. Feel free to check out the other articles to learn more.