100.00% Lines (139/139) 100.00% Functions (29/29)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3   // Copyright (c) 2026 Michael Vandeberg 3   // Copyright (c) 2026 Michael Vandeberg
4   // 4   //
5   // Distributed under the Boost Software License, Version 1.0. (See accompanying 5   // Distributed under the Boost Software License, Version 1.0. (See accompanying
6   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7   // 7   //
8   // Official repository: https://github.com/boostorg/capy 8   // Official repository: https://github.com/boostorg/capy
9   // 9   //
10   10  
11   #include <boost/capy/ex/thread_pool.hpp> 11   #include <boost/capy/ex/thread_pool.hpp>
12   #include <boost/capy/continuation.hpp> 12   #include <boost/capy/continuation.hpp>
  13 + #include <boost/capy/detail/thread_local_ptr.hpp>
13   #include <boost/capy/ex/frame_allocator.hpp> 14   #include <boost/capy/ex/frame_allocator.hpp>
14   #include <boost/capy/test/thread_name.hpp> 15   #include <boost/capy/test/thread_name.hpp>
15   #include <algorithm> 16   #include <algorithm>
16   #include <atomic> 17   #include <atomic>
17   #include <condition_variable> 18   #include <condition_variable>
18   #include <cstdio> 19   #include <cstdio>
19   #include <mutex> 20   #include <mutex>
20   #include <thread> 21   #include <thread>
21   #include <vector> 22   #include <vector>
22   23  
23   /* 24   /*
24   Thread pool implementation using a shared work queue. 25   Thread pool implementation using a shared work queue.
25   26  
26   Work items are continuations linked via their intrusive next pointer, 27   Work items are continuations linked via their intrusive next pointer,
27   stored in a single queue protected by a mutex. No per-post heap 28   stored in a single queue protected by a mutex. No per-post heap
28   allocation: the continuation is owned by the caller and linked 29   allocation: the continuation is owned by the caller and linked
29   directly. Worker threads wait on a condition_variable until work 30   directly. Worker threads wait on a condition_variable until work
30   is available or stop is requested. 31   is available or stop is requested.
31   32  
32   Threads are started lazily on first post() via std::call_once to avoid 33   Threads are started lazily on first post() via std::call_once to avoid
33   spawning threads for pools that are constructed but never used. Each 34   spawning threads for pools that are constructed but never used. Each
34   thread is named with a configurable prefix plus index for debugger 35   thread is named with a configurable prefix plus index for debugger
35   visibility. 36   visibility.
36   37  
37   Work tracking: on_work_started/on_work_finished maintain an atomic 38   Work tracking: on_work_started/on_work_finished maintain an atomic
38   outstanding_work_ counter. join() blocks until this counter reaches 39   outstanding_work_ counter. join() blocks until this counter reaches
39   zero, then signals workers to stop and joins threads. 40   zero, then signals workers to stop and joins threads.
40   41  
41   Two shutdown paths: 42   Two shutdown paths:
42   - join(): waits for outstanding work to drain, then stops workers. 43   - join(): waits for outstanding work to drain, then stops workers.
43   - stop(): immediately signals workers to exit; queued work is abandoned. 44   - stop(): immediately signals workers to exit; queued work is abandoned.
44   - Destructor: stop() then join() (abandon + wait for threads). 45   - Destructor: stop() then join() (abandon + wait for threads).
45   */ 46   */
46   47  
47   namespace boost { 48   namespace boost {
48   namespace capy { 49   namespace capy {
49   50  
50   //------------------------------------------------------------------------------ 51   //------------------------------------------------------------------------------
51   52  
52   class thread_pool::impl 53   class thread_pool::impl
53   { 54   {
  55 + // Identifies the pool owning the current worker thread, or
  56 + // nullptr if the calling thread is not a pool worker. Checked
  57 + // by dispatch() to decide between symmetric transfer (inline
  58 + // resume) and post.
  59 + static inline detail::thread_local_ptr<impl const> current_;
  60 +
54   // Intrusive queue of continuations via continuation::next. 61   // Intrusive queue of continuations via continuation::next.
55   // No per-post allocation: the continuation is owned by the caller. 62   // No per-post allocation: the continuation is owned by the caller.
56   continuation* head_ = nullptr; 63   continuation* head_ = nullptr;
57   continuation* tail_ = nullptr; 64   continuation* tail_ = nullptr;
58   65  
HITCBC 59   831 void push(continuation* c) noexcept 66   819 void push(continuation* c) noexcept
60   { 67   {
HITCBC 61   831 c->next = nullptr; 68   819 c->next = nullptr;
HITCBC 62   831 if(tail_) 69   819 if(tail_)
HITCBC 63   609 tail_->next = c; 70   594 tail_->next = c;
64   else 71   else
HITCBC 65   222 head_ = c; 72   225 head_ = c;
HITCBC 66   831 tail_ = c; 73   819 tail_ = c;
HITCBC 67   831 } 74   819 }
68   75  
HITCBC 69   989 continuation* pop() noexcept 76   980 continuation* pop() noexcept
70   { 77   {
HITCBC 71   989 if(!head_) 78   980 if(!head_)
HITCBC 72   158 return nullptr; 79   161 return nullptr;
HITCBC 73   831 continuation* c = head_; 80   819 continuation* c = head_;
HITCBC 74   831 head_ = head_->next; 81   819 head_ = head_->next;
HITCBC 75   831 if(!head_) 82   819 if(!head_)
HITCBC 76   222 tail_ = nullptr; 83   225 tail_ = nullptr;
HITCBC 77   831 return c; 84   819 return c;
78   } 85   }
79   86  
HITCBC 80   1020 bool empty() const noexcept 87   1038 bool empty() const noexcept
81   { 88   {
HITCBC 82   1020 return head_ == nullptr; 89   1038 return head_ == nullptr;
83   } 90   }
84   91  
85   std::mutex mutex_; 92   std::mutex mutex_;
86   std::condition_variable work_cv_; 93   std::condition_variable work_cv_;
87   std::condition_variable done_cv_; 94   std::condition_variable done_cv_;
88   std::vector<std::thread> threads_; 95   std::vector<std::thread> threads_;
89   std::atomic<std::size_t> outstanding_work_{0}; 96   std::atomic<std::size_t> outstanding_work_{0};
90   bool stop_{false}; 97   bool stop_{false};
91   bool joined_{false}; 98   bool joined_{false};
92   std::size_t num_threads_; 99   std::size_t num_threads_;
93   char thread_name_prefix_[13]{}; // 12 chars max + null terminator 100   char thread_name_prefix_[13]{}; // 12 chars max + null terminator
94   std::once_flag start_flag_; 101   std::once_flag start_flag_;
95   102  
96   public: 103   public:
HITCBC 97   158 ~impl() = default; 104   161 ~impl() = default;
98   105  
  106 + bool
HITGNC   107 + 355 running_in_this_thread() const noexcept
  108 + {
HITGNC   109 + 355 return current_.get() == this;
  110 + }
  111 +
99   // Destroy abandoned coroutine frames. Must be called 112   // Destroy abandoned coroutine frames. Must be called
100   // before execution_context::shutdown()/destroy() so 113   // before execution_context::shutdown()/destroy() so
101   // that suspended-frame destructors (e.g. delay_awaitable 114   // that suspended-frame destructors (e.g. delay_awaitable
102   // calling timer_service::cancel()) run while services 115   // calling timer_service::cancel()) run while services
103   // are still valid. 116   // are still valid.
104   void 117   void
HITCBC 105   158 drain_abandoned() noexcept 118   161 drain_abandoned() noexcept
106   { 119   {
HITCBC 107   367 while(auto* c = pop()) 120   365 while(auto* c = pop())
108   { 121   {
HITCBC 109   209 auto h = c->h; 122   204 auto h = c->h;
HITCBC 110   209 if(h && h != std::noop_coroutine()) 123   204 if(h && h != std::noop_coroutine())
HITCBC 111   157 h.destroy(); 124   153 h.destroy();
HITCBC 112   209 } 125   204 }
HITCBC 113   158 } 126   161 }
114   127  
HITCBC 115   158 impl(std::size_t num_threads, std::string_view thread_name_prefix) 128   161 impl(std::size_t num_threads, std::string_view thread_name_prefix)
HITCBC 116   158 : num_threads_(num_threads) 129   161 : num_threads_(num_threads)
117   { 130   {
HITCBC 118   158 if(num_threads_ == 0) 131   161 if(num_threads_ == 0)
HITCBC 119   4 num_threads_ = std::max( 132   4 num_threads_ = std::max(
HITCBC 120   2 std::thread::hardware_concurrency(), 1u); 133   2 std::thread::hardware_concurrency(), 1u);
121   134  
122   // Truncate prefix to 12 chars, leaving room for up to 3-digit index. 135   // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
HITCBC 123   158 auto n = thread_name_prefix.copy(thread_name_prefix_, 12); 136   161 auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
HITCBC 124   158 thread_name_prefix_[n] = '\0'; 137   161 thread_name_prefix_[n] = '\0';
HITCBC 125   158 } 138   161 }
126   139  
127   void 140   void
HITCBC 128   831 post(continuation& c) 141   819 post(continuation& c)
129   { 142   {
HITCBC 130   831 ensure_started(); 143   819 ensure_started();
131   { 144   {
HITCBC 132   831 std::lock_guard<std::mutex> lock(mutex_); 145   819 std::lock_guard<std::mutex> lock(mutex_);
HITCBC 133   831 push(&c); 146   819 push(&c);
HITCBC 134   831 } 147   819 }
HITCBC 135   831 work_cv_.notify_one(); 148   819 work_cv_.notify_one();
HITCBC 136   831 } 149   819 }
137   150  
138   void 151   void
HITCBC 139   345 on_work_started() noexcept 152   347 on_work_started() noexcept
140   { 153   {
HITCBC 141   345 outstanding_work_.fetch_add(1, std::memory_order_acq_rel); 154   347 outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
HITCBC 142   345 } 155   347 }
143   156  
144   void 157   void
HITCBC 145   345 on_work_finished() noexcept 158   347 on_work_finished() noexcept
146   { 159   {
HITCBC 147   345 if(outstanding_work_.fetch_sub( 160   347 if(outstanding_work_.fetch_sub(
HITCBC 148   345 1, std::memory_order_acq_rel) == 1) 161   347 1, std::memory_order_acq_rel) == 1)
149   { 162   {
HITCBC 150   85 std::lock_guard<std::mutex> lock(mutex_); 163   87 std::lock_guard<std::mutex> lock(mutex_);
HITCBC 151   85 if(joined_ && !stop_) 164   87 if(joined_ && !stop_)
HITCBC 152   4 stop_ = true; 165   4 stop_ = true;
HITCBC 153   85 done_cv_.notify_all(); 166   87 done_cv_.notify_all();
HITCBC 154   85 work_cv_.notify_all(); 167   87 work_cv_.notify_all();
HITCBC 155   85 } 168   87 }
HITCBC 156   345 } 169   347 }
157   170  
158   void 171   void
HITCBC 159   170 join() noexcept 172   173 join() noexcept
160   { 173   {
161   { 174   {
HITCBC 162   170 std::unique_lock<std::mutex> lock(mutex_); 175   173 std::unique_lock<std::mutex> lock(mutex_);
HITCBC 163   170 if(joined_) 176   173 if(joined_)
HITCBC 164   12 return; 177   12 return;
HITCBC 165   158 joined_ = true; 178   161 joined_ = true;
166   179  
HITCBC 167   158 if(outstanding_work_.load( 180   161 if(outstanding_work_.load(
HITCBC 168   158 std::memory_order_acquire) == 0) 181   161 std::memory_order_acquire) == 0)
169   { 182   {
HITCBC 170   97 stop_ = true; 183   103 stop_ = true;
HITCBC 171   97 work_cv_.notify_all(); 184   103 work_cv_.notify_all();
172   } 185   }
173   else 186   else
174   { 187   {
HITCBC 175   61 done_cv_.wait(lock, [this]{ 188   58 done_cv_.wait(lock, [this]{
HITCBC 176   66 return stop_; 189   63 return stop_;
177   }); 190   });
178   } 191   }
HITCBC 179   170 } 192   173 }
180   193  
HITCBC 181   339 for(auto& t : threads_) 194   345 for(auto& t : threads_)
HITCBC 182   181 if(t.joinable()) 195   184 if(t.joinable())
HITCBC 183   181 t.join(); 196   184 t.join();
184   } 197   }
185   198  
186   void 199   void
HITCBC 187   160 stop() noexcept 200   163 stop() noexcept
188   { 201   {
189   { 202   {
HITCBC 190   160 std::lock_guard<std::mutex> lock(mutex_); 203   163 std::lock_guard<std::mutex> lock(mutex_);
HITCBC 191   160 stop_ = true; 204   163 stop_ = true;
HITCBC 192   160 } 205   163 }
HITCBC 193   160 work_cv_.notify_all(); 206   163 work_cv_.notify_all();
HITCBC 194   160 done_cv_.notify_all(); 207   163 done_cv_.notify_all();
HITCBC 195   160 } 208   163 }
196   209  
197   private: 210   private:
198   void 211   void
HITCBC 199   831 ensure_started() 212   819 ensure_started()
200   { 213   {
HITCBC 201   831 std::call_once(start_flag_, [this]{ 214   819 std::call_once(start_flag_, [this]{
HITCBC 202   102 threads_.reserve(num_threads_); 215   105 threads_.reserve(num_threads_);
HITCBC 203   283 for(std::size_t i = 0; i < num_threads_; ++i) 216   289 for(std::size_t i = 0; i < num_threads_; ++i)
HITCBC 204   362 threads_.emplace_back([this, i]{ run(i); }); 217   368 threads_.emplace_back([this, i]{ run(i); });
HITCBC 205   102 }); 218   105 });
HITCBC 206   831 } 219   819 }
207   220  
208   void 221   void
HITCBC 209   181 run(std::size_t index) 222   184 run(std::size_t index)
210   { 223   {
211   // Build name; set_current_thread_name truncates to platform limits. 224   // Build name; set_current_thread_name truncates to platform limits.
212   char name[16]; 225   char name[16];
HITCBC 213   181 std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index); 226   184 std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
HITCBC 214   181 set_current_thread_name(name); 227   184 set_current_thread_name(name);
215   228  
  229 + // Mark this thread as a worker of this pool so dispatch()
  230 + // can symmetric-transfer when called from within pool work.
  231 + struct scoped_pool
  232 + {
HITGNC   233 + 184 scoped_pool(impl const* p) noexcept { current_.set(p); }
HITGNC   234 + 184 ~scoped_pool() noexcept { current_.set(nullptr); }
HITGNC   235 + 184 } guard(this);
  236 +
216   for(;;) 237   for(;;)
217   { 238   {
HITCBC 218   803 continuation* c = nullptr; 239   799 continuation* c = nullptr;
219   { 240   {
HITCBC 220   803 std::unique_lock<std::mutex> lock(mutex_); 241   799 std::unique_lock<std::mutex> lock(mutex_);
HITCBC 221   803 work_cv_.wait(lock, [this]{ 242   799 work_cv_.wait(lock, [this]{
HITCBC 222   1318 return !empty() || 243   1358 return !empty() ||
HITCBC 223   1318 stop_; 244   1358 stop_;
224   }); 245   });
HITCBC 225   803 if(stop_) 246   799 if(stop_)
HITCBC 226   362 return; 247   368 return;
HITCBC 227   622 c = pop(); 248   615 c = pop();
HITCBC 228   803 } 249   799 }
HITCBC 229   622 if(c) 250   615 if(c)
HITCBC 230   622 safe_resume(c->h); 251   615 safe_resume(c->h);
HITCBC 231   622 } 252   615 }
HITGIC 232   } 253   184 }
233   }; 254   };
234   255  
235   //------------------------------------------------------------------------------ 256   //------------------------------------------------------------------------------
236   257  
HITCBC 237   158 thread_pool:: 258   161 thread_pool::
238   ~thread_pool() 259   ~thread_pool()
239   { 260   {
HITCBC 240   158 impl_->stop(); 261   161 impl_->stop();
HITCBC 241   158 impl_->join(); 262   161 impl_->join();
HITCBC 242   158 impl_->drain_abandoned(); 263   161 impl_->drain_abandoned();
HITCBC 243   158 shutdown(); 264   161 shutdown();
HITCBC 244   158 destroy(); 265   161 destroy();
HITCBC 245   158 delete impl_; 266   161 delete impl_;
HITCBC 246   158 } 267   161 }
247   268  
HITCBC 248   158 thread_pool:: 269   161 thread_pool::
HITCBC 249   158 thread_pool(std::size_t num_threads, std::string_view thread_name_prefix) 270   161 thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
HITCBC 250   158 : impl_(new impl(num_threads, thread_name_prefix)) 271   161 : impl_(new impl(num_threads, thread_name_prefix))
251   { 272   {
HITCBC 252   158 this->set_frame_allocator(std::allocator<void>{}); 273   161 this->set_frame_allocator(std::allocator<void>{});
HITCBC 253   158 } 274   161 }
254   275  
255   void 276   void
HITCBC 256   12 thread_pool:: 277   12 thread_pool::
257   join() noexcept 278   join() noexcept
258   { 279   {
HITCBC 259   12 impl_->join(); 280   12 impl_->join();
HITCBC 260   12 } 281   12 }
261   282  
262   void 283   void
HITCBC 263   2 thread_pool:: 284   2 thread_pool::
264   stop() noexcept 285   stop() noexcept
265   { 286   {
HITCBC 266   2 impl_->stop(); 287   2 impl_->stop();
HITCBC 267   2 } 288   2 }
268   289  
269   //------------------------------------------------------------------------------ 290   //------------------------------------------------------------------------------
270   291  
271   thread_pool::executor_type 292   thread_pool::executor_type
HITCBC 272   164 thread_pool:: 293   167 thread_pool::
273   get_executor() const noexcept 294   get_executor() const noexcept
274   { 295   {
HITCBC 275   164 return executor_type( 296   167 return executor_type(
HITCBC 276   164 const_cast<thread_pool&>(*this)); 297   167 const_cast<thread_pool&>(*this));
277   } 298   }
278   299  
279   void 300   void
HITCBC 280   345 thread_pool::executor_type:: 301   347 thread_pool::executor_type::
281   on_work_started() const noexcept 302   on_work_started() const noexcept
282   { 303   {
HITCBC 283   345 pool_->impl_->on_work_started(); 304   347 pool_->impl_->on_work_started();
HITCBC 284   345 } 305   347 }
285   306  
286   void 307   void
HITCBC 287   345 thread_pool::executor_type:: 308   347 thread_pool::executor_type::
288   on_work_finished() const noexcept 309   on_work_finished() const noexcept
289   { 310   {
HITCBC 290   345 pool_->impl_->on_work_finished(); 311   347 pool_->impl_->on_work_finished();
HITCBC 291   345 } 312   347 }
292   313  
293   void 314   void
HITCBC 294   831 thread_pool::executor_type:: 315   477 thread_pool::executor_type::
295   post(continuation& c) const 316   post(continuation& c) const
296   { 317   {
HITCBC 297   831 pool_->impl_->post(c); 318   477 pool_->impl_->post(c);
HITGNC   319 + 477 }
  320 +
  321 + std::coroutine_handle<>
HITGNC   322 + 355 thread_pool::executor_type::
  323 + dispatch(continuation& c) const
  324 + {
HITGNC   325 + 355 if(pool_->impl_->running_in_this_thread())
HITGNC   326 + 13 return c.h;
HITGNC   327 + 342 pool_->impl_->post(c);
HITGNC   328 + 342 return std::noop_coroutine();
ECB 298   831 } 329   }
299   330  
300   } // capy 331   } // capy
301   } // boost 332   } // boost