EventQueue#WaitForEvent(): re-add timeout

This commit is contained in:
Alexander A. Klimov 2019-04-03 09:53:45 +02:00
parent 28d46052b0
commit 4c5ee0dbbf
3 changed files with 23 additions and 10 deletions

View File

@ -5,6 +5,7 @@
#include "base/io-engine.hpp"
#include "base/singleton.hpp"
#include "base/logger.hpp"
#include "base/utility.hpp"
#include <boost/asio/spawn.hpp>
using namespace icinga;
@ -102,8 +103,10 @@ Dictionary::Ptr EventQueue::WaitForEvent(void *client, double timeout)
}
}
Dictionary::Ptr EventQueue::WaitForEvent(void *client, boost::asio::yield_context yc)
Dictionary::Ptr EventQueue::WaitForEvent(void *client, boost::asio::yield_context yc, double timeout)
{
double deadline = -1.0;
for (;;) {
{
boost::mutex::scoped_try_lock lock(m_Mutex);
@ -112,7 +115,13 @@ Dictionary::Ptr EventQueue::WaitForEvent(void *client, boost::asio::yield_contex
auto it = m_Events.find(client);
ASSERT(it != m_Events.end());
if (!it->second.empty()) {
if (it->second.empty()) {
if (deadline == -1.0) {
deadline = Utility::GetTime() + timeout;
} else if (Utility::GetTime() >= deadline) {
return nullptr;
}
} else {
Dictionary::Ptr result = *it->second.begin();
it->second.pop_front();
return result;

View File

@ -32,7 +32,7 @@ public:
void SetFilter(std::unique_ptr<Expression> filter);
Dictionary::Ptr WaitForEvent(void *client, double timeout = 5);
Dictionary::Ptr WaitForEvent(void *client, boost::asio::yield_context yc);
Dictionary::Ptr WaitForEvent(void *client, boost::asio::yield_context yc, double timeout = 5);
static std::vector<EventQueue::Ptr> GetQueuesForType(const String& type);
static void UnregisterIfUnused(const String& name, const EventQueue::Ptr& queue);

View File

@ -103,17 +103,21 @@ bool EventsHandler::HandleRequest(
asio::const_buffer newLine ("\n", 1);
for (;;) {
String body = JsonEncode(queue->WaitForEvent(&request, yc));
auto event (queue->WaitForEvent(&request, yc));
boost::algorithm::replace_all(body, "\n", "");
if (event) {
String body = JsonEncode(event);
asio::const_buffer payload (body.CStr(), body.GetLength());
boost::algorithm::replace_all(body, "\n", "");
IoBoundWorkSlot dontLockTheIoThreadWhileWriting (yc);
asio::const_buffer payload (body.CStr(), body.GetLength());
asio::async_write(stream, payload, yc);
asio::async_write(stream, newLine, yc);
stream.async_flush(yc);
IoBoundWorkSlot dontLockTheIoThreadWhileWriting (yc);
asio::async_write(stream, payload, yc);
asio::async_write(stream, newLine, yc);
stream.async_flush(yc);
}
}
}