Apache Kafka

Apache Kafka Producer Example in Golang

One of the best ways to understand the working of Apache Kafka is to build the custom consumer and producer applications using another programming language.

Golang has quickly become one of the most influential and admirable languages to build the large-scale applications. With its simplicity, efficiency, and speed, you can quickly configure a Kafka consumer and producer in no time.

In this tutorial, we will learn how to build a simple Apache Kafka producer client using Golang. We will configure a basic consumer app to read the messages to the specified topic.

Requirements:

This tutorial requires the following:

  1. Golang development tools
  2. A running Apache Kafka cluster

Let us dive in!

Step 1: Create a Project

Start by setting up the directories for your project.

$ mkdir ~/projects/kafka-golang

Navigate into the projects directory:

$ cd ~/projects/kafka-golang

Once you are in the target directory, initialize a new Go module and install the required dependencies:

$ go mod init kafka-golang
go: creating new go.mod: module kafka-golang

Next, install the golang package to interact with Kafka.

$ go get github.com/confluentinc/confluent-kafka-go/kafka

Output:

go: downloading github.com/confluentinc/confluent-kafka-go v1.9.2

go: added github.com/confluentinc/confluent-kafka-go v1.9.2

Step 2: Setup the Kafka Topic

The next step is to create a new Kafka topic where we publish and consume the messages. In this example, we create a new topic called “users”. You can use the “kafka-topics.sh” utility to accomplish this.

The command is as shown:

$ kafka-topics.sh --bootstrap-server=localhost:9092 --create --topic coordinates --replication-factor 1 --partitions 1

Output:

Created topic users.

Once we create the topic that we wish, we can set up the producer and consumer applications.

Create the Producer

Create a file to store the producer source code in the kafka-golang directory. The command is as follows:

$ touch producer.go

Add the source code as shown in the following:

package main
import (
    "fmt"

    "github.com/confluentinc/confluent-kafka-go/kafka"
)

funcmain() {
    // producer config
    config := &kafka.ConfigMap{
        "bootstrap.servers": "localhost:9092", // kafka broker addr
    }

    topic := "coordinates"// target topic name

    // Create Kafka producer
    producer, err := kafka.NewProducer(config)
    if err != nil {
        panic(err)
    }

    // write 10 messages to the "coordinates" topic
    fori := 0; i<10; i++ {
        value := fmt.Sprintf("message-%d", i)
        err := producer.Produce(&kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
            Value:          []byte(value),
        }, nil)
        if err != nil {
            fmt.Printf("Failed to produce message %d: %v\n", i, err)
        } else {
            fmt.Printf("Produced message %d: %s\n", i, value)
        }
    }

    // Close Kafka producer
    producer.Flush(15 * 1000)
    producer.Close()
}

In the previous code, we start by importing the necessary packages. In this case, we need the fmt and the confluent-kafka-go/kafka as installed in the previous steps.

In the next step, we setup the Kafka producer configuration by creating a ConfigMap object. In this case, we specify the address to the Kafka address. If you have multiple Kafka brokers, you can specify them as a comma-separated list.

We also proceed to set the name of the topic to which we wish to write the messages.

In the preceding section, we create a new Kafka producer using the NewProducer() function from the confluent-kafka-go package. Here, we pass the configuration object as an argument. In case of errors, the code will panic and terminate with an error message.

Last but not the least is the code to write the messages to the topic. In this case, we use a simple “for” loop to write 10 simple messages to the specified topic.

To close off, we use the Flush() method to flush any remaining messages in the producer’s buffer and close the producer.

Produce Messages

We can run the producer application with the following command:

$ go run producer.go

The previous command should start the producer application and write the messages to the coordinates topic. An example producer is as follows:

Produced message 0: message-0
Produced message 1: message-1
Produced message 2: message-2
Produced message 3: message-3
Produced message 4: message-4
Produced message 5: message-5
Produced message 6: message-6
Produced message 7: message-7
Produced message 8: message-8

Read the Messages

For the sake of simplicity, we use a Kafka Console Consumer Application to read the messages. The command is as follows:

./bin/kafka-console-consumer.sh --topic coordinates --bootstrap-server localhost:9092 --from-beginning

The previous command should launch a console consumer app and read the messages as shown in the output:

message-0
message-1
message-2
message-3
message-4
message-5
message-6
message-7
message-8
message-9

After the read operation, we can press CTRL + C to terminate it. If you wish to learn how to create a Kafka consumer application in Go, check our tutorial to learn more.

Conclusion

We learned how we can use the confluent-kafka-go/kafka package to create a simple Kafka producer application using Golang.

About the author

John Otieno

My name is John and am a fellow geek like you. I am passionate about all things computers from Hardware, Operating systems to Programming. My dream is to share my knowledge with the world and help out fellow geeks. Follow my content by subscribing to LinuxHint mailing list