src/ex/thread_pool.cpp

100.0% Lines (139/139) 100.0% List of functions (29/29)
thread_pool.cpp
f(x) Functions (29)
Function Calls Lines Blocks
boost::capy::thread_pool::impl::push(boost::capy::continuation*) :66 819x 100.0% 100.0% boost::capy::thread_pool::impl::pop() :76 980x 100.0% 100.0% boost::capy::thread_pool::impl::empty() const :87 1038x 100.0% 100.0% boost::capy::thread_pool::impl::~impl() :104 161x 100.0% 100.0% boost::capy::thread_pool::impl::running_in_this_thread() const :107 355x 100.0% 100.0% boost::capy::thread_pool::impl::drain_abandoned() :118 161x 100.0% 100.0% boost::capy::thread_pool::impl::impl(unsigned long, std::basic_string_view<char, std::char_traits<char> >) :128 161x 100.0% 72.0% boost::capy::thread_pool::impl::post(boost::capy::continuation&) :141 819x 100.0% 100.0% boost::capy::thread_pool::impl::on_work_started() :152 347x 100.0% 100.0% boost::capy::thread_pool::impl::on_work_finished() :158 347x 100.0% 100.0% boost::capy::thread_pool::impl::join() :172 173x 100.0% 85.0% boost::capy::thread_pool::impl::join()::{lambda()#1}::operator()() const :188 63x 100.0% 100.0% boost::capy::thread_pool::impl::stop() :200 163x 100.0% 100.0% boost::capy::thread_pool::impl::ensure_started() :212 819x 100.0% 100.0% boost::capy::thread_pool::impl::ensure_started()::{lambda()#1}::operator()() const :214 105x 100.0% 100.0% boost::capy::thread_pool::impl::ensure_started()::{lambda()#1}::operator()() const::{lambda()#1}::operator()() const :217 184x 100.0% 100.0% boost::capy::thread_pool::impl::run(unsigned long) :222 184x 100.0% 78.0% boost::capy::thread_pool::impl::run(unsigned long)::scoped_pool::scoped_pool(boost::capy::thread_pool::impl const*) :233 184x 100.0% 100.0% boost::capy::thread_pool::impl::run(unsigned long)::scoped_pool::~scoped_pool() :234 184x 100.0% 100.0% boost::capy::thread_pool::impl::run(unsigned long)::{lambda()#1}::operator()() const :242 1038x 100.0% 100.0% boost::capy::thread_pool::~thread_pool() :258 161x 100.0% 100.0% boost::capy::thread_pool::thread_pool(unsigned long, std::basic_string_view<char, std::char_traits<char> >) :269 161x 100.0% 55.0% boost::capy::thread_pool::join() :277 12x 100.0% 100.0% boost::capy::thread_pool::stop() :284 2x 100.0% 100.0% boost::capy::thread_pool::get_executor() const :293 167x 100.0% 100.0% boost::capy::thread_pool::executor_type::on_work_started() const :301 347x 100.0% 100.0% boost::capy::thread_pool::executor_type::on_work_finished() const :308 347x 100.0% 100.0% boost::capy::thread_pool::executor_type::post(boost::capy::continuation&) const :315 477x 100.0% 100.0% boost::capy::thread_pool::executor_type::dispatch(boost::capy::continuation&) const :322 355x 100.0% 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 // Copyright (c) 2026 Michael Vandeberg
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/boostorg/capy
9 //
10
11 #include <boost/capy/ex/thread_pool.hpp>
12 #include <boost/capy/continuation.hpp>
13 #include <boost/capy/detail/thread_local_ptr.hpp>
14 #include <boost/capy/ex/frame_allocator.hpp>
15 #include <boost/capy/test/thread_name.hpp>
16 #include <algorithm>
17 #include <atomic>
18 #include <condition_variable>
19 #include <cstdio>
20 #include <mutex>
21 #include <thread>
22 #include <vector>
23
24 /*
25 Thread pool implementation using a shared work queue.
26
27 Work items are continuations linked via their intrusive next pointer,
28 stored in a single queue protected by a mutex. No per-post heap
29 allocation: the continuation is owned by the caller and linked
30 directly. Worker threads wait on a condition_variable until work
31 is available or stop is requested.
32
33 Threads are started lazily on first post() via std::call_once to avoid
34 spawning threads for pools that are constructed but never used. Each
35 thread is named with a configurable prefix plus index for debugger
36 visibility.
37
38 Work tracking: on_work_started/on_work_finished maintain an atomic
39 outstanding_work_ counter. join() blocks until this counter reaches
40 zero, then signals workers to stop and joins threads.
41
42 Two shutdown paths:
43 - join(): waits for outstanding work to drain, then stops workers.
44 - stop(): immediately signals workers to exit; queued work is abandoned.
45 - Destructor: stop() then join() (abandon + wait for threads).
46 */
47
48 namespace boost {
49 namespace capy {
50
51 //------------------------------------------------------------------------------
52
53 class thread_pool::impl
54 {
55 // Identifies the pool owning the current worker thread, or
56 // nullptr if the calling thread is not a pool worker. Checked
57 // by dispatch() to decide between symmetric transfer (inline
58 // resume) and post.
59 static inline detail::thread_local_ptr<impl const> current_;
60
61 // Intrusive queue of continuations via continuation::next.
62 // No per-post allocation: the continuation is owned by the caller.
63 continuation* head_ = nullptr;
64 continuation* tail_ = nullptr;
65
66 819x void push(continuation* c) noexcept
67 {
68 819x c->next = nullptr;
69 819x if(tail_)
70 594x tail_->next = c;
71 else
72 225x head_ = c;
73 819x tail_ = c;
74 819x }
75
76 980x continuation* pop() noexcept
77 {
78 980x if(!head_)
79 161x return nullptr;
80 819x continuation* c = head_;
81 819x head_ = head_->next;
82 819x if(!head_)
83 225x tail_ = nullptr;
84 819x return c;
85 }
86
87 1038x bool empty() const noexcept
88 {
89 1038x return head_ == nullptr;
90 }
91
92 std::mutex mutex_;
93 std::condition_variable work_cv_;
94 std::condition_variable done_cv_;
95 std::vector<std::thread> threads_;
96 std::atomic<std::size_t> outstanding_work_{0};
97 bool stop_{false};
98 bool joined_{false};
99 std::size_t num_threads_;
100 char thread_name_prefix_[13]{}; // 12 chars max + null terminator
101 std::once_flag start_flag_;
102
103 public:
104 161x ~impl() = default;
105
106 bool
107 355x running_in_this_thread() const noexcept
108 {
109 355x return current_.get() == this;
110 }
111
112 // Destroy abandoned coroutine frames. Must be called
113 // before execution_context::shutdown()/destroy() so
114 // that suspended-frame destructors (e.g. delay_awaitable
115 // calling timer_service::cancel()) run while services
116 // are still valid.
117 void
118 161x drain_abandoned() noexcept
119 {
120 365x while(auto* c = pop())
121 {
122 204x auto h = c->h;
123 204x if(h && h != std::noop_coroutine())
124 153x h.destroy();
125 204x }
126 161x }
127
128 161x impl(std::size_t num_threads, std::string_view thread_name_prefix)
129 161x : num_threads_(num_threads)
130 {
131 161x if(num_threads_ == 0)
132 4x num_threads_ = std::max(
133 2x std::thread::hardware_concurrency(), 1u);
134
135 // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
136 161x auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
137 161x thread_name_prefix_[n] = '\0';
138 161x }
139
140 void
141 819x post(continuation& c)
142 {
143 819x ensure_started();
144 {
145 819x std::lock_guard<std::mutex> lock(mutex_);
146 819x push(&c);
147 819x }
148 819x work_cv_.notify_one();
149 819x }
150
151 void
152 347x on_work_started() noexcept
153 {
154 347x outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
155 347x }
156
157 void
158 347x on_work_finished() noexcept
159 {
160 347x if(outstanding_work_.fetch_sub(
161 347x 1, std::memory_order_acq_rel) == 1)
162 {
163 87x std::lock_guard<std::mutex> lock(mutex_);
164 87x if(joined_ && !stop_)
165 4x stop_ = true;
166 87x done_cv_.notify_all();
167 87x work_cv_.notify_all();
168 87x }
169 347x }
170
171 void
172 173x join() noexcept
173 {
174 {
175 173x std::unique_lock<std::mutex> lock(mutex_);
176 173x if(joined_)
177 12x return;
178 161x joined_ = true;
179
180 161x if(outstanding_work_.load(
181 161x std::memory_order_acquire) == 0)
182 {
183 103x stop_ = true;
184 103x work_cv_.notify_all();
185 }
186 else
187 {
188 58x done_cv_.wait(lock, [this]{
189 63x return stop_;
190 });
191 }
192 173x }
193
194 345x for(auto& t : threads_)
195 184x if(t.joinable())
196 184x t.join();
197 }
198
199 void
200 163x stop() noexcept
201 {
202 {
203 163x std::lock_guard<std::mutex> lock(mutex_);
204 163x stop_ = true;
205 163x }
206 163x work_cv_.notify_all();
207 163x done_cv_.notify_all();
208 163x }
209
210 private:
211 void
212 819x ensure_started()
213 {
214 819x std::call_once(start_flag_, [this]{
215 105x threads_.reserve(num_threads_);
216 289x for(std::size_t i = 0; i < num_threads_; ++i)
217 368x threads_.emplace_back([this, i]{ run(i); });
218 105x });
219 819x }
220
221 void
222 184x run(std::size_t index)
223 {
224 // Build name; set_current_thread_name truncates to platform limits.
225 char name[16];
226 184x std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
227 184x set_current_thread_name(name);
228
229 // Mark this thread as a worker of this pool so dispatch()
230 // can symmetric-transfer when called from within pool work.
231 struct scoped_pool
232 {
233 184x scoped_pool(impl const* p) noexcept { current_.set(p); }
234 184x ~scoped_pool() noexcept { current_.set(nullptr); }
235 184x } guard(this);
236
237 for(;;)
238 {
239 799x continuation* c = nullptr;
240 {
241 799x std::unique_lock<std::mutex> lock(mutex_);
242 799x work_cv_.wait(lock, [this]{
243 1358x return !empty() ||
244 1358x stop_;
245 });
246 799x if(stop_)
247 368x return;
248 615x c = pop();
249 799x }
250 615x if(c)
251 615x safe_resume(c->h);
252 615x }
253 184x }
254 };
255
256 //------------------------------------------------------------------------------
257
258 161x thread_pool::
259 ~thread_pool()
260 {
261 161x impl_->stop();
262 161x impl_->join();
263 161x impl_->drain_abandoned();
264 161x shutdown();
265 161x destroy();
266 161x delete impl_;
267 161x }
268
269 161x thread_pool::
270 161x thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
271 161x : impl_(new impl(num_threads, thread_name_prefix))
272 {
273 161x this->set_frame_allocator(std::allocator<void>{});
274 161x }
275
276 void
277 12x thread_pool::
278 join() noexcept
279 {
280 12x impl_->join();
281 12x }
282
283 void
284 2x thread_pool::
285 stop() noexcept
286 {
287 2x impl_->stop();
288 2x }
289
290 //------------------------------------------------------------------------------
291
292 thread_pool::executor_type
293 167x thread_pool::
294 get_executor() const noexcept
295 {
296 167x return executor_type(
297 167x const_cast<thread_pool&>(*this));
298 }
299
300 void
301 347x thread_pool::executor_type::
302 on_work_started() const noexcept
303 {
304 347x pool_->impl_->on_work_started();
305 347x }
306
307 void
308 347x thread_pool::executor_type::
309 on_work_finished() const noexcept
310 {
311 347x pool_->impl_->on_work_finished();
312 347x }
313
314 void
315 477x thread_pool::executor_type::
316 post(continuation& c) const
317 {
318 477x pool_->impl_->post(c);
319 477x }
320
321 std::coroutine_handle<>
322 355x thread_pool::executor_type::
323 dispatch(continuation& c) const
324 {
325 355x if(pool_->impl_->running_in_this_thread())
326 13x return c.h;
327 342x pool_->impl_->post(c);
328 342x return std::noop_coroutine();
329 }
330
331 } // capy
332 } // boost
333