Apache Kafka Streams is a library extension of the Apache Kafka platform that allows us to read, process (including transform), and analyze the real-time data streams.
One of the viral features of Kafka Streams is its support for stateful stream processing through its state stores and various stateful operators such as reduce transformations.
In this tutorial, we will learn about one of the transformations that you can do to a Kafka stream called “reduce transformation”.
What Are Kafka Streams Transformations?
Let us start at the basics and define a Kafka stream transformation.
Kafka stream transformations are a set of operations that we can perform on data streams within a Kafka stream application. Transformations allow us to perform tasks such as transforming, filtering, aggregating, joining, and processing the data that flows within the stream application.
Transformations can be stateless or stateful, depending on the requirements. For example, if the transformation needs to store and update the state information, we can consider them stateful transformations.
As you can guess, there are various transformations that we can carry out in a Kafka stream. However, for this one, we will focus on the reduce transformation. You can check our tutorials on the topics to learn more about the other types of Kafka transformations.
Kafka Streams Reduce Transformation
The Kafka Streams reduce transformation is used to aggregate and reduce the values of a given stream based on a specific function. Think of an aggregate function in SQL.
The reduce operation continuously updates the running total as new values are added to the stream based on the defined function.
The resulting output from the reduce operation is emitted to downstream processors whenever a new result is computed.
Kafka Streams Reduce Method
We can use the reduce() method in Kafka Streams to perform a running aggregation over a stream of the records with the same key, where each new record updates the current aggregate result.
The method definition is as shown in the following:
The Reducer<V> is a functional interface that defines the function to reduce the values in a single value.
The reduce() method returns a KTable<K, V> object which represents a table that contains the latest aggregate result for each key in the stream.
From here, we can use the various methods from the KTable class to query and process the data from such a table. For example, we can use the toStream() method to convert the table into a stream of records and the groupBy() method to group the records by a specific key.
Sample Application:
The following shows a simple Java application which demnstrates the usage of the Kafka reduce transformation:
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Properties;
public class ProductPriceReducerDemo {
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "product-price-reducer");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Double().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream productPrices = builder.stream("product-prices");
KTable totalPrices = productPrices
.groupByKey()
.reduce((totalPrice, newPrice) -> totalPrice + newPrice, Materialized.as("product-price-total"));
totalPrices.toStream().foreach((key, value) -> System.out.println("Product: " + key + " Total price: " + value));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
In the given example, we define the Kafka streams’ configuration properties. This includes the application ID, bootstrap servers, key, values, etc.
We then create a StreamsBuilder object to build the stream processing topology.
In the next step, we create a KStream from the product-prices topic and group it by key using the groupByKey()method.
Finally, we use the reduce() method with a Reducer<Double> function to add a new price to the running total.
The resulting totalPrices KTable contains the latest total price for each product in the input stream. We then use the toStream() method to convert the KTable to a KStream of key-value pairs and print each record to the console.
Conclusion
We discussed the reduce transformation in a Kafka stream, how it works, its method definition, return value, and more. We also built a simple Java application to demonstrate the use of the reduce() application in a Kafka stream.