Horizon Official Technical Documentation
WorkerThreadPool.hpp
Go to the documentation of this file.
1/***************************************************
2 * _ _ _ *
3 * | | | | (_) *
4 * | |_| | ___ _ __ _ _______ _ __ *
5 * | _ |/ _ \| '__| |_ / _ \| '_ \ *
6 * | | | | (_) | | | |/ / (_) | | | | *
7 * \_| |_/\___/|_| |_/___\___/|_| |_| *
8 ***************************************************
9 * This file is part of Horizon (c).
10 *
11 * Copyright (c) 2019 Sagun K. (sagunxp@gmail.com).
12 * Copyright (c) 2019 Horizon Dev Team.
13 *
14 * Base Author - Sagun K. (sagunxp@gmail.com)
15 *
16 * This library is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation, either version 3 of the License, or
19 * (at your option) any later version.
20 *
21 * This library is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with this library. If not, see <http://www.gnu.org/licenses/>.
28 **************************************************/
29
30#ifndef HORIZON_CORE_MULTITHREADING_WORKERTHREADPOOL_HPP
31#define HORIZON_CORE_MULTITHREADING_WORKERTHREADPOOL_HPP
32
33#include "ThreadSafeQueue.hpp"
34#include <thread>
35#include <vector>
36#include <future>
37#include <type_traits>
38
40{
41 struct impl_base
42 {
43 virtual void call()=0;
44 virtual ~impl_base() {}
45 };
46 template<typename F>
48 {
49 F f;
50 impl_type(F &&f_): f(std::move(f_)) {}
51 void call() { f(); }
52 };
53public:
54 template<typename F>
56 : impl(new impl_type<F>(std::move(f)))
57 {}
58
59 void call() { impl->call(); }
60
62 : impl(std::move(other.impl))
63 {}
64
66 {
67 impl = std::move(other.impl);
68 return *this;
69 }
70
74
75private:
76 std::unique_ptr<impl_base> impl;
77};
78
80{
81public:
82 WorkerThreadPool(unsigned const thread_count = std::thread::hardware_concurrency())
83 : _done(false)
84 {
85 try {
86 for (unsigned i = 0; i < thread_count; ++i)
87 _threads.push_back(std::thread(&WorkerThreadPool::worker_thread, this));
88 } catch (...) {
89 _done = true;
90 throw;
91 }
92 }
93
95 {
96 _done = true;
97
98 for (unsigned long i = 0; i < _threads.size(); i++)
99 if (_threads.at(i).joinable())
100 _threads.at(i).join();
101 }
102
103 template<typename FunctionType>
104 std::future<typename std::invoke_result<FunctionType()>::type>
105 submit(FunctionType f)
106 {
107 typedef typename std::invoke_result<FunctionType()>::type result_type;
108
109 std::packaged_task<result_type()> task(std::move(f));
110 std::future<result_type> res(task.get_future());
111 _work_queue.push(std::move(task));
112 return res;
113 }
114
115private:
117 {
118 while (!_done) {
119 std::shared_ptr<FunctionWrapper> task;
120 if ((task = _work_queue.try_pop()))
121 task->call();
122 else
123 std::this_thread::yield();
124 }
125 }
126
127 std::vector<std::thread> _threads;
128 std::atomic_bool _done;
130};
131
132#endif /* HORIZON_CORE_MULTITHREADING_WORKERTHREADPOOL_HPP */
Definition: WorkerThreadPool.hpp:40
FunctionWrapper(FunctionWrapper &&other)
Definition: WorkerThreadPool.hpp:61
std::unique_ptr< impl_base > impl
Definition: WorkerThreadPool.hpp:76
FunctionWrapper(F &&f)
Definition: WorkerThreadPool.hpp:55
void call()
Definition: WorkerThreadPool.hpp:59
FunctionWrapper(const FunctionWrapper &)=delete
FunctionWrapper & operator=(FunctionWrapper &&other)
Definition: WorkerThreadPool.hpp:65
FunctionWrapper & operator=(const FunctionWrapper &)=delete
FunctionWrapper(FunctionWrapper &)=delete
void push(T &&new_value)
Definition: ThreadSafeQueue.hpp:90
std::shared_ptr< T > try_pop()
Definition: ThreadSafeQueue.hpp:84
Definition: WorkerThreadPool.hpp:80
WorkerThreadPool(unsigned const thread_count=std::thread::hardware_concurrency())
Definition: WorkerThreadPool.hpp:82
std::future< typename std::invoke_result< FunctionType()>::type > submit(FunctionType f)
Definition: WorkerThreadPool.hpp:105
std::atomic_bool _done
Definition: WorkerThreadPool.hpp:128
~WorkerThreadPool()
Definition: WorkerThreadPool.hpp:94
std::vector< std::thread > _threads
Definition: WorkerThreadPool.hpp:127
void worker_thread()
Definition: WorkerThreadPool.hpp:116
ThreadSafeQueue< FunctionWrapper > _work_queue
Definition: WorkerThreadPool.hpp:129
Definition: WorkerThreadPool.hpp:42
virtual ~impl_base()
Definition: WorkerThreadPool.hpp:44
Definition: WorkerThreadPool.hpp:48
F f
Definition: WorkerThreadPool.hpp:49
impl_type(F &&f_)
Definition: WorkerThreadPool.hpp:50
void call()
Definition: WorkerThreadPool.hpp:51