Merge pull request #10265 from Icinga/RedisConnection-spinlock

RedisConnection#Connect(): get rid of spin lock
This commit is contained in:
Julian Brost 2025-05-14 15:06:58 +02:00 committed by GitHub
commit 1a386ad55d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 62 additions and 20 deletions

View File

@ -124,28 +124,55 @@ void IoEngine::RunEventLoop()
} }
} }
AsioConditionVariable::AsioConditionVariable(boost::asio::io_context& io, bool init) AsioEvent::AsioEvent(boost::asio::io_context& io, bool init)
: m_Timer(io) : m_Timer(io)
{ {
m_Timer.expires_at(init ? boost::posix_time::neg_infin : boost::posix_time::pos_infin); m_Timer.expires_at(init ? boost::posix_time::neg_infin : boost::posix_time::pos_infin);
} }
void AsioConditionVariable::Set() void AsioEvent::Set()
{ {
m_Timer.expires_at(boost::posix_time::neg_infin); m_Timer.expires_at(boost::posix_time::neg_infin);
} }
void AsioConditionVariable::Clear() void AsioEvent::Clear()
{ {
m_Timer.expires_at(boost::posix_time::pos_infin); m_Timer.expires_at(boost::posix_time::pos_infin);
} }
void AsioConditionVariable::Wait(boost::asio::yield_context yc) void AsioEvent::Wait(boost::asio::yield_context yc)
{ {
boost::system::error_code ec; boost::system::error_code ec;
m_Timer.async_wait(yc[ec]); m_Timer.async_wait(yc[ec]);
} }
AsioDualEvent::AsioDualEvent(boost::asio::io_context& io, bool init)
: m_IsTrue(io, init), m_IsFalse(io, !init)
{
}
void AsioDualEvent::Set()
{
m_IsTrue.Set();
m_IsFalse.Clear();
}
void AsioDualEvent::Clear()
{
m_IsTrue.Clear();
m_IsFalse.Set();
}
void AsioDualEvent::WaitForSet(boost::asio::yield_context yc)
{
m_IsTrue.Wait(std::move(yc));
}
void AsioDualEvent::WaitForClear(boost::asio::yield_context yc)
{
m_IsFalse.Wait(std::move(yc));
}
/** /**
* Cancels any pending timeout callback. * Cancels any pending timeout callback.
* *

View File

@ -158,14 +158,14 @@ class TerminateIoThread : public std::exception
}; };
/** /**
* Condition variable which doesn't block I/O threads * Awaitable flag which doesn't block I/O threads, inspired by threading.Event from Python
* *
* @ingroup base * @ingroup base
*/ */
class AsioConditionVariable class AsioEvent
{ {
public: public:
AsioConditionVariable(boost::asio::io_context& io, bool init = false); AsioEvent(boost::asio::io_context& io, bool init = false);
void Set(); void Set();
void Clear(); void Clear();
@ -175,6 +175,26 @@ private:
boost::asio::deadline_timer m_Timer; boost::asio::deadline_timer m_Timer;
}; };
/**
* Like AsioEvent, which only allows waiting for an event to be set, but additionally supports waiting for clearing
*
* @ingroup base
*/
class AsioDualEvent
{
public:
AsioDualEvent(boost::asio::io_context& io, bool init = false);
void Set();
void Clear();
void WaitForSet(boost::asio::yield_context yc);
void WaitForClear(boost::asio::yield_context yc);
private:
AsioEvent m_IsTrue, m_IsFalse;
};
/** /**
* I/O timeout emulator * I/O timeout emulator
* *

View File

@ -302,12 +302,6 @@ void RedisConnection::Connect(asio::yield_context& yc)
boost::asio::deadline_timer timer (m_Strand.context()); boost::asio::deadline_timer timer (m_Strand.context());
auto waitForReadLoop ([this, &yc]() {
while (!m_Queues.FutureResponseActions.empty()) {
IoEngine::YieldCurrentCoroutine(yc);
}
});
for (;;) { for (;;) {
try { try {
if (m_Path.IsEmpty()) { if (m_Path.IsEmpty()) {
@ -339,7 +333,7 @@ void RedisConnection::Connect(asio::yield_context& yc)
} }
Handshake(conn, yc); Handshake(conn, yc);
waitForReadLoop(); m_QueuedReads.WaitForClear(yc);
m_TlsConn = std::move(conn); m_TlsConn = std::move(conn);
} else { } else {
Log(m_Parent ? LogNotice : LogInformation, "IcingaDB") Log(m_Parent ? LogNotice : LogInformation, "IcingaDB")
@ -350,7 +344,7 @@ void RedisConnection::Connect(asio::yield_context& yc)
icinga::Connect(conn->next_layer(), m_Host, Convert::ToString(m_Port), yc); icinga::Connect(conn->next_layer(), m_Host, Convert::ToString(m_Port), yc);
Handshake(conn, yc); Handshake(conn, yc);
waitForReadLoop(); m_QueuedReads.WaitForClear(yc);
m_TcpConn = std::move(conn); m_TcpConn = std::move(conn);
} }
} else { } else {
@ -362,7 +356,7 @@ void RedisConnection::Connect(asio::yield_context& yc)
conn->next_layer().async_connect(Unix::endpoint(m_Path.CStr()), yc); conn->next_layer().async_connect(Unix::endpoint(m_Path.CStr()), yc);
Handshake(conn, yc); Handshake(conn, yc);
waitForReadLoop(); m_QueuedReads.WaitForClear(yc);
m_UnixConn = std::move(conn); m_UnixConn = std::move(conn);
} }
@ -394,7 +388,7 @@ void RedisConnection::Connect(asio::yield_context& yc)
void RedisConnection::ReadLoop(asio::yield_context& yc) void RedisConnection::ReadLoop(asio::yield_context& yc)
{ {
for (;;) { for (;;) {
m_QueuedReads.Wait(yc); m_QueuedReads.WaitForSet(yc);
while (!m_Queues.FutureResponseActions.empty()) { while (!m_Queues.FutureResponseActions.empty()) {
auto item (std::move(m_Queues.FutureResponseActions.front())); auto item (std::move(m_Queues.FutureResponseActions.front()));

View File

@ -262,7 +262,8 @@ namespace icinga
std::set<QueryPriority> m_SuppressedQueryKinds; std::set<QueryPriority> m_SuppressedQueryKinds;
// Indicate that there's something to send/receive // Indicate that there's something to send/receive
AsioConditionVariable m_QueuedWrites, m_QueuedReads; AsioEvent m_QueuedWrites;
AsioDualEvent m_QueuedReads;
std::function<void(boost::asio::yield_context& yc)> m_ConnectedCallback; std::function<void(boost::asio::yield_context& yc)> m_ConnectedCallback;

View File

@ -75,8 +75,8 @@ private:
double m_Seen; double m_Seen;
boost::asio::io_context::strand m_IoStrand; boost::asio::io_context::strand m_IoStrand;
std::vector<String> m_OutgoingMessagesQueue; std::vector<String> m_OutgoingMessagesQueue;
AsioConditionVariable m_OutgoingMessagesQueued; AsioEvent m_OutgoingMessagesQueued;
AsioConditionVariable m_WriterDone; AsioEvent m_WriterDone;
Atomic<bool> m_ShuttingDown; Atomic<bool> m_ShuttingDown;
boost::asio::deadline_timer m_CheckLivenessTimer, m_HeartbeatTimer; boost::asio::deadline_timer m_CheckLivenessTimer, m_HeartbeatTimer;