Apache Kafka

Apache Kafka Producer Example in C

Kafka producers are one of the primary building blocks of Apache Kafka. A Kafka consumer refers to a client application which is responsible to generate and publish the data to a Kafka topic.

We can create a Kafka topic in various programming languages such as Python, C++, C, C#, Golang, etc.

When a Kafka producer sends a message to a topic, it first establishes a connection to a Kafka broker, a single node in the Kafka cluster that acts as a message broker. The producer then sends the message to the broker which forwards it to the appropriate partition in the topic.

Before sending the message, the producer must specify the topic name, partition number, and the actual message payload. We can set the partition number explicitly or allow the partition strategy to determine the target partition. Kafka provides various built-in partitioning strategies such as round-robin or hash-based partitioning.

Once a message is published on the topic, the publisher receives an acknowledgment from the broker with the status. The acknowledgment can be synchronous or asynchronous, depending on the producer configuration.

In addition to sending messages, the Kafka producer provides various configuration options to control its behavior. For example, we can use the producer to specify the maximum size of records, the number of retries in case of failures, and the compression codec to compress the messages.

This tutorial discusses how we can build a simple Kafka consumer and producer application using C and the librdkafka.

Requirements:

  1. C Build Toolchain
  2. Librdkafka
  3. Apache Kafka
  4. Zlib Development Package

Project Setup

Create a new directory to store the source code for your project.

$ mkdir kafka-c

Navigate into the directory and create a new file to store the source code for your producer application.

$ touch producer.c

C/C++ Client Installation

The next step is to install the librdkafka client package. On Debian-based distributions, we can use apt as shown in the following command:

$ sudo apt-get install librdkafka-dev

Next, install the zlib package with the following command:

$ sudo apt-get install zlib1g zlib1g-dev -y

Once installed, edit the “producer.c” file and add the source code as shown in the following example:

Kafka Producer Example:

#include <stdio.h>

#include <string.h>

#include <librdkafka/rdkafka.h>

int main(int argc, char **argv) {

char *brokers = "localhost:9092"; // broker addr

char *topic = "sample_topic"; // target topic

// kafka config param

rd_kafka_conf_t *conf = rd_kafka_conf_new();

char errstr[512];

// new producer instance

rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));

if (!producer) {

fprintf(stderr, "Failed to create producer: %s\n", errstr);

  return 1;

}

// Add brokers to producer

if (rd_kafka_brokers_add(producer, brokers) == 0) {

fprintf(stderr, "Failed to add brokers: %s\n", rd_kafka_err2str(rd_kafka_last_error()));

rd_kafka_destroy(producer);

  return 1;

}

// create kafka topic instance

rd_kafka_topic_t *kafka_topic = rd_kafka_topic_new(producer, topic, NULL);

// configure producer messages

  const char *messages[10] = {

"Hello, Kafka! 1",

"Hello, Kafka! 2",

"Hello, Kafka! 3",

"Hello, Kafka! 4",

"Hello, Kafka! 5",

"Hello, Kafka! 6",

"Hello, Kafka! 7",

"Hello, Kafka! 8",

"Hello, Kafka! 9",

"Hello, Kafka! 10"

};

size_t message_lens[10] = {0};

for (int i = 0; i < 10; i++) {

message_lens[i] = strlen(messages[i]);

}

// write records asynchronously

  for (int i = 0; i < 10; i++) {

rd_kafka_message_t *rkmessage = rd_kafka_message_new();

rd_kafka_message_set_topic(rkmessage, topic);

rd_kafka_message_set_payload(rkmessage, (void *) messages[i], message_lens[i]);

if (rd_kafka_produce(kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,

rkmessage->payload, rkmessage->len, NULL, 0, NULL) == -1) {

fprintf(stderr, "Failed to produce message: %s\n", rd_kafka_err2str(rd_kafka_last_error()));

rd_kafka_message_destroy(rkmessage);

rd_kafka_topic_destroy(kafka_topic);

rd_kafka_destroy(producer);

  return 1;

 }

rd_kafka_poll(producer, 0);

rd_kafka_message_destroy(rkmessage);

 }

// Wait for records to be sent asynchronously

rd_kafka_flush(producer, 5000);

// Cleanup

rd_kafka_topic_destroy(kafka_topic);

rd_kafka_destroy(producer);

  return 0;

}

To run the previous code, use the following command:

$ gcc -o producer producer.c -lrdkafka -lz -lpthread

You can then execute the binary to write the messages to the topic.

Kafka Consumer Application

To read the messages that are written to the topic, create a consumer application and add the source code as shown in the following:

#include <stdio.h>

#include <stdlib.h>

#include <string.h>

#include <librdkafka/rdkafka.h>

static void msg_consume(rd_kafka_message_t *rkmessage, void *opaque) {

if (rkmessage->err) {

if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {

fprintf(stderr, "Reached end of partition\n");

  } else {

fprintf(stderr, "Failed to consume message: %s\n", rd_kafka_message_errstr(rkmessage));

  }

  } else {

printf("Received message (%zd bytes): %.*s\n",

rkmessage->len, (int) rkmessage->len, (char *) rkmessage->payload);

  }

rd_kafka_message_destroy(rkmessage);

}

int main(int argc, char **argv) {

char *brokers = "localhost:9092"; // broker addr

char *topic = "sample_topic"; // target topic

int partition = RD_KAFKA_PARTITION_UA; // target partition

int timeout = 1000; // Poll timeout in milliseconds

// config params

rd_kafka_conf_t *conf = rd_kafka_conf_new();

char errstr[512];

// consumer instance

rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));

if (!consumer) {

fprintf(stderr, "Failed to create consumer: %s\n", errstr);

  return 1;

}

// add broker

if (rd_kafka_brokers_add(consumer, brokers) == 0) {

fprintf(stderr, "Failed to add brokers: %s\n", rd_kafka_err2str(rd_kafka_last_error()));

rd_kafka_destroy(consumer);

  return 1;

}

// kafka topic partition list

rd_kafka_topic_partition_list_t *topic_list = rd_kafka_topic_partition_list_new(1);

rd_kafka_topic_partition_list_add(topic_list, topic, partition);

// Start consuming messages from Kafka

if (rd_kafka_consume_start_queue(topic_list, partition, 0, NULL) == -1) {

fprintf(stderr, "Failed to start consuming messages: %s\n", rd_kafka_err2str(rd_kafka_last_error()));

rd_kafka_topic_partition_list_destroy(topic_list);

rd_kafka_destroy(consumer);

  return 1;

}

// Consume messages until interrupt

while (1) {

rd_kafka_message_t *rkmessage = rd_kafka_consume_queue(topic_list, partition, timeout);

if (!rkmessage) {

    continue;

  }

msg_consume(rkmessage, NULL);

  }

// Stop consuming messages

rd_kafka_consume_stop_queue(topic_list, partition);

rd_kafka_topic_partition_list_destroy(topic_list);

rd_kafka_destroy(consumer);

  return 0;

}

To compile the consumer app, run the following command:

gcc -o consumer consumer.c -lrdkafka -lz -lpthread

You can then run the consumer binary to read the messages from Kafka.

Conclusion

This article covered the basics of how to create a Kafka consumer and producer application using the librdkafka library. Check the docs to learn more.

About the author

John Otieno

My name is John and am a fellow geek like you. I am passionate about all things computers from Hardware, Operating systems to Programming. My dream is to share my knowledge with the world and help out fellow geeks. Follow my content by subscribing to LinuxHint mailing list