Apache Kafka

Apache Kafka Streams Stream to Table Transformations

A Kafka table refers to a logical view of a continuously updated stream of data that can be queried and updated in real time. Kafka tables API allows us to create the tables from the Kafka topics, perform operations on the tables, and output the results to the Kafka topics or external data stores.

A Kafka Stream, stream-to-table transformation allows us to convert a stream of data into a table using the pre-defined criteria. We can then query the converted table or pass the data to other transformations that only accept a specific KTable object.

In this tutorial, we will cover how to use the KTable API to convert a stream into a Kafka table.

Kafka Streams KTable API

As mentioned, we can use the KTable API in Kafka Streams to convert a stream into a table object. The KTable API is a high-level abstraction that allows us to perform stateful operations on a given stream and represent the resulting data as a table.

Example Application:

The following example application demonstrates how to use the Kafka Stream KTable API to convert a stream into a table.

Suppose we have a stream which contains the purchase events where each event contains a customer ID and the purchase value. We can create a table that quickly and efficiently depicts the total amount that is purchased by each customer using the KTable API.

An example source code is as follows:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
public class StreamToTableExample {
public static void main(String[] args) {
        String inputTopic = "purchases";
        String outputTopic = "customer-purchases";
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-to-table-example");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        StreamsBuilder builder = new StreamsBuilder();
        KStream purchases = builder.stream(inputTopic, Consumed.with(Serdes.String(), new PurchaseSerde()));
        KStream customerPurchases = purchases
                .groupBy((key, purchase) -> purchase.getCustomerId(),
                        Grouped.with(Serdes.String(), new PurchaseSerde()))
                .aggregate(() -> 0L, (key, purchase, total) -> total + purchase.getAmount(),
                        Materialized.with(Serdes.String(), Serdes.Long()))
                .toStream();
        customerPurchases.to(outputTopic);
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }

}

In the provided example, we start by defining the input and output topics. We then configure the Kafka streams using the StreamsBuilder class.

In the next step, we create a Kstream from the input topic using the builder.stream() method. We then group the purchases by the customer ID using the groupBy transformation and the aggregate transformation to determine the total amount for each customer.

Finally, we store the resulting KTable into a new topic called “customer-purchases”. Although the last step is not required, we write the result to the output topic using the to() method.

Conclusion

We demonstrated how we can work with the Kafka Streams KTable API to convert a Kafka stream into a table.

About the author

John Otieno

My name is John and am a fellow geek like you. I am passionate about all things computers from Hardware, Operating systems to Programming. My dream is to share my knowledge with the world and help out fellow geeks. Follow my content by subscribing to LinuxHint mailing list