A Simple Thread Pool Implementation in C++
Note that this implementation assumes that the jobs sent to the thread pool do not need to communicate with each other. If this is not the case, you need a more complicated solution.
Constructor and members
template<typename ReturnType>
class ThreadPool {
public:
explicit ThreadPool(int num_threads) {
for (int i = 0; i < num_threads; ++i) {
// Start all the threads with the worker_loop
workers_.emplace_back([this] {this->worker_loop();});
}
}
private:
std::vector<std::thread> workers_;
std::queue<std::packaged_task<ReturnType()>> jobs_;
std::mutex mutex_;
std::condition_variable cond_var_;
bool shutting_down_ = false;
};
Most of the members of the ThreadPool
class should be straight forward:
workers_
: are the worker threads. Instead of spwaning a thread for every task, we reuse these threads.jobs_
: These are the jobs submitted to the thread pool. We usestd::packaged_task
to package each job, and return anstd::future
when the job is submitted.mutex_
: For coordinating the threads.cond_var
: For notifying threads.shutting_down_
: A flag indicating if we are shutting down the thread pool.
In the constructor, we create the worker threads and assign start the method
worker_loop()
(to be defined) for each worker.
shutdown()
and wait()
methods
void shutdown() {
{
std::scoped_lock lock(mutex_);
shutting_down_ = true;
}
cond_var_.notify_all();
}
void wait() {
// Prevents new jobs being submitted
shutdown();
for (auto& worker : workers_) {
worker.join();
}
}
In the shutdown()
method, we simply set the flag shutting_down_
to true
, and
notify all the threads. A std::scoped_lock
is used here.
The wait()
method simply calls shutdown()
and join all the worker threads.
The submit()
method
template<typename FuncType, typename ...ArgTypes>
std::future<ReturnType> submit(FuncType&& func, ArgTypes&& ...args) {
{
// Cannot submit if we are shutting down
std::scoped_lock lock(mutex_);
if (shutting_down_) {
throw std::runtime_error("Cannot submit jobs after shutdown.");
}
}
// Forward the job into a packaged task
auto task = std::packaged_task<ReturnType()>(
[
func = std::forward<FuncType>(func),
args = std::make_tuple(std::forward<ArgTypes>(args)...)
]() {
return std::apply(func, std::move(args));
}
);
auto future = task.get_future();
{
std::scoped_lock lock(mutex_);
jobs_.emplace(std::move(task));
}
cond_var_.notify_one();
return future;
}
The submit()
method is a bit more involved. It is a variadic template. After
checking the thread pool is still active, the function is packaged into a
std::packaged_task
, and the arguments are forwarded to the task.
We then get the future from the packaged task and notify one thread. Finally, we return the future.
The worker loop
void worker_loop() {
while (true) {
// Get a job from the queue
std::unique_lock lock(mutex_);
cond_var_.wait(lock, [this] {return !jobs_.empty() || shutting_down_;});
if (shutting_down_ && jobs_.empty()) {
// lock will be released as it goes out of scope after break.
break;
}
auto task {std::move(jobs_.front())};
jobs_.pop();
lock.unlock();
// Actual job execution
task();
}
}
The worker_loop()
is the actual method that’s keeping all the worker threads
alive.
In the while loop, the worker wait until there is a new job or until we shutdown the thread pool. If there is a new job, the worker takes the job from the queue and actually executes the job. When the job is finished, the worker waits for a new job.
Full code
#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <future>
#include <exception>
template<typename ReturnType>
class ThreadPool {
public:
explicit ThreadPool(int num_threads) {
for (int i = 0; i < num_threads; ++i) {
// Start all the threads with the worker_loop
workers_.emplace_back([this] {this->worker_loop();});
}
}
void shutdown() {
{
std::scoped_lock lock(mutex_);
shutting_down_ = true;
}
cond_var_.notify_all();
}
void wait() {
// Prevents new jobs being submitted
shutdown();
for (auto& worker : workers_) {
worker.join();
}
}
template<typename FuncType, typename ...ArgTypes>
std::future<ReturnType> submit(FuncType&& func, ArgTypes&& ...args) {
{
// Cannot submit if we are shutting down
std::scoped_lock lock(mutex_);
if (shutting_down_) {
throw std::runtime_error("Cannot submit jobs after shutdown.");
}
}
// Forward the job into a packaged task
auto task = std::packaged_task<ReturnType()>(
[
func = std::forward<FuncType>(func),
args = std::make_tuple(std::forward<ArgTypes>(args)...)
]() {
return std::apply(func, std::move(args));
}
);
auto future = task.get_future();
{
std::scoped_lock lock(mutex_);
jobs_.emplace(std::move(task));
}
cond_var_.notify_one();
return future;
}
private:
std::vector<std::thread> workers_;
std::queue<std::packaged_task<ReturnType()>> jobs_;
std::mutex mutex_;
std::condition_variable cond_var_;
bool shutting_down_ = false;
// The loop job for each thread to run util the new job is invalid
void worker_loop() {
while (true) {
// Get a job from the queue
std::unique_lock lock(mutex_);
cond_var_.wait(lock, [this] {return !jobs_.empty() || shutting_down_;});
if (shutting_down_ && jobs_.empty()) {
// lock will be released as it goes out of scope after break.
break;
}
auto task {std::move(jobs_.front())};
jobs_.pop();
lock.unlock();
// Actual job execution
task();
}
}
};
The full code and an example using the thread pool can be found here.