CpuBoundWork#CpuBoundWork(): don't spin on atomic int to acquire slot

This is inefficient and involves possible bad surprises regarding waiting durations on busy nodes. Instead, use AsioConditionVariable#Wait() if there are no free slots. It's notified by others' CpuBoundWork#~CpuBoundWork() once finished.
This commit is contained in:
Alexander A. Klimov 2025-01-24 17:58:07 +01:00
parent 1e24adf247
commit cb18ed0133
2 changed files with 108 additions and 11 deletions

View File

@ -16,30 +16,121 @@
using namespace icinga;
CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand&)
/**
* Acquires a slot for CPU-bound work.
*
* If and as long as the lock-free TryAcquireSlot() doesn't succeed,
* subscribes to the slow path by waiting on a condition variable.
* It is woken up by Done() which is called by the destructor.
*
* @param yc Needed to asynchronously wait for the condition variable.
* @param strand Where to post the wake-up of the condition variable.
*/
CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand& strand)
: m_Done(false)
{
auto& ioEngine (IoEngine::Get());
VERIFY(strand.running_in_this_thread());
for (;;) {
auto availableSlots (ioEngine.m_CpuBoundSemaphore.fetch_sub(1));
auto& ie (IoEngine::Get());
Shared<AsioConditionVariable>::Ptr cv;
if (availableSlots < 1) {
ioEngine.m_CpuBoundSemaphore.fetch_add(1);
IoEngine::YieldCurrentCoroutine(yc);
continue;
}
while (!TryAcquireSlot()) {
if (!cv) {
cv = Shared<AsioConditionVariable>::Make(ie.GetIoContext());
// The above line may take a little bit, so let's optimistically re-check
if (TryAcquireSlot()) {
break;
}
}
{
std::unique_lock lock (ie.m_CpuBoundWaitingMutex);
// The above line may take even longer, so let's check again.
// Also mitigate lost wake-ups by re-checking during the lock:
//
// During our lock, Done() can't retrieve the subscribers to wake up,
// so any ongoing wake-up is either done at this point or has not started yet.
// If such a wake-up is done, it's a lost wake-up to us unless we re-check here
// whether the slot being freed (just before the wake-up) is still available.
if (TryAcquireSlot()) {
break;
}
// If the (hypothetical) slot mentioned above was taken by another coroutine,
// there are no free slots again, just as if no wake-ups happened just now.
ie.m_CpuBoundWaiting.emplace_back(strand, cv);
}
cv->Wait(yc);
}
}
/**
* Tries to acquire a slot for CPU-bound work.
*
* Specifically, decrements the number of free slots (semaphore) by one,
* but only if it's currently greater than zero.
* Not falling below zero requires an atomic#compare_exchange_weak() loop
* instead of a simple atomic#fetch_sub() call, but it's also atomic.
*
* @return Whether a slot was acquired.
*/
bool CpuBoundWork::TryAcquireSlot()
{
auto& ie (IoEngine::Get());
auto freeSlots (ie.m_CpuBoundSemaphore.load());
while (freeSlots > 0) {
// If ie.m_CpuBoundSemaphore was changed after the last load,
// compare_exchange_weak() will load its latest value into freeSlots for us to retry until...
if (ie.m_CpuBoundSemaphore.compare_exchange_weak(freeSlots, freeSlots - 1)) {
// ... either we successfully decrement ie.m_CpuBoundSemaphore by one, ...
return true;
}
}
// ... or it becomes zero due to another coroutine.
return false;
}
/**
* Releases the own slot acquired by the constructor (TryAcquireSlot()) if not already done.
*
* Precisely, increments the number of free slots (semaphore) by one.
* Also wakes up all waiting constructors (slow path) if necessary.
*/
void CpuBoundWork::Done()
{
if (!m_Done) {
IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1);
m_Done = true;
auto& ie (IoEngine::Get());
// The constructor takes the slow path only if the semaphore is full,
// so we only have to wake up constructors if the semaphore was full.
// This works because after fetch_add(), TryAcquireSlot() (fast path) will succeed.
if (ie.m_CpuBoundSemaphore.fetch_add(1) < 1) {
// So now there are only slow path subscribers from just before the fetch_add() to be woken up.
// Precisely, only subscribers from just before the fetch_add() which turned 0 to 1.
decltype(ie.m_CpuBoundWaiting) subscribers;
{
// Locking after fetch_add() is safe because a delayed wake-up is fine.
// Wake-up of constructors which subscribed after the fetch_add() is also not a problem.
// In worst case, they will just re-subscribe to the slow path.
// Lost wake-ups are mitigated by the constructor, see its implementation comments.
std::unique_lock lock (ie.m_CpuBoundWaitingMutex);
std::swap(subscribers, ie.m_CpuBoundWaiting);
}
// Again, a delayed wake-up is fine, hence unlocked.
for (auto& [strand, cv] : subscribers) {
boost::asio::post(strand, [cv = std::move(cv)] { cv->NotifyOne(); });
}
}
}
}

View File

@ -12,6 +12,7 @@
#include <atomic>
#include <exception>
#include <memory>
#include <mutex>
#include <thread>
#include <utility>
#include <vector>
@ -52,6 +53,8 @@ public:
void Done();
private:
static bool TryAcquireSlot();
bool m_Done;
};
@ -153,7 +156,10 @@ private:
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> m_KeepAlive;
std::vector<std::thread> m_Threads;
boost::asio::deadline_timer m_AlreadyExpiredTimer;
std::atomic_int_fast32_t m_CpuBoundSemaphore;
std::mutex m_CpuBoundWaitingMutex;
std::vector<std::pair<boost::asio::io_context::strand, Shared<AsioConditionVariable>::Ptr>> m_CpuBoundWaiting;
};
class TerminateIoThread : public std::exception