TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2026 Michael Vandeberg
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/boostorg/capy
9 : //
10 :
11 : #include <boost/capy/ex/thread_pool.hpp>
12 : #include <boost/capy/continuation.hpp>
13 : #include <boost/capy/detail/thread_local_ptr.hpp>
14 : #include <boost/capy/ex/frame_allocator.hpp>
15 : #include <boost/capy/test/thread_name.hpp>
16 : #include <algorithm>
17 : #include <atomic>
18 : #include <condition_variable>
19 : #include <cstdio>
20 : #include <mutex>
21 : #include <thread>
22 : #include <vector>
23 :
24 : /*
25 : Thread pool implementation using a shared work queue.
26 :
27 : Work items are continuations linked via their intrusive next pointer,
28 : stored in a single queue protected by a mutex. No per-post heap
29 : allocation: the continuation is owned by the caller and linked
30 : directly. Worker threads wait on a condition_variable until work
31 : is available or stop is requested.
32 :
33 : Threads are started lazily on first post() via std::call_once to avoid
34 : spawning threads for pools that are constructed but never used. Each
35 : thread is named with a configurable prefix plus index for debugger
36 : visibility.
37 :
38 : Work tracking: on_work_started/on_work_finished maintain an atomic
39 : outstanding_work_ counter. join() blocks until this counter reaches
40 : zero, then signals workers to stop and joins threads.
41 :
42 : Two shutdown paths:
43 : - join(): waits for outstanding work to drain, then stops workers.
44 : - stop(): immediately signals workers to exit; queued work is abandoned.
45 : - Destructor: stop() then join() (abandon + wait for threads).
46 : */
47 :
48 : namespace boost {
49 : namespace capy {
50 :
51 : //------------------------------------------------------------------------------
52 :
53 : class thread_pool::impl
54 : {
55 : // Identifies the pool owning the current worker thread, or
56 : // nullptr if the calling thread is not a pool worker. Checked
57 : // by dispatch() to decide between symmetric transfer (inline
58 : // resume) and post.
59 : static inline detail::thread_local_ptr<impl const> current_;
60 :
61 : // Intrusive queue of continuations via continuation::next.
62 : // No per-post allocation: the continuation is owned by the caller.
63 : continuation* head_ = nullptr;
64 : continuation* tail_ = nullptr;
65 :
66 HIT 819 : void push(continuation* c) noexcept
67 : {
68 819 : c->next = nullptr;
69 819 : if(tail_)
70 594 : tail_->next = c;
71 : else
72 225 : head_ = c;
73 819 : tail_ = c;
74 819 : }
75 :
76 980 : continuation* pop() noexcept
77 : {
78 980 : if(!head_)
79 161 : return nullptr;
80 819 : continuation* c = head_;
81 819 : head_ = head_->next;
82 819 : if(!head_)
83 225 : tail_ = nullptr;
84 819 : return c;
85 : }
86 :
87 1038 : bool empty() const noexcept
88 : {
89 1038 : return head_ == nullptr;
90 : }
91 :
92 : std::mutex mutex_;
93 : std::condition_variable work_cv_;
94 : std::condition_variable done_cv_;
95 : std::vector<std::thread> threads_;
96 : std::atomic<std::size_t> outstanding_work_{0};
97 : bool stop_{false};
98 : bool joined_{false};
99 : std::size_t num_threads_;
100 : char thread_name_prefix_[13]{}; // 12 chars max + null terminator
101 : std::once_flag start_flag_;
102 :
103 : public:
104 161 : ~impl() = default;
105 :
106 : bool
107 355 : running_in_this_thread() const noexcept
108 : {
109 355 : return current_.get() == this;
110 : }
111 :
112 : // Destroy abandoned coroutine frames. Must be called
113 : // before execution_context::shutdown()/destroy() so
114 : // that suspended-frame destructors (e.g. delay_awaitable
115 : // calling timer_service::cancel()) run while services
116 : // are still valid.
117 : void
118 161 : drain_abandoned() noexcept
119 : {
120 365 : while(auto* c = pop())
121 : {
122 204 : auto h = c->h;
123 204 : if(h && h != std::noop_coroutine())
124 153 : h.destroy();
125 204 : }
126 161 : }
127 :
128 161 : impl(std::size_t num_threads, std::string_view thread_name_prefix)
129 161 : : num_threads_(num_threads)
130 : {
131 161 : if(num_threads_ == 0)
132 4 : num_threads_ = std::max(
133 2 : std::thread::hardware_concurrency(), 1u);
134 :
135 : // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
136 161 : auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
137 161 : thread_name_prefix_[n] = '\0';
138 161 : }
139 :
140 : void
141 819 : post(continuation& c)
142 : {
143 819 : ensure_started();
144 : {
145 819 : std::lock_guard<std::mutex> lock(mutex_);
146 819 : push(&c);
147 819 : }
148 819 : work_cv_.notify_one();
149 819 : }
150 :
151 : void
152 347 : on_work_started() noexcept
153 : {
154 347 : outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
155 347 : }
156 :
157 : void
158 347 : on_work_finished() noexcept
159 : {
160 347 : if(outstanding_work_.fetch_sub(
161 347 : 1, std::memory_order_acq_rel) == 1)
162 : {
163 87 : std::lock_guard<std::mutex> lock(mutex_);
164 87 : if(joined_ && !stop_)
165 4 : stop_ = true;
166 87 : done_cv_.notify_all();
167 87 : work_cv_.notify_all();
168 87 : }
169 347 : }
170 :
171 : void
172 173 : join() noexcept
173 : {
174 : {
175 173 : std::unique_lock<std::mutex> lock(mutex_);
176 173 : if(joined_)
177 12 : return;
178 161 : joined_ = true;
179 :
180 161 : if(outstanding_work_.load(
181 161 : std::memory_order_acquire) == 0)
182 : {
183 103 : stop_ = true;
184 103 : work_cv_.notify_all();
185 : }
186 : else
187 : {
188 58 : done_cv_.wait(lock, [this]{
189 63 : return stop_;
190 : });
191 : }
192 173 : }
193 :
194 345 : for(auto& t : threads_)
195 184 : if(t.joinable())
196 184 : t.join();
197 : }
198 :
199 : void
200 163 : stop() noexcept
201 : {
202 : {
203 163 : std::lock_guard<std::mutex> lock(mutex_);
204 163 : stop_ = true;
205 163 : }
206 163 : work_cv_.notify_all();
207 163 : done_cv_.notify_all();
208 163 : }
209 :
210 : private:
211 : void
212 819 : ensure_started()
213 : {
214 819 : std::call_once(start_flag_, [this]{
215 105 : threads_.reserve(num_threads_);
216 289 : for(std::size_t i = 0; i < num_threads_; ++i)
217 368 : threads_.emplace_back([this, i]{ run(i); });
218 105 : });
219 819 : }
220 :
221 : void
222 184 : run(std::size_t index)
223 : {
224 : // Build name; set_current_thread_name truncates to platform limits.
225 : char name[16];
226 184 : std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
227 184 : set_current_thread_name(name);
228 :
229 : // Mark this thread as a worker of this pool so dispatch()
230 : // can symmetric-transfer when called from within pool work.
231 : struct scoped_pool
232 : {
233 184 : scoped_pool(impl const* p) noexcept { current_.set(p); }
234 184 : ~scoped_pool() noexcept { current_.set(nullptr); }
235 184 : } guard(this);
236 :
237 : for(;;)
238 : {
239 799 : continuation* c = nullptr;
240 : {
241 799 : std::unique_lock<std::mutex> lock(mutex_);
242 799 : work_cv_.wait(lock, [this]{
243 1358 : return !empty() ||
244 1358 : stop_;
245 : });
246 799 : if(stop_)
247 368 : return;
248 615 : c = pop();
249 799 : }
250 615 : if(c)
251 615 : safe_resume(c->h);
252 615 : }
253 184 : }
254 : };
255 :
256 : //------------------------------------------------------------------------------
257 :
258 161 : thread_pool::
259 : ~thread_pool()
260 : {
261 161 : impl_->stop();
262 161 : impl_->join();
263 161 : impl_->drain_abandoned();
264 161 : shutdown();
265 161 : destroy();
266 161 : delete impl_;
267 161 : }
268 :
269 161 : thread_pool::
270 161 : thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
271 161 : : impl_(new impl(num_threads, thread_name_prefix))
272 : {
273 161 : this->set_frame_allocator(std::allocator<void>{});
274 161 : }
275 :
276 : void
277 12 : thread_pool::
278 : join() noexcept
279 : {
280 12 : impl_->join();
281 12 : }
282 :
283 : void
284 2 : thread_pool::
285 : stop() noexcept
286 : {
287 2 : impl_->stop();
288 2 : }
289 :
290 : //------------------------------------------------------------------------------
291 :
292 : thread_pool::executor_type
293 167 : thread_pool::
294 : get_executor() const noexcept
295 : {
296 167 : return executor_type(
297 167 : const_cast<thread_pool&>(*this));
298 : }
299 :
300 : void
301 347 : thread_pool::executor_type::
302 : on_work_started() const noexcept
303 : {
304 347 : pool_->impl_->on_work_started();
305 347 : }
306 :
307 : void
308 347 : thread_pool::executor_type::
309 : on_work_finished() const noexcept
310 : {
311 347 : pool_->impl_->on_work_finished();
312 347 : }
313 :
314 : void
315 477 : thread_pool::executor_type::
316 : post(continuation& c) const
317 : {
318 477 : pool_->impl_->post(c);
319 477 : }
320 :
321 : std::coroutine_handle<>
322 355 : thread_pool::executor_type::
323 : dispatch(continuation& c) const
324 : {
325 355 : if(pool_->impl_->running_in_this_thread())
326 13 : return c.h;
327 342 : pool_->impl_->post(c);
328 342 : return std::noop_coroutine();
329 : }
330 :
331 : } // capy
332 : } // boost
|