Go provides a powerful standard library which is focused on simplicity, making it easier for the developers to write a reliable and efficient code. It also includes support for low-level programming, with the ability to interact directly with the memory and create a system-level software.
Go’s syntax is clean and concise, promoting readability and maintainability. It provides strict typing, making it easier to catch errors at compile time. Its garbage collector frees up memory as needed, reducing the risk of memory leaks. Additionally, Go has excellent support for concurrency and parallelism, making it well-suited to develop some highly concurrent systems.
Introduction to Kafka
Apache Kafka is a free, open-source, highly scalable, distributed, and fault-tolerant publish-subscribe messaging system. It is designed to handle a high volume, high throughput, and real-time data stream, making it suitable for many use cases including log aggregation, real-time analytics, and event-driven architectures.
Kafka is based on a distributed architecture which allows it to handle large amounts of data across multiple servers. It uses a publish-subscribe model where producers send messages to the topics, and consumers subscribe to them to receive them. This allows for decoupled communication between the producers and consumers, providing high scalability and flexibility.
Kafka provides strong durability guarantees, with the messages stored on the disk and replicated across multiple servers for fault tolerance. It also supports batch processing which allows for the efficient handling of large amounts of data.
This tutorial explores how we can build a producer and consumer application using the Go programming language.
NOTE: This tutorial demonstrates how to build a Go app using the latest Go version (v1.20) as of writing this tutorial. We use the latest Apache Kafka version on a local deployment.
Requirements
Although this tutorial is beginner friendly, we do not cover the basics of installing and setting up the Apache Kafka or the Golang toolchain on your machine. You can check our tutorials on the topics to learn more.
Step 1: Create a Project
Start by setting up the directories for your project.
Navigate into the projects directory:
Once you are in the target directory, initialize a new Go module and install the required dependencies:
go: creating new go.mod: module kafka-golang
Next, install the golang package to interact with Kafka.
Output:
go: added github.com/confluentinc/confluent-kafka-go v1.9.2
Step 2: Setup the Kafka Topic
The next step is to create a new Kafka topic where we publish and consume the messages. In this example, we create a new topic called “users”. You can use the kafka-topics.sh utility to accomplish this.
The command is as shown in the following:
Output:
Created topic users.
Once we created the topic that we wish, we can set up the producer and consumer applications.
Create the Producer
Create a file to store the producer source code in the kafka-golang directory. The command is as shown:
Add the source code as shown in the following:
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
// producer config
config := &kafka.ConfigMap{
"bootstrap.servers": "localhost:9092", // kafka broker addr
}
topic := "coordinates" // target topic name
// Create Kafka producer
producer, err := kafka.NewProducer(config)
if err != nil {
panic(err)
}
// write 10 messages to the "coordinates" topic
for i := 0; i < 10; i++ {
value := fmt.Sprintf("message-%d", i)
err := producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(value),
}, nil)
if err != nil {
fmt.Printf("Failed to produce message %d: %v\n", i, err)
} else {
fmt.Printf("Produced message %d: %s\n", i, value)
}
}
// Close Kafka producer
producer.Flush(15 * 1000)
producer.Close()
}
In the previous code, we start by importing the necessary packages. In this case, we need the fmt and the confluent-kafka-go/kafka as installed in the previous steps.
In the next step, we setup the Kafka producer configuration by creating a ConfigMap object. In this case, we specify the address to the Kafka address. If you have multiple Kafka brokers, you can specify them as a comma-separated list.
We also proceed to set the name of the topic to which we wish to write the messages.
In the preceding section, we create a new Kafka producer using the NewProducer() function from the confluent-kafka-go package. Here, we pass the configuration object as an argument. In case of errors, the code will panic and terminate with an error message.
The last but not the least is the code to write the messages to the topic. In this case, we use a simple “for” loop to write 10 simple messages to the specified topic.
To close off, we use the Flush() method to flush any remaining messages in the producer’s buffer and close the producer.
Produce Messages
We can run the producer application with the following command:
The previous command should start the producer application and write the messages to the “coordinates” topic. An example producer is as shown:
Produced message 1: message-1
Produced message 2: message-2
Produced message 3: message-3
Produced message 4: message-4
Produced message 5: message-5
Produced message 6: message-6
Produced message 7: message-7
Produced message 8: message-8
Create the Consumer
The next step is to build a simple consumer application that subscribes to the “coordinates” topic and read the available messages from the beginning.
The source code is as shown:
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
// Set up Kafka consumer configuration
config := &kafka.ConfigMap{
"bootstrap.servers": "localhost:9092", // broker addr
"group.id": "console-consumer-21564", // consumer group
"auto.offset.reset": "earliest", // earliest offset
}
// Create Kafka consumer
consumer, err := kafka.NewConsumer(config)
if err != nil {
panic(err)
}
// subscribe to target topic
consumer.SubscribeTopics([]string{"coordinates"}, nil)
// Consume messages
for {
msg, err := consumer.ReadMessage(-1)
if err == nil {
fmt.Printf("Received message: %s\n", string(msg.Value))
} else {
fmt.Printf("Error while consuming message: %v (%v)\n", err, msg)
}
}
// Close Kafka consumer
consumer.Close()
}
The previous code behaves similarly to the producer application. Instead, it uses the SubscribeTopics() function to subscribe to a Kafka topic.
We also use the ReadMessage() function to read the messages from the topic until it is interrupted.
We can run the code as follows:
This should return the messages from the topic as follows:
Received message: message-1
Received message: message-2
Received message: message-3
Received message: message-4
Received message: message-5
Received message: message-6
Received message: message-7
Received message: message-8
Received message: message-9
There you have it! A way to create a simple Kafka consumer application in Golang.
Conclusion
You discovered how to create a simple Kafka producer and consumer in Go using the confluent-kafka-go library. You can check the documentation to learn more about this package and how to create more complex applications.