Apache Kafka

Apache Kafka Consumer Example in C

One of the best ways to understand how Kafka works is by creating custom consumer and producer applications using your favorite programming languages.

If you want to get the fundamentals out of the way, you’ve come to the right place. This tutorial explores how to set up a basic Kafka consumer application using the C programming language.

Since Kafka operates on a pub-sub model, consumer applications are crucial in processing and consuming the data from the topics.

Requirements:

  1. C Build Toolchain
  2. Librdkafka
  3. Apache Kafka
  4. Zlib Development Package

Project Setup

Create a new directory to store the source code for your project.

$ mkdir kafka-c

Navigate into the directory and create a new file to store the source code for your producer application.

$ touch producer.c

C/C++ Client Installation

The next step is to install the librdkafka client package. We can use apt on Debian-based distributions as shown in the following command:

$ sudo apt-get install librdkafka-dev

Next, install the zlib package with the following command:

$ sudo apt-get install zlib1g zlib1g-dev -y

Once installed, edit the producer.c file and add the source code as shown in the following:

Kafka Producer Example:

#include <stdio.h>

#include <string.h>

#include <librdkafka/rdkafka.h>

int main(int argc, char **argv) {

  char *brokers = "localhost:9092"; // broker addr

  char *topic = "sample_topic"; // target topic

  // kafka config param

  rd_kafka_conf_t *conf = rd_kafka_conf_new();

  char errstr[512];

  // new producer instance

  rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));

  if (!producer) {

    fprintf(stderr, "Failed to create producer: %s\n", errstr);

    return 1;

  }

  // Add brokers to producer

  if (rd_kafka_brokers_add(producer, brokers) == 0) {

    fprintf(stderr, "Failed to add brokers: %s\n", rd_kafka_err2str(rd_kafka_last_error()));

    rd_kafka_destroy(producer);

    return 1;

  }

  // create kafka topic instance

    rd_kafka_topic_t *kafka_topic = rd_kafka_topic_new(producer, topic, NULL);

  // configure producer messages

  const char *messages[10] = {

    "Hello, Kafka! 1",

    "Hello, Kafka! 2",

    "Hello, Kafka! 3",

    "Hello, Kafka! 4",

    "Hello, Kafka! 5",

    "Hello, Kafka! 6",

    "Hello, Kafka! 7",

    "Hello, Kafka! 8",

    "Hello, Kafka! 9",

    "Hello, Kafka! 10"

  };

  size_t message_lens[10] = {0};

  for (int i = 0; i < 10; i++) {

    message_lens[i] = strlen(messages[i]);

  }

  // write records asynchronously

  for (int i = 0; i < 10; i++) {

     rd_kafka_message_t *rkmessage = rd_kafka_message_new();

     rd_kafka_message_set_topic(rkmessage, topic);

     rd_kafka_message_set_payload(rkmessage, (void *) messages[i], message_lens[i]);

     if (rd_kafka_produce(kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,

     rkmessage->payload, rkmessage->len, NULL, 0, NULL) == -1) {

     fprintf(stderr, "Failed to produce message: %s\n", rd_kafka_err2str(rd_kafka_last_error()));

     rd_kafka_message_destroy(rkmessage);

     rd_kafka_topic_destroy(kafka_topic);

     rd_kafka_destroy(producer);

     return 1;

  }

     rd_kafka_poll(producer, 0);

     rd_kafka_message_destroy(rkmessage);

  }

  // Wait for records to be sent asynchronously

  rd_kafka_flush(producer, 5000);

  // Cleanup

  rd_kafka_topic_destroy(kafka_topic);

  rd_kafka_destroy(producer);

  return 0;

}

To run the given code, use the following command:

$ gcc -o producer producer.c -lrdkafka -lz -lpthread

You can then execute the binary to write the messages to the topic.

Setting Up the Consumer Application

Once you have the consumer application configured on your machine, it’s time to write the consumer application to read the messages.

The following source code shows a simple application that can read the messages from the defined topic and print the messages to the console:

#include <stdio.h>

#include <string.h>

#include <librdkafka/rdkafka.h>

int main(int argc, char **argv) {

  char *brokers = "localhost:9092"; // broker addr

  char *topic = "sample_topic"; // target topic
 
  // kafka config param

  rd_kafka_conf_t *conf = rd_kafka_conf_new();

  char errstr[512];

  // new consumer instance

  rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));

  if (!consumer) {

    fprintf(stderr, "Failed to create consumer: %s\n", errstr);

    return 1;

  }

  // Add brokers to consumer

  if (rd_kafka_brokers_add(consumer, brokers) == 0) {

    fprintf(stderr, "Failed to add brokers: %s\n", rd_kafka_err2str(rd_kafka_last_error()));

    rd_kafka_destroy(consumer);

    return 1;

  }

  // create kafka topic instance

  rd_kafka_topic_t *kafka_topic = rd_kafka_topic_new(consumer, topic, NULL);

  // create consumer topic partition list

  rd_kafka_topic_partition_list_t *partitions = rd_kafka_topic_partition_list_new(1);

  rd_kafka_topic_partition_list_add(partitions, topic, RD_KAFKA_PARTITION_UA);

  // start consuming from the beginning

  rd_kafka_resp_err_t err = rd_kafka_consume_start_offsets(consumer, partitions, RD_KAFKA_OFFSET_BEGINNING, 0);

  if (err) {

    fprintf(stderr, "Failed to start consuming: %s\n", rd_kafka_err2str(err));

    rd_kafka_topic_partition_list_destroy(partitions);

    rd_kafka_topic_destroy(kafka_topic);

    rd_kafka_destroy(consumer);

    return 1;

  }

  // continuously poll for messages

  while (1) {

    rd_kafka_message_t *message = rd_kafka_consumer_poll(consumer, 1000);

    if (!message) {

    continue;

  }

    if (message->err) {

    if (message->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {

    fprintf(stderr, "Reached end of partition\n");

  } else {

  fprintf(stderr, "Error while consuming message: %s\n", rd_kafka_message_errstr(message));

  }

  } else {

  printf("Received message: %.*s\n", (int) message->len, (char *) message->payload);

  }

  rd_kafka_message_destroy(message);

  }

  // stop consuming

  rd_kafka_consume_stop(partitions, RD_KAFKA_PARTITION_UA);

  rd_kafka_topic_partition_list_destroy(partitions);

  // Cleanup

  rd_kafka_topic_destroy(kafka_topic);

  rd_kafka_destroy(consumer);

  return 0;

}

The previous example consumer uses the librdkafka library to consume the messages to the subscribed topic. The consumer reads the messages from the beginning and continuously polls for new messages using the rd_kafka_consumer_poll() method until all messages are read or an error is encountered.

Top of Form

Conclusion

This article covered the basics of how to create a Kafka consumer and producer application using the librdkafka library. Check the document to learn more.

About the author

John Otieno

My name is John and am a fellow geek like you. I am passionate about all things computers from Hardware, Operating systems to Programming. My dream is to share my knowledge with the world and help out fellow geeks. Follow my content by subscribing to LinuxHint mailing list