diff --git a/lib/remote/eventqueue.cpp b/lib/remote/eventqueue.cpp index 20c4af688..6defc2114 100644 --- a/lib/remote/eventqueue.cpp +++ b/lib/remote/eventqueue.cpp @@ -2,8 +2,10 @@ #include "remote/eventqueue.hpp" #include "remote/filterutility.hpp" +#include "base/io-engine.hpp" #include "base/singleton.hpp" #include "base/logger.hpp" +#include using namespace icinga; @@ -100,6 +102,26 @@ Dictionary::Ptr EventQueue::WaitForEvent(void *client, double timeout) } } +Dictionary::Ptr EventQueue::WaitForEvent(void *client, boost::asio::yield_context yc) +{ + for (;;) { + { + boost::mutex::scoped_lock lock(m_Mutex); + + auto it = m_Events.find(client); + ASSERT(it != m_Events.end()); + + if (!it->second.empty()) { + Dictionary::Ptr result = *it->second.begin(); + it->second.pop_front(); + return result; + } + } + + IoBoundWorkSlot dontLockTheIoThreadWhileWaiting (yc); + } +} + std::vector EventQueue::GetQueuesForType(const String& type) { EventQueueRegistry::ItemMap queues = EventQueueRegistry::GetInstance()->GetItems(); diff --git a/lib/remote/eventqueue.hpp b/lib/remote/eventqueue.hpp index 45d23af23..8f6a76c0b 100644 --- a/lib/remote/eventqueue.hpp +++ b/lib/remote/eventqueue.hpp @@ -6,6 +6,7 @@ #include "remote/httphandler.hpp" #include "base/object.hpp" #include "config/expression.hpp" +#include #include #include #include @@ -31,6 +32,7 @@ public: void SetFilter(std::unique_ptr filter); Dictionary::Ptr WaitForEvent(void *client, double timeout = 5); + Dictionary::Ptr WaitForEvent(void *client, boost::asio::yield_context yc); static std::vector GetQueuesForType(const String& type); static void UnregisterIfUnused(const String& name, const EventQueue::Ptr& queue); diff --git a/lib/remote/eventshandler.cpp b/lib/remote/eventshandler.cpp index 99843d481..cf07d6305 100644 --- a/lib/remote/eventshandler.cpp +++ b/lib/remote/eventshandler.cpp @@ -6,6 +6,7 @@ #include "config/configcompiler.hpp" #include "config/expression.hpp" #include "base/defer.hpp" +#include "base/io-engine.hpp" #include "base/objectlock.hpp" #include "base/json.hpp" #include @@ -91,23 +92,24 @@ bool EventsHandler::HandleRequest( response.result(http::status::ok); response.set(http::field::content_type, "application/json"); - http::async_write(stream, response, yc); - stream.async_flush(yc); + { + IoBoundWorkSlot dontLockTheIoThreadWhileWriting (yc); + + http::async_write(stream, response, yc); + stream.async_flush(yc); + } asio::const_buffer newLine ("\n", 1); for (;;) { - Dictionary::Ptr result = queue->WaitForEvent(&request); - - if (!result) - continue; - - String body = JsonEncode(result); + String body = JsonEncode(queue->WaitForEvent(&request, yc)); boost::algorithm::replace_all(body, "\n", ""); asio::const_buffer payload (body.CStr(), body.GetLength()); + IoBoundWorkSlot dontLockTheIoThreadWhileWriting (yc); + stream.async_write_some(payload, yc); stream.async_write_some(newLine, yc); stream.async_flush(yc);