C++ Pipeline Pattern: Real-World Engineering Guide

Problem Solved

Break complex work into sequential stages, each running in a dedicated thread or thread pool, exploiting parallelism across stages.

How It Works

  • Stage 1 → Queue → Stage 2 → Queue → Stage 3 → …
  • Each stage processes items and passes to next
  • Stages run in parallel on different items

STL Usage

#include <queue>
#include <thread>
#include <vector>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <atomic>
using namespace std;

template<typename T>
class PipelineStage {
private:
    queue<T> input_queue_;
    mutex mtx_;
    condition_variable cv_;
    function<T(T)> processor_;
    thread worker_;
    atomic<bool> stop_{false};

public:
    PipelineStage(function<T(T)> processor) : processor_(processor) {
        worker_ = thread([this]() {
            while (!stop_) {
                T item;
                {
                    unique_lock<mutex> lock(mtx_);
                    cv_.wait(lock, [this]() { return stop_ || !input_queue_.empty(); });
                    if (stop_ && input_queue_.empty()) break;
                    item = input_queue_.front();
                    input_queue_.pop();
                }
                
                T result = processor_(item);
                if (next_stage_) {
                    next_stage_->enqueue(result);
                }
            }
        });
    }

    void enqueue(const T& item) {
        {
            lock_guard<mutex> lock(mtx_);
            input_queue_.push(item);
        }
        cv_.notify_one();
    }

    PipelineStage<T>* next_stage_ = nullptr;

    ~PipelineStage() {
        stop_ = true;
        cv_.notify_one();
        worker_.join();
    }
};

template<typename T>
class Pipeline {
private:
    vector<unique_ptr<PipelineStage<T>>> stages_;

public:
    void addStage(function<T(T)> processor) {
        stages_.push_back(make_unique<PipelineStage<T>>(processor));
        if (stages_.size() > 1) {
            stages_[stages_.size() - 2]->next_stage_ = stages_.back().get();
        }
    }

    void process(const T& item) {
        if (!stages_.empty()) {
            stages_[0]->enqueue(item);
        }
    }
};

Example

#include <iostream>
#include <string>
using namespace std;

void pipelineExample() {
    Pipeline<string> pipeline;
    
    // Stage 1: Read
    pipeline.addStage([](string input) {
        return "Read: " + input;
    });
    
    // Stage 2: Process
    pipeline.addStage([](string input) {
        return "Processed: " + input;
    });
    
    // Stage 3: Write
    pipeline.addStage([](string input) {
        cout << "Output: " << input << endl;
        return input;
    });
    
    for (int i = 0; i < 10; ++i) {
        pipeline.process("Item " + to_string(i));
    }
    
    this_thread::sleep_for(chrono::seconds(2));
}

Use Cases

  • Image processing: Capture → process → compress → upload
  • Data pipelines: Extract → transform → load
  • Video processing: Decode → filter → encode
  • Log processing: Parse → filter → aggregate → store

Key Takeaways

  • Exploits parallelism across stages
  • Each stage can process different items simultaneously
  • Good for sequential processing pipelines
  • High throughput for streaming data

Things to Be Careful About

  • Backpressure: Slow stages can fill queues
  • Error handling: Errors in one stage affect pipeline
  • Shutdown: Ensure all items processed
  • Bottlenecks: Slowest stage limits throughput

Summary

Pipelines enable efficient parallel processing of sequential stages, maximizing throughput for streaming workloads.