Eigen  3.4.90 (git rev 5a9f66fb35d03a4da9ef8976e67a61b30aa16dcf)
 
Loading...
Searching...
No Matches
NonBlockingThreadPool.h
1// This file is part of Eigen, a lightweight C++ template library
2// for linear algebra.
3//
4// Copyright (C) 2016 Dmitry Vyukov <[email protected]>
5//
6// This Source Code Form is subject to the terms of the Mozilla
7// Public License v. 2.0. If a copy of the MPL was not distributed
8// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9
10#ifndef EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
11#define EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
12
13// IWYU pragma: private
14#include "./InternalHeaderCheck.h"
15
16namespace Eigen {
17
18template <typename Environment>
19class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
20 public:
21 typedef typename Environment::Task Task;
22 typedef RunQueue<Task, 1024> Queue;
23
24 ThreadPoolTempl(int num_threads, Environment env = Environment()) : ThreadPoolTempl(num_threads, true, env) {}
25
26 ThreadPoolTempl(int num_threads, bool allow_spinning, Environment env = Environment())
27 : env_(env),
28 num_threads_(num_threads),
29 allow_spinning_(allow_spinning),
30 thread_data_(num_threads),
31 all_coprimes_(num_threads),
32 waiters_(num_threads),
33 global_steal_partition_(EncodePartition(0, num_threads_)),
34 blocked_(0),
35 spinning_(0),
36 done_(false),
37 cancelled_(false),
38 ec_(waiters_) {
39 waiters_.resize(num_threads_);
40 // Calculate coprimes of all numbers [1, num_threads].
41 // Coprimes are used for random walks over all threads in Steal
42 // and NonEmptyQueueIndex. Iteration is based on the fact that if we take
43 // a random starting thread index t and calculate num_threads - 1 subsequent
44 // indices as (t + coprime) % num_threads, we will cover all threads without
45 // repetitions (effectively getting a presudo-random permutation of thread
46 // indices).
47 eigen_plain_assert(num_threads_ < kMaxThreads);
48 for (int i = 1; i <= num_threads_; ++i) {
49 all_coprimes_.emplace_back(i);
50 ComputeCoprimes(i, &all_coprimes_.back());
51 }
52#ifndef EIGEN_THREAD_LOCAL
53 init_barrier_.reset(new Barrier(num_threads_));
54#endif
55 thread_data_.resize(num_threads_);
56 for (int i = 0; i < num_threads_; i++) {
57 SetStealPartition(i, EncodePartition(0, num_threads_));
58 thread_data_[i].thread.reset(env_.CreateThread([this, i]() { WorkerLoop(i); }));
59 }
60#ifndef EIGEN_THREAD_LOCAL
61 // Wait for workers to initialize per_thread_map_. Otherwise we might race
62 // with them in Schedule or CurrentThreadId.
63 init_barrier_->Wait();
64#endif
65 }
66
67 ~ThreadPoolTempl() {
68 done_ = true;
69
70 // Now if all threads block without work, they will start exiting.
71 // But note that threads can continue to work arbitrary long,
72 // block, submit new work, unblock and otherwise live full life.
73 if (!cancelled_) {
74 ec_.Notify(true);
75 } else {
76 // Since we were cancelled, there might be entries in the queues.
77 // Empty them to prevent their destructor from asserting.
78 for (size_t i = 0; i < thread_data_.size(); i++) {
79 thread_data_[i].queue.Flush();
80 }
81 }
82 // Join threads explicitly (by destroying) to avoid destruction order within
83 // this class.
84 for (size_t i = 0; i < thread_data_.size(); ++i) thread_data_[i].thread.reset();
85 }
86
87 void SetStealPartitions(const std::vector<std::pair<unsigned, unsigned>>& partitions) {
88 eigen_plain_assert(partitions.size() == static_cast<std::size_t>(num_threads_));
89
90 // Pass this information to each thread queue.
91 for (int i = 0; i < num_threads_; i++) {
92 const auto& pair = partitions[i];
93 unsigned start = pair.first, end = pair.second;
94 AssertBounds(start, end);
95 unsigned val = EncodePartition(start, end);
96 SetStealPartition(i, val);
97 }
98 }
99
100 void Schedule(std::function<void()> fn) EIGEN_OVERRIDE { ScheduleWithHint(std::move(fn), 0, num_threads_); }
101
102 void ScheduleWithHint(std::function<void()> fn, int start, int limit) override {
103 Task t = env_.CreateTask(std::move(fn));
104 PerThread* pt = GetPerThread();
105 if (pt->pool == this) {
106 // Worker thread of this pool, push onto the thread's queue.
107 Queue& q = thread_data_[pt->thread_id].queue;
108 t = q.PushFront(std::move(t));
109 } else {
110 // A free-standing thread (or worker of another pool), push onto a random
111 // queue.
112 eigen_plain_assert(start < limit);
113 eigen_plain_assert(limit <= num_threads_);
114 int num_queues = limit - start;
115 int rnd = Rand(&pt->rand) % num_queues;
116 eigen_plain_assert(start + rnd < limit);
117 Queue& q = thread_data_[start + rnd].queue;
118 t = q.PushBack(std::move(t));
119 }
120 // Note: below we touch this after making w available to worker threads.
121 // Strictly speaking, this can lead to a racy-use-after-free. Consider that
122 // Schedule is called from a thread that is neither main thread nor a worker
123 // thread of this pool. Then, execution of w directly or indirectly
124 // completes overall computations, which in turn leads to destruction of
125 // this. We expect that such scenario is prevented by program, that is,
126 // this is kept alive while any threads can potentially be in Schedule.
127 if (!t.f) {
128 ec_.Notify(false);
129 } else {
130 env_.ExecuteTask(t); // Push failed, execute directly.
131 }
132 }
133
134 void Cancel() EIGEN_OVERRIDE {
135 cancelled_ = true;
136 done_ = true;
137
138 // Let each thread know it's been cancelled.
139#ifdef EIGEN_THREAD_ENV_SUPPORTS_CANCELLATION
140 for (size_t i = 0; i < thread_data_.size(); i++) {
141 thread_data_[i].thread->OnCancel();
142 }
143#endif
144
145 // Wake up the threads without work to let them exit on their own.
146 ec_.Notify(true);
147 }
148
149 int NumThreads() const EIGEN_FINAL { return num_threads_; }
150
151 int CurrentThreadId() const EIGEN_FINAL {
152 const PerThread* pt = const_cast<ThreadPoolTempl*>(this)->GetPerThread();
153 if (pt->pool == this) {
154 return pt->thread_id;
155 } else {
156 return -1;
157 }
158 }
159
160 private:
161 // Create a single atomic<int> that encodes start and limit information for
162 // each thread.
163 // We expect num_threads_ < 65536, so we can store them in a single
164 // std::atomic<unsigned>.
165 // Exposed publicly as static functions so that external callers can reuse
166 // this encode/decode logic for maintaining their own thread-safe copies of
167 // scheduling and steal domain(s).
168 static const int kMaxPartitionBits = 16;
169 static const int kMaxThreads = 1 << kMaxPartitionBits;
170
171 inline unsigned EncodePartition(unsigned start, unsigned limit) { return (start << kMaxPartitionBits) | limit; }
172
173 inline void DecodePartition(unsigned val, unsigned* start, unsigned* limit) {
174 *limit = val & (kMaxThreads - 1);
175 val >>= kMaxPartitionBits;
176 *start = val;
177 }
178
179 void AssertBounds(int start, int end) {
180 eigen_plain_assert(start >= 0);
181 eigen_plain_assert(start < end); // non-zero sized partition
182 eigen_plain_assert(end <= num_threads_);
183 }
184
185 inline void SetStealPartition(size_t i, unsigned val) {
186 thread_data_[i].steal_partition.store(val, std::memory_order_relaxed);
187 }
188
189 inline unsigned GetStealPartition(int i) { return thread_data_[i].steal_partition.load(std::memory_order_relaxed); }
190
191 void ComputeCoprimes(int N, MaxSizeVector<unsigned>* coprimes) {
192 for (int i = 1; i <= N; i++) {
193 unsigned a = i;
194 unsigned b = N;
195 // If GCD(a, b) == 1, then a and b are coprimes.
196 while (b != 0) {
197 unsigned tmp = a;
198 a = b;
199 b = tmp % b;
200 }
201 if (a == 1) {
202 coprimes->push_back(i);
203 }
204 }
205 }
206
207 typedef typename Environment::EnvThread Thread;
208
209 struct PerThread {
210 constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) {}
211 ThreadPoolTempl* pool; // Parent pool, or null for normal threads.
212 uint64_t rand; // Random generator state.
213 int thread_id; // Worker thread index in pool.
214#ifndef EIGEN_THREAD_LOCAL
215 // Prevent false sharing.
216 char pad_[128];
217#endif
218 };
219
220 struct ThreadData {
221 constexpr ThreadData() : thread(), steal_partition(0), queue() {}
222 std::unique_ptr<Thread> thread;
223 std::atomic<unsigned> steal_partition;
224 Queue queue;
225 };
226
227 Environment env_;
228 const int num_threads_;
229 const bool allow_spinning_;
230 MaxSizeVector<ThreadData> thread_data_;
231 MaxSizeVector<MaxSizeVector<unsigned>> all_coprimes_;
232 MaxSizeVector<EventCount::Waiter> waiters_;
233 unsigned global_steal_partition_;
234 std::atomic<unsigned> blocked_;
235 std::atomic<bool> spinning_;
236 std::atomic<bool> done_;
237 std::atomic<bool> cancelled_;
238 EventCount ec_;
239#ifndef EIGEN_THREAD_LOCAL
240 std::unique_ptr<Barrier> init_barrier_;
241 EIGEN_MUTEX per_thread_map_mutex_; // Protects per_thread_map_.
242 std::unordered_map<uint64_t, std::unique_ptr<PerThread>> per_thread_map_;
243#endif
244
245 // Main worker thread loop.
246 void WorkerLoop(int thread_id) {
247#ifndef EIGEN_THREAD_LOCAL
248 std::unique_ptr<PerThread> new_pt(new PerThread());
249 per_thread_map_mutex_.lock();
250 bool insertOK = per_thread_map_.emplace(GlobalThreadIdHash(), std::move(new_pt)).second;
251 eigen_plain_assert(insertOK);
252 EIGEN_UNUSED_VARIABLE(insertOK);
253 per_thread_map_mutex_.unlock();
254 init_barrier_->Notify();
255 init_barrier_->Wait();
256#endif
257 PerThread* pt = GetPerThread();
258 pt->pool = this;
259 pt->rand = GlobalThreadIdHash();
260 pt->thread_id = thread_id;
261 Queue& q = thread_data_[thread_id].queue;
262 EventCount::Waiter* waiter = &waiters_[thread_id];
263 // TODO(dvyukov,rmlarsen): The time spent in NonEmptyQueueIndex() is
264 // proportional to num_threads_ and we assume that new work is scheduled at
265 // a constant rate, so we set spin_count to 5000 / num_threads_. The
266 // constant was picked based on a fair dice roll, tune it.
267 const int spin_count = allow_spinning_ && num_threads_ > 0 ? 5000 / num_threads_ : 0;
268 if (num_threads_ == 1) {
269 // For num_threads_ == 1 there is no point in going through the expensive
270 // steal loop. Moreover, since NonEmptyQueueIndex() calls PopBack() on the
271 // victim queues it might reverse the order in which ops are executed
272 // compared to the order in which they are scheduled, which tends to be
273 // counter-productive for the types of I/O workloads the single thread
274 // pools tend to be used for.
275 while (!cancelled_) {
276 Task t = q.PopFront();
277 for (int i = 0; i < spin_count && !t.f; i++) {
278 if (!cancelled_.load(std::memory_order_relaxed)) {
279 t = q.PopFront();
280 }
281 }
282 if (!t.f) {
283 if (!WaitForWork(waiter, &t)) {
284 return;
285 }
286 }
287 if (t.f) {
288 env_.ExecuteTask(t);
289 }
290 }
291 } else {
292 while (!cancelled_) {
293 Task t = q.PopFront();
294 if (!t.f) {
295 t = LocalSteal();
296 if (!t.f) {
297 t = GlobalSteal();
298 if (!t.f) {
299 // Leave one thread spinning. This reduces latency.
300 if (allow_spinning_ && !spinning_ && !spinning_.exchange(true)) {
301 for (int i = 0; i < spin_count && !t.f; i++) {
302 if (!cancelled_.load(std::memory_order_relaxed)) {
303 t = GlobalSteal();
304 } else {
305 return;
306 }
307 }
308 spinning_ = false;
309 }
310 if (!t.f) {
311 if (!WaitForWork(waiter, &t)) {
312 return;
313 }
314 }
315 }
316 }
317 }
318 if (t.f) {
319 env_.ExecuteTask(t);
320 }
321 }
322 }
323 }
324
325 // Steal tries to steal work from other worker threads in the range [start,
326 // limit) in best-effort manner.
327 Task Steal(unsigned start, unsigned limit) {
328 PerThread* pt = GetPerThread();
329 const size_t size = limit - start;
330 unsigned r = Rand(&pt->rand);
331 // Reduce r into [0, size) range, this utilizes trick from
332 // https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
333 eigen_plain_assert(all_coprimes_[size - 1].size() < (1 << 30));
334 unsigned victim = ((uint64_t)r * (uint64_t)size) >> 32;
335 unsigned index = ((uint64_t)all_coprimes_[size - 1].size() * (uint64_t)r) >> 32;
336 unsigned inc = all_coprimes_[size - 1][index];
337
338 for (unsigned i = 0; i < size; i++) {
339 eigen_plain_assert(start + victim < limit);
340 Task t = thread_data_[start + victim].queue.PopBack();
341 if (t.f) {
342 return t;
343 }
344 victim += inc;
345 if (victim >= size) {
346 victim -= static_cast<unsigned int>(size);
347 }
348 }
349 return Task();
350 }
351
352 // Steals work within threads belonging to the partition.
353 Task LocalSteal() {
354 PerThread* pt = GetPerThread();
355 unsigned partition = GetStealPartition(pt->thread_id);
356 // If thread steal partition is the same as global partition, there is no
357 // need to go through the steal loop twice.
358 if (global_steal_partition_ == partition) return Task();
359 unsigned start, limit;
360 DecodePartition(partition, &start, &limit);
361 AssertBounds(start, limit);
362
363 return Steal(start, limit);
364 }
365
366 // Steals work from any other thread in the pool.
367 Task GlobalSteal() { return Steal(0, num_threads_); }
368
369 // WaitForWork blocks until new work is available (returns true), or if it is
370 // time to exit (returns false). Can optionally return a task to execute in t
371 // (in such case t.f != nullptr on return).
372 bool WaitForWork(EventCount::Waiter* waiter, Task* t) {
373 eigen_plain_assert(!t->f);
374 // We already did best-effort emptiness check in Steal, so prepare for
375 // blocking.
376 ec_.Prewait();
377 // Now do a reliable emptiness check.
378 int victim = NonEmptyQueueIndex();
379 if (victim != -1) {
380 ec_.CancelWait();
381 if (cancelled_) {
382 return false;
383 } else {
384 *t = thread_data_[victim].queue.PopBack();
385 return true;
386 }
387 }
388 // Number of blocked threads is used as termination condition.
389 // If we are shutting down and all worker threads blocked without work,
390 // that's we are done.
391 blocked_++;
392 // TODO is blocked_ required to be unsigned?
393 if (done_ && blocked_ == static_cast<unsigned>(num_threads_)) {
394 ec_.CancelWait();
395 // Almost done, but need to re-check queues.
396 // Consider that all queues are empty and all worker threads are preempted
397 // right after incrementing blocked_ above. Now a free-standing thread
398 // submits work and calls destructor (which sets done_). If we don't
399 // re-check queues, we will exit leaving the work unexecuted.
400 if (NonEmptyQueueIndex() != -1) {
401 // Note: we must not pop from queues before we decrement blocked_,
402 // otherwise the following scenario is possible. Consider that instead
403 // of checking for emptiness we popped the only element from queues.
404 // Now other worker threads can start exiting, which is bad if the
405 // work item submits other work. So we just check emptiness here,
406 // which ensures that all worker threads exit at the same time.
407 blocked_--;
408 return true;
409 }
410 // Reached stable termination state.
411 ec_.Notify(true);
412 return false;
413 }
414 ec_.CommitWait(waiter);
415 blocked_--;
416 return true;
417 }
418
419 int NonEmptyQueueIndex() {
420 PerThread* pt = GetPerThread();
421 // We intentionally design NonEmptyQueueIndex to steal work from
422 // anywhere in the queue so threads don't block in WaitForWork() forever
423 // when all threads in their partition go to sleep. Steal is still local.
424 const size_t size = thread_data_.size();
425 unsigned r = Rand(&pt->rand);
426 unsigned inc = all_coprimes_[size - 1][r % all_coprimes_[size - 1].size()];
427 unsigned victim = r % size;
428 for (unsigned i = 0; i < size; i++) {
429 if (!thread_data_[victim].queue.Empty()) {
430 return victim;
431 }
432 victim += inc;
433 if (victim >= size) {
434 victim -= static_cast<unsigned int>(size);
435 }
436 }
437 return -1;
438 }
439
440 static EIGEN_STRONG_INLINE uint64_t GlobalThreadIdHash() {
441 return std::hash<std::thread::id>()(std::this_thread::get_id());
442 }
443
444 EIGEN_STRONG_INLINE PerThread* GetPerThread() {
445#ifndef EIGEN_THREAD_LOCAL
446 static PerThread dummy;
447 auto it = per_thread_map_.find(GlobalThreadIdHash());
448 if (it == per_thread_map_.end()) {
449 return &dummy;
450 } else {
451 return it->second.get();
452 }
453#else
454 EIGEN_THREAD_LOCAL PerThread per_thread_;
455 PerThread* pt = &per_thread_;
456 return pt;
457#endif
458 }
459
460 static EIGEN_STRONG_INLINE unsigned Rand(uint64_t* state) {
461 uint64_t current = *state;
462 // Update the internal state
463 *state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL;
464 // Generate the random output (using the PCG-XSH-RS scheme)
465 return static_cast<unsigned>((current ^ (current >> 22)) >> (22 + (current >> 61)));
466 }
467};
468
469typedef ThreadPoolTempl<StlThreadEnvironment> ThreadPool;
470
471} // namespace Eigen
472
473#endif // EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
Namespace containing all symbols from the Eigen library.
Definition Core:137