diff --git a/lib/base/io-engine.cpp b/lib/base/io-engine.cpp index c93a4d87a..b70050552 100644 --- a/lib/base/io-engine.cpp +++ b/lib/base/io-engine.cpp @@ -27,6 +27,7 @@ #include #include #include +#include using namespace icinga; @@ -137,3 +138,25 @@ void IoEngine::RunEventLoop() } } } + +AsioConditionVariable::AsioConditionVariable(boost::asio::io_service& io, bool init) + : m_Timer(io) +{ + m_Timer.expires_at(init ? boost::posix_time::neg_infin : boost::posix_time::pos_infin); +} + +void AsioConditionVariable::Set() +{ + m_Timer.expires_at(boost::posix_time::neg_infin); +} + +void AsioConditionVariable::Clear() +{ + m_Timer.expires_at(boost::posix_time::pos_infin); +} + +void AsioConditionVariable::Wait(boost::asio::yield_context yc) +{ + boost::system::error_code ec; + m_Timer.async_wait(yc[ec]); +} diff --git a/lib/base/io-engine.hpp b/lib/base/io-engine.hpp index 65059db68..d08525081 100644 --- a/lib/base/io-engine.hpp +++ b/lib/base/io-engine.hpp @@ -109,4 +109,22 @@ class TerminateIoThread : public std::exception { }; +/** + * Condition variable which doesn't block I/O threads + * + * @ingroup base + */ +class AsioConditionVariable +{ +public: + AsioConditionVariable(boost::asio::io_service& io, bool init = false); + + void Set(); + void Clear(); + void Wait(boost::asio::yield_context yc); + +private: + boost::asio::deadline_timer m_Timer; +}; + #endif /* IO_ENGINE_H */ diff --git a/lib/remote/jsonrpcconnection-heartbeat.cpp b/lib/remote/jsonrpcconnection-heartbeat.cpp index 569ffbb1c..da6afe4b7 100644 --- a/lib/remote/jsonrpcconnection-heartbeat.cpp +++ b/lib/remote/jsonrpcconnection-heartbeat.cpp @@ -44,7 +44,7 @@ void JsonRpcConnection::HandleAndWriteHeartbeats(boost::asio::yield_context yc) }) } })); - m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin); + m_OutgoingMessagesQueued.Set(); } } diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp index fa8b65ffc..a0f37f950 100644 --- a/lib/remote/jsonrpcconnection.cpp +++ b/lib/remote/jsonrpcconnection.cpp @@ -17,7 +17,6 @@ #include #include #include -#include #include using namespace icinga; @@ -35,9 +34,6 @@ JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated, { if (authenticated) m_Endpoint = Endpoint::GetByName(identity); - - m_OutgoingMessagesQueued.expires_at(boost::posix_time::pos_infin); - m_WriterDone.expires_at(boost::posix_time::pos_infin); } void JsonRpcConnection::Start() @@ -97,18 +93,15 @@ void JsonRpcConnection::WriteOutgoingMessages(boost::asio::yield_context yc) { Defer disconnect ([this]() { Disconnect(); }); - Defer signalWriterDone ([this]() { m_WriterDone.expires_at(boost::posix_time::neg_infin); }); + Defer signalWriterDone ([this]() { m_WriterDone.Set(); }); do { - try { - m_OutgoingMessagesQueued.async_wait(yc); - } catch (...) { - } + m_OutgoingMessagesQueued.Wait(yc); auto queue (std::move(m_OutgoingMessagesQueue)); m_OutgoingMessagesQueue.clear(); - m_OutgoingMessagesQueued.expires_at(boost::posix_time::pos_infin); + m_OutgoingMessagesQueued.Clear(); if (!queue.empty()) { try { @@ -169,7 +162,7 @@ void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message) { m_IoStrand.post([this, message]() { m_OutgoingMessagesQueue.emplace_back(message); - m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin); + m_OutgoingMessagesQueued.Set(); }); } @@ -186,12 +179,9 @@ void JsonRpcConnection::Disconnect() Log(LogWarning, "JsonRpcConnection") << "API client disconnected for identity '" << m_Identity << "'"; - m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin); + m_OutgoingMessagesQueued.Set(); - try { - m_WriterDone.async_wait(yc); - } catch (...) { - } + m_WriterDone.Wait(yc); try { m_Stream->next_layer().async_shutdown(yc); @@ -288,7 +278,7 @@ void JsonRpcConnection::MessageHandler(const String& jsonString) resultMessage->Set("id", message->Get("id")); m_OutgoingMessagesQueue.emplace_back(resultMessage); - m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin); + m_OutgoingMessagesQueued.Set(); } } diff --git a/lib/remote/jsonrpcconnection.hpp b/lib/remote/jsonrpcconnection.hpp index bc5fa398d..b0679d368 100644 --- a/lib/remote/jsonrpcconnection.hpp +++ b/lib/remote/jsonrpcconnection.hpp @@ -5,6 +5,7 @@ #include "remote/i2-remote.hpp" #include "remote/endpoint.hpp" +#include "base/io-engine.hpp" #include "base/tlsstream.hpp" #include "base/timer.hpp" #include "base/workqueue.hpp" @@ -12,7 +13,6 @@ #include #include #include -#include namespace icinga { @@ -73,8 +73,8 @@ private: double m_NextHeartbeat; boost::asio::io_service::strand m_IoStrand; std::vector m_OutgoingMessagesQueue; - boost::asio::deadline_timer m_OutgoingMessagesQueued; - boost::asio::deadline_timer m_WriterDone; + AsioConditionVariable m_OutgoingMessagesQueued; + AsioConditionVariable m_WriterDone; bool m_ShuttingDown; void HandleIncomingMessages(boost::asio::yield_context yc);