C++ Producer-Consumer Patterns: Single/Multiple Producer and Consumer Examples
C++ Producer-Consumer Patterns: Single/Multiple Producer and Consumer Examples
The producer-consumer pattern is a fundamental concurrency design pattern where one or more threads (producers) generate data and one or more threads (consumers) process that data. This guide covers all four combinations: single producer-single consumer, multiple producer-single consumer, single producer-multiple consumer, and multiple producer-multiple consumer.
Table of Contents
- Overview
- Thread-Safe Queue Implementation
- Single Producer & Single Consumer
- Multiple Producer & Single Consumer
- Single Producer & Multiple Consumer
- Multiple Producer & Multiple Consumer
- Best Practices
- Common Pitfalls
Overview
The producer-consumer pattern solves the problem of coordinating work between threads that generate data and threads that process it. The pattern uses a shared buffer (typically a queue) to decouple producers from consumers.
Key Components
- Queue/Buffer: Shared data structure to hold items
- Mutex: Protects the queue from race conditions
- Condition Variable: Signals when data is available or space is available
- Producer Thread(s): Generate and enqueue data
- Consumer Thread(s): Dequeue and process data
Pattern Variations
- 1 Producer, 1 Consumer (SPSC): Simplest case, minimal contention
- N Producers, 1 Consumer (MPSC): Multiple sources, single processor
- 1 Producer, N Consumers (SPMC): Single source, parallel processing
- N Producers, M Consumers (MPMC): Most general case, maximum parallelism
Thread-Safe Queue Implementation
First, let’s implement a thread-safe queue that will be used in all examples:
#include <queue>
#include <mutex>
#include <condition_variable>
#include <optional>
using namespace std;
template<typename T>
class ThreadSafeQueue {
private:
queue<T> queue_;
mutex mtx_;
condition_variable not_empty_;
condition_variable not_full_;
size_t max_size_;
bool shutdown_;
public:
explicit ThreadSafeQueue(size_t max_size = 1000)
: max_size_(max_size), shutdown_(false) {}
void push(const T& item) {
unique_lock<mutex> lock(mtx_);
// Wait until there's space or shutdown
not_full_.wait(lock, [this]() {
return queue_.size() < max_size_ || shutdown_;
});
if (shutdown_) {
return;
}
queue_.push(item);
not_empty_.notify_one();
}
optional<T> pop() {
unique_lock<mutex> lock(mtx_);
// Wait until there's data or shutdown
not_empty_.wait(lock, [this]() {
return !queue_.empty() || shutdown_;
});
if (shutdown_ && queue_.empty()) {
return nullopt;
}
T item = queue_.front();
queue_.pop();
not_full_.notify_one();
return item;
}
void shutdown() {
lock_guard<mutex> lock(mtx_);
shutdown_ = true;
not_empty_.notify_all();
not_full_.notify_all();
}
size_t size() const {
lock_guard<mutex> lock(mtx_);
return queue_.size();
}
};
Single Producer & Single Consumer
The simplest case: one producer thread generates data, one consumer thread processes it.
Use Cases
- Logging systems
- Data streaming pipelines
- Simple task queues
- File processing
Example Implementation
#include <iostream>
#include <thread>
#include <chrono>
#include <atomic>
using namespace std;
void singleProducerSingleConsumer() {
ThreadSafeQueue<int> queue(10);
atomic<bool> done(false);
// Producer thread
thread producer([&queue, &done]() {
for (int i = 1; i <= 20; ++i) {
queue.push(i);
cout << "Producer: pushed " << i << endl;
this_thread::sleep_for(chrono::milliseconds(100));
}
done = true;
queue.shutdown();
});
// Consumer thread
thread consumer([&queue, &done]() {
while (!done || queue.size() > 0) {
auto item = queue.pop();
if (item.has_value()) {
int value = item.value();
cout << "Consumer: processed " << value << endl;
// Simulate processing time
this_thread::sleep_for(chrono::milliseconds(150));
}
}
});
producer.join();
consumer.join();
cout << "SPSC completed" << endl;
}
Characteristics
- Low contention: Only one thread accesses each end
- Simple synchronization: Minimal locking overhead
- Deterministic: Easy to reason about behavior
- No load balancing needed: Single consumer handles all work
Multiple Producer & Single Consumer
Multiple producer threads generate data, one consumer thread processes it.
Use Cases
- Event aggregation systems
- Log collection from multiple sources
- Sensor data collection
- Request queuing in web servers
Example Implementation
#include <vector>
#include <atomic>
using namespace std;
void multipleProducerSingleConsumer() {
ThreadSafeQueue<string> queue(50);
atomic<int> active_producers(3);
atomic<bool> done(false);
// Multiple producer threads
vector<thread> producers;
for (int p = 0; p < 3; ++p) {
producers.emplace_back([&queue, p, &active_producers]() {
for (int i = 1; i <= 10; ++i) {
string msg = "Producer-" + to_string(p) + ": item-" + to_string(i);
queue.push(msg);
cout << msg << endl;
this_thread::sleep_for(chrono::milliseconds(50 + p * 20));
}
active_producers--;
if (active_producers == 0) {
queue.shutdown();
}
});
}
// Single consumer thread
thread consumer([&queue, &active_producers]() {
int processed = 0;
while (active_producers > 0 || queue.size() > 0) {
auto item = queue.pop();
if (item.has_value()) {
cout << "Consumer: processed [" << item.value() << "]" << endl;
processed++;
this_thread::sleep_for(chrono::milliseconds(100));
}
}
cout << "Consumer processed " << processed << " items" << endl;
});
for (auto& t : producers) {
t.join();
}
consumer.join();
cout << "MPSC completed" << endl;
}
Characteristics
- Producer contention: Multiple threads compete to push
- Consumer bottleneck: Single thread may become overwhelmed
- Load balancing: Not needed (single consumer)
- Throughput: Limited by consumer processing speed
Optimization Tips
// Batch processing for better throughput
void optimizedMPSC() {
ThreadSafeQueue<int> queue(100);
const int BATCH_SIZE = 10;
thread consumer([&queue]() {
vector<int> batch;
while (true) {
// Collect a batch
for (int i = 0; i < BATCH_SIZE; ++i) {
auto item = queue.pop();
if (item.has_value()) {
batch.push_back(item.value());
} else {
break;
}
}
if (batch.empty()) {
break;
}
// Process batch
processBatch(batch);
batch.clear();
}
});
}
Single Producer & Multiple Consumer
One producer thread generates data, multiple consumer threads process it in parallel.
Use Cases
- Task distribution systems
- Parallel data processing
- Load balancing
- Worker thread pools
Example Implementation
#include <atomic>
using namespace std;
void singleProducerMultipleConsumer() {
ThreadSafeQueue<int> queue(100);
atomic<int> items_produced(0);
atomic<int> items_consumed(0);
const int TOTAL_ITEMS = 100;
const int NUM_CONSUMERS = 4;
// Single producer thread
thread producer([&queue, &items_produced]() {
for (int i = 1; i <= TOTAL_ITEMS; ++i) {
queue.push(i);
items_produced++;
cout << "Producer: pushed " << i << endl;
this_thread::sleep_for(chrono::milliseconds(10));
}
queue.shutdown();
});
// Multiple consumer threads
vector<thread> consumers;
for (int c = 0; c < NUM_CONSUMERS; ++c) {
consumers.emplace_back([&queue, c, &items_consumed]() {
int local_count = 0;
while (true) {
auto item = queue.pop();
if (!item.has_value()) {
break;
}
int value = item.value();
cout << "Consumer-" << c << ": processed " << value << endl;
items_consumed++;
local_count++;
// Simulate variable processing time
this_thread::sleep_for(chrono::milliseconds(20 + (value % 50)));
}
cout << "Consumer-" << c << " finished, processed " << local_count << " items" << endl;
});
}
producer.join();
for (auto& t : consumers) {
t.join();
}
cout << "SPMC completed: produced=" << items_produced
<< ", consumed=" << items_consumed << endl;
}
Characteristics
- Load balancing: Work distributed across consumers
- Parallel processing: Multiple items processed simultaneously
- Consumer contention: Multiple threads compete to pop
- Throughput: Scales with number of consumers (up to a point)
Load Balancing Example
// Work-stealing for better load distribution
class WorkStealingQueue {
private:
ThreadSafeQueue<int> shared_queue_;
vector<queue<int>> local_queues_;
mutex steal_mtx_;
int num_consumers_;
public:
WorkStealingQueue(int num_consumers)
: num_consumers_(num_consumers), local_queues_(num_consumers) {}
void push(int item, int consumer_id) {
local_queues_[consumer_id].push(item);
if (local_queues_[consumer_id].size() > 10) {
// Steal work to shared queue
shared_queue_.push(item);
}
}
optional<int> pop(int consumer_id) {
// Try local queue first
if (!local_queues_[consumer_id].empty()) {
int item = local_queues_[consumer_id].front();
local_queues_[consumer_id].pop();
return item;
}
// Try shared queue
return shared_queue_.pop();
}
};
Multiple Producer & Multiple Consumer
The most general case: multiple producers and multiple consumers.
Use Cases
- High-throughput message queues
- Distributed task processing
- Real-time data processing systems
- Microservices communication
Example Implementation
void multipleProducerMultipleConsumer() {
ThreadSafeQueue<string> queue(200);
atomic<int> active_producers(5);
atomic<int> total_produced(0);
atomic<int> total_consumed(0);
const int NUM_PRODUCERS = 5;
const int NUM_CONSUMERS = 3;
const int ITEMS_PER_PRODUCER = 20;
// Multiple producer threads
vector<thread> producers;
for (int p = 0; p < NUM_PRODUCERS; ++p) {
producers.emplace_back([&queue, p, &active_producers, &total_produced]() {
for (int i = 1; i <= ITEMS_PER_PRODUCER; ++i) {
string msg = "P" + to_string(p) + "-item" + to_string(i);
queue.push(msg);
total_produced++;
this_thread::sleep_for(chrono::milliseconds(30 + (p * 5)));
}
active_producers--;
if (active_producers == 0) {
queue.shutdown();
}
});
}
// Multiple consumer threads
vector<thread> consumers;
for (int c = 0; c < NUM_CONSUMERS; ++c) {
consumers.emplace_back([&queue, c, &active_producers, &total_consumed]() {
int local_count = 0;
while (active_producers > 0 || queue.size() > 0) {
auto item = queue.pop();
if (item.has_value()) {
cout << "C" << c << ": [" << item.value() << "]" << endl;
total_consumed++;
local_count++;
this_thread::sleep_for(chrono::milliseconds(50 + (c * 10)));
}
}
cout << "Consumer-" << c << " finished: " << local_count << " items" << endl;
});
}
// Wait for all producers
for (auto& t : producers) {
t.join();
}
// Wait for all consumers
for (auto& t : consumers) {
t.join();
}
cout << "MPMC completed: produced=" << total_produced
<< ", consumed=" << total_consumed << endl;
}
Characteristics
- Maximum parallelism: Both production and consumption parallelized
- High contention: Multiple threads compete on both ends
- Complex synchronization: Requires careful design
- Scalability: Can handle high throughput
Advanced MPMC with Priority Queue
#include <queue>
#include <functional>
using namespace std;
template<typename T>
class PriorityThreadSafeQueue {
private:
priority_queue<T, vector<T>, greater<T>> queue_;
mutex mtx_;
condition_variable not_empty_;
bool shutdown_;
public:
void push(const T& item) {
lock_guard<mutex> lock(mtx_);
if (shutdown_) return;
queue_.push(item);
not_empty_.notify_one();
}
optional<T> pop() {
unique_lock<mutex> lock(mtx_);
not_empty_.wait(lock, [this]() {
return !queue_.empty() || shutdown_;
});
if (shutdown_ && queue_.empty()) {
return nullopt;
}
T item = queue_.top();
queue_.pop();
return item;
}
void shutdown() {
lock_guard<mutex> lock(mtx_);
shutdown_ = true;
not_empty_.notify_all();
}
};
Best Practices
1. Proper Shutdown Handling
class GracefulShutdown {
private:
ThreadSafeQueue<int> queue_;
atomic<bool> shutdown_requested_;
public:
void shutdown() {
shutdown_requested_ = true;
queue_.shutdown();
// Give consumers time to finish
this_thread::sleep_for(chrono::milliseconds(100));
}
};
2. Bounded Queue Size
Always use bounded queues to prevent memory exhaustion:
ThreadSafeQueue<int> queue(1000); // Limit to 1000 items
3. Error Handling
bool tryPush(const T& item, chrono::milliseconds timeout) {
unique_lock<mutex> lock(mtx_);
if (not_full_.wait_for(lock, timeout, [this]() {
return queue_.size() < max_size_ || shutdown_;
})) {
if (shutdown_) return false;
queue_.push(item);
not_empty_.notify_one();
return true;
}
return false; // Timeout
}
4. Performance Monitoring
class MonitoredQueue {
private:
atomic<size_t> total_pushed_{0};
atomic<size_t> total_popped_{0};
atomic<size_t> max_size_{0};
public:
void push(const T& item) {
// ... push logic ...
total_pushed_++;
size_t current = queue_.size();
size_t max = max_size_.load();
while (current > max && !max_size_.compare_exchange_weak(max, current)) {
max = max_size_.load();
}
}
void printStats() {
cout << "Pushed: " << total_pushed_
<< ", Popped: " << total_popped_
<< ", Max size: " << max_size_ << endl;
}
};
5. Use Lock-Free for High Performance
For extreme performance, consider lock-free queues:
#include <atomic>
template<typename T>
class LockFreeQueue {
private:
struct Node {
atomic<T*> data;
atomic<Node*> next;
};
atomic<Node*> head_;
atomic<Node*> tail_;
public:
void push(const T& item) {
Node* new_node = new Node;
new_node->data.store(new T(item));
new_node->next.store(nullptr);
Node* prev_tail = tail_.exchange(new_node);
prev_tail->next.store(new_node);
}
};
Common Pitfalls
1. Deadlock from Multiple Locks
// BAD: Potential deadlock
void badExample() {
mutex mtx1, mtx2;
thread t1([&]() {
lock_guard<mutex> l1(mtx1);
lock_guard<mutex> l2(mtx2); // Different order
});
thread t2([&]() {
lock_guard<mutex> l2(mtx2);
lock_guard<mutex> l1(mtx1); // Different order
});
}
// GOOD: Always lock in same order
void goodExample() {
mutex mtx1, mtx2;
thread t1([&]() {
lock(mtx1, mtx2); // Lock both at once
lock_guard<mutex> l1(mtx1, adopt_lock);
lock_guard<mutex> l2(mtx2, adopt_lock);
});
}
2. Lost Wake-ups
// BAD: Lost wake-up
void badWait() {
unique_lock<mutex> lock(mtx_);
if (queue_.empty()) {
lock.unlock();
// Another thread might push here!
lock.lock();
not_empty_.wait(lock); // May wait forever
}
}
// GOOD: Use predicate
void goodWait() {
unique_lock<mutex> lock(mtx_);
not_empty_.wait(lock, [this]() {
return !queue_.empty() || shutdown_;
});
}
3. Race Condition in Shutdown
// BAD: Race condition
void badShutdown() {
shutdown_ = true; // Not atomic!
not_empty_.notify_all();
}
// GOOD: Proper synchronization
void goodShutdown() {
{
lock_guard<mutex> lock(mtx_);
shutdown_ = true;
}
not_empty_.notify_all();
not_full_.notify_all();
}
4. Unbounded Queue Growth
// BAD: No size limit
queue<int> unbounded_queue; // Can grow indefinitely
// GOOD: Bounded queue
ThreadSafeQueue<int> bounded_queue(1000);
5. Ignoring Return Values
// BAD: Ignoring optional
queue.pop(); // Lost item if queue was empty
// GOOD: Check return value
auto item = queue.pop();
if (item.has_value()) {
process(item.value());
}
Summary
The producer-consumer pattern is essential for concurrent programming in C++. Choose the right variant based on your needs:
- SPSC: Simple, low overhead, deterministic
- MPSC: Multiple sources, single processor
- SPMC: Single source, parallel processing, load balancing
- MPMC: Maximum parallelism, high throughput
Key takeaways:
- Always use bounded queues
- Implement proper shutdown mechanisms
- Use condition variables with predicates
- Monitor performance and contention
- Consider lock-free implementations for extreme performance
By understanding these patterns and following best practices, you can build robust, scalable concurrent systems in C++.