A Kafka Stream, stream-to-table transformation allows us to convert a stream of data into a table using the pre-defined criteria. We can then query the converted table or pass the data to other transformations that only accept a specific KTable object.
In this tutorial, we will cover how to use the KTable API to convert a stream into a Kafka table.
Kafka Streams KTable API
As mentioned, we can use the KTable API in Kafka Streams to convert a stream into a table object. The KTable API is a high-level abstraction that allows us to perform stateful operations on a given stream and represent the resulting data as a table.
Example Application:
The following example application demonstrates how to use the Kafka Stream KTable API to convert a stream into a table.
Suppose we have a stream which contains the purchase events where each event contains a customer ID and the purchase value. We can create a table that quickly and efficiently depicts the total amount that is purchased by each customer using the KTable API.
An example source code is as follows:
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
public class StreamToTableExample {
public static void main(String[] args) {
String inputTopic = "purchases";
String outputTopic = "customer-purchases";
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-to-table-example");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
StreamsBuilder builder = new StreamsBuilder();
KStream purchases = builder.stream(inputTopic, Consumed.with(Serdes.String(), new PurchaseSerde()));
KStream customerPurchases = purchases
.groupBy((key, purchase) -> purchase.getCustomerId(),
Grouped.with(Serdes.String(), new PurchaseSerde()))
.aggregate(() -> 0L, (key, purchase, total) -> total + purchase.getAmount(),
Materialized.with(Serdes.String(), Serdes.Long()))
.toStream();
customerPurchases.to(outputTopic);
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
In the provided example, we start by defining the input and output topics. We then configure the Kafka streams using the StreamsBuilder class.
In the next step, we create a Kstream from the input topic using the builder.stream() method. We then group the purchases by the customer ID using the groupBy transformation and the aggregate transformation to determine the total amount for each customer.
Finally, we store the resulting KTable into a new topic called “customer-purchases”. Although the last step is not required, we write the result to the output topic using the to() method.
Conclusion
We demonstrated how we can work with the Kafka Streams KTable API to convert a Kafka stream into a table.