/v1/events: don't lock I/O thread

This commit is contained in:
Alexander A. Klimov 2019-02-15 12:32:22 +01:00
parent d7b465ce74
commit 7681ec10a4
3 changed files with 34 additions and 8 deletions

View File

@ -2,8 +2,10 @@
#include "remote/eventqueue.hpp"
#include "remote/filterutility.hpp"
#include "base/io-engine.hpp"
#include "base/singleton.hpp"
#include "base/logger.hpp"
#include <boost/asio/spawn.hpp>
using namespace icinga;
@ -100,6 +102,26 @@ Dictionary::Ptr EventQueue::WaitForEvent(void *client, double timeout)
}
}
Dictionary::Ptr EventQueue::WaitForEvent(void *client, boost::asio::yield_context yc)
{
for (;;) {
{
boost::mutex::scoped_lock lock(m_Mutex);
auto it = m_Events.find(client);
ASSERT(it != m_Events.end());
if (!it->second.empty()) {
Dictionary::Ptr result = *it->second.begin();
it->second.pop_front();
return result;
}
}
IoBoundWorkSlot dontLockTheIoThreadWhileWaiting (yc);
}
}
std::vector<EventQueue::Ptr> EventQueue::GetQueuesForType(const String& type)
{
EventQueueRegistry::ItemMap queues = EventQueueRegistry::GetInstance()->GetItems();

View File

@ -6,6 +6,7 @@
#include "remote/httphandler.hpp"
#include "base/object.hpp"
#include "config/expression.hpp"
#include <boost/asio/spawn.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
#include <set>
@ -31,6 +32,7 @@ public:
void SetFilter(std::unique_ptr<Expression> filter);
Dictionary::Ptr WaitForEvent(void *client, double timeout = 5);
Dictionary::Ptr WaitForEvent(void *client, boost::asio::yield_context yc);
static std::vector<EventQueue::Ptr> GetQueuesForType(const String& type);
static void UnregisterIfUnused(const String& name, const EventQueue::Ptr& queue);

View File

@ -6,6 +6,7 @@
#include "config/configcompiler.hpp"
#include "config/expression.hpp"
#include "base/defer.hpp"
#include "base/io-engine.hpp"
#include "base/objectlock.hpp"
#include "base/json.hpp"
#include <boost/asio/buffer.hpp>
@ -91,23 +92,24 @@ bool EventsHandler::HandleRequest(
response.result(http::status::ok);
response.set(http::field::content_type, "application/json");
{
IoBoundWorkSlot dontLockTheIoThreadWhileWriting (yc);
http::async_write(stream, response, yc);
stream.async_flush(yc);
}
asio::const_buffer newLine ("\n", 1);
for (;;) {
Dictionary::Ptr result = queue->WaitForEvent(&request);
if (!result)
continue;
String body = JsonEncode(result);
String body = JsonEncode(queue->WaitForEvent(&request, yc));
boost::algorithm::replace_all(body, "\n", "");
asio::const_buffer payload (body.CStr(), body.GetLength());
IoBoundWorkSlot dontLockTheIoThreadWhileWriting (yc);
stream.async_write_some(payload, yc);
stream.async_write_some(newLine, yc);
stream.async_flush(yc);