From c8e170bd1a71faa2f3a117b653829d1c8c6173eb Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Tue, 9 Sep 2025 11:01:31 +0200 Subject: [PATCH] quick & dirty proof of concept --- lib/base/io-engine.cpp | 21 +++------------- lib/base/io-engine.hpp | 56 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 18 deletions(-) diff --git a/lib/base/io-engine.cpp b/lib/base/io-engine.cpp index 0792be5cc..dff8793dc 100644 --- a/lib/base/io-engine.cpp +++ b/lib/base/io-engine.cpp @@ -19,33 +19,18 @@ using namespace icinga; CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc) : m_Done(false) { - auto& ioEngine (IoEngine::Get()); - - for (;;) { - auto availableSlots (ioEngine.m_CpuBoundSemaphore.fetch_sub(1)); - - if (availableSlots < 1) { - ioEngine.m_CpuBoundSemaphore.fetch_add(1); - IoEngine::YieldCurrentCoroutine(yc); - continue; - } - - break; - } + AsyncLock(yc); } CpuBoundWork::~CpuBoundWork() { - if (!m_Done) { - IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1); - } + Done(); } void CpuBoundWork::Done() { if (!m_Done) { - IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1); - + Unlock(); m_Done = true; } } diff --git a/lib/base/io-engine.hpp b/lib/base/io-engine.hpp index 0883d7810..8506f59d2 100644 --- a/lib/base/io-engine.hpp +++ b/lib/base/io-engine.hpp @@ -12,14 +12,17 @@ #include #include #include +#include #include #include #include #include #include #include +#include #include #include +#include #include #if BOOST_VERSION >= 108700 @@ -47,6 +50,59 @@ public: void Done(); private: + template + auto AsyncLock(CompletionToken&& token) + { + // implementation inspired by https://github.com/boostorg/asio/blob/boost-1.89.0/example/cpp14/operations/callback_wrapper.cpp + auto init = [](auto handler) { + auto work = boost::asio::make_work_guard(handler); + + // With C++23, using std::move_only_function as value type for m_Waiting should be possible instead of using std::shared_ptr here for making the lambda copyable. + auto callback = [handler = std::make_shared(std::move(handler)), work = std::make_shared(std::move(work))]() { + auto alloc = boost::asio::get_associated_allocator(*handler, boost::asio::recycling_allocator()); + boost::asio::dispatch(work->get_executor(), + boost::asio::bind_allocator(alloc, + [handler = std::move(*handler)]() mutable { + Log(LogInformation, "CpuBoundWork") << "calling handler"; + std::move(handler)(); + })); + }; + + std::unique_lock lock(m_Mutex); + if (m_FreeSlots > 0) { + Log(LogInformation, "CpuBoundWork") << "immediate acquire"; + m_FreeSlots--; + lock.unlock(); + callback(); + } else { + Log(LogInformation, "CpuBoundWork") << "wait acquire"; + m_Waiting.emplace(std::move(callback)); + } + }; + + return boost::asio::async_initiate(init, token); + } + + void Unlock() + { + std::unique_lock lock(m_Mutex); + + if (!m_Waiting.empty()) { + Log(LogInformation, "CpuBoundWork") << "wake next"; + auto callback = std::move(m_Waiting.front()); + m_Waiting.pop(); + lock.unlock(); + callback(); + } else { + Log(LogInformation, "CpuBoundWork") << "release"; + m_FreeSlots++; + } + } + + static inline std::mutex m_Mutex; + static inline std::size_t m_FreeSlots = 1; + static inline std::queue> m_Waiting; + bool m_Done; };