If you want to get the fundamentals out of the way, you’ve come to the right place. This tutorial explores how to set up a basic Kafka consumer application using the C programming language.
Since Kafka operates on a pub-sub model, consumer applications are crucial in processing and consuming the data from the topics.
Requirements:
- C Build Toolchain
- Librdkafka
- Apache Kafka
- Zlib Development Package
Project Setup
Create a new directory to store the source code for your project.
Navigate into the directory and create a new file to store the source code for your producer application.
C/C++ Client Installation
The next step is to install the librdkafka client package. We can use apt on Debian-based distributions as shown in the following command:
Next, install the zlib package with the following command:
Once installed, edit the producer.c file and add the source code as shown in the following:
Kafka Producer Example:
#include <string.h>
#include <librdkafka/rdkafka.h>
int main(int argc, char **argv) {
char *brokers = "localhost:9092"; // broker addr
char *topic = "sample_topic"; // target topic
// kafka config param
rd_kafka_conf_t *conf = rd_kafka_conf_new();
char errstr[512];
// new producer instance
rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!producer) {
fprintf(stderr, "Failed to create producer: %s\n", errstr);
return 1;
}
// Add brokers to producer
if (rd_kafka_brokers_add(producer, brokers) == 0) {
fprintf(stderr, "Failed to add brokers: %s\n", rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_destroy(producer);
return 1;
}
// create kafka topic instance
rd_kafka_topic_t *kafka_topic = rd_kafka_topic_new(producer, topic, NULL);
// configure producer messages
const char *messages[10] = {
"Hello, Kafka! 1",
"Hello, Kafka! 2",
"Hello, Kafka! 3",
"Hello, Kafka! 4",
"Hello, Kafka! 5",
"Hello, Kafka! 6",
"Hello, Kafka! 7",
"Hello, Kafka! 8",
"Hello, Kafka! 9",
"Hello, Kafka! 10"
};
size_t message_lens[10] = {0};
for (int i = 0; i < 10; i++) {
message_lens[i] = strlen(messages[i]);
}
// write records asynchronously
for (int i = 0; i < 10; i++) {
rd_kafka_message_t *rkmessage = rd_kafka_message_new();
rd_kafka_message_set_topic(rkmessage, topic);
rd_kafka_message_set_payload(rkmessage, (void *) messages[i], message_lens[i]);
if (rd_kafka_produce(kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,
rkmessage->payload, rkmessage->len, NULL, 0, NULL) == -1) {
fprintf(stderr, "Failed to produce message: %s\n", rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_message_destroy(rkmessage);
rd_kafka_topic_destroy(kafka_topic);
rd_kafka_destroy(producer);
return 1;
}
rd_kafka_poll(producer, 0);
rd_kafka_message_destroy(rkmessage);
}
// Wait for records to be sent asynchronously
rd_kafka_flush(producer, 5000);
// Cleanup
rd_kafka_topic_destroy(kafka_topic);
rd_kafka_destroy(producer);
return 0;
}
To run the given code, use the following command:
You can then execute the binary to write the messages to the topic.
Setting Up the Consumer Application
Once you have the consumer application configured on your machine, it’s time to write the consumer application to read the messages.
The following source code shows a simple application that can read the messages from the defined topic and print the messages to the console:
#include <string.h>
#include <librdkafka/rdkafka.h>
int main(int argc, char **argv) {
char *brokers = "localhost:9092"; // broker addr
char *topic = "sample_topic"; // target topic
// kafka config param
rd_kafka_conf_t *conf = rd_kafka_conf_new();
char errstr[512];
// new consumer instance
rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
if (!consumer) {
fprintf(stderr, "Failed to create consumer: %s\n", errstr);
return 1;
}
// Add brokers to consumer
if (rd_kafka_brokers_add(consumer, brokers) == 0) {
fprintf(stderr, "Failed to add brokers: %s\n", rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_destroy(consumer);
return 1;
}
// create kafka topic instance
rd_kafka_topic_t *kafka_topic = rd_kafka_topic_new(consumer, topic, NULL);
// create consumer topic partition list
rd_kafka_topic_partition_list_t *partitions = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(partitions, topic, RD_KAFKA_PARTITION_UA);
// start consuming from the beginning
rd_kafka_resp_err_t err = rd_kafka_consume_start_offsets(consumer, partitions, RD_KAFKA_OFFSET_BEGINNING, 0);
if (err) {
fprintf(stderr, "Failed to start consuming: %s\n", rd_kafka_err2str(err));
rd_kafka_topic_partition_list_destroy(partitions);
rd_kafka_topic_destroy(kafka_topic);
rd_kafka_destroy(consumer);
return 1;
}
// continuously poll for messages
while (1) {
rd_kafka_message_t *message = rd_kafka_consumer_poll(consumer, 1000);
if (!message) {
continue;
}
if (message->err) {
if (message->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
fprintf(stderr, "Reached end of partition\n");
} else {
fprintf(stderr, "Error while consuming message: %s\n", rd_kafka_message_errstr(message));
}
} else {
printf("Received message: %.*s\n", (int) message->len, (char *) message->payload);
}
rd_kafka_message_destroy(message);
}
// stop consuming
rd_kafka_consume_stop(partitions, RD_KAFKA_PARTITION_UA);
rd_kafka_topic_partition_list_destroy(partitions);
// Cleanup
rd_kafka_topic_destroy(kafka_topic);
rd_kafka_destroy(consumer);
return 0;
}
The previous example consumer uses the librdkafka library to consume the messages to the subscribed topic. The consumer reads the messages from the beginning and continuously polls for new messages using the rd_kafka_consumer_poll() method until all messages are read or an error is encountered.
Top of Form
Conclusion
This article covered the basics of how to create a Kafka consumer and producer application using the librdkafka library. Check the document to learn more.