For example, suppose there’s a stream which contains the product information. We can use the branches to split the stream into two substreams based on the price index of a given value. For example: if the product price is above or lower than a specified threshold.
As you can guess, Kafka Streams provide a specific transformation type that allows us to perform this exact operation.
Using this tutorial, you will learn the basics of working with Kafka Streams branch transformation to split a stream into multiple substreams.
What Is a Kafka Stream Branch Transformation?
As the name suggests, the branch transformation allows us to create multiple substreams from a single stream source based on a given condition. Each branch holds a subset of the original data that matches the defined parameters.
Kafka Streams Branch Method
We can use the branch() method to perform a branch transformation. The method definition is as follows:
The method takes a list of predicates as Java predicate objects to evaluate to true or false based on the provided condition.
The predicate interface is a function interface that takes a key of “type k” and a value of “type v” and outputs a Boolean value which indicates whether it’s a matching condition.
The return value of the branch method is an array of KStream<K, V> where each element in the array is a substream that contains the records that match the defined Predicate.
Once a new record is received in the source stream, it is evaluated against each predicate in the order which is defined in the branch method.
Example Application:
The following shows a simple Java application that sorts the products into two substreams based on the product price:
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
public class ProductPriceSorter {
public static void main(String[] args) {
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Product> productsStream = builder.stream("products");
// Create two substreams based on the product price
KStream<String, Product>[] priceBranches = productsStream.branch(
(key, product) -> product.getPrice() > 500,
(key, product) -> product.getPrice() <= 500
);
// Process the high-price products substream
priceBranches[0].mapValues(product -> "High-price product: " + product.getName())
.to("high-price-products", Produced.with(Serdes.String(), Serdes.String()));
// Process the low-price products substream
priceBranches[1].mapValues(product -> "Low-price product: " + product.getName())
.to("low-price-products", Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
}
}
In the provided example, we process each substream separately by mapping the product records to the strings that indicate whether the product is high-priced or low-priced. We then output the matching values either to the high-price-products or low-price-products.
Conclusion
We quickly explored how to work with Kafka Streams branch transformation to branch the data from a source stream into multiple substreams based on a condition.