What is the Producer-Consumer Problem?
The producer-consumer problem is a fundamental concept in concurrent programming. It describes a problem involving two types of processes:
- producers, which generate data,
- and consumers, which utilize that data.
The core challenge lies in coordinating access to a shared resource—commonly a data buffer—so that producers do not exceed its capacity and consumers do not attempt to retrieve data from an empty queue. This problem underpins many modern software systems where asynchronous communication and shared resource management are essential.
What is a Message Queuing System?
A message queuing system (also referred to as a message broker) serves as an intermediary layer for communication between components. It allows producers to push messages to the queue and consumers to pull messages from it, independently. This decoupling ensures that both components can operate asynchronously and at different speeds.
A message queuing system provides a structured and reliable mechanism for managing communication flow, handling backpressure, and ensuring that messages are eventually processed in the intended order. It is critical in building scalable, maintainable, and fault-tolerant systems.
Design a Message Queuing System
System design interviews often ask candidates to build simplified versions of real-world systems within constrained timeframes. One such popular machine coding challenge is to design and implement a basic Message Queuing System from scratch, while maintaining correctness, modularity, and thread safety.
Problem Statement
Design an in-memory message queuing system that supports the following:
- Multiple producers can publish messages.
- Multiple consumers can register themselves and process messages.
- Each message should be processed only once by each valid consumer.
- Consumers can define filtering rules to selectively process certain messages.
- Consumers may have dependencies on other consumers (i.e., they should process a message only after dependent consumers have done so).
- Support basic retry logic for failed processing attempts.
Constraints:
- The system should be thread-safe.
- The queue must have a bounded size, and producers should block if the queue is full.
- Processing should happen asynchronously.
- Logs should be written to disk to track which consumer processed which message and when.
Optional Requirements:
- Implement a dashboard to display live metrics (e.g., messages processed per consumer).
Several popular tools utilise message queues for building real-world data pipelines and applications:
RabbitMQ: An open-source message broker that uses AMQP (Advanced Message Queuing Protocol), a widely-adopted open standard for message-oriented middleware.
Apache Kafka: Optimized for high-throughput distributed messaging, widely used in event-driven architectures.
Amazon SQS: A fully managed message queuing service offered by AWS.
Google Pub/Sub: A scalable event ingestion and delivery system, well-suited for real-time analytics.
These systems are commonly used for implementing microservices, processing data, handling event streams and distributed workflows.
Project Overview: Structure and Execution
In this project, we’ll develop a lightweight in-memory message queuing system using Python. The project will implement the producer-consumer model with additional support for message filtering and consumer dependencies.
Project Structure
message-queue (root)/
├── app.py # Entry point
├── dashboard_api.py # Flask API & dashboard
├── input.txt # Source for messages
├── logs/ # Processed and error logs
│ ├── error_log.txt
│ └── processed_log.csv
├── message_queue/ # Core system logic
│ ├── core.py # Queue, Consumer, dependencies
│ ├── utils.py # Logging, hashing
│ ├── watcher.py # File watcher
│ └── __init__.py
├── templates/
│ └── dashboard.html # Live Bootstrap dashboard
├── requirements.txt # Install dependencies
├── pyproject.toml # Optional modern Python packaging and dependency
│ management using pip or build tools.
├── .gitignore
└── README.md
message_queue/
– Core implementation logic for message handling
– Consumer class with individual processing behaviours for consumersmessage_queue/core.py
:Consumer
– Core class that manages the lifecycle of message processing. Uses a thread-safe queue and handles message publishing, consumer registration, retry logic, dependency resolution, and asynchronous dispatchmessage_queue/
core.py:MessageQueueapp.py
– The command-line interface and system entry pointinput.txt
– A configurable file for simulating message production
Execution Steps
- Clone the repository.
- Ensure Python 3.8 or higher is installed.
- Navigate to the project directory.
- Edit
input.txt
with test messages, one per line, which will act as input for the message queue. - Execute:
python app.py
to initiate the queue and begin processing.
Key Modules and Functional Overview
core.py
class Consumer:
def __init__(self, name, callback, dependencies=None, filter_fn=None):
self.name = name
self.callback = callback # function() that processes the message
self.dependencies = dependencies or []
self.filter_fn = filter_fn or (lambda m: True) # filter for which messages should be consumed
class MessageQueue:
def __init__(self, max_size=10):
# Queue is thread-safe, so no external Lock is needed
self.queue = Queue(maxsize=max_size)
self.consumers = {}
self.processed_messages = defaultdict(set) # consumer_name -> set(message_ids)
self.retry_counts = defaultdict(lambda: defaultdict(int)) # consumer -> message_id -> retries
self.pending = deque() # Holds (consumer, message_id, message) when deps not satisfied
self.MAX_RETRIES = 3
ensure_log_dirs()
threading.Thread(target=self._dispatch_loop, daemon=True).start()
threading.Thread(target=self._pending_loop, daemon=True).start()
- Defines the
Consumer
class, which encapsulates a name, a processing callback, dependencies, and an optional message filter. - Implements the
MessageQueue
class, which manages message queuing, consumer registration, retry logic, and dependency resolution. It uses background threads for dispatching messages (_dispatch_loop
) and retrying pending ones (_pending_loop
).
def _process_message_chain(self, message_id, message):
processed_consumers = set()
remaining_consumers = set(self.consumers.keys())
while remaining_consumers:
progress = False
for name in list(remaining_consumers):
consumer = self.consumers[name]
# If the consumer filters out the message or has already processed it,
# mark it as processed to unblock dependents
if not consumer.filter_fn(message) or message_id in self.processed_messages[name]:
processed_consumers.add(name) # mark consumer as processed
remaining_consumers.remove(name)
progress = True
continue
# Process the consumer only if all its dependencies have completed
if all(dep in processed_consumers for dep in consumer.dependencies):
self._process_message(consumer, message_id, message)
processed_consumers.add(name)
remaining_consumers.remove(name)
progress = True
if not progress:
raise RuntimeError("Circular dependency or unresolved consumer chain") # error message for a deadlock
def _process_message(self, consumer, message_id, message):
try:
consumer.callback(message, message_id)
self.processed_messages[consumer.name].add(message_id)
except Exception as e:
self.retry_counts[consumer.name][message_id] += 1
count = self.retry_counts[consumer.name][message_id]
err_msg = f"[x] Error in consumer {consumer.name}: {e} (Retry {count}/{self.MAX_RETRIES})"
print(err_msg)
timestamped_log(err_msg, "logs/error_log.txt")
if count < self.MAX_RETRIES:
retry_msg = f"[~] Retrying {consumer.name} for message {message_id} (attempt {count + 1})"
print(retry_msg)
timestamped_log(retry_msg, "logs/error_log.txt")
time.sleep(1)
self._process_message(consumer, message_id, message)
- The
_process_message_chain()
method ensures that each message is processed by all applicable consumers in the correct dependency order. If a consumer filters out a message or has already processed it, it is marked as completed. If dependencies for a consumer are met, it proceeds with processing. If no progress can be made (e.g., due to circular dependencies), the method raises an exception to indicate a deadlock. - The
_process_message()
method handles actual execution of the consumer’s callback. If it fails, it retries up to a maximum threshold. All errors and retries are logged, and delays are introduced between attempts to avoid frequent repeated failures.
Note: Dependencies between consumers are enforced only for messages that both consumers are expected to process. If a consumer filters out a message using its filter_fn, it is still marked as processed for that message. This ensures that dependent consumers are not unnecessarily blocked by consumers that do not process the current message.
app.py
def main(queue_size):
mq = MessageQueue(max_size=queue_size)
file_tracker = FileTracker(INPUT_FILE)
mq.add_consumer(Consumer("A", process_wrapper("A"), dependencies=[],
filter_fn=lambda m: m.lower().startswith('a')))
mq.add_consumer(Consumer("B", process_wrapper("B"), dependencies=["A"],
filter_fn=lambda m: m.lower().startswith('b')))
mq.add_consumer(Consumer("C", process_wrapper("C"), dependencies=["B"],
filter_fn=lambda m: m.lower().startswith('c')))
run_dashboard(mq)
- Serves as the application’s main driver. It initializes the queue, registers three consumers (A, B, and C) with dependency chains and prefix-based filters, and sets up file tracking using a
FileTracker
fromwatcher.py
. - Launches a command-line (background) dashboard that prints the number of messages processed per consumer.
- (Optionally) starts a Flask server in a separate thread for HTTP-based interactions.
watcher.py
class FileTracker:
def __init__(self, filepath):
self.filepath = filepath
self.offset = 0
def read_new_lines(self):
if not os.path.exists(self.filepath):
return []
with open(self.filepath, "r") as f:
f.seek(self.offset)
new_lines = f.readlines()
self.offset = f.tell()
return [line.strip() for line in new_lines if line.strip()]
class FileChangeHandler(FileSystemEventHandler):
def __init__(self, filepath, on_change):
self.filepath = filepath
self.on_change = on_change
def on_modified(self, event):
if event.src_path.endswith(self.filepath):
print(f"[watcher] Detected change in: {event.src_path}")
self.on_change()
def start_watcher(filepath, on_change):
handler = FileChangeHandler(filepath, on_change)
observer = Observer()
observer.schedule(handler, path=os.path.dirname(filepath) or ".", recursive=False)
observer.start()
print(f"[watcher] Watching for changes in {filepath}...")
return observer
- Implements file-based message detection using the
watchdog
library. FileTracker
efficiently reads only newly appended lines frominput.txt
.FileChangeHandler
reacts to file modifications and invokes message ingestion (on_change()
).
utils.py
- Provides utility functions such as
timestamped_log()
for writing timestamped entries to log files andensure_log_dirs()
to set up the logging directory structure.
This modular design provides maintainability and extensibility, ensuring that new consumers, logging mechanisms, or input channels can be added with minimal modifications.
Optional Extension: Web Dashboard with Flask
To add a web-based dashboard, the project includes an optional Flask server that exposes runtime system metrics. This interface allows you to visualize queue activity and you can extend it with a more user-friendly front-end.
dashboard_api.py
@app.route("/dashboard")
def dashboard():
return render_template("dashboard.html")
@app.route("/")
def home():
if message_queue is None:
return jsonify({"error": "MessageQueue not attached"}), 503
return jsonify({
"queue_size": message_queue.queue.qsize(),
"queue_max": message_queue.queue.maxsize,
"pending": len(message_queue.pending),
"processed": {k: len(v) for k, v in message_queue.processed_messages.items()},
"retries": {
cname: sum(message_queue.retry_counts[cname].values())
for cname in message_queue.retry_counts
}
})
- Serves a basic dashboard UI at
/dashboard
- Provides real-time system stats (queue size, pending items, processed messages, retry counts) at
/
- Easily extendable to support custom HTML/CSS, flowcharts, graphs etc
Complete Code
The complete implementation can be found here: onlycodeblog/message-queue
This implementation of the producer-consumer problem emphasises good programming practices such as separation of concerns, fault tolerance, and asynchronous design. It offers a practical context for understanding message queues, modular system design, and dependency handling.