Eigen  3.4.90 (git rev 5a9f66fb35d03a4da9ef8976e67a61b30aa16dcf)
 
Loading...
Searching...
No Matches
RunQueue.h
1// This file is part of Eigen, a lightweight C++ template library
2// for linear algebra.
3//
4// Copyright (C) 2016 Dmitry Vyukov <[email protected]>
5//
6// This Source Code Form is subject to the terms of the Mozilla
7// Public License v. 2.0. If a copy of the MPL was not distributed
8// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9
10#ifndef EIGEN_CXX11_THREADPOOL_RUNQUEUE_H
11#define EIGEN_CXX11_THREADPOOL_RUNQUEUE_H
12
13// IWYU pragma: private
14#include "./InternalHeaderCheck.h"
15
16namespace Eigen {
17
18// RunQueue is a fixed-size, partially non-blocking deque or Work items.
19// Operations on front of the queue must be done by a single thread (owner),
20// operations on back of the queue can be done by multiple threads concurrently.
21//
22// Algorithm outline:
23// All remote threads operating on the queue back are serialized by a mutex.
24// This ensures that at most two threads access state: owner and one remote
25// thread (Size aside). The algorithm ensures that the occupied region of the
26// underlying array is logically continuous (can wraparound, but no stray
27// occupied elements). Owner operates on one end of this region, remote thread
28// operates on the other end. Synchronization between these threads
29// (potential consumption of the last element and take up of the last empty
30// element) happens by means of state variable in each element. States are:
31// empty, busy (in process of insertion of removal) and ready. Threads claim
32// elements (empty->busy and ready->busy transitions) by means of a CAS
33// operation. The finishing transition (busy->empty and busy->ready) are done
34// with plain store as the element is exclusively owned by the current thread.
35//
36// Note: we could permit only pointers as elements, then we would not need
37// separate state variable as null/non-null pointer value would serve as state,
38// but that would require malloc/free per operation for large, complex values
39// (and this is designed to store std::function<()>).
40template <typename Work, unsigned kSize>
41class RunQueue {
42 public:
43 RunQueue() : front_(0), back_(0) {
44 // require power-of-two for fast masking
45 eigen_plain_assert((kSize & (kSize - 1)) == 0);
46 eigen_plain_assert(kSize > 2); // why would you do this?
47 eigen_plain_assert(kSize <= (64 << 10)); // leave enough space for counter
48 for (unsigned i = 0; i < kSize; i++) array_[i].state.store(kEmpty, std::memory_order_relaxed);
49 }
50
51 ~RunQueue() { eigen_plain_assert(Size() == 0); }
52
53 // PushFront inserts w at the beginning of the queue.
54 // If queue is full returns w, otherwise returns default-constructed Work.
55 Work PushFront(Work w) {
56 unsigned front = front_.load(std::memory_order_relaxed);
57 Elem* e = &array_[front & kMask];
58 uint8_t s = e->state.load(std::memory_order_relaxed);
59 if (s != kEmpty || !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) return w;
60 front_.store(front + 1 + (kSize << 1), std::memory_order_relaxed);
61 e->w = std::move(w);
62 e->state.store(kReady, std::memory_order_release);
63 return Work();
64 }
65
66 // PopFront removes and returns the first element in the queue.
67 // If the queue was empty returns default-constructed Work.
68 Work PopFront() {
69 unsigned front = front_.load(std::memory_order_relaxed);
70 Elem* e = &array_[(front - 1) & kMask];
71 uint8_t s = e->state.load(std::memory_order_relaxed);
72 if (s != kReady || !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) return Work();
73 Work w = std::move(e->w);
74 e->state.store(kEmpty, std::memory_order_release);
75 front = ((front - 1) & kMask2) | (front & ~kMask2);
76 front_.store(front, std::memory_order_relaxed);
77 return w;
78 }
79
80 // PushBack adds w at the end of the queue.
81 // If queue is full returns w, otherwise returns default-constructed Work.
82 Work PushBack(Work w) {
83 EIGEN_MUTEX_LOCK lock(mutex_);
84 unsigned back = back_.load(std::memory_order_relaxed);
85 Elem* e = &array_[(back - 1) & kMask];
86 uint8_t s = e->state.load(std::memory_order_relaxed);
87 if (s != kEmpty || !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) return w;
88 back = ((back - 1) & kMask2) | (back & ~kMask2);
89 back_.store(back, std::memory_order_relaxed);
90 e->w = std::move(w);
91 e->state.store(kReady, std::memory_order_release);
92 return Work();
93 }
94
95 // PopBack removes and returns the last elements in the queue.
96 Work PopBack() {
97 if (Empty()) return Work();
98 EIGEN_MUTEX_LOCK lock(mutex_);
99 unsigned back = back_.load(std::memory_order_relaxed);
100 Elem* e = &array_[back & kMask];
101 uint8_t s = e->state.load(std::memory_order_relaxed);
102 if (s != kReady || !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) return Work();
103 Work w = std::move(e->w);
104 e->state.store(kEmpty, std::memory_order_release);
105 back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed);
106 return w;
107 }
108
109 // PopBackHalf removes and returns half last elements in the queue.
110 // Returns number of elements removed.
111 unsigned PopBackHalf(std::vector<Work>* result) {
112 if (Empty()) return 0;
113 EIGEN_MUTEX_LOCK lock(mutex_);
114 unsigned back = back_.load(std::memory_order_relaxed);
115 unsigned size = Size();
116 unsigned mid = back;
117 if (size > 1) mid = back + (size - 1) / 2;
118 unsigned n = 0;
119 unsigned start = 0;
120 for (; static_cast<int>(mid - back) >= 0; mid--) {
121 Elem* e = &array_[mid & kMask];
122 uint8_t s = e->state.load(std::memory_order_relaxed);
123 if (n == 0) {
124 if (s != kReady || !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) continue;
125 start = mid;
126 } else {
127 // Note: no need to store temporal kBusy, we exclusively own these
128 // elements.
129 eigen_plain_assert(s == kReady);
130 }
131 result->push_back(std::move(e->w));
132 e->state.store(kEmpty, std::memory_order_release);
133 n++;
134 }
135 if (n != 0) back_.store(start + 1 + (kSize << 1), std::memory_order_relaxed);
136 return n;
137 }
138
139 // Size returns current queue size.
140 // Can be called by any thread at any time.
141 unsigned Size() const { return SizeOrNotEmpty<true>(); }
142
143 // Empty tests whether container is empty.
144 // Can be called by any thread at any time.
145 bool Empty() const { return SizeOrNotEmpty<false>() == 0; }
146
147 // Delete all the elements from the queue.
148 void Flush() {
149 while (!Empty()) {
150 PopFront();
151 }
152 }
153
154 private:
155 static const unsigned kMask = kSize - 1;
156 static const unsigned kMask2 = (kSize << 1) - 1;
157 struct Elem {
158 std::atomic<uint8_t> state;
159 Work w;
160 };
161 enum {
162 kEmpty,
163 kBusy,
164 kReady,
165 };
166 EIGEN_MUTEX mutex_;
167 // Low log(kSize) + 1 bits in front_ and back_ contain rolling index of
168 // front/back, respectively. The remaining bits contain modification counters
169 // that are incremented on Push operations. This allows us to (1) distinguish
170 // between empty and full conditions (if we would use log(kSize) bits for
171 // position, these conditions would be indistinguishable); (2) obtain
172 // consistent snapshot of front_/back_ for Size operation using the
173 // modification counters.
174 std::atomic<unsigned> front_;
175 std::atomic<unsigned> back_;
176 Elem array_[kSize];
177
178 // SizeOrNotEmpty returns current queue size; if NeedSizeEstimate is false,
179 // only whether the size is 0 is guaranteed to be correct.
180 // Can be called by any thread at any time.
181 template <bool NeedSizeEstimate>
182 unsigned SizeOrNotEmpty() const {
183 // Emptiness plays critical role in thread pool blocking. So we go to great
184 // effort to not produce false positives (claim non-empty queue as empty).
185 unsigned front = front_.load(std::memory_order_acquire);
186 for (;;) {
187 // Capture a consistent snapshot of front/tail.
188 unsigned back = back_.load(std::memory_order_acquire);
189 unsigned front1 = front_.load(std::memory_order_relaxed);
190 if (front != front1) {
191 front = front1;
192 std::atomic_thread_fence(std::memory_order_acquire);
193 continue;
194 }
195 if (NeedSizeEstimate) {
196 return CalculateSize(front, back);
197 } else {
198 // This value will be 0 if the queue is empty, and undefined otherwise.
199 unsigned maybe_zero = ((front ^ back) & kMask2);
200 // Queue size estimate must agree with maybe zero check on the queue
201 // empty/non-empty state.
202 eigen_assert((CalculateSize(front, back) == 0) == (maybe_zero == 0));
203 return maybe_zero;
204 }
205 }
206 }
207
208 EIGEN_ALWAYS_INLINE unsigned CalculateSize(unsigned front, unsigned back) const {
209 int size = (front & kMask2) - (back & kMask2);
210 // Fix overflow.
211 if (size < 0) size += 2 * kSize;
212 // Order of modification in push/pop is crafted to make the queue look
213 // larger than it is during concurrent modifications. E.g. push can
214 // increment size before the corresponding pop has decremented it.
215 // So the computed size can be up to kSize + 1, fix it.
216 if (size > static_cast<int>(kSize)) size = kSize;
217 return static_cast<unsigned>(size);
218 }
219
220 RunQueue(const RunQueue&) = delete;
221 void operator=(const RunQueue&) = delete;
222};
223
224} // namespace Eigen
225
226#endif // EIGEN_CXX11_THREADPOOL_RUNQUEUE_H
Namespace containing all symbols from the Eigen library.
Definition Core:137