Apache Kafka

Apache Kafka Consumer Example in Golang

Golang, commonly known as Go, is a free, open-source, statically typed, concurrent, garbage-collected programming language developed by Google. It is designed to have a small and straightforward syntax while still providing high performance and scalability through its use of lightweight concurrent processes known as goroutines, and channels for communication between these processes.

Go provides a powerful standard library which is focused on simplicity, making it easier for the developers to write a reliable and efficient code. It also includes support for low-level programming, with the ability to interact directly with the memory and create a system-level software.

Go’s syntax is clean and concise, promoting readability and maintainability. It provides strict typing, making it easier to catch errors at compile time. Its garbage collector frees up memory as needed, reducing the risk of memory leaks. Additionally, Go has excellent support for concurrency and parallelism, making it well-suited to develop some highly concurrent systems.

Introduction to Kafka

Apache Kafka is a free, open-source, highly scalable, distributed, and fault-tolerant publish-subscribe messaging system. It is designed to handle a high volume, high throughput, and real-time data stream, making it suitable for many use cases including log aggregation, real-time analytics, and event-driven architectures.

Kafka is based on a distributed architecture which allows it to handle large amounts of data across multiple servers. It uses a publish-subscribe model where producers send messages to the topics, and consumers subscribe to them to receive them. This allows for decoupled communication between the producers and consumers, providing high scalability and flexibility.

Kafka provides strong durability guarantees, with the messages stored on the disk and replicated across multiple servers for fault tolerance. It also supports batch processing which allows for the efficient handling of large amounts of data.

This tutorial explores how we can build a producer and consumer application using the Go programming language.

NOTE: This tutorial demonstrates how to build a Go app using the latest Go version (v1.20) as of writing this tutorial. We use the latest Apache Kafka version on a local deployment.

Requirements

Although this tutorial is beginner friendly, we do not cover the basics of installing and setting up the Apache Kafka or the Golang toolchain on your machine. You can check our tutorials on the topics to learn more.

Step 1: Create a Project

Start by setting up the directories for your project.

$ mkdir ~/projects/kafka-golang

Navigate into the projects directory:

$ cd ~/projects/kafka-golang

Once you are in the target directory, initialize a new Go module and install the required dependencies:

$ go mod init kafka-golang
go: creating new go.mod: module kafka-golang

Next, install the golang package to interact with Kafka.

$ go get github.com/confluentinc/confluent-kafka-go/kafka

Output:

go: downloading github.com/confluentinc/confluent-kafka-go v1.9.2

go: added github.com/confluentinc/confluent-kafka-go v1.9.2

Step 2: Setup the Kafka Topic

The next step is to create a new Kafka topic where we publish and consume the messages. In this example, we create a new topic called “users”. You can use the kafka-topics.sh utility to accomplish this.

The command is as shown in the following:

$ kafka-topics.sh --bootstrap-server=localhost:9092 --create --topic coordinates --replication-factor 1 --partitions 1

Output:

Created topic users.

Once we created the topic that we wish, we can set up the producer and consumer applications.

Create the Producer

Create a file to store the producer source code in the kafka-golang directory. The command is as shown:

$ touch producer.go

Add the source code as shown in the following:

package main

import (

  "fmt"

  "github.com/confluentinc/confluent-kafka-go/kafka"

)

func main() {

  // producer config

  config := &kafka.ConfigMap{

    "bootstrap.servers": "localhost:9092", // kafka broker addr

  }

  topic := "coordinates" // target topic name

  // Create Kafka producer

  producer, err := kafka.NewProducer(config)

  if err != nil {

    panic(err)

  }

  // write 10 messages to the "coordinates" topic

  for i := 0; i < 10; i++ {

    value := fmt.Sprintf("message-%d", i)

    err := producer.Produce(&kafka.Message{

      TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},

      Value: []byte(value),

    }, nil)

    if err != nil {

        fmt.Printf("Failed to produce message %d: %v\n", i, err)

    } else {

        fmt.Printf("Produced message %d: %s\n", i, value)

    }

  }

  // Close Kafka producer

  producer.Flush(15 * 1000)

  producer.Close()

}

In the previous code, we start by importing the necessary packages. In this case, we need the fmt and the confluent-kafka-go/kafka as installed in the previous steps.

In the next step, we setup the Kafka producer configuration by creating a ConfigMap object. In this case, we specify the address to the Kafka address. If you have multiple Kafka brokers, you can specify them as a comma-separated list.

We also proceed to set the name of the topic to which we wish to write the messages.

In the preceding section, we create a new Kafka producer using the NewProducer() function from the confluent-kafka-go package. Here, we pass the configuration object as an argument. In case of errors, the code will panic and terminate with an error message.

The last but not the least is the code to write the messages to the topic. In this case, we use a simple “for” loop to write 10 simple messages to the specified topic.

To close off, we use the Flush() method to flush any remaining messages in the producer’s buffer and close the producer.

Produce Messages

We can run the producer application with the following command:

$ go run producer.go

The previous command should start the producer application and write the messages to the “coordinates” topic. An example producer is as shown:

Produced message 0: message-0
Produced message 1: message-1
Produced message 2: message-2
Produced message 3: message-3
Produced message 4: message-4
Produced message 5: message-5
Produced message 6: message-6
Produced message 7: message-7
Produced message 8: message-8

Create the Consumer

The next step is to build a simple consumer application that subscribes to the “coordinates” topic and read the available messages from the beginning.

The source code is as shown:

package main

import (

  "fmt"

  "github.com/confluentinc/confluent-kafka-go/kafka"

)

func main() {

  // Set up Kafka consumer configuration

  config := &kafka.ConfigMap{

      "bootstrap.servers": "localhost:9092", // broker addr

      "group.id": "console-consumer-21564", // consumer group

      "auto.offset.reset": "earliest", // earliest offset

  }

  // Create Kafka consumer

  consumer, err := kafka.NewConsumer(config)

  if err != nil {

      panic(err)

  }

  // subscribe to target topic

  consumer.SubscribeTopics([]string{"coordinates"}, nil)

  // Consume messages

  for {

      msg, err := consumer.ReadMessage(-1)

      if err == nil {

            fmt.Printf("Received message: %s\n", string(msg.Value))

      } else {

            fmt.Printf("Error while consuming message: %v (%v)\n", err, msg)

      }

  }

// Close Kafka consumer

consumer.Close()

}

The previous code behaves similarly to the producer application. Instead, it uses the SubscribeTopics() function to subscribe to a Kafka topic.

We also use the ReadMessage() function to read the messages from the topic until it is interrupted.

We can run the code as follows:

$ go run consumer.go

This should return the messages from the topic as follows:

Received message: message-0

Received message: message-1

Received message: message-2

Received message: message-3

Received message: message-4

Received message: message-5

Received message: message-6

Received message: message-7

Received message: message-8

Received message: message-9

There you have it! A way to create a simple Kafka consumer application in Golang.

Conclusion

You discovered how to create a simple Kafka producer and consumer in Go using the confluent-kafka-go library. You can check the documentation to learn more about this package and how to create more complex applications.

About the author

John Otieno

My name is John and am a fellow geek like you. I am passionate about all things computers from Hardware, Operating systems to Programming. My dream is to share my knowledge with the world and help out fellow geeks. Follow my content by subscribing to LinuxHint mailing list