Revert "CpuBoundWork#CpuBoundWork(): don't spin on atomic int to acquire slot"

This reverts commit 7c4b70f8998b2081bee3610cb03b9b8b5cf04e7d.
This commit is contained in:
Alexander A. Klimov 2024-12-05 10:38:27 +01:00
parent 7c4b70f899
commit 21a0dcbfcb
2 changed files with 30 additions and 78 deletions

View File

@ -16,55 +16,28 @@
using namespace icinga; using namespace icinga;
CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand& strand) CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand&)
: m_Done(false) : m_Done(false)
{ {
auto& ioEngine (IoEngine::Get()); auto& ioEngine (IoEngine::Get());
auto& sem (ioEngine.m_CpuBoundSemaphore);
std::unique_lock<std::mutex> lock (sem.Mutex);
if (sem.FreeSlots) { for (;;) {
--sem.FreeSlots; auto availableSlots (ioEngine.m_CpuBoundSemaphore.fetch_sub(1));
return;
if (availableSlots < 1) {
ioEngine.m_CpuBoundSemaphore.fetch_add(1);
IoEngine::YieldCurrentCoroutine(yc);
continue;
} }
auto cv (Shared<AsioConditionVariable>::Make(ioEngine.GetIoContext())); break;
bool gotSlot = false;
auto pos (sem.Waiting.insert(sem.Waiting.end(), IoEngine::CpuBoundQueueItem{&strand, cv, &gotSlot}));
lock.unlock();
try {
cv->Wait(yc);
} catch (...) {
std::unique_lock<std::mutex> lock (sem.Mutex);
if (gotSlot) {
lock.unlock();
Done();
} else {
sem.Waiting.erase(pos);
}
throw;
} }
} }
void CpuBoundWork::Done() void CpuBoundWork::Done()
{ {
if (!m_Done) { if (!m_Done) {
auto& sem (IoEngine::Get().m_CpuBoundSemaphore); IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1);
std::unique_lock<std::mutex> lock (sem.Mutex);
if (sem.Waiting.empty()) {
++sem.FreeSlots;
} else {
auto next (sem.Waiting.front());
*next.GotSlot = true;
sem.Waiting.pop_front();
boost::asio::post(*next.Strand, [cv = std::move(next.CV)]() { cv->Set(); });
}
m_Done = true; m_Done = true;
} }
@ -85,11 +58,7 @@ boost::asio::io_context& IoEngine::GetIoContext()
IoEngine::IoEngine() : m_IoContext(), m_KeepAlive(boost::asio::make_work_guard(m_IoContext)), m_Threads(decltype(m_Threads)::size_type(Configuration::Concurrency * 2u)), m_AlreadyExpiredTimer(m_IoContext) IoEngine::IoEngine() : m_IoContext(), m_KeepAlive(boost::asio::make_work_guard(m_IoContext)), m_Threads(decltype(m_Threads)::size_type(Configuration::Concurrency * 2u)), m_AlreadyExpiredTimer(m_IoContext)
{ {
m_AlreadyExpiredTimer.expires_at(boost::posix_time::neg_infin); m_AlreadyExpiredTimer.expires_at(boost::posix_time::neg_infin);
m_CpuBoundSemaphore.store(Configuration::Concurrency * 3u / 2u);
{
std::unique_lock<std::mutex> lock (m_CpuBoundSemaphore.Mutex);
m_CpuBoundSemaphore.FreeSlots = Configuration::Concurrency * 3u / 2u;
}
for (auto& thread : m_Threads) { for (auto& thread : m_Threads) {
thread = std::thread(&IoEngine::RunEventLoop, this); thread = std::thread(&IoEngine::RunEventLoop, this);

View File

@ -6,14 +6,10 @@
#include "base/exception.hpp" #include "base/exception.hpp"
#include "base/lazy-init.hpp" #include "base/lazy-init.hpp"
#include "base/logger.hpp" #include "base/logger.hpp"
#include "base/shared.hpp"
#include "base/shared-object.hpp" #include "base/shared-object.hpp"
#include <atomic> #include <atomic>
#include <cstdint>
#include <exception> #include <exception>
#include <list>
#include <memory> #include <memory>
#include <mutex>
#include <thread> #include <thread>
#include <utility> #include <utility>
#include <vector> #include <vector>
@ -35,7 +31,7 @@ namespace icinga
class CpuBoundWork class CpuBoundWork
{ {
public: public:
CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand& strand); CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand&);
CpuBoundWork(const CpuBoundWork&) = delete; CpuBoundWork(const CpuBoundWork&) = delete;
CpuBoundWork(CpuBoundWork&&) = delete; CpuBoundWork(CpuBoundWork&&) = delete;
CpuBoundWork& operator=(const CpuBoundWork&) = delete; CpuBoundWork& operator=(const CpuBoundWork&) = delete;
@ -52,25 +48,6 @@ private:
bool m_Done; bool m_Done;
}; };
/**
* Condition variable which doesn't block I/O threads
*
* @ingroup base
*/
class AsioConditionVariable
{
public:
AsioConditionVariable(boost::asio::io_context& io, bool init = false);
void Set();
void Clear();
void Wait(boost::asio::yield_context yc);
private:
boost::asio::deadline_timer m_Timer;
};
/** /**
* Async I/O engine * Async I/O engine
* *
@ -133,13 +110,6 @@ public:
} }
private: private:
struct CpuBoundQueueItem
{
boost::asio::io_context::strand* Strand;
Shared<AsioConditionVariable>::Ptr CV;
bool* GotSlot;
};
IoEngine(); IoEngine();
void RunEventLoop(); void RunEventLoop();
@ -150,18 +120,31 @@ private:
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> m_KeepAlive; boost::asio::executor_work_guard<boost::asio::io_context::executor_type> m_KeepAlive;
std::vector<std::thread> m_Threads; std::vector<std::thread> m_Threads;
boost::asio::deadline_timer m_AlreadyExpiredTimer; boost::asio::deadline_timer m_AlreadyExpiredTimer;
std::atomic_int_fast32_t m_CpuBoundSemaphore;
struct {
std::mutex Mutex;
uint_fast32_t FreeSlots;
std::list<CpuBoundQueueItem> Waiting;
} m_CpuBoundSemaphore;
}; };
class TerminateIoThread : public std::exception class TerminateIoThread : public std::exception
{ {
}; };
/**
* Condition variable which doesn't block I/O threads
*
* @ingroup base
*/
class AsioConditionVariable
{
public:
AsioConditionVariable(boost::asio::io_context& io, bool init = false);
void Set();
void Clear();
void Wait(boost::asio::yield_context yc);
private:
boost::asio::deadline_timer m_Timer;
};
/** /**
* I/O timeout emulator * I/O timeout emulator
* *