quick & dirty proof of concept

This commit is contained in:
Julian Brost 2025-09-09 11:01:31 +02:00
parent 87df80d322
commit c8e170bd1a
2 changed files with 59 additions and 18 deletions

View File

@ -19,33 +19,18 @@ using namespace icinga;
CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc) CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc)
: m_Done(false) : m_Done(false)
{ {
auto& ioEngine (IoEngine::Get()); AsyncLock(yc);
for (;;) {
auto availableSlots (ioEngine.m_CpuBoundSemaphore.fetch_sub(1));
if (availableSlots < 1) {
ioEngine.m_CpuBoundSemaphore.fetch_add(1);
IoEngine::YieldCurrentCoroutine(yc);
continue;
}
break;
}
} }
CpuBoundWork::~CpuBoundWork() CpuBoundWork::~CpuBoundWork()
{ {
if (!m_Done) { Done();
IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1);
}
} }
void CpuBoundWork::Done() void CpuBoundWork::Done()
{ {
if (!m_Done) { if (!m_Done) {
IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1); Unlock();
m_Done = true; m_Done = true;
} }
} }

View File

@ -12,14 +12,17 @@
#include <atomic> #include <atomic>
#include <exception> #include <exception>
#include <memory> #include <memory>
#include <queue>
#include <thread> #include <thread>
#include <utility> #include <utility>
#include <vector> #include <vector>
#include <stdexcept> #include <stdexcept>
#include <boost/context/fixedsize_stack.hpp> #include <boost/context/fixedsize_stack.hpp>
#include <boost/exception/all.hpp> #include <boost/exception/all.hpp>
#include <boost/asio/bind_allocator.hpp>
#include <boost/asio/deadline_timer.hpp> #include <boost/asio/deadline_timer.hpp>
#include <boost/asio/io_context.hpp> #include <boost/asio/io_context.hpp>
#include <boost/asio/recycling_allocator.hpp>
#include <boost/asio/spawn.hpp> #include <boost/asio/spawn.hpp>
#if BOOST_VERSION >= 108700 #if BOOST_VERSION >= 108700
@ -47,6 +50,59 @@ public:
void Done(); void Done();
private: private:
template <typename CompletionToken>
auto AsyncLock(CompletionToken&& token)
{
// implementation inspired by https://github.com/boostorg/asio/blob/boost-1.89.0/example/cpp14/operations/callback_wrapper.cpp
auto init = [](auto handler) {
auto work = boost::asio::make_work_guard(handler);
// With C++23, using std::move_only_function as value type for m_Waiting should be possible instead of using std::shared_ptr here for making the lambda copyable.
auto callback = [handler = std::make_shared<decltype(handler)>(std::move(handler)), work = std::make_shared<decltype(work)>(std::move(work))]() {
auto alloc = boost::asio::get_associated_allocator(*handler, boost::asio::recycling_allocator<void>());
boost::asio::dispatch(work->get_executor(),
boost::asio::bind_allocator(alloc,
[handler = std::move(*handler)]() mutable {
Log(LogInformation, "CpuBoundWork") << "calling handler";
std::move(handler)();
}));
};
std::unique_lock lock(m_Mutex);
if (m_FreeSlots > 0) {
Log(LogInformation, "CpuBoundWork") << "immediate acquire";
m_FreeSlots--;
lock.unlock();
callback();
} else {
Log(LogInformation, "CpuBoundWork") << "wait acquire";
m_Waiting.emplace(std::move(callback));
}
};
return boost::asio::async_initiate<CompletionToken, void()>(init, token);
}
void Unlock()
{
std::unique_lock lock(m_Mutex);
if (!m_Waiting.empty()) {
Log(LogInformation, "CpuBoundWork") << "wake next";
auto callback = std::move(m_Waiting.front());
m_Waiting.pop();
lock.unlock();
callback();
} else {
Log(LogInformation, "CpuBoundWork") << "release";
m_FreeSlots++;
}
}
static inline std::mutex m_Mutex;
static inline std::size_t m_FreeSlots = 1;
static inline std::queue<std::function<void()>> m_Waiting;
bool m_Done; bool m_Done;
}; };