In this post, we will quickly explore the Kafka Stream GroupByKey transformation and understand how it works.
What Are the GroupByKey Transformations?
In Kafka streams, the groupByKey transformations are used to group the events within a specific stream based on a particular key.
For example, suppose you have a stream of customer orders. We might want to group all the orders based on the customer ID. We can use the groupbByKey transformation to perform this operation and create a new stream where each event represents a group of events with the same key.
Kafka Stream GroupByMethod()
We can use the groupByKey() method in a Kafka Stream to perform the groupByKey transformation. The method takes no arguments and returns the KGroupedStream object. This object represents the grouped stream which we can pass to other methods for further processing.
The following shows the method definition for the groupByMethod():
Grouped<K, V> grouped
);
As you can see from the previous method, the method takes an optional grouped parameter that allows you to specify how to serialize and deserialize the key and values in the stream. If this value is not determined, the application uses the default serializer and deserializers.
Sample Application:
The following source code shows how we can use the groupByKey() method to perform a GroupByKey transformation:
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import java.util.Properties;
public class GroupByKeyExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "group-by-key-example");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Double().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream orders = builder.stream("customer-orders");
KGroupedStream ordersByCustomer = orders.groupByKey();
KTable salesByCustomer = ordersByCustomer.reduce(
(totalSales, newOrder) -> totalSales + newOrder,
Materialized.as("sales-by-customer-store")
);
salesByCustomer.toStream().to("sales-by-customer");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
In the previous example, we create a simple consumer application that consumes a stream of events from the customer-orders topic.
The application also groups the orders by the customer ID using the groupByKey() method.
It then calculates the total amount of sales for each customer using the reduce() method.
Finally, we store the results in a Kafka table called “sales-by-customer-store” and write them to a new topic called “sales-by-customer”.
Note: The given example is an extremely simple application for demonstration purposes only. In a real-world scenario, you need to configure the various parameters and ensure that the Kafka Streams library is installed in your application dependencies.
Conclusion
You now understand what the groupByKey transformation means when it comes to Kafka streams. You also learned about the Kafka groupByKey() method in Java, how it works, and how to use it.