Python also has a set of libraries that allows us to extend its functionality for specific needs. Due to its simplistic and modular nature, Python is an excellent choice for simple script and automation workflows, even in large environments.
This tutorial teaches us how to create a Kafka Consumer client in Python using the python-kafka library.
Requirements
Before proceeding with this tutorial, ensure that you installed the latest version of the Python3 interpreter on your machine. We also require you to have the latest version of the Apache Kafka cluster setup on your local machine.
Project Setup
For this tutorial, we build a Kafka consumer and producer application which allows you to read and write the data to and from the Kafka cluster.
Start by creating the directory to store your Kafka project:
Navigate into the directory and initialize the consumer and producer applications:
The next step is to create the data to store in the Kafka topic. An example JSON data is as provided in the following:
{"first_name":"Catlaina","email":"[email protected]", "ip_address":"90.112.53.189"}
{"first_name":"Kathryne","email":"[email protected]", "ip_address":"156.225.255.84"}
{"first_name":"Raynell","email":"[email protected]", "ip_address":"144.202.164.52"}
{"first_name":"Erma","email":"[email protected]", "ip_address":"4.237.156.25"}
{"first_name":"Larine","email":"[email protected]", "ip_address":"145.104.178.124"}
{"first_name":"Vanni","email":"[email protected]", "ip_address":"221.84.169.166"}
{"first_name":"Fonzie","email":"[email protected]", "ip_address":"21.197.120.160"}
{"first_name":"Roddie","email":"[email protected]", "ip_address":"172.50.221.72"}
{"first_name":"Clemmie","email":"[email protected]", "ip_address":"212.174.81.188"}
{"first_name":"Jo","email":"[email protected]", "ip_address":"163.247.214.43"}
{"first_name":"Barry","email":"[email protected]", "ip_address":"166.111.152.163"}
{"first_name":"Junina","email":"[email protected]", "ip_address":"138.125.82.77"}
{"first_name":"Randi","email":"[email protected]", "ip_address":"62.178.44.225"}
{"first_name":"Avie","email":"[email protected]", "ip_address":"5.226.100.57"}
{"first_name":"Cathy","email":"[email protected]", "ip_address":"77.194.185.187"}
{"first_name":"Davy","email":"[email protected]", "ip_address":"240.213.147.178"}
{"first_name":"Lynnette","email":"[email protected]", "ip_address":"227.155.101.183"}
{"first_name":"Tremain","email":"[email protected]", "ip_address":"206.242.212.211"}
{"first_name":"Catherine","email":"[email protected]", "ip_address":"242.109.202.238"}
Next, edit the producer file and add the code as shown in the following:
producer = KafkaProducer(bootstrap_servers="localhost:9092")
with open("user_data.json", "r") as f:
for line in f:
producer.send("users", line.encode())
producer.flush()
The previous code assumes that you have a running Kafka broker on the local machine at port 9092. We also assume that you have a Kafka topic called “users” which is created on the cluster. If not, you can use the following command:
Once complete, run the following producer script:
The script should read each line in the JSON file and write it to the “users” topic in the Kafka cluster.
Consumer Application
Once the messages are written to the Kafka topic, we can consume them by subscribing to the users topic, as shown in the following code:
The source code is as follows:
# create Kafka consumer instance
consumer = KafkaConsumer("users", bootstrap_servers='localhost:9092', auto_offset_reset='earliest', enable_auto_commit=False)
# poll for new messages in the topic and print them to the console
for message in consumer:
print(message.value.decode())
Run the consumer app. This should read the messages and print them to the console.
Conclusion
You now learned how to create a Kafka consumer and producer using the Python programming language.