#include #include #include #include #include #include #include #include struct thread_pool { explicit thread_pool(std::size_t size) { start(size); finished.reserve(1024); } std::mutex mutex; std::condition_variable condition; std::deque> work; std::vector> finished; // queue( lambda ) will enqueue the lambda into the tasks for the threads template> std::future queue(F &&f) { // wrap the function object into a packaged task, splitting // execution from the return value: std::packaged_task p(std::forward(f)); auto r = p.get_future(); { std::unique_lock l(mutex); work.emplace_back(std::move(p)); } condition.notify_one(); return r; // return the future result of the task } // start threads_num threads in the thread pool. void start(std::size_t threads_num = 1) { for (std::size_t i = 0; i < threads_num; ++i) { finished.push_back( std::async( std::launch::async, [this] { thread_task(); } ) ); } } // abort() cancels all non-started tasks, and tells every working thread // stop running, and waits for them to finish up. void abort() { cancel_pending(); finish(); } // cancel_pending() merely cancels all non-started tasks: void cancel_pending() { std::unique_lock l(mutex); work.clear(); } // finish enques a "stop the thread" message for every thread, then waits for them: void finish() { { std::unique_lock l(mutex); for (auto &&unused:finished) { work.emplace_back(); } } condition.notify_all(); finished.clear(); } ~thread_pool() { finish(); } private: void thread_task() { while (true) { std::packaged_task f; { std::unique_lock l(mutex); if (work.empty()) { condition.wait(l, [&] { return !work.empty(); }); } f = std::move(work.front()); work.pop_front(); } // if the task is invalid, it means we are asked to abort: if (!f.valid()) return; f(); } } }; inline void dispatch(thread_pool &pool, std::function f) { pool.queue(f); }