From 331ba1f661bb0e812534bf7c224d6c89dfec3a5c Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Thu, 5 Dec 2024 10:54:32 +0100 Subject: [PATCH 1/3] Rename AsioConditionVariable to AsioEvent The current implementation is rather similar to Python's threading.Event, than to a CV. --- lib/base/io-engine.cpp | 8 ++++---- lib/base/io-engine.hpp | 6 +++--- lib/icingadb/redisconnection.hpp | 2 +- lib/remote/jsonrpcconnection.hpp | 4 ++-- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/lib/base/io-engine.cpp b/lib/base/io-engine.cpp index 3190ed03d..30e2512d4 100644 --- a/lib/base/io-engine.cpp +++ b/lib/base/io-engine.cpp @@ -124,23 +124,23 @@ void IoEngine::RunEventLoop() } } -AsioConditionVariable::AsioConditionVariable(boost::asio::io_context& io, bool init) +AsioEvent::AsioEvent(boost::asio::io_context& io, bool init) : m_Timer(io) { m_Timer.expires_at(init ? boost::posix_time::neg_infin : boost::posix_time::pos_infin); } -void AsioConditionVariable::Set() +void AsioEvent::Set() { m_Timer.expires_at(boost::posix_time::neg_infin); } -void AsioConditionVariable::Clear() +void AsioEvent::Clear() { m_Timer.expires_at(boost::posix_time::pos_infin); } -void AsioConditionVariable::Wait(boost::asio::yield_context yc) +void AsioEvent::Wait(boost::asio::yield_context yc) { boost::system::error_code ec; m_Timer.async_wait(yc[ec]); diff --git a/lib/base/io-engine.hpp b/lib/base/io-engine.hpp index 64831ff8c..049523e52 100644 --- a/lib/base/io-engine.hpp +++ b/lib/base/io-engine.hpp @@ -158,14 +158,14 @@ class TerminateIoThread : public std::exception }; /** - * Condition variable which doesn't block I/O threads + * Awaitable flag which doesn't block I/O threads, inspired by threading.Event from Python * * @ingroup base */ -class AsioConditionVariable +class AsioEvent { public: - AsioConditionVariable(boost::asio::io_context& io, bool init = false); + AsioEvent(boost::asio::io_context& io, bool init = false); void Set(); void Clear(); diff --git a/lib/icingadb/redisconnection.hpp b/lib/icingadb/redisconnection.hpp index acc6e4381..2038f797c 100644 --- a/lib/icingadb/redisconnection.hpp +++ b/lib/icingadb/redisconnection.hpp @@ -262,7 +262,7 @@ namespace icinga std::set m_SuppressedQueryKinds; // Indicate that there's something to send/receive - AsioConditionVariable m_QueuedWrites, m_QueuedReads; + AsioEvent m_QueuedWrites, m_QueuedReads; std::function m_ConnectedCallback; diff --git a/lib/remote/jsonrpcconnection.hpp b/lib/remote/jsonrpcconnection.hpp index ef83dce1b..826d3b46a 100644 --- a/lib/remote/jsonrpcconnection.hpp +++ b/lib/remote/jsonrpcconnection.hpp @@ -75,8 +75,8 @@ private: double m_Seen; boost::asio::io_context::strand m_IoStrand; std::vector m_OutgoingMessagesQueue; - AsioConditionVariable m_OutgoingMessagesQueued; - AsioConditionVariable m_WriterDone; + AsioEvent m_OutgoingMessagesQueued; + AsioEvent m_WriterDone; Atomic m_ShuttingDown; boost::asio::deadline_timer m_CheckLivenessTimer, m_HeartbeatTimer; From 060d8b185e318bbe9544132b8111c6e559a9cd7a Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Wed, 14 May 2025 12:24:28 +0200 Subject: [PATCH 2/3] Introduce AsioDualEvent --- lib/base/io-engine.cpp | 27 +++++++++++++++++++++++++++ lib/base/io-engine.hpp | 20 ++++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/lib/base/io-engine.cpp b/lib/base/io-engine.cpp index 30e2512d4..0792be5cc 100644 --- a/lib/base/io-engine.cpp +++ b/lib/base/io-engine.cpp @@ -146,6 +146,33 @@ void AsioEvent::Wait(boost::asio::yield_context yc) m_Timer.async_wait(yc[ec]); } +AsioDualEvent::AsioDualEvent(boost::asio::io_context& io, bool init) + : m_IsTrue(io, init), m_IsFalse(io, !init) +{ +} + +void AsioDualEvent::Set() +{ + m_IsTrue.Set(); + m_IsFalse.Clear(); +} + +void AsioDualEvent::Clear() +{ + m_IsTrue.Clear(); + m_IsFalse.Set(); +} + +void AsioDualEvent::WaitForSet(boost::asio::yield_context yc) +{ + m_IsTrue.Wait(std::move(yc)); +} + +void AsioDualEvent::WaitForClear(boost::asio::yield_context yc) +{ + m_IsFalse.Wait(std::move(yc)); +} + /** * Cancels any pending timeout callback. * diff --git a/lib/base/io-engine.hpp b/lib/base/io-engine.hpp index 049523e52..0883d7810 100644 --- a/lib/base/io-engine.hpp +++ b/lib/base/io-engine.hpp @@ -175,6 +175,26 @@ private: boost::asio::deadline_timer m_Timer; }; +/** + * Like AsioEvent, which only allows waiting for an event to be set, but additionally supports waiting for clearing + * + * @ingroup base + */ +class AsioDualEvent +{ +public: + AsioDualEvent(boost::asio::io_context& io, bool init = false); + + void Set(); + void Clear(); + + void WaitForSet(boost::asio::yield_context yc); + void WaitForClear(boost::asio::yield_context yc); + +private: + AsioEvent m_IsTrue, m_IsFalse; +}; + /** * I/O timeout emulator * From 2739f7f18954c5d60a7c42e055b98e6d4b77dae5 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Wed, 14 May 2025 12:28:11 +0200 Subject: [PATCH 3/3] RedisConnection#Connect(): get rid of spin lock Instead of IoEngine::YieldCurrentCoroutine(yc) until m_Queues.FutureResponseActions.empty(), async-wait a CV which is updated along with m_Queues.FutureResponseActions. --- lib/icingadb/redisconnection.cpp | 14 ++++---------- lib/icingadb/redisconnection.hpp | 3 ++- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/lib/icingadb/redisconnection.cpp b/lib/icingadb/redisconnection.cpp index c1f73f5a0..e0b026239 100644 --- a/lib/icingadb/redisconnection.cpp +++ b/lib/icingadb/redisconnection.cpp @@ -302,12 +302,6 @@ void RedisConnection::Connect(asio::yield_context& yc) boost::asio::deadline_timer timer (m_Strand.context()); - auto waitForReadLoop ([this, &yc]() { - while (!m_Queues.FutureResponseActions.empty()) { - IoEngine::YieldCurrentCoroutine(yc); - } - }); - for (;;) { try { if (m_Path.IsEmpty()) { @@ -339,7 +333,7 @@ void RedisConnection::Connect(asio::yield_context& yc) } Handshake(conn, yc); - waitForReadLoop(); + m_QueuedReads.WaitForClear(yc); m_TlsConn = std::move(conn); } else { Log(m_Parent ? LogNotice : LogInformation, "IcingaDB") @@ -350,7 +344,7 @@ void RedisConnection::Connect(asio::yield_context& yc) icinga::Connect(conn->next_layer(), m_Host, Convert::ToString(m_Port), yc); Handshake(conn, yc); - waitForReadLoop(); + m_QueuedReads.WaitForClear(yc); m_TcpConn = std::move(conn); } } else { @@ -362,7 +356,7 @@ void RedisConnection::Connect(asio::yield_context& yc) conn->next_layer().async_connect(Unix::endpoint(m_Path.CStr()), yc); Handshake(conn, yc); - waitForReadLoop(); + m_QueuedReads.WaitForClear(yc); m_UnixConn = std::move(conn); } @@ -394,7 +388,7 @@ void RedisConnection::Connect(asio::yield_context& yc) void RedisConnection::ReadLoop(asio::yield_context& yc) { for (;;) { - m_QueuedReads.Wait(yc); + m_QueuedReads.WaitForSet(yc); while (!m_Queues.FutureResponseActions.empty()) { auto item (std::move(m_Queues.FutureResponseActions.front())); diff --git a/lib/icingadb/redisconnection.hpp b/lib/icingadb/redisconnection.hpp index 2038f797c..308cf36d4 100644 --- a/lib/icingadb/redisconnection.hpp +++ b/lib/icingadb/redisconnection.hpp @@ -262,7 +262,8 @@ namespace icinga std::set m_SuppressedQueryKinds; // Indicate that there's something to send/receive - AsioEvent m_QueuedWrites, m_QueuedReads; + AsioEvent m_QueuedWrites; + AsioDualEvent m_QueuedReads; std::function m_ConnectedCallback;