Transformations are some of the most widespread use of the Kafka streams when working in Apache Kafka. This is because they allow you to process and analyze the data in real-time.
Kafka offers a wide range of transformations which allow you to perform various operations on Kafka Streams data.
Merge is one of the available transformations in the Kafka Streams ecosystem. In this tutorial, we will explore what the Kafka merge transformation does and the basic example that explains the method usage.
Kafka Streams Merge Transformations
The Kafka Streams merge transformation allows us to merge two or more data streams. The merge transformation uses the merge method from the Kafka Streams DSL API. The method takes two or more streams as the input and returns a single stream which consists of the concatenation of all the input streams.
Kafka Streams Merge Method
The merge method in Kafka Streams is defined in the KStream interface which represents a record stream in a Kafka topic.
The method definition is as follows:
The method takes two or more streams of key-value pairs with the same key and value types, and returns a new KStream object. The returned object contains all the records from the input stream which is concatenated into one.
It is good to note that the merge method concatenates the records from the input streams in the order that they are received. This means that the method does not perform a sorting or filtering transformation before the operation.
You can perform a sorting and filtering transformation to remove the duplicates before passing them to the merge method.
NOTE: A merge transformation concatenates the records of the input streams while a join transformation combines the records based on a joining key.
Example Application:
The following example application demonstrates how to use the merge transformation in Kafka Streams to concatenate the records of the “users” and “orders” topics:
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import java.util.Properties;
public class MergeStreamsExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "merge-streams-example");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> usersStream = builder.stream("users");
KStream<String, String> ordersStream = builder.stream("orders");
KStream<String, String> mergedStream = usersStream.merge(ordersStream);
mergedStream.to("merged", Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
In the provided example, we create two KStream objects by reading from the “users” and “order” topics, respectively.
We then use the merge method to concatenate the records of each stream and write the resulting stream to a new stream called “merged”.
Conclusion
We understood the role of transformations in Kafka. We also learned what merge transformation is, how it works, and how to build a simple application that merges two streams.