ra4_draw  4bd0201e3d922d42bd545d4b045ed44db33454a4
thread_pool.cpp
Go to the documentation of this file.
1 #include "core/thread_pool.hpp"
2 
3 #include "TThread.h"
4 
5 using namespace std;
6 
8  tasks_(),
9  threads_(),
10  stop_thread_now_(),
11  stop_at_empty_(),
12  mutex_(),
13  cv_(){
14  TThread::Initialize();
15  size_t num_threads = thread::hardware_concurrency();
16  if(num_threads > 2){
17  --num_threads;
18  }else{
19  num_threads = 1;
20  }
21  Resize(num_threads);
22 }
23 
24 ThreadPool::ThreadPool(std::size_t num_threads):
25  tasks_(),
26  threads_(),
29  mutex_(),
30  cv_(){
31  TThread::Initialize();
32  Resize(num_threads);
33 }
34 
36  stop_at_empty_ = true;
37  {
38  lock_guard<mutex> lock(mutex_);
39  cv_.notify_all();
40  }
41 
42  for(size_t ithread = 0; ithread < Size(); ++ithread){
43  if(threads_.at(ithread)->joinable()){
44  threads_.at(ithread)->join();
45  }
46  }
47 }
48 
49 size_t ThreadPool::Size() const{
50  return threads_.size();
51 }
52 
53 void ThreadPool::Resize(size_t num_threads){
54  size_t old_num_threads = Size();
55  if(num_threads > old_num_threads){
56  threads_.resize(num_threads);
57  stop_thread_now_.resize(num_threads);
58 
59  for(size_t ithread = old_num_threads; ithread < num_threads; ++ithread){
60  threads_.at(ithread).reset(new thread(&ThreadPool::DoTasksFromQueue, this, ithread));
61  }
62  }else if(num_threads < old_num_threads){
63  for(size_t ithread = num_threads; ithread < old_num_threads; ++ithread){
64  *stop_thread_now_.at(ithread) = true;
65  threads_.at(ithread)->detach();
66  }
67 
68  {
69  lock_guard<mutex> lock(mutex_);
70  cv_.notify_all();
71  }
72 
73  threads_.resize(num_threads);
74  stop_thread_now_.resize(num_threads);
75  }
76 }
77 
78 void ThreadPool::DoTasksFromQueue(size_t ithread){
79  unique_ptr<function<void()> > task = tasks_.Pop();
80  while(true){
81  while(task != nullptr){
82  (*task)();
83  if(stop_thread_now_.at(ithread)) return;
84  task = tasks_.Pop();
85  }
86 
87  unique_lock<mutex> lock(mutex_);
88  cv_.wait(lock, bind(&ThreadPool::ReadyToAct, this, ithread, ref(task)));
89  if(task == nullptr) return;
90  }
91 }
92 
93 bool ThreadPool::ReadyToAct(size_t ithread, unique_ptr<function<void()> > &task){
94  if(stop_thread_now_.at(ithread)){
95  return true;
96  }else{
97  task = tasks_.Pop();
98  return stop_at_empty_ || task != nullptr;
99  }
100 }
101 
103  lock_guard<mutex> lock(mutex_);
104  queue_.push(unique_ptr<function<void()> >());
105  queue_.back() = move(func);
106 }
107 
109  lock_guard<mutex> lock(mutex_);
110  if(queue_.empty()){
111  return FuncPtr();
112  }else{
113  Queue::FuncPtr func = move(queue_.front());
114  queue_.pop();
115  return func;
116  }
117 }
std::atomic< bool > stop_at_empty_
Definition: thread_pool.hpp:59
std::size_t Size() const
Definition: thread_pool.cpp:49
Queue tasks_
Definition: thread_pool.hpp:56
STL namespace.
void DoTasksFromQueue(size_t ithread)
Definition: thread_pool.cpp:78
std::mutex mutex_
Definition: thread_pool.hpp:61
std::vector< std::shared_ptr< std::atomic< bool > > > stop_thread_now_
Definition: thread_pool.hpp:58
bool ReadyToAct(size_t ithread, std::unique_ptr< std::function< void()> > &task)
Definition: thread_pool.cpp:93
std::vector< std::unique_ptr< std::thread > > threads_
Definition: thread_pool.hpp:57
void Resize(size_t num_threads)
Definition: thread_pool.cpp:53
std::unique_ptr< std::function< void()> > FuncPtr
Definition: thread_pool.hpp:38
void Push(FuncPtr &func)
std::condition_variable cv_
Definition: thread_pool.hpp:62