Getting Started with Confluent Kafka in Python

Shikha Pandey
5 min readSep 9, 2023

--

Kafka is a distributed streaming platform that allows you to build real-time data pipelines and streaming applications. Confluent Kafka is a popular distribution of Kafka that provides additional tools and features for managing Kafka clusters effectively. In this article, we’ll explore how to use Confluent Kafka with Python to create a real-time stock price project. NOTE:- This article introduces the basics of Confluent Kafka for real-time stock price tracking. Kafka has extensive applications in finance, e-commerce, healthcare, and more, powering real-time data processing like fraud detection and recommendations. This is just the beginning of Kafka’s real-world potential.

Prerequisites

Before we dive into the world of Kafka, make sure you have the following prerequisites:

  • Python: You need Python installed on your system. You can download it from the official Python website (https://www.python.org/downloads/).
  • Confluent Kafka Cluster: You should have access to a Confluent Kafka cluster. If you don’t have one, you can set up a Confluent Cloud cluster (https://www.confluent.io/confluent-cloud/) for testing purposes.
  • Confluent Kafka Python Library: Install the Confluent Kafka Python library using pip:
pip install confluent-kafka
  • Yahoo Finance Python Library: We’ll use the Yahoo Finance library to fetch stock prices. Install it using pip:
pip install yfinance

Project Overview

In this mini project, we’ll create a real-time stock price tracking application using Confluent Kafka and Yahoo Finance data. The application consists of three main components:

  1. Producer: This component fetches real-time stock prices from Yahoo Finance and publishes them to a Kafka topic.
  2. Consumer: The consumer subscribes to the Kafka topic and receives real-time stock price updates. It can be used to process or display this data in real-time.
  3. Web Application: We’ll create a simple web application using Flask to display historical and real-time stock price charts.

Producer

Let’s start with the producer component.

The Producer is like a messenger in our project. It’s responsible for going out and fetching the real-time stock prices for us. Imagine it as a reporter who’s constantly checking stock prices and telling us what they are.

We’ll use the yfinance library to fetch stock prices and the confluent-kafka library to publish them to a Kafka topic. Here's a simplified version of the producer code:

import yfinance as yf
from confluent_kafka import Producer

# Initialize the Kafka producer with SASL_SSL authentication
producer = Producer({
'bootstrap.servers': '<KAFKA_BROKER>',
'sasl.mechanisms': 'PLAIN',
'security.protocol': 'SASL_SSL',
'sasl.username': '<YOUR_API_KEY>',
'sasl.password': '<YOUR_API_SECRET>',
})

def fetch_and_produce_stock_price(symbol):
try:
stock = yf.Ticker(symbol)
price = stock.history(period='1m')['Close'].iloc[-1]
message = f'{symbol}:{price}' # Combine symbol and price
producer.produce('<KAFKA_TOPIC>', value=message)
producer.flush() # Ensure the message is sent immediately
print(f'Sent {symbol} price: {price}')
except Exception as e:
print(f'Error sending data: {e}')

Make sure to replace <KAFKA_BROKER>, <YOUR_API_KEY>, <YOUR_API_SECRET>, and <KAFKA_TOPIC> with your Kafka cluster details.

Why do we need it?

  • We use the producer because we want to keep track of real-time stock prices.
  • It fetches the stock prices from a source (Yahoo Finance in this case) and then sends these prices to a central place called Kafka.
  • Kafka is like a notice board where information can be posted, and anyone interested can read it.

Consumer

Next, we have the consumer component. The consumer subscribes to the Kafka topic and receives real-time stock price updates. Here’s the consumer code:

from confluent_kafka import Consumer, KafkaError

# Initialize the Kafka consumer with SASL_SSL authentication
consumer = Consumer({
'bootstrap.servers': '<KAFKA_BROKER>',
'sasl.mechanisms': 'PLAIN',
'security.protocol': 'SASL_SSL',
'sasl.username': '<YOUR_API_KEY>',
'sasl.password': '<YOUR_API_SECRET>',
'group.id': 'stock_price_group',
'auto.offset.reset': 'latest', # Start from the latest message
})

# Subscribe to the Kafka topic
consumer.subscribe(['<KAFKA_TOPIC>'])

try:
while True:
msg = consumer.poll(1.0)

if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(f'Error while consuming: {msg.error()}')
else:
# Parse the received message
value = msg.value().decode('utf-8')
symbol, price = value.split(':')
print(f'Received {symbol} price: {price}')

except KeyboardInterrupt:
pass
finally:
# Close the consumer gracefully
consumer.close()

Again, replace <KAFKA_BROKER>, <YOUR_API_KEY>, <YOUR_API_SECRET>, and <KAFKA_TOPIC> with your Kafka cluster details.

Why do we need it?

  • We use the consumer because we want to know the stock prices in real-time.
  • It looks at the notice board (Kafka) and reads the prices that the producer has posted there.
  • This way, we can stay updated with the latest stock prices without constantly checking ourselves.

Exploring Kafka’s Versatility

Before we conclude, let’s delve into some common use cases for Kafka that showcase its incredible versatility:

  • ETL (Extract, Transform, Load): Kafka is widely used in ETL processes to ingest data from various sources, transform it, and load it into data lakes or data warehouses.
  • Machine Learning: Kafka can be used to feed real-time data into machine learning models for training or making predictions.
  • Log Aggregation: Kafka is a popular choice for aggregating logs and events from multiple services in real-time, making it easier to monitor and troubleshoot applications.
  • IoT (Internet of Things): Kafka is suitable for handling large volumes of data generated by IoT devices in real-time.

While Kafka is a powerful solution, it’s worth noting that there are alternative technologies available, such as RabbitMQ, Apache Pulsar, and AWS Kinesis, each with its own strengths and use cases. Choosing the right tool depends on your specific requirements and objectives.

Kafka’s extensive utility extends well beyond these use cases, making it a valuable tool in various industries and scenarios. Whether you’re ensuring fraud detection in finance, enhancing user recommendations in e-commerce, or monitoring vital signs in healthcare, Kafka is a trusted ally for real-time data streaming.

Conclusion

In this article, we explored how to use Confluent Kafka with Python to create a real-time stock price tracking project. We created a producer to fetch stock prices and publish them to a Kafka topic, a consumer to receive and process these updates, and a simple web application to display stock price charts.

Kafka’s real-time data streaming capabilities make it a powerful tool for building applications that require real-time data processing and analysis. You can further enhance this project by adding more features, such as storing data in a database or integrating with additional data sources.

Now that you have a basic understanding of using Confluent Kafka with Python, you can explore more advanced use cases and build robust real-time data applications. If you want to dive deeper into the project and access the full source code, feel free to check out the GitHub repository here.

Happy coding and may your real-time data applications thrive with Confluent Kafka!

--

--

Shikha Pandey
Shikha Pandey

Written by Shikha Pandey

Software Engineer - Tech Enthusiast - Startup Enthusiast. Reach me out at https://shikhapandey.me/:)

No responses yet