This tutorial explores on working with the Kafka Streams cogroup transformation using the KStream API in the Kafka Streams library.
Kafka Streams Cogroup Transformation Method
We can use the cogroup method to perform a cogroup transformation on a Kafka stream. The method transformation is as shown in the following:
The method takes two or more input streams as parameters and returns a new stream which contains the records from all input streams that are grouped by key.
Where K represents the key type, V1 and V2 are the value types of the input streams, and CoGrouped<K, V> is an object that contains the configuration options for the cogroup operation.
Example Application:
Suppose we have a “customers” topic which contains the records with the customer ID as the key and a JSON object as the value. In contrast, a purchases topic includes the records with the customer ID as the key and a JSON object which represents the purchase details as the value.
We can create a simple application that uses a cogroup transformation to combine the records of both topics as shown in the following:
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
public class CustomerPurchaseJoinApp {
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "customer-purchase-join");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> customers = builder.stream("customers");
KStream<String, String> purchases = builder.stream("purchases");
KStream<String, String> customerPurchases = customers
.cogroup(purchases, CoGrouped.with(Serdes.String(), Serdes.String(), Serdes.String()))
.mapValues(new ValueMapper<List<String>, String>() {
@Override
public String apply(List<String> values) {
ObjectMapper mapper = new ObjectMapper();
JsonNode customer = null;
JsonNode purchase = null;
try {
customer = mapper.readTree(values.get(0));
purchase = mapper.readTree(values.get(1));
} catch (Exception e) {
e.printStackTrace();
}
return customer.toString() + "\n" + purchase.toString();
}
});
customerPurchases.to("customer-purchases");
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
}
}
We start by defining the properties of the Kafka Streams such as the application ID, Kafka broker address, key, value serializer/deserializer, etc.
Next, we create a StreamsBuilder instance and define two streams for customers and purchase topics.
Next, define the cogroup operation to combine the records from both streams based on the shared key; in this case, the customer ID is the shared key. Finally, we use the “mapVales” to apply a function that takes the combined records and returns a string representation of the customer and purchase information as JSON objects.
To conclude, we use the KStream for the customer-purchases topic as the output topic.
Conclusion
We explored how we can use the cogroup method to perform a cogroup transformation on Kafka topics.