We can use the Kafka Streams platform to build the stream processing applications using a high-level DSL to process and analyze the data in real-time.
Properties of Kakfa Streams
The following are some points that you may need to understand about the Apache Kafka Streams platform:
Kafka Streams is essentially a library that is embedded in a Kafka broker. Its architecture is similar to a Kafka broker which means it runs on the JVM. It then uses Kafka as a storage layer.
The Kafka Streams platform provides the Processor API and the Stream DSL API. The processor API allows us the access to a low-level interface to create custom processors and connect them to Kafka topics. The Stream DSL API provides a high-level interface for processing the data such as filtering, joining, reducing, etc.
Another feature of Kafka Streams is Windowing support. Windowing allows us to process the data over a specific time interval. You can specify the windowing parameters based on the time interval or a set number of records.
Last but not the least is processing the semantics. Kafka Streams support the At-Least-Once and Exactly-Once processing. The At-Least-Once ensures that every record is processed at least once, while the Exactly-Once ensures that every record is processed only once.
In this tutorial, we will explore using one of Kafka Streams’ transformation features to process the stream data in a specific format.
Kafka Streams Count Transformation
As mentioned, Kafka Streams provides a set of transformations that we can apply to the data streams. One such transformation is the count transformation.
The count transformation allows us to count the records in a given stream. We can apply the transformation to a KStream or KTable object representing a stream or table.
The count transformation then generates a KTable object which contains the count of records for every key in the input stream/table.
Kafka Stream Count Method
We use the Count() method to call the count transformation on a stream/table. The method definition is as shown in the following:
The method accepts the K value, a KStream, or KTable object. It then returns the KTable with the count of records.
Example Application:
The following sample application demonstrates how to use the count transformation using Java:
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import java.util.Properties;
public class CountUsersApp {
public static void main(String[] args) {
// set up the properties for the Kafka Streams application
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "count-users-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// create a new StreamsBuilder instance
StreamsBuilder builder = new StreamsBuilder();
// create a KStream from the "users" topic
KStream usersStream = builder.stream("users");
// group the stream by key and count the number of records for each key
KTable countTable = usersStream
.groupBy((key, value) -> key)
.count();
// print the count to the console
countTable.toStream().foreach((key, value) -> System.out.println("Count for key " + key + " is " + value));
// create a new KafkaStreams instance using the properties and the StreamsBuilder
KafkaStreams streams = new KafkaStreams(builder.build(), props);
// start the KafkaStreams instance
streams.start();
// add shutdown hook to close the KafkaStreams instance gracefully
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
In the provided example, we start by setting the properties of the Kafka stream app.
We then create a new StreamBuilderinstance. We then proceed to create a KStream from the “users” topic and group the stream by key using the groupBy method.
Using the count() method, we count the number of records for each key using the count() method.
Finally, we print the count to the console using the foreach transformation on the resulting KTable.
Conclusion
We explored the fundamentals of working with Kafka transformations by learning about the Kafka Streams count transformation.