Producing and Consuming Messages in Python

Producing and consuming messages in Python is commonly done using Apache Kafka. Python applications act as:

  • Producers β†’ Send messages to Kafka
  • Consumers β†’ Read messages from Kafka

This is a core concept in real-time data pipelines.

Step 1: Install Kafka Python Library

The most commonly used library is:

pip install kafka-python

Kafka Producer in Python

A producer sends messages to a Kafka topic.

Example: Sending Messages

from kafka import KafkaProducer
import jsonproducer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)data = {
"user_id": 101,
"action": "purchase",
"amount": 250
}producer.send('sales_topic', value=data)
producer.flush()print("Message sent successfully")

Explanation:

  • bootstrap_servers β†’ Kafka broker address
  • sales_topic β†’ Topic name
  • value_serializer β†’ Converts data to bytes

Kafka Consumer in Python

A consumer reads messages from a Kafka topic.

Example: Receiving Messages

from kafka import KafkaConsumer
import jsonconsumer = KafkaConsumer(
'sales_topic',
bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='sales-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)for message in consumer:
print("Received:", message.value)

Explanation:

  • auto_offset_reset='earliest' β†’ Start reading from beginning
  • group_id β†’ Consumer group name
  • value_deserializer β†’ Convert bytes back to JSON

How It Works

  1. Producer sends event to Kafka topic
  2. Kafka stores event in partition
  3. Consumer reads event using offset
  4. Offset moves forward after processing

Real-Time Example Use Case

E-commerce System:

  • Producer β†’ Sends order events
  • Consumer β†’ Updates dashboard
  • Another consumer β†’ Detects fraud
  • Another consumer β†’ Stores data in warehouse

One message, multiple consumers.

Message Format Best Practice

Always send:

  • JSON format
  • Include timestamp
  • Include unique ID
  • Avoid sending large payloads

Example:

{
"order_id": "ORD1001",
"timestamp": "2026-03-03T10:30:00",
"amount": 500
}

Error Handling Example

Producer with basic error handling:

try:
producer.send('sales_topic', value=data)
producer.flush()
except Exception as e:
print("Error:", e)

Scaling with Consumer Groups

If multiple consumers share the same group_id:

  • Kafka distributes partitions among them
  • Enables parallel processing
  • Improves performance

Interview Answer (Short Version)

Producing and consuming messages in Python involves using the kafka-python library to send messages to Kafka topics as a producer and read them as a consumer. It enables real-time data processing and event-driven architectures.

Final Summary

Producing and Consuming in Python allows you to:

  • Build real-time pipelines
  • Process live events
  • Scale using consumer groups
  • Integrate with data warehouses and dashboards

It is a fundamental skill in modern streaming and data engineering systems.

Home Β» PYTHON FOR DATA ENGINEERING (PYDE) > Real-Time Data Streaming > Producing and Consuming Messages in Python