Python

How to Implement Real-Time Data Streaming in Python

Mastering the implementation of real-time data streaming in Python acts as an essential skill in today’s data-involved world. This guide explores the core steps and essential tools for utilizing the real-time data streaming with authenticity in Python. From selecting a fitting framework like Apache Kafka or Apache Pulsar to writing a Python code for effortless data consumption, processing, and effective visualization, we will acquire the needed skills to construct the agile and efficient real-time data channels.

Example 1: Implementation of Real-Time Data Streaming in Python

Implementing a real-time data streaming in Python is crucial in today’s data-driven age and world. In this detailed example, we will walk through the process of building a real-time data streaming system using Apache Kafka and Python in Google Colab.

To initialize the example before we start coding, building a specific environment in Google Colab is essential. The first thing we need to do is install the necessary libraries. We use the “kafka-python” library for Kafka integration.

!pip install kafka-python

 
This command installs the “kafka-python” library which provides the Python functions and the bindings for Apache Kafka. Next, we import the required libraries for our project. Importing the required libraries including “KafkaProducer” and “KafkaConsumer” are the classes from the “kafka-python” library that allow us to interact with Kafka brokers. JSON is the Python library to work with the JSON data which we use to serialize and deserialize the messages.

from kafka import KafkaProducer, KafkaConsumer
import json

 
Creation of a Kafka Producer

This is important because a Kafka producer sends the data to a Kafka topic. In our example, we create a producer to send a simulated real-time data to a topic called “real-time-topic.”

We create a “KafkaProducer” instance which specifies the Kafka broker’s address as “localhost:9092”. Then, we use the “value_serializer”, a function that serializes the data before sending it to Kafka. In our case, a lambda function encodes the data as UTF-8-encoded JSON. Now, let’s simulate some real-time data and send it to the Kafka topic.

producer = KafkaProducer(bootstrap_servers='localhost:9092',
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))
# Simulated real-time data
data = {'sensor_id': 1, 'temperature': 25.5, 'humidity': 60.2}
# Sending data to the topic
producer.send('real-time-topic', data)

 
In these lines, we define a “data” dictionary that represents a simulated sensor data. We then use the “send” method to publish this data to the “real-time-topic”.

Then, we want to create a Kafka consumer, and a Kafka consumer reads the data from a Kafka topic. We create a consumer to consume and process the messages in the “real-time-topic.” We create a “KafkaConsumer” instance, specifying the topic that we want to consume, e.g., (real-time-topic) and the Kafka broker’s address. Then, the “value_deserializer” is a function that deserializes the data that is received from Kafka. In our case, a lambda function decodes the data as UTF-8-encoded JSON.

consumer = KafkaConsumer('real-time-topic',
                         bootstrap_servers='localhost:9092',
                         value_deserializer=lambda x: json.loads(x.decode('utf-8')))

 
We use an iterative loop to continuously consume and process the messages from the topic.

# Reading and processing real-time data
for message in consumer:
    data = message.value
    print(f"Received Data: {data}")

 
We retrieve each message’s value and our simulated sensor data inside the loop and print it to the console. Running the Kafka producer and consumer involves running this code in Google Colab and executing the code cells individually. The producer sends the simulated data to the Kafka topic, and the consumer reads and prints the received data.


Analysis of the Output as the Code Runs

We will observe a real-time data that is being produced and consumed. The data format may vary depending on our simulation or actual data source. In this detailed example, we cover the entire process of setting up a real-time data streaming system using Apache Kafka and Python in Google Colab. We will explain each line of code and its significance in building this system. Real-time data streaming is a powerful capability, and this example serves as a foundation for more complex real-world applications.

Example 2: Implementing a Real-Time Data Streaming in Python Using a Stock Market Data

Let’s do another unique example of implementing a real-time data streaming in Python using a different scenario; this time, we will focus on stock market data. We create a real-time data streaming system that captures the stock price changes and processes them using Apache Kafka and Python in Google Colab. As demonstrated in the previous example, we start by configuring our environment in Google Colab. First, we install the required libraries:

!pip install kafka-python yfinance

 
Here, we add the “yfinance” library which allows us to get a real-time stock market data. Next, we import the necessary libraries. We continue to use the “KafkaProducer” and “KafkaConsumer” classes from the “kafka-python” library for Kafka interaction. We import JSON to work with the JSON data. We also use “yfinance” to get a real-time stock market data. We also import the “time” library to add a time delay to simulate the real-time updates.

from kafka import KafkaProducer, KafkaConsumer
import json
import yfinance as yf
import time

 
Now, we create a Kafka producer for stock data. Our Kafka producer gets a real-time stock data and sends it to a Kafka topic named “stock-price”.

producer = KafkaProducer(bootstrap_servers='localhost:9092',
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))

while True:
    stock = yf.Ticker("AAPL")  # Example: Apple Inc. stock
    stock_data = stock.history(period="1d")
    last_price = stock_data["Close"].iloc[-1]
    data = {"symbol": "AAPL", "price": last_price}    
    producer.send('stock-price', data)
    time.sleep(10)  # Simulate real-time updates every 10 seconds

 
We create a “KafkaProducer” instance with the Kafka broker’s address in this code. Inside the loop, we use “yfinance” to get the latest stock price for Apple Inc. (“AAPL”). Then, we extract the last closing price and send it to the “stock-price” topic. Eventually, we introduce a time delay to simulate the real-time updates every 10 seconds.

Let’s create a Kafka consumer to read and process the stock price data from the “stock-price” topic.

consumer = KafkaConsumer('stock-price',
                         bootstrap_servers='localhost:9092',
                         value_deserializer=lambda x: json.loads(x.decode('utf-8')))

for message in consumer:
    stock_data = message.value
    print(f"Received Stock Data: {stock_data['symbol']} - Price: {stock_data['price']}")

 
This code is similar to the previous example’s consumer setup. It continuously reads and processes the messages from the “stock-price” topic and prints the stock symbol and price to the console. We execute the code cells sequentially, e.g., one by one in Google Colab to run the producer and consumer. The producer gets and sends the real-time stock price updates while the consumer reads and displays this data.

!pip install kafka-python yfinance
from kafka import KafkaProducer, KafkaConsumer
import json
import yfinance as yf
import time
producer = KafkaProducer(bootstrap_servers='localhost:9092',
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))

while True:
    stock = yf.Ticker("AAPL")  # Apple Inc. stock
    stock_data = stock.history(period="1d")
    last_price = stock_data["Close"].iloc[-1]
   
    data = {"symbol": "AAPL", "price": last_price}
   
    producer.send('stock-price', data)
   
    time.sleep(10)  # Simulate real-time updates every 10 seconds
consumer = KafkaConsumer('stock-price',
                         bootstrap_servers='localhost:9092',
                         value_deserializer=lambda x: json.loads(x.decode('utf-8')))

for message in consumer:
    stock_data = message.value
    print(f"Received Stock Data: {stock_data['symbol']} - Price: {stock_data['price']}")

 
In the analysis of the output after the code runs, we will observe the real-time stock price updates for Apple Inc. being produced and consumed.

Conclusion

In this unique example, we demonstrated the implementation of real-time data streaming in Python using Apache Kafka and the “yfinance” library to capture and process the stock market data. We thoroughly explained each line of the code. Real-time data streaming can be applied to various fields to build the real-world applications in finance, IoT, and more.

About the author

Kalsoom Bibi

Hello, I am a freelance writer and usually write for Linux and other technology related content