Apache Kafka

Apache Kafka Consumer Seek Operation

The “seek” operation in Apache Kafka Consumer allows a consumer to manually specify the offset of a particular partition to start consuming. This allows the consumer to rewind or fast-forward to a specific point in the partition and start reading the messages from there.

The seek operation can be used in cases where a consumer wants to re-process the data or has missed some messages. The seek operation is typically used with the “poll” method to retrieve the messages from the specified offset.

This tutorial teaches us how to use the .seek() API in Apache Kafka to replay the messages in a Kafka topic at a specific offset.

Setup Sample Topic

Let’s set up a simple topic for demonstration purposes for this tutorial. The command is as shown in the following:

kafka-topics.sh --bootstrap-server=localhost:9092 --create --topic users --replication-factor 1 --partitions 1

The previous command should create a new topic called “users” with a replication factor of 1 and one partitions.

Once the topic is ready, we can produce the sample messages. Start by creating a sample “user_data.json” file which contains the messages that we wish to publish.

Add the sample data as shown in the following:

[{"first_name":"Gustavo","email":"[email protected]"},
{"first_name":"Letti","email":"[email protected]"},
{"first_name":"Merrie","email":"[email protected]"},
{"first_name":"Hulda","email":"[email protected]"},
{"first_name":"Lynea","email":"[email protected]"},
{"first_name":"Ynes","email":"[email protected]"},
{"first_name":"Miles","email":"[email protected]"},
{"first_name":"Booth","email":"[email protected]"},
{"first_name":"Osborne","email":"[email protected]"},
{"first_name":"Henri","email":"[email protected]"},

The previous file contains 10 user information in JSON format. We then use a simple Python script to read and write the messages to the topic.

Start by creating the file as follows:

$ touch loader.py

Next, install the Kafka package for Python:

$ pip3 install kafka-python

Edit the “loader.py” file:

$ vim loader.py

Add the code as shown in the following:

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
with open('user_data.json', 'r') as file:
    messages = file.readlines()
    for message in messages:
        producer.send('users', message.encode())
producer.flush()

Run the loader script to write the messages to the “users” topic:

$ python3 loader.py

This should read the messages in the script and write them to the “users” topic in the Kafka cluster.

Using the .Seek() API to Read the Messages at a Specific Offset

The following example code demonstrates how we can use the .seek() API to read the data at a specific offset:

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

public class ConsumerSeek {
    public static void main(String[] args) {
        Logger log = LoggerFactory.getLogger(ConsumerSeek.class.getName());

        String bootstrapServer = "127.0.0.1:9092";
        String target_topic = "users";

        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        TopicPartition partitionToReadFrom = new TopicPartition(target_topic, 0);
        int targetOffset = 5;
consumer.assign(Arrays.asList(partitionToReadFrom));

        consumer.seek(partitionToReadFrom, targetOffset);
        log.info("Terminating.");
    }
}

The previous code sets up a Kafka Consumer using the properties for the bootstrap server, key and value deserializers, and the auto offset reset policy. The topic and partition to read from are specified, and the consumer is then moved to a specific offset in that partition using the seek method. The code logs “Terminating” when finished.

Conclusion

You now learned how to work with the seek operation in a Kafka Consumer application using Java. The seek operation allows you to manipulate the position of the consumer’s topic partition. With seek(), you can set the position of the consumer to a specific offset which can be useful for reprocessing the data or skipping irrelevant messages.

About the author

John Otieno

My name is John and am a fellow geek like you. I am passionate about all things computers from Hardware, Operating systems to Programming. My dream is to share my knowledge with the world and help out fellow geeks. Follow my content by subscribing to LinuxHint mailing list