diff --git a/lib/base/io-engine.cpp b/lib/base/io-engine.cpp index a4a813d69..871981a82 100644 --- a/lib/base/io-engine.cpp +++ b/lib/base/io-engine.cpp @@ -16,55 +16,28 @@ using namespace icinga; -CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand& strand) +CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand&) : m_Done(false) { auto& ioEngine (IoEngine::Get()); - auto& sem (ioEngine.m_CpuBoundSemaphore); - std::unique_lock lock (sem.Mutex); - if (sem.FreeSlots) { - --sem.FreeSlots; - return; - } + for (;;) { + auto availableSlots (ioEngine.m_CpuBoundSemaphore.fetch_sub(1)); - auto cv (Shared::Make(ioEngine.GetIoContext())); - bool gotSlot = false; - auto pos (sem.Waiting.insert(sem.Waiting.end(), IoEngine::CpuBoundQueueItem{&strand, cv, &gotSlot})); - - lock.unlock(); - - try { - cv->Wait(yc); - } catch (...) { - std::unique_lock lock (sem.Mutex); - - if (gotSlot) { - lock.unlock(); - Done(); - } else { - sem.Waiting.erase(pos); + if (availableSlots < 1) { + ioEngine.m_CpuBoundSemaphore.fetch_add(1); + IoEngine::YieldCurrentCoroutine(yc); + continue; } - throw; + break; } } void CpuBoundWork::Done() { if (!m_Done) { - auto& sem (IoEngine::Get().m_CpuBoundSemaphore); - std::unique_lock lock (sem.Mutex); - - if (sem.Waiting.empty()) { - ++sem.FreeSlots; - } else { - auto next (sem.Waiting.front()); - - *next.GotSlot = true; - sem.Waiting.pop_front(); - boost::asio::post(*next.Strand, [cv = std::move(next.CV)]() { cv->Set(); }); - } + IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1); m_Done = true; } @@ -85,11 +58,7 @@ boost::asio::io_context& IoEngine::GetIoContext() IoEngine::IoEngine() : m_IoContext(), m_KeepAlive(boost::asio::make_work_guard(m_IoContext)), m_Threads(decltype(m_Threads)::size_type(Configuration::Concurrency * 2u)), m_AlreadyExpiredTimer(m_IoContext) { m_AlreadyExpiredTimer.expires_at(boost::posix_time::neg_infin); - - { - std::unique_lock lock (m_CpuBoundSemaphore.Mutex); - m_CpuBoundSemaphore.FreeSlots = Configuration::Concurrency * 3u / 2u; - } + m_CpuBoundSemaphore.store(Configuration::Concurrency * 3u / 2u); for (auto& thread : m_Threads) { thread = std::thread(&IoEngine::RunEventLoop, this); diff --git a/lib/base/io-engine.hpp b/lib/base/io-engine.hpp index f370bde66..622a92dd0 100644 --- a/lib/base/io-engine.hpp +++ b/lib/base/io-engine.hpp @@ -6,14 +6,10 @@ #include "base/exception.hpp" #include "base/lazy-init.hpp" #include "base/logger.hpp" -#include "base/shared.hpp" #include "base/shared-object.hpp" #include -#include #include -#include #include -#include #include #include #include @@ -35,7 +31,7 @@ namespace icinga class CpuBoundWork { public: - CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand& strand); + CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand&); CpuBoundWork(const CpuBoundWork&) = delete; CpuBoundWork(CpuBoundWork&&) = delete; CpuBoundWork& operator=(const CpuBoundWork&) = delete; @@ -52,25 +48,6 @@ private: bool m_Done; }; - -/** - * Condition variable which doesn't block I/O threads - * - * @ingroup base - */ -class AsioConditionVariable -{ -public: - AsioConditionVariable(boost::asio::io_context& io, bool init = false); - - void Set(); - void Clear(); - void Wait(boost::asio::yield_context yc); - -private: - boost::asio::deadline_timer m_Timer; -}; - /** * Async I/O engine * @@ -133,13 +110,6 @@ public: } private: - struct CpuBoundQueueItem - { - boost::asio::io_context::strand* Strand; - Shared::Ptr CV; - bool* GotSlot; - }; - IoEngine(); void RunEventLoop(); @@ -150,18 +120,31 @@ private: boost::asio::executor_work_guard m_KeepAlive; std::vector m_Threads; boost::asio::deadline_timer m_AlreadyExpiredTimer; - - struct { - std::mutex Mutex; - uint_fast32_t FreeSlots; - std::list Waiting; - } m_CpuBoundSemaphore; + std::atomic_int_fast32_t m_CpuBoundSemaphore; }; class TerminateIoThread : public std::exception { }; +/** + * Condition variable which doesn't block I/O threads + * + * @ingroup base + */ +class AsioConditionVariable +{ +public: + AsioConditionVariable(boost::asio::io_context& io, bool init = false); + + void Set(); + void Clear(); + void Wait(boost::asio::yield_context yc); + +private: + boost::asio::deadline_timer m_Timer; +}; + /** * I/O timeout emulator *