Prerequisite
You have to install the necessary python library to read data from Kafka. Python3 is used in this tutorial to write the script of consumer and producer. If the pip package is not installed before in your Linux operating system then you have to install pip before installing the Kafka library for python. python3-kafka is used in this tutorial to read data from Kafka. Run the following command to install the library.
Reading simple text data from Kafka
Different types of data can be sent from the producer on a particular topic that can be read by the consumer. How a simple text data can be sent and received from Kafka using producer and consumer is shown in this part of this tutorial.
Create a file named producer1.py with the following python script. KafkaProducer module is imported from the Kafka library. The broker list needs to define at the time of producer object initialization to connect with the Kafka server. The default port of Kafka is ‘9092’. bootstrap_servers argument is used to define the hostname with the port. ‘First_Topic‘ is set as a topic name by which text message will be sent from the producer. Next, a simple text message, ‘Hello from Kafka’ is sent using send() method of KafkaProducer to the topic, ‘First_Topic’.
producer1.py:
from kafka import KafkaProducer
# Define server with port
bootstrap_servers = ['localhost:9092']
# Define topic name where the message will publish
topicName = 'First_Topic'
# Initialize producer variable
producer = KafkaProducer(bootstrap_servers = bootstrap_servers)
# Publish text in defined topic
producer.send(topicName, b'Hello from kafka...')
# Print message
print("Message Sent")
Create a file named consumer1.py with the following python script. KafkaConsumer module is imported from the Kafka library to read data from Kafka. sys module is used here to terminate the script. The same hostname and port number of the producer are used in the script of the consumer to read data from Kafka. The topic name of the consumer and the producer must be the same that is ‘First_topic’. Next, the consumer object is initialized with the three arguments. Topic name, group id and server information. for loop is used here to read the text send from Kafka producer.
consumer1.py:
from kafka import KafkaConsumer
# Import sys module
import sys
# Define server with port
bootstrap_servers = ['localhost:9092']
# Define topic name from where the message will recieve
topicName = 'First_Topic'
# Initialize consumer variable
consumer = KafkaConsumer (topicName, group_id ='group1',bootstrap_servers =
bootstrap_servers)
# Read and print message from consumer
for msg in consumer:
print("Topic Name=%s,Message=%s"%(msg.topic,msg.value))
# Terminate the script
sys.exit()
Output:
Run the following command from one terminal to execute the producer script.
The following output will appear after sending the message.
Run the following command from another terminal to execute the consumer script.
The output shows the topic name and the text message sent from the producer.
Reading JSON formatted data from Kafka
JSON formatted data can be sent by the Kafka producer and read by Kafka consumer using the json module of python. How JSON data can be serialized and de-serialized before sending and receiving the data using the python-kafka module is shown in this part of this tutorial.
Create a python script named producer2.py with the following script. Another module named JSON is imported with KafkaProducer module here. value_serializer argument is used with bootstrap_servers argument here to initialize the object of Kafka producer. This argument indicates that JSON data will be encoded using ‘utf-8‘ character set at the time of sending. Next, JSON formatted data is sent to the topic named JSONtopic.
producer2.py:
from kafka import KafkaProducer
# Import JSON module to serialize data
import json
# Initialize producer variable and set parameter for JSON encode
producer = KafkaProducer(bootstrap_servers =
['localhost:9092'],value_serializer=lambda v: json.dumps(v).encode('utf-8'))
# Send data in JSON format
producer.send('JSONtopic', {'name': 'fahmida','email':'[email protected]'})
# Print message
print("Message Sent to JSONtopic")
Create a python script named consumer2.py with the following script. KafkaConsumer, sys and JSON modules are imported in this script. KafkaConsumer module is used to read JSON formatted data from the Kafka. JSON module is used to decode the encoded JSON data send from the Kafka producer. Sys module is used to terminate the script. value_deserializer argument is used with bootstrap_servers to define how JSON data will be decoded. Next, for loop is used to print all the consumer records and JSON data retrieved from Kafka.
consumer2.py:
from kafka import KafkaConsumer
# Import sys module
import sys
# Import json module to serialize data
import json
# Initialize consumer variable and set property for JSON decode
consumer = KafkaConsumer ('JSONtopic',bootstrap_servers = ['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
# Read data from kafka
for message in consumer:
print("Consumer records:\n")
print(message)
print("\nReading from JSON data\n")
print("Name:",message[6]['name'])
print("Email:",message[6]['email'])
# Terminate the script
sys.exit()
Output:
Run the following command from one terminal to execute the producer script.
The script will print the following message after sending the JSON data.
Run the following command from another terminal to execute the consumer script.
The following output will appear after running the script.
Conclusion:
The data can be sent and received in different formats from Kafka using python. The data can also be stored into the database and retrieved from the database using Kafka and python. I home, this tutorial will help the python user to start working with Kafka.