Apache Kafka

Apache Kafka Streams ForEach Transformations

Apache Kafka Streams is a library extension of the Apache Kafka platform that allows us to read, process (including transform), and analyze the real-time data streams.

One of the viral features of Kafka Streams is its support for stateful stream processing through its state stores and various stateful operators such as ForEach transformations.

In this tutorial, we will learn about one of the transformations that you can do to a Kafka stream called “ForEach” transformation.

What Are Kafka Streams Transformations?

Let us start at the basics and define a Kafka stream transformation.

Kafka stream transformations are a set of operations that we can perform on data streams within a Kafka stream application. Transformations allow us to perform tasks such as transforming, filtering, aggregating, joining, and processing the data that flows within the stream application.

Transformations can be stateless or stateful, depending on the requirements. For example, if the transformation needs to store and update the state information, we can consider them stateful transformations.

As you can guess, there are various transformations that we can carry out in a Kafka stream. However, for this one, we will focus on the ForEach transformation. You can check our tutorials on the same topic to learn more about the other types of Kafka transformations.

Kafka Streams ForEach Transformation

The ForEach transformation in Kafka Streams is used to apply a user-defined function to each record in a given stream. One example is printing the values to the console or writing the data to an external system.

Kafka ForEach Method

We can use the ForEach method to perform a ForEach transformation in a stream. The method definition is as shown in the following:

void foreach(ForeachAction<? super K, ? super V> action)

The method accepts the ForEachAction, a functional interface representing an action to be carried out on the items in the stream.

The functional interface has the apply() method which takes two arguments: a key of type k and the value of type v.

The method does not return anything; it applies the action which is defined in the function as a side-effect action.

Once we call the ForEach method on a Kafka stream or KTable object, the function that is provided in the ForEachAction is executed for each record within the stream.

It is good to remember that the order in which the records are processed is determined by the order in which the consumer receives them.

Example Application:

The following shows an example of how we can use the ForEach transformation to print the items from the “users” topic to the console:

import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;

public class PrintUsersToConsole {

    public static void main(String[] args) {
       
        // Set up Kafka Streams properties
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("application.id", "print-users-to-console");

        // Create a StreamsBuilder object
        StreamsBuilder builder = new StreamsBuilder();

        // Build a KStream object that reads from the "users" topic
        KStream usersStream = builder.stream("users", Consumed.with(Serdes.String(), Serdes.String()));

        // Apply the foreach transformation to print each record to the console
        usersStream.foreach((key, value) -> System.out.printf("Key: %s, Value: %s\n", key, value));

        // Create a KafkaStreams object and start it
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        // Add shutdown hook to stop KafkaStreams gracefully when the JVM is terminated
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

In the given example, we apply the ForEach transformation to the KStream object that reads the records from the “users” topic. The ForEach transformation then prints each record to the console.

You can run the previous code using the following command:

$ javac PrintUsersToConsole.java
$ java PrintUsersToConsole

This should allow you to print the messages from the console.

Conclusion

We learned about the ForEach application. We learned how it works and how to use it to operate on each record in a given stream.

About the author

John Otieno

My name is John and am a fellow geek like you. I am passionate about all things computers from Hardware, Operating systems to Programming. My dream is to share my knowledge with the world and help out fellow geeks. Follow my content by subscribing to LinuxHint mailing list