Apache Kafka

Apache Kafka Consumer Example in Python

Python is a high-level, dynamically-typed, interpreted programming language which is created in the late 1980s by Guido van Rossum to provide a simple, readable, and easy-to-learn language. Python emphasizes the code readability and maintainability with a simple syntax.

Python also has a set of libraries that allows us to extend its functionality for specific needs. Due to its simplistic and modular nature, Python is an excellent choice for simple script and automation workflows, even in large environments.

This tutorial teaches us how to create a Kafka Consumer client in Python using the python-kafka library.

Requirements

Before proceeding with this tutorial, ensure that you installed the latest version of the Python3 interpreter on your machine. We also require you to have the latest version of the Apache Kafka cluster setup on your local machine.

Project Setup

For this tutorial, we build a Kafka consumer and producer application which allows you to read and write the data to and from the Kafka cluster.

Start by creating the directory to store your Kafka project:

$ mkdir kafkpy

Navigate into the directory and initialize the consumer and producer applications:

$ cd kafkapy && touch consumer.py producer.py

The next step is to create the data to store in the Kafka topic. An example JSON data is as provided in the following:

{"first_name":"Abigail","email":"[email protected]", "ip_address":"159.164.228.226"}
{"first_name":"Catlaina","email":"[email protected]", "ip_address":"90.112.53.189"}
{"first_name":"Kathryne","email":"[email protected]", "ip_address":"156.225.255.84"}
{"first_name":"Raynell","email":"[email protected]", "ip_address":"144.202.164.52"}
{"first_name":"Erma","email":"[email protected]", "ip_address":"4.237.156.25"}
{"first_name":"Larine","email":"[email protected]", "ip_address":"145.104.178.124"}
{"first_name":"Vanni","email":"[email protected]", "ip_address":"221.84.169.166"}
{"first_name":"Fonzie","email":"[email protected]", "ip_address":"21.197.120.160"}
{"first_name":"Roddie","email":"[email protected]", "ip_address":"172.50.221.72"}
{"first_name":"Clemmie","email":"[email protected]", "ip_address":"212.174.81.188"}
{"first_name":"Jo","email":"[email protected]", "ip_address":"163.247.214.43"}
{"first_name":"Barry","email":"[email protected]", "ip_address":"166.111.152.163"}
{"first_name":"Junina","email":"[email protected]", "ip_address":"138.125.82.77"}
{"first_name":"Randi","email":"[email protected]", "ip_address":"62.178.44.225"}
{"first_name":"Avie","email":"[email protected]", "ip_address":"5.226.100.57"}
{"first_name":"Cathy","email":"[email protected]", "ip_address":"77.194.185.187"}
{"first_name":"Davy","email":"[email protected]", "ip_address":"240.213.147.178"}
{"first_name":"Lynnette","email":"[email protected]", "ip_address":"227.155.101.183"}
{"first_name":"Tremain","email":"[email protected]", "ip_address":"206.242.212.211"}
{"first_name":"Catherine","email":"[email protected]", "ip_address":"242.109.202.238"}

Next, edit the producer file and add the code as shown in the following:

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers="localhost:9092")

with open("user_data.json", "r") as f:
    for line in f:
        producer.send("users", line.encode())

producer.flush()

The previous code assumes that you have a running Kafka broker on the local machine at port 9092. We also assume that you have a Kafka topic called “users” which is created on the cluster. If not, you can use the following command:

kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic users

Once complete, run the following producer script:

$ python3 producer.py

The script should read each line in the JSON file and write it to the “users” topic in the Kafka cluster.

Consumer Application

Once the messages are written to the Kafka topic, we can consume them by subscribing to the users topic, as shown in the following code:

$ vim consumer.py

The source code is as follows:

from kafka import KafkaConsumer
# create Kafka consumer instance
consumer = KafkaConsumer("users", bootstrap_servers='localhost:9092', auto_offset_reset='earliest', enable_auto_commit=False)
# poll for new messages in the topic and print them to the console
for message in consumer:
    print(message.value.decode())

Run the consumer app. This should read the messages and print them to the console.

Conclusion

You now learned how to create a Kafka consumer and producer using the Python programming language.

About the author

John Otieno

My name is John and am a fellow geek like you. I am passionate about all things computers from Hardware, Operating systems to Programming. My dream is to share my knowledge with the world and help out fellow geeks. Follow my content by subscribing to LinuxHint mailing list