Transactions ensure that multiple messages that are produced or consumed by single or multiple clients are treated as a single unit of work, such that either all or none of them are committed.
This tutorial dives into Kafka internals by learning how to create a basic Kafka producer transaction in Java.
Kafka Producer Transaction Example in Java
The following source code shows how we can implement a producer transaction in Java:
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Transaction;
import org.apache.kafka.clients.producer.TransactionManager;
import java.util.Properties;
public class KafkaProducerTransactionsExample {
private static final String TOPIC_NAME = "coordinates";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
// Configure Kafka producer properties
Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
props.put("transactional.id", "prod1");
// Create a Kafka producer
Producer producer = new KafkaProducer(props);
// Initialize the transaction manager
TransactionManager txnManager = new TransactionManager(props);
txnManager.beginTransaction();
try {
// Send the first message
ProducerRecord record1 = new ProducerRecord(TOPIC_NAME, "key1", "value1");
producer.send(record1);
// Send the second message
ProducerRecord record2 = new ProducerRecord(TOPIC_NAME, "key2", "value2");
producer.send(record2);
// Commit the transaction
txnManager.commit();
System.out.println("Messages sent successfully");
} catch (Exception e) {
// Rollback the transaction in case of failure
txnManager.abort();
System.out.println("Error sending messages: " + e.getMessage());
} finally {
// Close the producer
producer.close();
}
}
}
In the given example, we start by configuring the Kafka producer using the required properties such as the broker address and the transactional ID.
We then create a Kafka producer instance and initialize the transaction manager using the defined properties.
Using the beginTransaction() method of the manager, we initiate the transaction and send two messages to the coordinates topic using the send() method of the producer.
Finally, we commit the transaction using the commit() method from the transaction manager.
NOTE: We also implement a simple error handling which allows us to perform a rollback in case of an exception using the abort() method.
Conclusion
This fundamental guide provides a simplistic overview of setting up a quick Kafka transactional producer using the Java programming language. Check the documentation to explore further.