Eigen  3.4.90 (git rev 5a9f66fb35d03a4da9ef8976e67a61b30aa16dcf)
 
Loading...
Searching...
No Matches
CoreThreadPoolDevice.h
1// This file is part of Eigen, a lightweight C++ template library
2// for linear algebra.
3//
4// Copyright (C) 2023 Charlie Schlosser <[email protected]>
5//
6// This Source Code Form is subject to the terms of the Mozilla
7// Public License v. 2.0. If a copy of the MPL was not distributed
8// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9
10#ifndef EIGEN_CORE_THREAD_POOL_DEVICE_H
11#define EIGEN_CORE_THREAD_POOL_DEVICE_H
12
13namespace Eigen {
14
15// CoreThreadPoolDevice provides an easy-to-understand Device for parallelizing Eigen Core expressions with
16// Threadpool. Expressions are recursively split evenly until the evaluation cost is less than the threshold for
17// delegating the task to a thread.
18
19// a
20// / \
21// / \
22// / \
23// / \
24// / \
25// / \
26// / \
27// a e
28// / \ / \
29// / \ / \
30// / \ / \
31// a c e g
32// / \ / \ / \ / \
33// / \ / \ / \ / \
34// a b c d e f g h
35
36// Each task descends the binary tree to the left, delegates the right task to a new thread, and continues to the
37// left. This ensures that work is evenly distributed to the thread pool as quickly as possible and minimizes the number
38// of tasks created during the evaluation. Consider an expression that is divided into 8 chunks. The
39// primary task 'a' creates tasks 'e' 'c' and 'b', and executes its portion of the expression at the bottom of the
40// tree. Likewise, task 'e' creates tasks 'g' and 'f', and executes its portion of the expression.
41
42struct CoreThreadPoolDevice {
43 using Task = std::function<void()>;
44 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE CoreThreadPoolDevice(ThreadPool& pool, float threadCostThreshold = 3e-5f)
45 : m_pool(pool) {
46 eigen_assert(threadCostThreshold >= 0.0f && "threadCostThreshold must be non-negative");
47 m_costFactor = threadCostThreshold;
48 }
49
50 template <int PacketSize>
51 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE int calculateLevels(Index size, float cost) const {
52 eigen_assert(cost >= 0.0f && "cost must be non-negative");
53 Index numOps = size / PacketSize;
54 int actualThreads = numOps < m_pool.NumThreads() ? static_cast<int>(numOps) : m_pool.NumThreads();
55 float totalCost = static_cast<float>(numOps) * cost;
56 float idealThreads = totalCost * m_costFactor;
57 if (idealThreads < static_cast<float>(actualThreads)) {
58 idealThreads = numext::maxi(idealThreads, 1.0f);
59 actualThreads = numext::mini(actualThreads, static_cast<int>(idealThreads));
60 }
61 int maxLevel = internal::log2_ceil(actualThreads);
62 return maxLevel;
63 }
64
65// MSVC does not like inlining parallelForImpl
66#if EIGEN_COMP_MSVC && !EIGEN_COMP_CLANG
67#define EIGEN_PARALLEL_FOR_INLINE
68#else
69#define EIGEN_PARALLEL_FOR_INLINE EIGEN_STRONG_INLINE
70#endif
71
72 template <typename UnaryFunctor, int PacketSize>
73 EIGEN_DEVICE_FUNC EIGEN_PARALLEL_FOR_INLINE void parallelForImpl(Index begin, Index end, UnaryFunctor& f,
74 Barrier& barrier, int level) {
75 while (level > 0) {
76 level--;
77 Index size = end - begin;
78 eigen_assert(size % PacketSize == 0 && "this function assumes size is a multiple of PacketSize");
79 Index mid = begin + numext::round_down(size >> 1, PacketSize);
80 Task right = [this, mid, end, &f, &barrier, level]() {
81 parallelForImpl<UnaryFunctor, PacketSize>(mid, end, f, barrier, level);
82 };
83 m_pool.Schedule(std::move(right));
84 end = mid;
85 }
86 for (Index i = begin; i < end; i += PacketSize) f(i);
87 barrier.Notify();
88 }
89
90 template <typename BinaryFunctor, int PacketSize>
91 EIGEN_DEVICE_FUNC EIGEN_PARALLEL_FOR_INLINE void parallelForImpl(Index outerBegin, Index outerEnd, Index innerBegin,
92 Index innerEnd, BinaryFunctor& f, Barrier& barrier,
93 int level) {
94 while (level > 0) {
95 level--;
96 Index outerSize = outerEnd - outerBegin;
97 if (outerSize > 1) {
98 Index outerMid = outerBegin + (outerSize >> 1);
99 Task right = [this, &f, &barrier, outerMid, outerEnd, innerBegin, innerEnd, level]() {
100 parallelForImpl<BinaryFunctor, PacketSize>(outerMid, outerEnd, innerBegin, innerEnd, f, barrier, level);
101 };
102 m_pool.Schedule(std::move(right));
103 outerEnd = outerMid;
104 } else {
105 Index innerSize = innerEnd - innerBegin;
106 eigen_assert(innerSize % PacketSize == 0 && "this function assumes innerSize is a multiple of PacketSize");
107 Index innerMid = innerBegin + numext::round_down(innerSize >> 1, PacketSize);
108 Task right = [this, &f, &barrier, outerBegin, outerEnd, innerMid, innerEnd, level]() {
109 parallelForImpl<BinaryFunctor, PacketSize>(outerBegin, outerEnd, innerMid, innerEnd, f, barrier, level);
110 };
111 m_pool.Schedule(std::move(right));
112 innerEnd = innerMid;
113 }
114 }
115 for (Index outer = outerBegin; outer < outerEnd; outer++)
116 for (Index inner = innerBegin; inner < innerEnd; inner += PacketSize) f(outer, inner);
117 barrier.Notify();
118 }
119
120#undef EIGEN_PARALLEL_FOR_INLINE
121
122 template <typename UnaryFunctor, int PacketSize>
123 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void parallelFor(Index begin, Index end, UnaryFunctor& f, float cost) {
124 Index size = end - begin;
125 int maxLevel = calculateLevels<PacketSize>(size, cost);
126 Barrier barrier(1 << maxLevel);
127 parallelForImpl<UnaryFunctor, PacketSize>(begin, end, f, barrier, maxLevel);
128 barrier.Wait();
129 }
130
131 template <typename BinaryFunctor, int PacketSize>
132 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void parallelFor(Index outerBegin, Index outerEnd, Index innerBegin,
133 Index innerEnd, BinaryFunctor& f, float cost) {
134 Index outerSize = outerEnd - outerBegin;
135 Index innerSize = innerEnd - innerBegin;
136 Index size = outerSize * innerSize;
137 int maxLevel = calculateLevels<PacketSize>(size, cost);
138 Barrier barrier(1 << maxLevel);
139 parallelForImpl<BinaryFunctor, PacketSize>(outerBegin, outerEnd, innerBegin, innerEnd, f, barrier, maxLevel);
140 barrier.Wait();
141 }
142
143 ThreadPool& m_pool;
144 // costFactor is the cost of delegating a task to a thread
145 // the inverse is used to avoid a floating point division
146 float m_costFactor;
147};
148
149// specialization of coefficient-wise assignment loops for CoreThreadPoolDevice
150
151namespace internal {
152
153template <typename Kernel>
154struct cost_helper {
155 using SrcEvaluatorType = typename Kernel::SrcEvaluatorType;
156 using DstEvaluatorType = typename Kernel::DstEvaluatorType;
157 using SrcXprType = typename SrcEvaluatorType::XprType;
158 using DstXprType = typename DstEvaluatorType::XprType;
159 static constexpr Index Cost = functor_cost<SrcXprType>::Cost + functor_cost<DstXprType>::Cost;
160};
161
162template <typename Kernel>
163struct dense_assignment_loop_with_device<Kernel, CoreThreadPoolDevice, DefaultTraversal, NoUnrolling> {
164 static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost;
165 struct AssignmentFunctor : public Kernel {
166 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel& kernel) : Kernel(kernel) {}
167 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer, Index inner) {
168 this->assignCoeffByOuterInner(outer, inner);
169 }
170 };
171
172 static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) {
173 const Index innerSize = kernel.innerSize();
174 const Index outerSize = kernel.outerSize();
175 constexpr float cost = static_cast<float>(XprEvaluationCost);
176 AssignmentFunctor functor(kernel);
177 device.template parallelFor<AssignmentFunctor, 1>(0, outerSize, 0, innerSize, functor, cost);
178 }
179};
180
181template <typename Kernel>
182struct dense_assignment_loop_with_device<Kernel, CoreThreadPoolDevice, DefaultTraversal, InnerUnrolling> {
183 using DstXprType = typename Kernel::DstEvaluatorType::XprType;
184 static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost, InnerSize = DstXprType::InnerSizeAtCompileTime;
185 struct AssignmentFunctor : public Kernel {
186 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel& kernel) : Kernel(kernel) {}
187 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer) {
188 copy_using_evaluator_DefaultTraversal_InnerUnrolling<Kernel, 0, InnerSize>::run(*this, outer);
189 }
190 };
191 static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) {
192 const Index outerSize = kernel.outerSize();
193 AssignmentFunctor functor(kernel);
194 constexpr float cost = static_cast<float>(XprEvaluationCost) * static_cast<float>(InnerSize);
195 device.template parallelFor<AssignmentFunctor, 1>(0, outerSize, functor, cost);
196 }
197};
198
199template <typename Kernel>
200struct dense_assignment_loop_with_device<Kernel, CoreThreadPoolDevice, InnerVectorizedTraversal, NoUnrolling> {
201 using PacketType = typename Kernel::PacketType;
202 static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost, PacketSize = unpacket_traits<PacketType>::size,
203 SrcAlignment = Kernel::AssignmentTraits::SrcAlignment,
204 DstAlignment = Kernel::AssignmentTraits::DstAlignment;
205 struct AssignmentFunctor : public Kernel {
206 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel& kernel) : Kernel(kernel) {}
207 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer, Index inner) {
208 this->template assignPacketByOuterInner<Unaligned, Unaligned, PacketType>(outer, inner);
209 }
210 };
211 static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) {
212 const Index innerSize = kernel.innerSize();
213 const Index outerSize = kernel.outerSize();
214 const float cost = static_cast<float>(XprEvaluationCost) * static_cast<float>(innerSize);
215 AssignmentFunctor functor(kernel);
216 device.template parallelFor<AssignmentFunctor, PacketSize>(0, outerSize, 0, innerSize, functor, cost);
217 }
218};
219
220template <typename Kernel>
221struct dense_assignment_loop_with_device<Kernel, CoreThreadPoolDevice, InnerVectorizedTraversal, InnerUnrolling> {
222 using PacketType = typename Kernel::PacketType;
223 using DstXprType = typename Kernel::DstEvaluatorType::XprType;
224 static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost, PacketSize = unpacket_traits<PacketType>::size,
225 SrcAlignment = Kernel::AssignmentTraits::SrcAlignment,
226 DstAlignment = Kernel::AssignmentTraits::DstAlignment,
227 InnerSize = DstXprType::InnerSizeAtCompileTime;
228 struct AssignmentFunctor : public Kernel {
229 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel& kernel) : Kernel(kernel) {}
230 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer) {
231 copy_using_evaluator_innervec_InnerUnrolling<Kernel, 0, InnerSize, SrcAlignment, DstAlignment>::run(*this, outer);
232 }
233 };
234 static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) {
235 const Index outerSize = kernel.outerSize();
236 constexpr float cost = static_cast<float>(XprEvaluationCost) * static_cast<float>(InnerSize);
237 AssignmentFunctor functor(kernel);
238 device.template parallelFor<AssignmentFunctor, PacketSize>(0, outerSize, functor, cost);
239 }
240};
241
242template <typename Kernel>
243struct dense_assignment_loop_with_device<Kernel, CoreThreadPoolDevice, SliceVectorizedTraversal, NoUnrolling> {
244 using Scalar = typename Kernel::Scalar;
245 using PacketType = typename Kernel::PacketType;
246 static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost, PacketSize = unpacket_traits<PacketType>::size;
247 struct PacketAssignmentFunctor : public Kernel {
248 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE PacketAssignmentFunctor(Kernel& kernel) : Kernel(kernel) {}
249 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer, Index inner) {
250 this->template assignPacketByOuterInner<Unaligned, Unaligned, PacketType>(outer, inner);
251 }
252 };
253 struct ScalarAssignmentFunctor : public Kernel {
254 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE ScalarAssignmentFunctor(Kernel& kernel) : Kernel(kernel) {}
255 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer) {
256 const Index innerSize = this->innerSize();
257 const Index packetAccessSize = numext::round_down(innerSize, PacketSize);
258 for (Index inner = packetAccessSize; inner < innerSize; inner++) this->assignCoeffByOuterInner(outer, inner);
259 }
260 };
261 static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) {
262 const Index outerSize = kernel.outerSize();
263 const Index innerSize = kernel.innerSize();
264 const Index packetAccessSize = numext::round_down(innerSize, PacketSize);
265 constexpr float packetCost = static_cast<float>(XprEvaluationCost);
266 const float scalarCost = static_cast<float>(XprEvaluationCost) * static_cast<float>(innerSize - packetAccessSize);
267 PacketAssignmentFunctor packetFunctor(kernel);
268 ScalarAssignmentFunctor scalarFunctor(kernel);
269 device.template parallelFor<PacketAssignmentFunctor, PacketSize>(0, outerSize, 0, packetAccessSize, packetFunctor,
270 packetCost);
271 device.template parallelFor<ScalarAssignmentFunctor, 1>(0, outerSize, scalarFunctor, scalarCost);
272 };
273};
274
275template <typename Kernel>
276struct dense_assignment_loop_with_device<Kernel, CoreThreadPoolDevice, LinearTraversal, NoUnrolling> {
277 static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost;
278 struct AssignmentFunctor : public Kernel {
279 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel& kernel) : Kernel(kernel) {}
280 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index index) { this->assignCoeff(index); }
281 };
282 static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) {
283 const Index size = kernel.size();
284 constexpr float cost = static_cast<float>(XprEvaluationCost);
285 AssignmentFunctor functor(kernel);
286 device.template parallelFor<AssignmentFunctor, 1>(0, size, functor, cost);
287 }
288};
289
290template <typename Kernel>
291struct dense_assignment_loop_with_device<Kernel, CoreThreadPoolDevice, LinearVectorizedTraversal, NoUnrolling> {
292 using Scalar = typename Kernel::Scalar;
293 using PacketType = typename Kernel::PacketType;
294 static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost,
295 RequestedAlignment = Kernel::AssignmentTraits::LinearRequiredAlignment,
296 PacketSize = unpacket_traits<PacketType>::size,
297 DstIsAligned = Kernel::AssignmentTraits::DstAlignment >= RequestedAlignment,
298 DstAlignment = packet_traits<Scalar>::AlignedOnScalar ? RequestedAlignment
299 : Kernel::AssignmentTraits::DstAlignment,
300 SrcAlignment = Kernel::AssignmentTraits::JointAlignment;
301 struct AssignmentFunctor : public Kernel {
302 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel& kernel) : Kernel(kernel) {}
303 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index index) {
304 this->template assignPacket<DstAlignment, SrcAlignment, PacketType>(index);
305 }
306 };
307 static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) {
308 const Index size = kernel.size();
309 const Index alignedStart =
310 DstIsAligned ? 0 : internal::first_aligned<RequestedAlignment>(kernel.dstDataPtr(), size);
311 const Index alignedEnd = alignedStart + numext::round_down(size - alignedStart, PacketSize);
312
313 unaligned_dense_assignment_loop<DstIsAligned != 0>::run(kernel, 0, alignedStart);
314
315 constexpr float cost = static_cast<float>(XprEvaluationCost);
316 AssignmentFunctor functor(kernel);
317 device.template parallelFor<AssignmentFunctor, PacketSize>(alignedStart, alignedEnd, functor, cost);
318
319 unaligned_dense_assignment_loop<>::run(kernel, alignedEnd, size);
320 }
321};
322
323} // namespace internal
324
325} // namespace Eigen
326
327#endif // EIGEN_CORE_THREAD_POOL_DEVICE_H
Namespace containing all symbols from the Eigen library.
Definition Core:137