Apache Kafka

Apache Kafka Streams Merge Transformations

In Kafka Streams, transformations refer to the operations that we can perform on the data that is stored in a Kafka stream. The transformations allow us to modify the existing data and store the results in a new stream, print it to the console, or rewrite it back to the original index.

Transformations are some of the most widespread use of the Kafka streams when working in Apache Kafka. This is because they allow you to process and analyze the data in real-time.

Kafka offers a wide range of transformations which allow you to perform various operations on Kafka Streams data.

Merge is one of the available transformations in the Kafka Streams ecosystem. In this tutorial, we will explore what the Kafka merge transformation does and the basic example that explains the method usage.

Kafka Streams Merge Transformations

The Kafka Streams merge transformation allows us to merge two or more data streams. The merge transformation uses the merge method from the Kafka Streams DSL API. The method takes two or more streams as the input and returns a single stream which consists of the concatenation of all the input streams.

Kafka Streams Merge Method

The merge method in Kafka Streams is defined in the KStream interface which represents a record stream in a Kafka topic.

The method definition is as follows:

<K, V> KStream<K, V> merge(KStream<K, V> stream1, KStream<K, V> stream2, @SuppressWarnings("unchecked") KStream<K, V>... streams);

The method takes two or more streams of key-value pairs with the same key and value types, and returns a new KStream object. The returned object contains all the records from the input stream which is concatenated into one.

It is good to note that the merge method concatenates the records from the input streams in the order that they are received. This means that the method does not perform a sorting or filtering transformation before the operation.

You can perform a sorting and filtering transformation to remove the duplicates before passing them to the merge method.

NOTE: A merge transformation concatenates the records of the input streams while a join transformation combines the records based on a joining key.

Example Application:

The following example application demonstrates how to use the merge transformation in Kafka Streams to concatenate the records of the “users” and “orders” topics:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Properties;

public class MergeStreamsExample {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "merge-streams-example");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> usersStream = builder.stream("users");

        KStream<String, String> ordersStream = builder.stream("orders");

        KStream<String, String> mergedStream = usersStream.merge(ordersStream);

        mergedStream.to("merged", Produced.with(Serdes.String(), Serdes.String()));

        KafkaStreams streams = new KafkaStreams(builder.build(), props);

        streams.start();
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

In the provided example, we create two KStream objects by reading from the “users” and “order” topics, respectively.

We then use the merge method to concatenate the records of each stream and write the resulting stream to a new stream called “merged”.

Conclusion

We understood the role of transformations in Kafka. We also learned what merge transformation is, how it works, and how to build a simple application that merges two streams.

About the author

John Otieno

My name is John and am a fellow geek like you. I am passionate about all things computers from Hardware, Operating systems to Programming. My dream is to share my knowledge with the world and help out fellow geeks. Follow my content by subscribing to LinuxHint mailing list