We can use the Kafka Streams platform to build the stream processing applications using a high-level DSL for processing and analyzing the data in real time.
Properties of Kafka Streams
The following are some points that you may need to understand about the Apache Kafka Streams platform:
Kafka Streams is essentially a library that is embedded in a Kafka broker. Its architecture is similar to the Kafka broker which means that it runs on the JVM. It then uses Kafka as a storage layer.
The Kafka Streams platform provides the Processor API and the Stream DSL API. The processor API allows us the access to a low-level interface to create the custom processors and connect them to Kafka topics. The Stream DSL API provides a high-level interface to process the data such as filtering, joining, reducing, etc.
Another feature of Kafka Streams is the Windowing support. Windowing allows us to process the data over a specific time interval. You can specify the windowing parameters based on the time interval or a set number of records.
Last but not the least is processing the semantics. Kafka Streams support the At-Least-Once and Exactly-Once processing. The At-Least-Once ensures that every record is processed at least once, while the Exactly-Once ensures that every record is processed only once.
In this tutorial, we will explore about the selectKey transformation using one of Kafka Streams’ transformation features to process the stream data in a specific format.
Apache Kafka SelectKey Transformation
One of the common Kafka Streams transformations is the selectKey transformation. This is used to modify the key of a record in a Kafka topic.
The selectKey transformation uses the selectKey() method, which takes the KeyValueMapper function to map a key-value pair to a new key-value pair with a modified key.
The new key can be different from the original, provided it’s a valid Kafka type. The type of value cannot be changed.
The following shows the method definition for the selectKey() method:
The method takes a KeyValueMapper function as its argument. The KeyValueMapper function has the following signature:
R apply(K key, V value);
}
The apply() method of the KeyValueMapper function takes a record’s original key and value as its input, and returns a new key-value pair with the modified key.
Finally, the selectKey() method returns a new KStream instance with the same value as the original stream but with a modified key.
Example Application:
The following application shows how to use Java to build a basic application that demonstrates the use of the selectKey transformation:
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Properties;
public class SelectKeyExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("application.id", "select-key-java");
StreamsBuilder builder = new StreamsBuilder();
// create a stream of key-value pairs
KStream stream = builder.stream("users");
// use selectKey to convert the key of each record to uppercase
KStream modifiedStream = stream.selectKey((key, value) -> key.toUpperCase());
// write the modified stream to an output topic
modifiedStream.to("users-output");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
In the provided example, we use the StreamsBuilder to create a new stream of key and value pairs from the “users” topic. We then use the selectKey transformation to modify the key of each record by converting it to uppercase.
Finally, we write the modified stream to the output topic using the to() method.
Conclusion
We explored the basics of Kafka Streams and how to use the Kafka selectKey() transformation to modify the key of a topic.