Besides the extensive use in desktop applications and video games, C# allows the developers to write extensive and robust web applications using the blazor and more.
This tutorial teaches us how to create a Kafka consumer client application using the C# programming language.
What Is a Kafka Consumer?
If you are starting with Apache Kafka, check out our series to discover more. However, a Kafka consumer is a simple application that the subscribes to a specific Kafka topic within a Kafka cluster receives messages from a given partition. A Kafka consumer utilizes a pull-based model to consume the available messages at the defined pace and provides reliable message processing using configurable offset values.
Requirements:
To follow along with this tutorial and produce a working C# consumer app, you need the following:
- .NET Core SDK 2.1 or higher
- Confluent.Kafka NuGet package
- Apache Kafka cluster
Let us dive in!
Setup Sample Topic
The first step is to create the topic that we will subscribe to using the consumer application. We can use the Kafka CLI tools for that. The command is as shown in the following:
--topic users \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1
The given command should create a new topic called “users” with the specified properties.
Project Setup
Let us now setup our project. First, it is good to remember that this tutorial assumes you have the latest version of the .NET SDK and Visual Studio IDE.
Open the Visual Studio IDE and select “Create a New Project” from the “Get Started” page.
Search for “Console App” in the template section.
The next step is configuring the project details such as the project name, directory location, etc.
Choose “.NET 6.0” in the “Framework” selection.
Once completed, open the “program.cs” file and add the source code for the consumer and producer applications. The source code is provided as follows:
using System;
class Program
{
static void Main(string[] args)
{
var config = new ProducerConfig { BootstrapServers = "172.17.148.4:9092" };
using var producer = new ProducerBuilder<Null, string>(config).Build();
for (int i = 0; i < 10; ++i)
{
producer.Produce("users", new Message<Null, string> { Value = $"Hello, Kafka user! ({i})" });
}
producer.Flush(TimeSpan.FromSeconds(10));
Console.WriteLine("Produced 10 messages to topic users");
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "172.17.148.4:9092",
GroupId="group1",
AutoOffsetReset = AutoOffsetReset.Earliest
};
using var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build();
consumer.Subscribe("users");
try
{
while (true)
{
var result = consumer.Consume(TimeSpan.FromSeconds(1));
if (result == null)
{
continue;
}
Console.WriteLine($"Consumed message '{result.Message.Value}' at: '{result.TopicPartitionOffset}'.");
}
}
catch (Exception ex)
{
Console.WriteLine($"Error: {ex.Message}");
}
}
}
Note that the previous code requires you to install the Confluent.Kafka package. You can do this by heading to the Tools -> Nuget Package Manager – Packet Manager Console and running the following command:
This command downloads the required package and installs it in your project.
Let us break down what the project entails:
In the previous program, we started by importing the required packages. In this case, we need the System and Confluent.kafka packages.
Next, we create a ProducerConfig object with the address of the Kafka bootstrap server. You can change this value to reflect the address of your Kafka cluster.
We then use the ProducerBuilder to create a new Kafka producer instance with NULL keys and string values. The producer then writes ten messages to the cluster users’ topic.
In the next section, we use the ConsumerConfig and pass the address to the Kafka server and the target consumer group ID. We also define the AutoOffsetReset parameter and set its value to “Earliest”.
We then use this configuration to create a ConsumerBuilder to build a Kafka consumer of “Ignore” keys and string values. Finally, we use the consumer to subscribe to the users’ topic and enter a loop to continuously poll for the messages using the “Consume” method with a timeout of 1 second.
Once the message is received, the consumer prints the message value and topic partition offset to the console.
You can build and run the project from the Visual Studio IDE.
Conclusion
You now learned how we can set up a Kafka producer and consumer application using the .NET framework through the Confluent.Kafka package.