We can create a Kafka topic in various programming languages such as Python, C++, C, C#, Golang, etc.
When a Kafka producer sends a message to a topic, it first establishes a connection to a Kafka broker, a single node in the Kafka cluster that acts as a message broker. The producer then sends the message to the broker which forwards it to the appropriate partition in the topic.
Before sending the message, the producer must specify the topic name, partition number, and the actual message payload. We can set the partition number explicitly or allow the partition strategy to determine the target partition. Kafka provides various built-in partitioning strategies such as round-robin or hash-based partitioning.
Once a message is published on the topic, the publisher receives an acknowledgment from the broker with the status. The acknowledgment can be synchronous or asynchronous, depending on the producer configuration.
In addition to sending messages, the Kafka producer provides various configuration options to control its behavior. For example, we can use the producer to specify the maximum size of records, the number of retries in case of failures, and the compression codec to compress the messages.
This tutorial discusses how we can build a simple Kafka consumer and producer application using C and the librdkafka.
Requirements:
- C Build Toolchain
- Librdkafka
- Apache Kafka
- Zlib Development Package
Project Setup
Create a new directory to store the source code for your project.
Navigate into the directory and create a new file to store the source code for your producer application.
C/C++ Client Installation
The next step is to install the librdkafka client package. On Debian-based distributions, we can use apt as shown in the following command:
Next, install the zlib package with the following command:
Once installed, edit the “producer.c” file and add the source code as shown in the following example:
Kafka Producer Example:
#include <string.h>
#include <librdkafka/rdkafka.h>
int main(int argc, char **argv) {
char *brokers = "localhost:9092"; // broker addr
char *topic = "sample_topic"; // target topic
// kafka config param
rd_kafka_conf_t *conf = rd_kafka_conf_new();
char errstr[512];
// new producer instance
rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!producer) {
fprintf(stderr, "Failed to create producer: %s\n", errstr);
return 1;
}
// Add brokers to producer
if (rd_kafka_brokers_add(producer, brokers) == 0) {
fprintf(stderr, "Failed to add brokers: %s\n", rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_destroy(producer);
return 1;
}
// create kafka topic instance
rd_kafka_topic_t *kafka_topic = rd_kafka_topic_new(producer, topic, NULL);
// configure producer messages
const char *messages[10] = {
"Hello, Kafka! 1",
"Hello, Kafka! 2",
"Hello, Kafka! 3",
"Hello, Kafka! 4",
"Hello, Kafka! 5",
"Hello, Kafka! 6",
"Hello, Kafka! 7",
"Hello, Kafka! 8",
"Hello, Kafka! 9",
"Hello, Kafka! 10"
};
size_t message_lens[10] = {0};
for (int i = 0; i < 10; i++) {
message_lens[i] = strlen(messages[i]);
}
// write records asynchronously
for (int i = 0; i < 10; i++) {
rd_kafka_message_t *rkmessage = rd_kafka_message_new();
rd_kafka_message_set_topic(rkmessage, topic);
rd_kafka_message_set_payload(rkmessage, (void *) messages[i], message_lens[i]);
if (rd_kafka_produce(kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,
rkmessage->payload, rkmessage->len, NULL, 0, NULL) == -1) {
fprintf(stderr, "Failed to produce message: %s\n", rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_message_destroy(rkmessage);
rd_kafka_topic_destroy(kafka_topic);
rd_kafka_destroy(producer);
return 1;
}
rd_kafka_poll(producer, 0);
rd_kafka_message_destroy(rkmessage);
}
// Wait for records to be sent asynchronously
rd_kafka_flush(producer, 5000);
// Cleanup
rd_kafka_topic_destroy(kafka_topic);
rd_kafka_destroy(producer);
return 0;
}
To run the previous code, use the following command:
You can then execute the binary to write the messages to the topic.
Kafka Consumer Application
To read the messages that are written to the topic, create a consumer application and add the source code as shown in the following:
#include <stdlib.h>
#include <string.h>
#include <librdkafka/rdkafka.h>
static void msg_consume(rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->err) {
if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
fprintf(stderr, "Reached end of partition\n");
} else {
fprintf(stderr, "Failed to consume message: %s\n", rd_kafka_message_errstr(rkmessage));
}
} else {
printf("Received message (%zd bytes): %.*s\n",
rkmessage->len, (int) rkmessage->len, (char *) rkmessage->payload);
}
rd_kafka_message_destroy(rkmessage);
}
int main(int argc, char **argv) {
char *brokers = "localhost:9092"; // broker addr
char *topic = "sample_topic"; // target topic
int partition = RD_KAFKA_PARTITION_UA; // target partition
int timeout = 1000; // Poll timeout in milliseconds
// config params
rd_kafka_conf_t *conf = rd_kafka_conf_new();
char errstr[512];
// consumer instance
rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
if (!consumer) {
fprintf(stderr, "Failed to create consumer: %s\n", errstr);
return 1;
}
// add broker
if (rd_kafka_brokers_add(consumer, brokers) == 0) {
fprintf(stderr, "Failed to add brokers: %s\n", rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_destroy(consumer);
return 1;
}
// kafka topic partition list
rd_kafka_topic_partition_list_t *topic_list = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(topic_list, topic, partition);
// Start consuming messages from Kafka
if (rd_kafka_consume_start_queue(topic_list, partition, 0, NULL) == -1) {
fprintf(stderr, "Failed to start consuming messages: %s\n", rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_topic_partition_list_destroy(topic_list);
rd_kafka_destroy(consumer);
return 1;
}
// Consume messages until interrupt
while (1) {
rd_kafka_message_t *rkmessage = rd_kafka_consume_queue(topic_list, partition, timeout);
if (!rkmessage) {
continue;
}
msg_consume(rkmessage, NULL);
}
// Stop consuming messages
rd_kafka_consume_stop_queue(topic_list, partition);
rd_kafka_topic_partition_list_destroy(topic_list);
rd_kafka_destroy(consumer);
return 0;
}
To compile the consumer app, run the following command:
You can then run the consumer binary to read the messages from Kafka.
Conclusion
This article covered the basics of how to create a Kafka consumer and producer application using the librdkafka library. Check the docs to learn more.