One of the viral features of Kafka Streams is its support for stateful stream processing through its state stores and various stateful operators such as ForEach transformations.
In this tutorial, we will learn about one of the transformations that you can do to a Kafka stream called “ForEach” transformation.
What Are Kafka Streams Transformations?
Let us start at the basics and define a Kafka stream transformation.
Kafka stream transformations are a set of operations that we can perform on data streams within a Kafka stream application. Transformations allow us to perform tasks such as transforming, filtering, aggregating, joining, and processing the data that flows within the stream application.
Transformations can be stateless or stateful, depending on the requirements. For example, if the transformation needs to store and update the state information, we can consider them stateful transformations.
As you can guess, there are various transformations that we can carry out in a Kafka stream. However, for this one, we will focus on the ForEach transformation. You can check our tutorials on the same topic to learn more about the other types of Kafka transformations.
Kafka Streams ForEach Transformation
The ForEach transformation in Kafka Streams is used to apply a user-defined function to each record in a given stream. One example is printing the values to the console or writing the data to an external system.
Kafka ForEach Method
We can use the ForEach method to perform a ForEach transformation in a stream. The method definition is as shown in the following:
The method accepts the ForEachAction, a functional interface representing an action to be carried out on the items in the stream.
The functional interface has the apply() method which takes two arguments: a key of type k and the value of type v.
The method does not return anything; it applies the action which is defined in the function as a side-effect action.
Once we call the ForEach method on a Kafka stream or KTable object, the function that is provided in the ForEachAction is executed for each record within the stream.
It is good to remember that the order in which the records are processed is determined by the order in which the consumer receives them.
Example Application:
The following shows an example of how we can use the ForEach transformation to print the items from the “users” topic to the console:
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
public class PrintUsersToConsole {
public static void main(String[] args) {
// Set up Kafka Streams properties
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("application.id", "print-users-to-console");
// Create a StreamsBuilder object
StreamsBuilder builder = new StreamsBuilder();
// Build a KStream object that reads from the "users" topic
KStream usersStream = builder.stream("users", Consumed.with(Serdes.String(), Serdes.String()));
// Apply the foreach transformation to print each record to the console
usersStream.foreach((key, value) -> System.out.printf("Key: %s, Value: %s\n", key, value));
// Create a KafkaStreams object and start it
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// Add shutdown hook to stop KafkaStreams gracefully when the JVM is terminated
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
In the given example, we apply the ForEach transformation to the KStream object that reads the records from the “users” topic. The ForEach transformation then prints each record to the console.
You can run the previous code using the following command:
$ java PrintUsersToConsole
This should allow you to print the messages from the console.
Conclusion
We learned about the ForEach application. We learned how it works and how to use it to operate on each record in a given stream.