A Simple Thread Pool Implementation in C++

Note that this implementation assumes that the jobs sent to the thread pool do not need to communicate with each other. If this is not the case, you need a more complicated solution.

Constructor and members

template<typename ReturnType>
class ThreadPool {
public:
  explicit ThreadPool(int num_threads) {
    for (int i = 0; i < num_threads; ++i) {
      // Start all the threads with the worker_loop
      workers_.emplace_back([this] {this->worker_loop();});
    }
  }
private:
  std::vector<std::thread> workers_;
  std::queue<std::packaged_task<ReturnType()>> jobs_;
  std::mutex mutex_;
  std::condition_variable cond_var_;
  bool shutting_down_ = false;
};

Most of the members of the ThreadPool class should be straight forward:

  1. workers_: are the worker threads. Instead of spwaning a thread for every task, we reuse these threads.
  2. jobs_: These are the jobs submitted to the thread pool. We use std::packaged_task to package each job, and return an std::future when the job is submitted.
  3. mutex_: For coordinating the threads.
  4. cond_var: For notifying threads.
  5. shutting_down_: A flag indicating if we are shutting down the thread pool.

In the constructor, we create the worker threads and assign start the method worker_loop() (to be defined) for each worker.

shutdown() and wait() methods

  void shutdown() {
    {
      std::scoped_lock lock(mutex_);
      shutting_down_ = true;
    }
    cond_var_.notify_all();
  }

  void wait() {
    // Prevents new jobs being submitted
    shutdown();
    for (auto& worker : workers_) {
      worker.join();
    }
  }

In the shutdown() method, we simply set the flag shutting_down_ to true, and notify all the threads. A std::scoped_lock is used here.

The wait() method simply calls shutdown() and join all the worker threads.

The submit() method

  template<typename FuncType, typename ...ArgTypes>
  std::future<ReturnType> submit(FuncType&& func, ArgTypes&& ...args) {
    {
      // Cannot submit if we are shutting down
      std::scoped_lock lock(mutex_);
      if (shutting_down_) {
        throw std::runtime_error("Cannot submit jobs after shutdown.");
      }
    }

    // Forward the job into a packaged task
    auto task = std::packaged_task<ReturnType()>(
      [
        func = std::forward<FuncType>(func),
        args = std::make_tuple(std::forward<ArgTypes>(args)...)
      ]() {
        return std::apply(func, std::move(args));
      }
    );

    auto future = task.get_future();

    {
      std::scoped_lock lock(mutex_);
      jobs_.emplace(std::move(task));
    }

    cond_var_.notify_one();
    return future;
  }

The submit() method is a bit more involved. It is a variadic template. After checking the thread pool is still active, the function is packaged into a std::packaged_task, and the arguments are forwarded to the task.

We then get the future from the packaged task and notify one thread. Finally, we return the future.

The worker loop

  void worker_loop() {
    while (true) {
      // Get a job from the queue
      std::unique_lock lock(mutex_);
      cond_var_.wait(lock, [this] {return !jobs_.empty() || shutting_down_;});

      if (shutting_down_ && jobs_.empty()) {
        // lock will be released as it goes out of scope after break.
        break;
      }

      auto task {std::move(jobs_.front())};
      jobs_.pop();
      lock.unlock();

      // Actual job execution
      task();
    }
  }

The worker_loop() is the actual method that’s keeping all the worker threads alive.

In the while loop, the worker wait until there is a new job or until we shutdown the thread pool. If there is a new job, the worker takes the job from the queue and actually executes the job. When the job is finished, the worker waits for a new job.

Full code

#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <future>
#include <exception>


template<typename ReturnType>
class ThreadPool {
public:
  explicit ThreadPool(int num_threads) {
    for (int i = 0; i < num_threads; ++i) {
      // Start all the threads with the worker_loop
      workers_.emplace_back([this] {this->worker_loop();});
    }
  }

  void shutdown() {
    {
      std::scoped_lock lock(mutex_);
      shutting_down_ = true;
    }
    cond_var_.notify_all();
  }

  void wait() {
    // Prevents new jobs being submitted
    shutdown();
    for (auto& worker : workers_) {
      worker.join();
    }
  }

  template<typename FuncType, typename ...ArgTypes>
  std::future<ReturnType> submit(FuncType&& func, ArgTypes&& ...args) {
    {
      // Cannot submit if we are shutting down
      std::scoped_lock lock(mutex_);
      if (shutting_down_) {
        throw std::runtime_error("Cannot submit jobs after shutdown.");
      }
    }

    // Forward the job into a packaged task
    auto task = std::packaged_task<ReturnType()>(
      [
        func = std::forward<FuncType>(func),
        args = std::make_tuple(std::forward<ArgTypes>(args)...)
      ]() {
        return std::apply(func, std::move(args));
      }
    );

    auto future = task.get_future();

    {
      std::scoped_lock lock(mutex_);
      jobs_.emplace(std::move(task));
    }

    cond_var_.notify_one();
    return future;
  }

private:
  std::vector<std::thread> workers_;
  std::queue<std::packaged_task<ReturnType()>> jobs_;
  std::mutex mutex_;
  std::condition_variable cond_var_;
  bool shutting_down_ = false;

  // The loop job for each thread to run util the new job is invalid
  void worker_loop() {
    while (true) {
      // Get a job from the queue
      std::unique_lock lock(mutex_);
      cond_var_.wait(lock, [this] {return !jobs_.empty() || shutting_down_;});

      if (shutting_down_ && jobs_.empty()) {
        // lock will be released as it goes out of scope after break.
        break;
      }

      auto task {std::move(jobs_.front())};
      jobs_.pop();
      lock.unlock();

      // Actual job execution
      task();
    }
  }
};

The full code and an example using the thread pool can be found here.