diff --git a/lib/base/io-engine.cpp b/lib/base/io-engine.cpp index 94397204e..d3a362dde 100644 --- a/lib/base/io-engine.cpp +++ b/lib/base/io-engine.cpp @@ -16,30 +16,121 @@ using namespace icinga; -CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand&) +/** + * Acquires a slot for CPU-bound work. + * + * If and as long as the lock-free TryAcquireSlot() doesn't succeed, + * subscribes to the slow path by waiting on a condition variable. + * It is woken up by Done() which is called by the destructor. + * + * @param yc Needed to asynchronously wait for the condition variable. + * @param strand Where to post the wake-up of the condition variable. + */ +CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand& strand) : m_Done(false) { - auto& ioEngine (IoEngine::Get()); + VERIFY(strand.running_in_this_thread()); - for (;;) { - auto availableSlots (ioEngine.m_CpuBoundSemaphore.fetch_sub(1)); + auto& ie (IoEngine::Get()); + Shared::Ptr cv; - if (availableSlots < 1) { - ioEngine.m_CpuBoundSemaphore.fetch_add(1); - IoEngine::YieldCurrentCoroutine(yc); - continue; + while (!TryAcquireSlot()) { + if (!cv) { + cv = Shared::Make(ie.GetIoContext()); + + // The above line may take a little bit, so let's optimistically re-check + if (TryAcquireSlot()) { + break; + } } - break; + { + std::unique_lock lock (ie.m_CpuBoundWaitingMutex); + + // The above line may take even longer, so let's check again. + // Also mitigate lost wake-ups by re-checking during the lock: + // + // During our lock, Done() can't retrieve the subscribers to wake up, + // so any ongoing wake-up is either done at this point or has not started yet. + // If such a wake-up is done, it's a lost wake-up to us unless we re-check here + // whether the slot being freed (just before the wake-up) is still available. + if (TryAcquireSlot()) { + break; + } + + // If the (hypothetical) slot mentioned above was taken by another coroutine, + // there are no free slots again, just as if no wake-ups happened just now. + ie.m_CpuBoundWaiting.emplace_back(strand, cv); + } + + cv->Wait(yc); } } +/** + * Tries to acquire a slot for CPU-bound work. + * + * Specifically, decrements the number of free slots (semaphore) by one, + * but only if it's currently greater than zero. + * Not falling below zero requires an atomic#compare_exchange_weak() loop + * instead of a simple atomic#fetch_sub() call, but it's also atomic. + * + * @return Whether a slot was acquired. + */ +bool CpuBoundWork::TryAcquireSlot() +{ + auto& ie (IoEngine::Get()); + auto freeSlots (ie.m_CpuBoundSemaphore.load()); + + while (freeSlots > 0) { + // If ie.m_CpuBoundSemaphore was changed after the last load, + // compare_exchange_weak() will load its latest value into freeSlots for us to retry until... + if (ie.m_CpuBoundSemaphore.compare_exchange_weak(freeSlots, freeSlots - 1)) { + // ... either we successfully decrement ie.m_CpuBoundSemaphore by one, ... + return true; + } + } + + // ... or it becomes zero due to another coroutine. + return false; +} + +/** + * Releases the own slot acquired by the constructor (TryAcquireSlot()) if not already done. + * + * Precisely, increments the number of free slots (semaphore) by one. + * Also wakes up all waiting constructors (slow path) if necessary. + */ void CpuBoundWork::Done() { if (!m_Done) { - IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1); - m_Done = true; + + auto& ie (IoEngine::Get()); + + // The constructor takes the slow path only if the semaphore is full, + // so we only have to wake up constructors if the semaphore was full. + // This works because after fetch_add(), TryAcquireSlot() (fast path) will succeed. + if (ie.m_CpuBoundSemaphore.fetch_add(1) < 1) { + // So now there are only slow path subscribers from just before the fetch_add() to be woken up. + // Precisely, only subscribers from just before the fetch_add() which turned 0 to 1. + + decltype(ie.m_CpuBoundWaiting) subscribers; + + { + // Locking after fetch_add() is safe because a delayed wake-up is fine. + // Wake-up of constructors which subscribed after the fetch_add() is also not a problem. + // In worst case, they will just re-subscribe to the slow path. + // Lost wake-ups are mitigated by the constructor, see its implementation comments. + std::unique_lock lock (ie.m_CpuBoundWaitingMutex); + std::swap(subscribers, ie.m_CpuBoundWaiting); + } + + // Again, a delayed wake-up is fine, hence unlocked. + for (auto& [strand, cv] : subscribers) { + boost::asio::post(strand, [cv = std::move(cv)] { cv->NotifyOne(); }); + } + } } } diff --git a/lib/base/io-engine.hpp b/lib/base/io-engine.hpp index c51aa7d2b..23be1a146 100644 --- a/lib/base/io-engine.hpp +++ b/lib/base/io-engine.hpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -52,6 +53,8 @@ public: void Done(); private: + static bool TryAcquireSlot(); + bool m_Done; }; @@ -153,7 +156,10 @@ private: boost::asio::executor_work_guard m_KeepAlive; std::vector m_Threads; boost::asio::deadline_timer m_AlreadyExpiredTimer; + std::atomic_int_fast32_t m_CpuBoundSemaphore; + std::mutex m_CpuBoundWaitingMutex; + std::vector::Ptr>> m_CpuBoundWaiting; }; class TerminateIoThread : public std::exception