Introduce AsioConditionVariable

This commit is contained in:
Alexander A. Klimov 2019-02-22 16:13:28 +01:00
parent d3392d1579
commit 8b3efe5759
5 changed files with 52 additions and 21 deletions

View File

@ -27,6 +27,7 @@
#include <boost/asio/io_service.hpp> #include <boost/asio/io_service.hpp>
#include <boost/asio/spawn.hpp> #include <boost/asio/spawn.hpp>
#include <boost/date_time/posix_time/ptime.hpp> #include <boost/date_time/posix_time/ptime.hpp>
#include <boost/system/error_code.hpp>
using namespace icinga; using namespace icinga;
@ -137,3 +138,25 @@ void IoEngine::RunEventLoop()
} }
} }
} }
AsioConditionVariable::AsioConditionVariable(boost::asio::io_service& io, bool init)
: m_Timer(io)
{
m_Timer.expires_at(init ? boost::posix_time::neg_infin : boost::posix_time::pos_infin);
}
void AsioConditionVariable::Set()
{
m_Timer.expires_at(boost::posix_time::neg_infin);
}
void AsioConditionVariable::Clear()
{
m_Timer.expires_at(boost::posix_time::pos_infin);
}
void AsioConditionVariable::Wait(boost::asio::yield_context yc)
{
boost::system::error_code ec;
m_Timer.async_wait(yc[ec]);
}

View File

@ -109,4 +109,22 @@ class TerminateIoThread : public std::exception
{ {
}; };
/**
* Condition variable which doesn't block I/O threads
*
* @ingroup base
*/
class AsioConditionVariable
{
public:
AsioConditionVariable(boost::asio::io_service& io, bool init = false);
void Set();
void Clear();
void Wait(boost::asio::yield_context yc);
private:
boost::asio::deadline_timer m_Timer;
};
#endif /* IO_ENGINE_H */ #endif /* IO_ENGINE_H */

View File

@ -44,7 +44,7 @@ void JsonRpcConnection::HandleAndWriteHeartbeats(boost::asio::yield_context yc)
}) } }) }
})); }));
m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin); m_OutgoingMessagesQueued.Set();
} }
} }

View File

@ -17,7 +17,6 @@
#include <boost/asio/deadline_timer.hpp> #include <boost/asio/deadline_timer.hpp>
#include <boost/asio/spawn.hpp> #include <boost/asio/spawn.hpp>
#include <boost/date_time/posix_time/posix_time_duration.hpp> #include <boost/date_time/posix_time/posix_time_duration.hpp>
#include <boost/date_time/posix_time/ptime.hpp>
#include <boost/thread/once.hpp> #include <boost/thread/once.hpp>
using namespace icinga; using namespace icinga;
@ -35,9 +34,6 @@ JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
{ {
if (authenticated) if (authenticated)
m_Endpoint = Endpoint::GetByName(identity); m_Endpoint = Endpoint::GetByName(identity);
m_OutgoingMessagesQueued.expires_at(boost::posix_time::pos_infin);
m_WriterDone.expires_at(boost::posix_time::pos_infin);
} }
void JsonRpcConnection::Start() void JsonRpcConnection::Start()
@ -97,18 +93,15 @@ void JsonRpcConnection::WriteOutgoingMessages(boost::asio::yield_context yc)
{ {
Defer disconnect ([this]() { Disconnect(); }); Defer disconnect ([this]() { Disconnect(); });
Defer signalWriterDone ([this]() { m_WriterDone.expires_at(boost::posix_time::neg_infin); }); Defer signalWriterDone ([this]() { m_WriterDone.Set(); });
do { do {
try { m_OutgoingMessagesQueued.Wait(yc);
m_OutgoingMessagesQueued.async_wait(yc);
} catch (...) {
}
auto queue (std::move(m_OutgoingMessagesQueue)); auto queue (std::move(m_OutgoingMessagesQueue));
m_OutgoingMessagesQueue.clear(); m_OutgoingMessagesQueue.clear();
m_OutgoingMessagesQueued.expires_at(boost::posix_time::pos_infin); m_OutgoingMessagesQueued.Clear();
if (!queue.empty()) { if (!queue.empty()) {
try { try {
@ -169,7 +162,7 @@ void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message)
{ {
m_IoStrand.post([this, message]() { m_IoStrand.post([this, message]() {
m_OutgoingMessagesQueue.emplace_back(message); m_OutgoingMessagesQueue.emplace_back(message);
m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin); m_OutgoingMessagesQueued.Set();
}); });
} }
@ -186,12 +179,9 @@ void JsonRpcConnection::Disconnect()
Log(LogWarning, "JsonRpcConnection") Log(LogWarning, "JsonRpcConnection")
<< "API client disconnected for identity '" << m_Identity << "'"; << "API client disconnected for identity '" << m_Identity << "'";
m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin); m_OutgoingMessagesQueued.Set();
try { m_WriterDone.Wait(yc);
m_WriterDone.async_wait(yc);
} catch (...) {
}
try { try {
m_Stream->next_layer().async_shutdown(yc); m_Stream->next_layer().async_shutdown(yc);
@ -288,7 +278,7 @@ void JsonRpcConnection::MessageHandler(const String& jsonString)
resultMessage->Set("id", message->Get("id")); resultMessage->Set("id", message->Get("id"));
m_OutgoingMessagesQueue.emplace_back(resultMessage); m_OutgoingMessagesQueue.emplace_back(resultMessage);
m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin); m_OutgoingMessagesQueued.Set();
} }
} }

View File

@ -5,6 +5,7 @@
#include "remote/i2-remote.hpp" #include "remote/i2-remote.hpp"
#include "remote/endpoint.hpp" #include "remote/endpoint.hpp"
#include "base/io-engine.hpp"
#include "base/tlsstream.hpp" #include "base/tlsstream.hpp"
#include "base/timer.hpp" #include "base/timer.hpp"
#include "base/workqueue.hpp" #include "base/workqueue.hpp"
@ -12,7 +13,6 @@
#include <vector> #include <vector>
#include <boost/asio/io_service_strand.hpp> #include <boost/asio/io_service_strand.hpp>
#include <boost/asio/spawn.hpp> #include <boost/asio/spawn.hpp>
#include <boost/asio/deadline_timer.hpp>
namespace icinga namespace icinga
{ {
@ -73,8 +73,8 @@ private:
double m_NextHeartbeat; double m_NextHeartbeat;
boost::asio::io_service::strand m_IoStrand; boost::asio::io_service::strand m_IoStrand;
std::vector<Dictionary::Ptr> m_OutgoingMessagesQueue; std::vector<Dictionary::Ptr> m_OutgoingMessagesQueue;
boost::asio::deadline_timer m_OutgoingMessagesQueued; AsioConditionVariable m_OutgoingMessagesQueued;
boost::asio::deadline_timer m_WriterDone; AsioConditionVariable m_WriterDone;
bool m_ShuttingDown; bool m_ShuttingDown;
void HandleIncomingMessages(boost::asio::yield_context yc); void HandleIncomingMessages(boost::asio::yield_context yc);