What Is a Kafka Consumer?
If you are starting with Apache Kafka, check out our series to discover more. However, a Kafka consumer is a simple application that subscribes to a specific Kafka topic within a Kafka cluster to receive the messages from a given partition. A Kafka consumer utilizes a pull-based model to consume the available messages at the defined pace and provide a reliable message processing using configurable offset values.
Requirements
To follow along with this tutorial and produce a working C# consumer app, you need the following:
- .NET Core SDK 2.1 or higher
- Confluent.Kafka NuGet package
- Apache Kafka cluster
Let us dive in!
Setup Sample Topic
The first step is to create the topic that we will subscribe to using the consumer application. For that, we can use the Kafka CLI tools. The command is as shown in the following:
--topic users \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1
The previous command should create a new topic called “users” with the specified properties.
Project Setup
Let us now set up our project. First, it is good to remember that this tutorial assumes you have the latest version of the .NET SDK and Visual Studio IDE.
Open the Visual Studio IDE and select “Create a New Project” from the “Get Started” page.
Search for “Console App” in the template section.
The next step is configuring the project details such as the project name, directory location, etc.
Choose “.NET 6.0” in the “Framework” selection.
Once completed, open the “program.cs” file and add the source code for the consumer and producer applications. The source code is provided in the following:
using System;
class Program
{
static void Main(string[] args)
{
var config = new ProducerConfig { BootstrapServers = "localhost:9092" };
using var producer = new ProducerBuilder<Null, string>(config).Build();
for (int i = 0; i < 10; ++i)
{
producer.Produce("users", new Message<Null, string> { Value = $"Hello, Kafka user! ({i})" });
}
producer.Flush(TimeSpan.FromSeconds(10));
Console.WriteLine("Produced 10 messages to topic users");
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId="group1",
AutoOffsetReset = AutoOffsetReset.Earliest
};
using var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build();
consumer.Subscribe("users");
try
{
while (true)
{
var result = consumer.Consume(TimeSpan.FromSeconds(1));
if (result == null)
{
continue;
}
Console.WriteLine($"Consumed message '{result.Message.Value}' at: '{result.TopicPartitionOffset}'.");
}
}
catch (Exception ex)
{
Console.WriteLine($"Error: {ex.Message}");
}
}
}
Note that the previous code requires you to install the Confluent.Kafka package. You can do this by heading to the Tools -> Nuget Package Manager – Packet Manager Console and run the following command:
This command downloads the required package and installs it in your project.
Let us break down what the project entails.
In the previous program, we start by importing the required packages. In this case, we need the System and Confluent.kafka packages.
Next, we create a ProducerConfig object with the address of the Kafka bootstrap server. You can change this value to reflect the address of your Kafka cluster.
We then use the ProducerBuilder to create a new Kafka producer instance with the NULL keys and string values. The producer then writes ten messages to the cluster users’ topic.
In the next section, we use the ConsumerConfig and pass the address to the Kafka server and the target consumer group ID. We also define the AutoOffsetReset parameter and set its value to “Earliest”.
We then use this configuration to create a ConsumerBuilder to build a Kafka consumer of “Ignore” keys and string values. Finally, we use the consumer to subscribe to the “users” topic and enter a loop to continuously poll for the messages using the “Consume” method with a timeout of 1 second.
Once the message is received, the consumer prints the message value and topic partition offset to the console.
You can build and run the project from the Visual Studio IDE.
Conclusion
You now discovered how you can configure a simple producer and consumer applications in C# using the open source packages and simple commands.