Apache Kafka

Apache Kafka Streams Branch Transformations

In Kafka Streams, a branch or substream refers to a subset of data which is created from a single source stream based on some condition. Branches allow us to split a single stream into multiple substreams, each representing a section of the original data and matching a specific condition.

For example, suppose there’s a stream which contains the product information. We can use the branches to split the stream into two substreams based on the price index of a given value. For example: if the product price is above or lower than a specified threshold.

As you can guess, Kafka Streams provide a specific transformation type that allows us to perform this exact operation.

Using this tutorial, you will learn the basics of working with Kafka Streams branch transformation to split a stream into multiple substreams.

What Is a Kafka Stream Branch Transformation?

As the name suggests, the branch transformation allows us to create multiple substreams from a single stream source based on a given condition. Each branch holds a subset of the original data that matches the defined parameters.

Kafka Streams Branch Method

We can use the branch() method to perform a branch transformation. The method definition is as follows:

public KStream<K, V>[] branch(Predicate<? super K, ? super V>... predicates)

The method takes a list of predicates as Java predicate objects to evaluate to true or false based on the provided condition.

The predicate interface is a function interface that takes a key of “type k” and a value of “type v” and outputs a Boolean value which indicates whether it’s a matching condition.

The return value of the branch method is an array of KStream<K, V> where each element in the array is a substream that contains the records that match the defined Predicate.

Once a new record is received in the source stream, it is evaluated against each predicate in the order which is defined in the branch method.

Example Application:

The following shows a simple Java application that sorts the products into two substreams based on the product price:

import org.apache.kafka.common.serialization.Serdes;

import org.apache.kafka.streams.KafkaStreams;

import org.apache.kafka.streams.StreamsBuilder;

import org.apache.kafka.streams.kstream.KStream;

import org.apache.kafka.streams.kstream.Produced;

public class ProductPriceSorter {

  public static void main(String[] args) {

    StreamsBuilder builder = new StreamsBuilder();

    KStream<String, Product> productsStream = builder.stream("products");

    // Create two substreams based on the product price

    KStream<String, Product>[] priceBranches = productsStream.branch(

        (key, product) -> product.getPrice() > 500,

        (key, product) -> product.getPrice() <= 500

    );

    // Process the high-price products substream

    priceBranches[0].mapValues(product -> "High-price product: " + product.getName())

        .to("high-price-products", Produced.with(Serdes.String(), Serdes.String()));

    // Process the low-price products substream

        priceBranches[1].mapValues(product -> "Low-price product: " + product.getName())

        .to("low-price-products", Produced.with(Serdes.String(), Serdes.String()));

        KafkaStreams streams = new KafkaStreams(builder.build(), config);

        streams.start();

    }

}

In the provided example, we process each substream separately by mapping the product records to the strings that indicate whether the product is high-priced or low-priced. We then output the matching values either to the high-price-products or low-price-products.

Conclusion

We quickly explored how to work with Kafka Streams branch transformation to branch the data from a source stream into multiple substreams based on a condition.

About the author

John Otieno

My name is John and am a fellow geek like you. I am passionate about all things computers from Hardware, Operating systems to Programming. My dream is to share my knowledge with the world and help out fellow geeks. Follow my content by subscribing to LinuxHint mailing list