LCOV - code coverage report
Current view: top level - /jenkins/workspace/boost-root/libs/capy/src/ex - thread_pool.cpp (source / functions) Coverage Total Hit
Test: coverage_remapped.info Lines: 100.0 % 139 139
Test Date: 2026-04-24 18:06:38 Functions: 100.0 % 29 29

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : // Copyright (c) 2026 Michael Vandeberg
       4                 : //
       5                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       6                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       7                 : //
       8                 : // Official repository: https://github.com/boostorg/capy
       9                 : //
      10                 : 
      11                 : #include <boost/capy/ex/thread_pool.hpp>
      12                 : #include <boost/capy/continuation.hpp>
      13                 : #include <boost/capy/detail/thread_local_ptr.hpp>
      14                 : #include <boost/capy/ex/frame_allocator.hpp>
      15                 : #include <boost/capy/test/thread_name.hpp>
      16                 : #include <algorithm>
      17                 : #include <atomic>
      18                 : #include <condition_variable>
      19                 : #include <cstdio>
      20                 : #include <mutex>
      21                 : #include <thread>
      22                 : #include <vector>
      23                 : 
      24                 : /*
      25                 :     Thread pool implementation using a shared work queue.
      26                 : 
      27                 :     Work items are continuations linked via their intrusive next pointer,
      28                 :     stored in a single queue protected by a mutex. No per-post heap
      29                 :     allocation: the continuation is owned by the caller and linked
      30                 :     directly. Worker threads wait on a condition_variable until work
      31                 :     is available or stop is requested.
      32                 : 
      33                 :     Threads are started lazily on first post() via std::call_once to avoid
      34                 :     spawning threads for pools that are constructed but never used. Each
      35                 :     thread is named with a configurable prefix plus index for debugger
      36                 :     visibility.
      37                 : 
      38                 :     Work tracking: on_work_started/on_work_finished maintain an atomic
      39                 :     outstanding_work_ counter. join() blocks until this counter reaches
      40                 :     zero, then signals workers to stop and joins threads.
      41                 : 
      42                 :     Two shutdown paths:
      43                 :     - join(): waits for outstanding work to drain, then stops workers.
      44                 :     - stop(): immediately signals workers to exit; queued work is abandoned.
      45                 :     - Destructor: stop() then join() (abandon + wait for threads).
      46                 : */
      47                 : 
      48                 : namespace boost {
      49                 : namespace capy {
      50                 : 
      51                 : //------------------------------------------------------------------------------
      52                 : 
      53                 : class thread_pool::impl
      54                 : {
      55                 :     // Identifies the pool owning the current worker thread, or
      56                 :     // nullptr if the calling thread is not a pool worker. Checked
      57                 :     // by dispatch() to decide between symmetric transfer (inline
      58                 :     // resume) and post.
      59                 :     static inline detail::thread_local_ptr<impl const> current_;
      60                 : 
      61                 :     // Intrusive queue of continuations via continuation::next.
      62                 :     // No per-post allocation: the continuation is owned by the caller.
      63                 :     continuation* head_ = nullptr;
      64                 :     continuation* tail_ = nullptr;
      65                 : 
      66 HIT         819 :     void push(continuation* c) noexcept
      67                 :     {
      68             819 :         c->next = nullptr;
      69             819 :         if(tail_)
      70             594 :             tail_->next = c;
      71                 :         else
      72             225 :             head_ = c;
      73             819 :         tail_ = c;
      74             819 :     }
      75                 : 
      76             980 :     continuation* pop() noexcept
      77                 :     {
      78             980 :         if(!head_)
      79             161 :             return nullptr;
      80             819 :         continuation* c = head_;
      81             819 :         head_ = head_->next;
      82             819 :         if(!head_)
      83             225 :             tail_ = nullptr;
      84             819 :         return c;
      85                 :     }
      86                 : 
      87            1038 :     bool empty() const noexcept
      88                 :     {
      89            1038 :         return head_ == nullptr;
      90                 :     }
      91                 : 
      92                 :     std::mutex mutex_;
      93                 :     std::condition_variable work_cv_;
      94                 :     std::condition_variable done_cv_;
      95                 :     std::vector<std::thread> threads_;
      96                 :     std::atomic<std::size_t> outstanding_work_{0};
      97                 :     bool stop_{false};
      98                 :     bool joined_{false};
      99                 :     std::size_t num_threads_;
     100                 :     char thread_name_prefix_[13]{};  // 12 chars max + null terminator
     101                 :     std::once_flag start_flag_;
     102                 : 
     103                 : public:
     104             161 :     ~impl() = default;
     105                 : 
     106                 :     bool
     107             355 :     running_in_this_thread() const noexcept
     108                 :     {
     109             355 :         return current_.get() == this;
     110                 :     }
     111                 : 
     112                 :     // Destroy abandoned coroutine frames. Must be called
     113                 :     // before execution_context::shutdown()/destroy() so
     114                 :     // that suspended-frame destructors (e.g. delay_awaitable
     115                 :     // calling timer_service::cancel()) run while services
     116                 :     // are still valid.
     117                 :     void
     118             161 :     drain_abandoned() noexcept
     119                 :     {
     120             365 :         while(auto* c = pop())
     121                 :         {
     122             204 :             auto h = c->h;
     123             204 :             if(h && h != std::noop_coroutine())
     124             153 :                 h.destroy();
     125             204 :         }
     126             161 :     }
     127                 : 
     128             161 :     impl(std::size_t num_threads, std::string_view thread_name_prefix)
     129             161 :         : num_threads_(num_threads)
     130                 :     {
     131             161 :         if(num_threads_ == 0)
     132               4 :             num_threads_ = std::max(
     133               2 :                 std::thread::hardware_concurrency(), 1u);
     134                 : 
     135                 :         // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
     136             161 :         auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
     137             161 :         thread_name_prefix_[n] = '\0';
     138             161 :     }
     139                 : 
     140                 :     void
     141             819 :     post(continuation& c)
     142                 :     {
     143             819 :         ensure_started();
     144                 :         {
     145             819 :             std::lock_guard<std::mutex> lock(mutex_);
     146             819 :             push(&c);
     147             819 :         }
     148             819 :         work_cv_.notify_one();
     149             819 :     }
     150                 : 
     151                 :     void
     152             347 :     on_work_started() noexcept
     153                 :     {
     154             347 :         outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
     155             347 :     }
     156                 : 
     157                 :     void
     158             347 :     on_work_finished() noexcept
     159                 :     {
     160             347 :         if(outstanding_work_.fetch_sub(
     161             347 :             1, std::memory_order_acq_rel) == 1)
     162                 :         {
     163              87 :             std::lock_guard<std::mutex> lock(mutex_);
     164              87 :             if(joined_ && !stop_)
     165               4 :                 stop_ = true;
     166              87 :             done_cv_.notify_all();
     167              87 :             work_cv_.notify_all();
     168              87 :         }
     169             347 :     }
     170                 : 
     171                 :     void
     172             173 :     join() noexcept
     173                 :     {
     174                 :         {
     175             173 :             std::unique_lock<std::mutex> lock(mutex_);
     176             173 :             if(joined_)
     177              12 :                 return;
     178             161 :             joined_ = true;
     179                 : 
     180             161 :             if(outstanding_work_.load(
     181             161 :                 std::memory_order_acquire) == 0)
     182                 :             {
     183             103 :                 stop_ = true;
     184             103 :                 work_cv_.notify_all();
     185                 :             }
     186                 :             else
     187                 :             {
     188              58 :                 done_cv_.wait(lock, [this]{
     189              63 :                     return stop_;
     190                 :                 });
     191                 :             }
     192             173 :         }
     193                 : 
     194             345 :         for(auto& t : threads_)
     195             184 :             if(t.joinable())
     196             184 :                 t.join();
     197                 :     }
     198                 : 
     199                 :     void
     200             163 :     stop() noexcept
     201                 :     {
     202                 :         {
     203             163 :             std::lock_guard<std::mutex> lock(mutex_);
     204             163 :             stop_ = true;
     205             163 :         }
     206             163 :         work_cv_.notify_all();
     207             163 :         done_cv_.notify_all();
     208             163 :     }
     209                 : 
     210                 : private:
     211                 :     void
     212             819 :     ensure_started()
     213                 :     {
     214             819 :         std::call_once(start_flag_, [this]{
     215             105 :             threads_.reserve(num_threads_);
     216             289 :             for(std::size_t i = 0; i < num_threads_; ++i)
     217             368 :                 threads_.emplace_back([this, i]{ run(i); });
     218             105 :         });
     219             819 :     }
     220                 : 
     221                 :     void
     222             184 :     run(std::size_t index)
     223                 :     {
     224                 :         // Build name; set_current_thread_name truncates to platform limits.
     225                 :         char name[16];
     226             184 :         std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
     227             184 :         set_current_thread_name(name);
     228                 : 
     229                 :         // Mark this thread as a worker of this pool so dispatch()
     230                 :         // can symmetric-transfer when called from within pool work.
     231                 :         struct scoped_pool
     232                 :         {
     233             184 :             scoped_pool(impl const* p) noexcept { current_.set(p); }
     234             184 :             ~scoped_pool() noexcept { current_.set(nullptr); }
     235             184 :         } guard(this);
     236                 : 
     237                 :         for(;;)
     238                 :         {
     239             799 :             continuation* c = nullptr;
     240                 :             {
     241             799 :                 std::unique_lock<std::mutex> lock(mutex_);
     242             799 :                 work_cv_.wait(lock, [this]{
     243            1358 :                     return !empty() ||
     244            1358 :                         stop_;
     245                 :                 });
     246             799 :                 if(stop_)
     247             368 :                     return;
     248             615 :                 c = pop();
     249             799 :             }
     250             615 :             if(c)
     251             615 :                 safe_resume(c->h);
     252             615 :         }
     253             184 :     }
     254                 : };
     255                 : 
     256                 : //------------------------------------------------------------------------------
     257                 : 
     258             161 : thread_pool::
     259                 : ~thread_pool()
     260                 : {
     261             161 :     impl_->stop();
     262             161 :     impl_->join();
     263             161 :     impl_->drain_abandoned();
     264             161 :     shutdown();
     265             161 :     destroy();
     266             161 :     delete impl_;
     267             161 : }
     268                 : 
     269             161 : thread_pool::
     270             161 : thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
     271             161 :     : impl_(new impl(num_threads, thread_name_prefix))
     272                 : {
     273             161 :     this->set_frame_allocator(std::allocator<void>{});
     274             161 : }
     275                 : 
     276                 : void
     277              12 : thread_pool::
     278                 : join() noexcept
     279                 : {
     280              12 :     impl_->join();
     281              12 : }
     282                 : 
     283                 : void
     284               2 : thread_pool::
     285                 : stop() noexcept
     286                 : {
     287               2 :     impl_->stop();
     288               2 : }
     289                 : 
     290                 : //------------------------------------------------------------------------------
     291                 : 
     292                 : thread_pool::executor_type
     293             167 : thread_pool::
     294                 : get_executor() const noexcept
     295                 : {
     296             167 :     return executor_type(
     297             167 :         const_cast<thread_pool&>(*this));
     298                 : }
     299                 : 
     300                 : void
     301             347 : thread_pool::executor_type::
     302                 : on_work_started() const noexcept
     303                 : {
     304             347 :     pool_->impl_->on_work_started();
     305             347 : }
     306                 : 
     307                 : void
     308             347 : thread_pool::executor_type::
     309                 : on_work_finished() const noexcept
     310                 : {
     311             347 :     pool_->impl_->on_work_finished();
     312             347 : }
     313                 : 
     314                 : void
     315             477 : thread_pool::executor_type::
     316                 : post(continuation& c) const
     317                 : {
     318             477 :     pool_->impl_->post(c);
     319             477 : }
     320                 : 
     321                 : std::coroutine_handle<>
     322             355 : thread_pool::executor_type::
     323                 : dispatch(continuation& c) const
     324                 : {
     325             355 :     if(pool_->impl_->running_in_this_thread())
     326              13 :         return c.h;
     327             342 :     pool_->impl_->post(c);
     328             342 :     return std::noop_coroutine();
     329                 : }
     330                 : 
     331                 : } // capy
     332                 : } // boost
        

Generated by: LCOV version 2.3