Apache Kafka

Apache Kafka Producer Transactions Example in Java

Kafka transactions are a powerful feature that enables the applications to achieve the ACID guarantees in a distributed and scalable way. Transactions are implemented using the 2PC protocol and provide a reliable and efficient way to produce and consume the messages in Kafka.

Transactions ensure that multiple messages that are produced or consumed by single or multiple clients are treated as a single unit of work, such that either all or none of them are committed.

This tutorial dives into Kafka internals by learning how to create a basic Kafka producer transaction in Java.

Kafka Producer Transaction Example in Java

The following source code shows how we can implement a producer transaction in Java:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Transaction;
import org.apache.kafka.clients.producer.TransactionManager;

import java.util.Properties;

public class KafkaProducerTransactionsExample {

    private static final String TOPIC_NAME = "coordinates";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {
        // Configure Kafka producer properties
        Properties props = new Properties();
        props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        props.put("transactional.id", "prod1");

        // Create a Kafka producer
        Producer producer = new KafkaProducer(props);

        // Initialize the transaction manager
        TransactionManager txnManager = new TransactionManager(props);
        txnManager.beginTransaction();

        try {
            // Send the first message
            ProducerRecord record1 = new ProducerRecord(TOPIC_NAME, "key1", "value1");
            producer.send(record1);

            // Send the second message
            ProducerRecord record2 = new ProducerRecord(TOPIC_NAME, "key2", "value2");
            producer.send(record2);

            // Commit the transaction
            txnManager.commit();

            System.out.println("Messages sent successfully");
        } catch (Exception e) {
            // Rollback the transaction in case of failure
            txnManager.abort();
            System.out.println("Error sending messages: " + e.getMessage());
        } finally {
            // Close the producer
            producer.close();
        }
    }
}

In the given example, we start by configuring the Kafka producer using the required properties such as the broker address and the transactional ID.

We then create a Kafka producer instance and initialize the transaction manager using the defined properties.

Using the beginTransaction() method of the manager, we initiate the transaction and send two messages to the coordinates topic using the send() method of the producer.

Finally, we commit the transaction using the commit() method from the transaction manager.

NOTE: We also implement a simple error handling which allows us to perform a rollback in case of an exception using the abort() method.

Conclusion

This fundamental guide provides a simplistic overview of setting up a quick Kafka transactional producer using the Java programming language. Check the documentation to explore further.

About the author

John Otieno

My name is John and am a fellow geek like you. I am passionate about all things computers from Hardware, Operating systems to Programming. My dream is to share my knowledge with the world and help out fellow geeks. Follow my content by subscribing to LinuxHint mailing list