ra4_stats  0341147a0dc35f80f4e12c6003afb76a38e2ed6e
thread_pool.cpp
Go to the documentation of this file.
1 #include "thread_pool.hpp"
2 
3 using namespace std;
4 
6  tasks_(),
7  threads_(),
8  stop_thread_now_(),
9  stop_at_empty_(),
10  mutex_(),
11  cv_(){
12  size_t num_threads = thread::hardware_concurrency();
13  if(num_threads > 2){
14  --num_threads;
15  }else{
16  num_threads = 1;
17  }
18  Resize(num_threads);
19 }
20 
21 ThreadPool::ThreadPool(size_t num_threads):
22  tasks_(),
23  threads_(),
26  mutex_(),
27  cv_(){
28  Resize(num_threads);
29 }
30 
32  stop_at_empty_ = true;
33  {
34  lock_guard<mutex> lock(mutex_);
35  cv_.notify_all();
36  }
37 
38  for(size_t ithread = 0; ithread < Size(); ++ithread){
39  if(threads_.at(ithread)->joinable()){
40  threads_.at(ithread)->join();
41  }
42  }
43 }
44 
45 size_t ThreadPool::Size() const{
46  return threads_.size();
47 }
48 
49 void ThreadPool::Resize(size_t num_threads){
50  size_t old_num_threads = Size();
51  if(num_threads > old_num_threads){
52  threads_.resize(num_threads);
53  stop_thread_now_.resize(num_threads);
54 
55  for(size_t ithread = old_num_threads; ithread < num_threads; ++ithread){
56  threads_.at(ithread).reset(new thread(&ThreadPool::DoTasksFromQueue, this, ithread));
57  }
58  }else if(num_threads < old_num_threads){
59  for(size_t ithread = num_threads; ithread < old_num_threads; ++ithread){
60  *stop_thread_now_.at(ithread) = true;
61  threads_.at(ithread)->detach();
62  }
63 
64  {
65  lock_guard<mutex> lock(mutex_);
66  cv_.notify_all();
67  }
68 
69  threads_.resize(num_threads);
70  stop_thread_now_.resize(num_threads);
71  }
72 }
73 
74 void ThreadPool::DoTasksFromQueue(size_t ithread){
75  unique_ptr<function<void()> > task = tasks_.Pop();
76  while(true){
77  while(task != nullptr){
78  (*task)();
79  if(stop_thread_now_.at(ithread)) return;
80  task = tasks_.Pop();
81  }
82 
83  unique_lock<mutex> lock(mutex_);
84  cv_.wait(lock, bind(&ThreadPool::ReadyToAct, this, ithread, ref(task)));
85  if(task == nullptr) return;
86  }
87 }
88 
89 bool ThreadPool::ReadyToAct(size_t ithread, unique_ptr<function<void()> > &task){
90  if(stop_thread_now_.at(ithread)){
91  return true;
92  }else{
93  task = tasks_.Pop();
94  return stop_at_empty_ || task != nullptr;
95  }
96 }
97 
99  lock_guard<mutex> lock(mutex_);
100  queue_.push(unique_ptr<function<void()> >());
101  queue_.back() = move(func);
102 }
103 
105  lock_guard<mutex> lock(mutex_);
106  if(queue_.empty()){
107  return FuncPtr();
108  }else{
109  Queue::FuncPtr func = move(queue_.front());
110  queue_.pop();
111  return func;
112  }
113 }
std::atomic< bool > stop_at_empty_
Definition: thread_pool.hpp:59
std::size_t Size() const
Definition: thread_pool.cpp:45
Queue tasks_
Definition: thread_pool.hpp:56
STL namespace.
void DoTasksFromQueue(size_t ithread)
Definition: thread_pool.cpp:74
std::mutex mutex_
Definition: thread_pool.hpp:61
std::vector< std::shared_ptr< std::atomic< bool > > > stop_thread_now_
Definition: thread_pool.hpp:58
bool ReadyToAct(size_t ithread, std::unique_ptr< std::function< void()> > &task)
Definition: thread_pool.cpp:89
std::vector< std::unique_ptr< std::thread > > threads_
Definition: thread_pool.hpp:57
void Resize(size_t num_threads)
Definition: thread_pool.cpp:49
std::unique_ptr< std::function< void()> > FuncPtr
Definition: thread_pool.hpp:38
void Push(FuncPtr &func)
Definition: thread_pool.cpp:98
std::condition_variable cv_
Definition: thread_pool.hpp:62