/v1/events: use new event queue

This commit is contained in:
Alexander A. Klimov 2019-04-05 17:34:46 +02:00 committed by Michael Friedrich
parent 90d9cd9257
commit 81713d0509
3 changed files with 37 additions and 52 deletions

View File

@ -109,36 +109,6 @@ Dictionary::Ptr EventQueue::WaitForEvent(void *client, double timeout)
}
}
Dictionary::Ptr EventQueue::WaitForEvent(void *client, boost::asio::yield_context yc, double timeout)
{
double deadline = -1.0;
for (;;) {
{
boost::mutex::scoped_try_lock lock(m_Mutex);
if (lock.owns_lock()) {
auto it = m_Events.find(client);
ASSERT(it != m_Events.end());
if (it->second.empty()) {
if (deadline == -1.0) {
deadline = Utility::GetTime() + timeout;
} else if (Utility::GetTime() >= deadline) {
return nullptr;
}
} else {
Dictionary::Ptr result = *it->second.begin();
it->second.pop_front();
return result;
}
}
}
IoBoundWorkSlot dontLockTheIoThreadWhileWaiting (yc);
}
}
std::vector<EventQueue::Ptr> EventQueue::GetQueuesForType(const String& type)
{
EventQueueRegistry::ItemMap queues = EventQueueRegistry::GetInstance()->GetItems();
@ -274,6 +244,11 @@ EventsSubscriber::~EventsSubscriber()
EventsRouter::GetInstance().Unsubscribe(m_Types, m_Inbox);
}
const EventsInbox::Ptr& EventsSubscriber::GetInbox()
{
return m_Inbox;
}
EventsFilter::EventsFilter(std::map<std::shared_ptr<Expression>, std::set<EventsInbox::Ptr>> inboxes)
: m_Inboxes(std::move(inboxes))
{

View File

@ -37,7 +37,6 @@ public:
void SetFilter(std::unique_ptr<Expression> filter);
Dictionary::Ptr WaitForEvent(void *client, double timeout = 5);
Dictionary::Ptr WaitForEvent(void *client, boost::asio::yield_context yc, double timeout = 5);
static std::vector<EventQueue::Ptr> GetQueuesForType(const String& type);
static void UnregisterIfUnused(const String& name, const EventQueue::Ptr& queue);
@ -128,6 +127,8 @@ public:
EventsSubscriber& operator=(EventsSubscriber&&) = delete;
~EventsSubscriber();
const EventsInbox::Ptr& GetInbox();
private:
std::set<EventType> m_Types;
EventsInbox::Ptr m_Inbox;

View File

@ -12,11 +12,30 @@
#include <boost/asio/buffer.hpp>
#include <boost/asio/write.hpp>
#include <boost/algorithm/string/replace.hpp>
#include <map>
#include <set>
using namespace icinga;
REGISTER_URLHANDLER("/v1/events", EventsHandler);
const std::map<String, EventType> l_EventTypes ({
{"AcknowledgementCleared", EventType::AcknowledgementCleared},
{"AcknowledgementSet", EventType::AcknowledgementSet},
{"CheckResult", EventType::CheckResult},
{"CommentAdded", EventType::CommentAdded},
{"CommentRemoved", EventType::CommentRemoved},
{"DowntimeAdded", EventType::DowntimeAdded},
{"DowntimeRemoved", EventType::DowntimeRemoved},
{"DowntimeStarted", EventType::DowntimeStarted},
{"DowntimeTriggered", EventType::DowntimeTriggered},
{"Flapping", EventType::Flapping},
{"Notification", EventType::Notification},
{"StateChange", EventType::StateChange}
});
const String l_ApiQuery ("<API query>");
bool EventsHandler::HandleRequest(
AsioTlsStream& stream,
const ApiUser::Ptr& user,
@ -63,30 +82,20 @@ bool EventsHandler::HandleRequest(
return true;
}
String filter = HttpUtility::GetLastParameter(params, "filter");
std::set<EventType> eventTypes;
std::unique_ptr<Expression> ufilter;
{
ObjectLock olock(types);
for (const String& type : types) {
auto typeId (l_EventTypes.find(type));
if (!filter.IsEmpty())
ufilter = ConfigCompiler::CompileText("<API query>", filter);
/* create a new queue or update an existing one */
EventQueue::Ptr queue = EventQueue::GetByName(queueName);
if (!queue) {
queue = new EventQueue(queueName);
EventQueue::Register(queueName, queue);
if (typeId != l_EventTypes.end()) {
eventTypes.emplace(typeId->second);
}
}
}
queue->SetTypes(types->ToSet<String>());
queue->SetFilter(std::move(ufilter));
queue->AddClient(&request);
Defer removeClient ([&queue, &request, &queueName]() {
queue->RemoveClient(&request);
EventQueue::UnregisterIfUnused(queueName, queue);
});
EventsSubscriber subscriber (std::move(eventTypes), HttpUtility::GetLastParameter(params, "filter"), l_ApiQuery);
server.StartStreaming();
@ -104,7 +113,7 @@ bool EventsHandler::HandleRequest(
AsioConditionVariable dontLockOwnStrand (stream.get_io_service(), true);
for (;;) {
auto event (queue->WaitForEvent(&request, yc));
auto event (subscriber.GetInbox()->Shift(yc));
if (event) {
String body = JsonEncode(event);