Consumers use the topic offset to track which records they already processed. When a consumer connects to a Kafka cluster, it can start consuming the records from a specific offset in a partition, either by specifying the offset explicitly or using one of several predefined offset strategies (e.g. earliest, latest, or a specific timestamp). As the consumer processes the records, it advances its offset to the next record in the partition.
In this tutorial, we will look at a simple Java implementation that we can use to get the offset of the next record to be fetched.
Source Code:
The example source code is shown in the following:
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Main {
private static final Logger log = LoggerFactory.getLogger(Main.class);
public static void main(String[] args) {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// create kafka consumer
Consumer consumer = new KafkaConsumer(properties);
consumer.subscribe(Arrays.asList("users"));
while(true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(200));
for (ConsumerRecord record : records) {
long nextOffset = record.offset() + 1;
log.info("Next offset: " + nextOffset);
}
}
}
}
In this example, the main class defines a static main method that creates a Kafka Consumer, subscribes to a topic, and polls for new records.
For each ConsumerRecord in the ConsumerRecords object, we retrieve the offset of the next record to be fetched by adding 1 to the current offset of the record. We then print the offset to the console and exit.
Conclusion
This post provides a simple Java implementation that allows you to fetch the offset of the next record to be fetched by a Kafka consumer application.