C++ Gate + Thread Pool + Callback Queue: Advanced Multi-Thread Pattern
C++ Gate + Thread Pool + Callback Queue: Advanced Multi-Thread Pattern
This pattern combines three powerful concepts: a gate (control mechanism), a thread pool (task execution), and a callback queue (result handling). This enables controlled asynchronous execution with result callbacks.
Table of Contents
- Overview
- Gate Implementation
- Thread Pool Integration
- Callback Queue
- Complete Implementation
- Example 1: Basic Usage
- Example 2: Controlled Execution
- Example 3: Async Operations with Callbacks
- Best Practices
Overview
Components
- Gate: Controls when tasks can be executed (enabled/disabled)
- Thread Pool: Executes tasks asynchronously
- Callback Queue: Queues callbacks to be executed after task completion
Use Cases
- Rate limiting: Control execution rate
- Resource management: Enable/disable based on resource availability
- State-dependent execution: Only execute when system is ready
- Async operations: Execute tasks and handle results via callbacks
Gate Implementation
#include <mutex>
#include <condition_variable>
#include <atomic>
using namespace std;
class Gate {
private:
mutex mtx_;
condition_variable cv_;
atomic<bool> open_{true};
public:
void open() {
{
lock_guard<mutex> lock(mtx_);
open_ = true;
}
cv_.notify_all();
}
void close() {
lock_guard<mutex> lock(mtx_);
open_ = false;
}
void waitUntilOpen() {
unique_lock<mutex> lock(mtx_);
cv_.wait(lock, [this]() { return open_.load(); });
}
bool isOpen() const {
return open_.load();
}
};
Thread Pool Integration
#include <thread>
#include <queue>
#include <functional>
#include <vector>
using namespace std;
class ThreadPool {
private:
vector<thread> workers_;
queue<function<void()>> tasks_;
mutex queue_mutex_;
condition_variable condition_;
atomic<bool> stop_{false};
public:
explicit ThreadPool(size_t num_threads) {
for (size_t i = 0; i < num_threads; ++i) {
workers_.emplace_back([this]() {
while (true) {
function<void()> task;
{
unique_lock<mutex> lock(queue_mutex_);
condition_.wait(lock, [this]() {
return stop_ || !tasks_.empty();
});
if (stop_ && tasks_.empty()) {
return;
}
task = tasks_.front();
tasks_.pop();
}
task();
}
});
}
}
template<class F>
void enqueue(F&& f) {
{
lock_guard<mutex> lock(queue_mutex_);
if (stop_) return;
tasks_.emplace(forward<F>(f));
}
condition_.notify_one();
}
~ThreadPool() {
{
lock_guard<mutex> lock(queue_mutex_);
stop_ = true;
}
condition_.notify_all();
for (auto& worker : workers_) {
worker.join();
}
}
};
Callback Queue
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
using namespace std;
template<typename Result>
class CallbackQueue {
private:
queue<function<void(Result)>> callbacks_;
queue<Result> results_;
mutex mtx_;
condition_variable cv_;
atomic<bool> stop_{false};
thread callback_thread_;
public:
CallbackQueue() : callback_thread_([this]() {
while (true) {
function<void(Result)> callback;
Result result;
{
unique_lock<mutex> lock(mtx_);
cv_.wait(lock, [this]() {
return stop_ || (!callbacks_.empty() && !results_.empty());
});
if (stop_ && (callbacks_.empty() || results_.empty())) {
return;
}
callback = callbacks_.front();
callbacks_.pop();
result = results_.front();
results_.pop();
}
callback(result);
}
}) {}
void enqueueCallback(function<void(Result)> callback, Result result) {
{
lock_guard<mutex> lock(mtx_);
callbacks_.push(callback);
results_.push(result);
}
cv_.notify_one();
}
~CallbackQueue() {
{
lock_guard<mutex> lock(mtx_);
stop_ = true;
}
cv_.notify_one();
callback_thread_.join();
}
};
Complete Implementation
#include <iostream>
#include <future>
#include <atomic>
using namespace std;
template<typename Result>
class GatedThreadPoolWithCallbacks {
private:
Gate gate_;
ThreadPool thread_pool_;
CallbackQueue<Result> callback_queue_;
atomic<int> pending_tasks_{0};
public:
GatedThreadPoolWithCallbacks(size_t num_threads)
: thread_pool_(num_threads) {}
template<class TaskFunc, class CallbackFunc>
void executeAsync(TaskFunc&& task, CallbackFunc&& callback) {
// Wait for gate to be open
gate_.waitUntilOpen();
pending_tasks_++;
// Execute task in thread pool
thread_pool_.enqueue([this, task, callback]() {
try {
Result result = task();
pending_tasks_--;
// Queue callback
callback_queue_.enqueueCallback(callback, result);
} catch (...) {
pending_tasks_--;
// Handle error callback if needed
}
});
}
void openGate() {
gate_.open();
}
void closeGate() {
gate_.close();
}
bool isGateOpen() const {
return gate_.isOpen();
}
int getPendingTasks() const {
return pending_tasks_.load();
}
};
// Usage example
void example() {
GatedThreadPoolWithCallbacks<int> system(4);
// Close gate initially
system.closeGate();
// Submit tasks (will wait for gate to open)
for (int i = 0; i < 10; ++i) {
system.executeAsync(
[i]() -> int {
cout << "Task " << i << " executing" << endl;
this_thread::sleep_for(chrono::milliseconds(100));
return i * i;
},
[i](int result) {
cout << "Callback for task " << i << ": result = " << result << endl;
}
);
}
// Open gate - all tasks can now execute
system.openGate();
this_thread::sleep_for(chrono::seconds(2));
}
Example 1: Basic Usage
void basicUsage() {
GatedThreadPoolWithCallbacks<string> system(4);
system.executeAsync(
[]() -> string {
return "Hello from async task";
},
[](const string& result) {
cout << "Received: " << result << endl;
}
);
this_thread::sleep_for(chrono::milliseconds(500));
}
Example 2: Controlled Execution
void controlledExecution() {
GatedThreadPoolWithCallbacks<int> system(4);
// Close gate - tasks will queue
system.closeGate();
// Submit multiple tasks
for (int i = 0; i < 5; ++i) {
system.executeAsync(
[i]() -> int {
cout << "Task " << i << " running" << endl;
return i;
},
[i](int result) {
cout << "Task " << i << " completed: " << result << endl;
}
);
}
cout << "Tasks queued, waiting..." << endl;
this_thread::sleep_for(chrono::milliseconds(500));
// Open gate - tasks execute
cout << "Opening gate..." << endl;
system.openGate();
this_thread::sleep_for(chrono::seconds(1));
}
Example 3: Async Operations with Callbacks
void asyncOperations() {
GatedThreadPoolWithCallbacks<vector<int>> system(4);
system.executeAsync(
[]() -> vector<int> {
vector<int> data;
for (int i = 0; i < 10; ++i) {
data.push_back(i * i);
}
return data;
},
[](const vector<int>& result) {
cout << "Processed " << result.size() << " items" << endl;
for (int val : result) {
cout << val << " ";
}
cout << endl;
}
);
this_thread::sleep_for(chrono::milliseconds(500));
}
Best Practices
- Gate Control: Use gates to control when execution should happen
- Error Handling: Handle exceptions in tasks and callbacks
- Resource Management: Monitor pending tasks to prevent overload
- Shutdown: Properly shutdown all components
- Thread Safety: Ensure all operations are thread-safe
Summary
The Gate + Thread Pool + Callback Queue pattern provides:
- Controlled execution: Gate controls when tasks run
- Efficient processing: Thread pool executes tasks
- Result handling: Callback queue processes results
- Flexibility: Can enable/disable execution dynamically
This pattern is useful for systems that need controlled async execution with result callbacks.