mirror of
https://github.com/Icinga/icinga2.git
synced 2025-07-25 22:54:57 +02:00
Merge pull request #7088 from Icinga/feature/asio-event-queue
Implement new event queue for ASIO consumers
This commit is contained in:
commit
37de1a919b
@ -35,8 +35,9 @@ void ApiEvents::StaticInitialize()
|
|||||||
void ApiEvents::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr& origin)
|
void ApiEvents::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr& origin)
|
||||||
{
|
{
|
||||||
std::vector<EventQueue::Ptr> queues = EventQueue::GetQueuesForType("CheckResult");
|
std::vector<EventQueue::Ptr> queues = EventQueue::GetQueuesForType("CheckResult");
|
||||||
|
auto inboxes (EventsRouter::GetInstance().GetInboxes(EventType::CheckResult));
|
||||||
|
|
||||||
if (queues.empty())
|
if (queues.empty() && !inboxes)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
Log(LogDebug, "ApiEvents", "Processing event type 'CheckResult'.");
|
Log(LogDebug, "ApiEvents", "Processing event type 'CheckResult'.");
|
||||||
@ -58,13 +59,16 @@ void ApiEvents::CheckResultHandler(const Checkable::Ptr& checkable, const CheckR
|
|||||||
for (const EventQueue::Ptr& queue : queues) {
|
for (const EventQueue::Ptr& queue : queues) {
|
||||||
queue->ProcessEvent(result);
|
queue->ProcessEvent(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inboxes.Push(std::move(result));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ApiEvents::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type, const MessageOrigin::Ptr& origin)
|
void ApiEvents::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type, const MessageOrigin::Ptr& origin)
|
||||||
{
|
{
|
||||||
std::vector<EventQueue::Ptr> queues = EventQueue::GetQueuesForType("StateChange");
|
std::vector<EventQueue::Ptr> queues = EventQueue::GetQueuesForType("StateChange");
|
||||||
|
auto inboxes (EventsRouter::GetInstance().GetInboxes(EventType::StateChange));
|
||||||
|
|
||||||
if (queues.empty())
|
if (queues.empty() && !inboxes)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
Log(LogDebug, "ApiEvents", "Processing event type 'StateChange'.");
|
Log(LogDebug, "ApiEvents", "Processing event type 'StateChange'.");
|
||||||
@ -88,6 +92,8 @@ void ApiEvents::StateChangeHandler(const Checkable::Ptr& checkable, const CheckR
|
|||||||
for (const EventQueue::Ptr& queue : queues) {
|
for (const EventQueue::Ptr& queue : queues) {
|
||||||
queue->ProcessEvent(result);
|
queue->ProcessEvent(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inboxes.Push(std::move(result));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ApiEvents::NotificationSentToAllUsersHandler(const Notification::Ptr& notification,
|
void ApiEvents::NotificationSentToAllUsersHandler(const Notification::Ptr& notification,
|
||||||
@ -95,8 +101,9 @@ void ApiEvents::NotificationSentToAllUsersHandler(const Notification::Ptr& notif
|
|||||||
const CheckResult::Ptr& cr, const String& author, const String& text, const MessageOrigin::Ptr& origin)
|
const CheckResult::Ptr& cr, const String& author, const String& text, const MessageOrigin::Ptr& origin)
|
||||||
{
|
{
|
||||||
std::vector<EventQueue::Ptr> queues = EventQueue::GetQueuesForType("Notification");
|
std::vector<EventQueue::Ptr> queues = EventQueue::GetQueuesForType("Notification");
|
||||||
|
auto inboxes (EventsRouter::GetInstance().GetInboxes(EventType::Notification));
|
||||||
|
|
||||||
if (queues.empty())
|
if (queues.empty() && !inboxes)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
Log(LogDebug, "ApiEvents", "Processing event type 'Notification'.");
|
Log(LogDebug, "ApiEvents", "Processing event type 'Notification'.");
|
||||||
@ -133,13 +140,16 @@ void ApiEvents::NotificationSentToAllUsersHandler(const Notification::Ptr& notif
|
|||||||
for (const EventQueue::Ptr& queue : queues) {
|
for (const EventQueue::Ptr& queue : queues) {
|
||||||
queue->ProcessEvent(result);
|
queue->ProcessEvent(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inboxes.Push(std::move(result));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ApiEvents::FlappingChangedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin)
|
void ApiEvents::FlappingChangedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin)
|
||||||
{
|
{
|
||||||
std::vector<EventQueue::Ptr> queues = EventQueue::GetQueuesForType("Flapping");
|
std::vector<EventQueue::Ptr> queues = EventQueue::GetQueuesForType("Flapping");
|
||||||
|
auto inboxes (EventsRouter::GetInstance().GetInboxes(EventType::Flapping));
|
||||||
|
|
||||||
if (queues.empty())
|
if (queues.empty() && !inboxes)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
Log(LogDebug, "ApiEvents", "Processing event type 'Flapping'.");
|
Log(LogDebug, "ApiEvents", "Processing event type 'Flapping'.");
|
||||||
@ -166,6 +176,8 @@ void ApiEvents::FlappingChangedHandler(const Checkable::Ptr& checkable, const Me
|
|||||||
for (const EventQueue::Ptr& queue : queues) {
|
for (const EventQueue::Ptr& queue : queues) {
|
||||||
queue->ProcessEvent(result);
|
queue->ProcessEvent(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inboxes.Push(std::move(result));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ApiEvents::AcknowledgementSetHandler(const Checkable::Ptr& checkable,
|
void ApiEvents::AcknowledgementSetHandler(const Checkable::Ptr& checkable,
|
||||||
@ -173,8 +185,9 @@ void ApiEvents::AcknowledgementSetHandler(const Checkable::Ptr& checkable,
|
|||||||
bool notify, bool persistent, double expiry, const MessageOrigin::Ptr& origin)
|
bool notify, bool persistent, double expiry, const MessageOrigin::Ptr& origin)
|
||||||
{
|
{
|
||||||
std::vector<EventQueue::Ptr> queues = EventQueue::GetQueuesForType("AcknowledgementSet");
|
std::vector<EventQueue::Ptr> queues = EventQueue::GetQueuesForType("AcknowledgementSet");
|
||||||
|
auto inboxes (EventsRouter::GetInstance().GetInboxes(EventType::AcknowledgementSet));
|
||||||
|
|
||||||
if (queues.empty())
|
if (queues.empty() && !inboxes)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
Log(LogDebug, "ApiEvents", "Processing event type 'AcknowledgementSet'.");
|
Log(LogDebug, "ApiEvents", "Processing event type 'AcknowledgementSet'.");
|
||||||
@ -204,13 +217,16 @@ void ApiEvents::AcknowledgementSetHandler(const Checkable::Ptr& checkable,
|
|||||||
for (const EventQueue::Ptr& queue : queues) {
|
for (const EventQueue::Ptr& queue : queues) {
|
||||||
queue->ProcessEvent(result);
|
queue->ProcessEvent(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inboxes.Push(std::move(result));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ApiEvents::AcknowledgementClearedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin)
|
void ApiEvents::AcknowledgementClearedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin)
|
||||||
{
|
{
|
||||||
std::vector<EventQueue::Ptr> queues = EventQueue::GetQueuesForType("AcknowledgementCleared");
|
std::vector<EventQueue::Ptr> queues = EventQueue::GetQueuesForType("AcknowledgementCleared");
|
||||||
|
auto inboxes (EventsRouter::GetInstance().GetInboxes(EventType::AcknowledgementCleared));
|
||||||
|
|
||||||
if (queues.empty())
|
if (queues.empty() && !inboxes)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
Log(LogDebug, "ApiEvents", "Processing event type 'AcknowledgementCleared'.");
|
Log(LogDebug, "ApiEvents", "Processing event type 'AcknowledgementCleared'.");
|
||||||
@ -235,13 +251,16 @@ void ApiEvents::AcknowledgementClearedHandler(const Checkable::Ptr& checkable, c
|
|||||||
}
|
}
|
||||||
|
|
||||||
result->Set("acknowledgement_type", AcknowledgementNone);
|
result->Set("acknowledgement_type", AcknowledgementNone);
|
||||||
|
|
||||||
|
inboxes.Push(std::move(result));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ApiEvents::CommentAddedHandler(const Comment::Ptr& comment)
|
void ApiEvents::CommentAddedHandler(const Comment::Ptr& comment)
|
||||||
{
|
{
|
||||||
std::vector<EventQueue::Ptr> queues = EventQueue::GetQueuesForType("CommentAdded");
|
std::vector<EventQueue::Ptr> queues = EventQueue::GetQueuesForType("CommentAdded");
|
||||||
|
auto inboxes (EventsRouter::GetInstance().GetInboxes(EventType::CommentAdded));
|
||||||
|
|
||||||
if (queues.empty())
|
if (queues.empty() && !inboxes)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
Log(LogDebug, "ApiEvents", "Processing event type 'CommentAdded'.");
|
Log(LogDebug, "ApiEvents", "Processing event type 'CommentAdded'.");
|
||||||
@ -255,13 +274,16 @@ void ApiEvents::CommentAddedHandler(const Comment::Ptr& comment)
|
|||||||
for (const EventQueue::Ptr& queue : queues) {
|
for (const EventQueue::Ptr& queue : queues) {
|
||||||
queue->ProcessEvent(result);
|
queue->ProcessEvent(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inboxes.Push(std::move(result));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ApiEvents::CommentRemovedHandler(const Comment::Ptr& comment)
|
void ApiEvents::CommentRemovedHandler(const Comment::Ptr& comment)
|
||||||
{
|
{
|
||||||
std::vector<EventQueue::Ptr> queues = EventQueue::GetQueuesForType("CommentRemoved");
|
std::vector<EventQueue::Ptr> queues = EventQueue::GetQueuesForType("CommentRemoved");
|
||||||
|
auto inboxes (EventsRouter::GetInstance().GetInboxes(EventType::CommentRemoved));
|
||||||
|
|
||||||
if (queues.empty())
|
if (queues.empty() && !inboxes)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
Log(LogDebug, "ApiEvents", "Processing event type 'CommentRemoved'.");
|
Log(LogDebug, "ApiEvents", "Processing event type 'CommentRemoved'.");
|
||||||
@ -275,13 +297,16 @@ void ApiEvents::CommentRemovedHandler(const Comment::Ptr& comment)
|
|||||||
for (const EventQueue::Ptr& queue : queues) {
|
for (const EventQueue::Ptr& queue : queues) {
|
||||||
queue->ProcessEvent(result);
|
queue->ProcessEvent(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inboxes.Push(std::move(result));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ApiEvents::DowntimeAddedHandler(const Downtime::Ptr& downtime)
|
void ApiEvents::DowntimeAddedHandler(const Downtime::Ptr& downtime)
|
||||||
{
|
{
|
||||||
std::vector<EventQueue::Ptr> queues = EventQueue::GetQueuesForType("DowntimeAdded");
|
std::vector<EventQueue::Ptr> queues = EventQueue::GetQueuesForType("DowntimeAdded");
|
||||||
|
auto inboxes (EventsRouter::GetInstance().GetInboxes(EventType::DowntimeAdded));
|
||||||
|
|
||||||
if (queues.empty())
|
if (queues.empty() && !inboxes)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
Log(LogDebug, "ApiEvents", "Processing event type 'DowntimeAdded'.");
|
Log(LogDebug, "ApiEvents", "Processing event type 'DowntimeAdded'.");
|
||||||
@ -295,13 +320,16 @@ void ApiEvents::DowntimeAddedHandler(const Downtime::Ptr& downtime)
|
|||||||
for (const EventQueue::Ptr& queue : queues) {
|
for (const EventQueue::Ptr& queue : queues) {
|
||||||
queue->ProcessEvent(result);
|
queue->ProcessEvent(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inboxes.Push(std::move(result));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ApiEvents::DowntimeRemovedHandler(const Downtime::Ptr& downtime)
|
void ApiEvents::DowntimeRemovedHandler(const Downtime::Ptr& downtime)
|
||||||
{
|
{
|
||||||
std::vector<EventQueue::Ptr> queues = EventQueue::GetQueuesForType("DowntimeRemoved");
|
std::vector<EventQueue::Ptr> queues = EventQueue::GetQueuesForType("DowntimeRemoved");
|
||||||
|
auto inboxes (EventsRouter::GetInstance().GetInboxes(EventType::DowntimeRemoved));
|
||||||
|
|
||||||
if (queues.empty())
|
if (queues.empty() && !inboxes)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
Log(LogDebug, "ApiEvents", "Processing event type 'DowntimeRemoved'.");
|
Log(LogDebug, "ApiEvents", "Processing event type 'DowntimeRemoved'.");
|
||||||
@ -315,13 +343,16 @@ void ApiEvents::DowntimeRemovedHandler(const Downtime::Ptr& downtime)
|
|||||||
for (const EventQueue::Ptr& queue : queues) {
|
for (const EventQueue::Ptr& queue : queues) {
|
||||||
queue->ProcessEvent(result);
|
queue->ProcessEvent(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inboxes.Push(std::move(result));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ApiEvents::DowntimeStartedHandler(const Downtime::Ptr& downtime)
|
void ApiEvents::DowntimeStartedHandler(const Downtime::Ptr& downtime)
|
||||||
{
|
{
|
||||||
std::vector<EventQueue::Ptr> queues = EventQueue::GetQueuesForType("DowntimeStarted");
|
std::vector<EventQueue::Ptr> queues = EventQueue::GetQueuesForType("DowntimeStarted");
|
||||||
|
auto inboxes (EventsRouter::GetInstance().GetInboxes(EventType::DowntimeStarted));
|
||||||
|
|
||||||
if (queues.empty())
|
if (queues.empty() && !inboxes)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
Log(LogDebug, "ApiEvents", "Processing event type 'DowntimeStarted'.");
|
Log(LogDebug, "ApiEvents", "Processing event type 'DowntimeStarted'.");
|
||||||
@ -335,13 +366,16 @@ void ApiEvents::DowntimeStartedHandler(const Downtime::Ptr& downtime)
|
|||||||
for (const EventQueue::Ptr& queue : queues) {
|
for (const EventQueue::Ptr& queue : queues) {
|
||||||
queue->ProcessEvent(result);
|
queue->ProcessEvent(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inboxes.Push(std::move(result));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ApiEvents::DowntimeTriggeredHandler(const Downtime::Ptr& downtime)
|
void ApiEvents::DowntimeTriggeredHandler(const Downtime::Ptr& downtime)
|
||||||
{
|
{
|
||||||
std::vector<EventQueue::Ptr> queues = EventQueue::GetQueuesForType("DowntimeTriggered");
|
std::vector<EventQueue::Ptr> queues = EventQueue::GetQueuesForType("DowntimeTriggered");
|
||||||
|
auto inboxes (EventsRouter::GetInstance().GetInboxes(EventType::DowntimeTriggered));
|
||||||
|
|
||||||
if (queues.empty())
|
if (queues.empty() && !inboxes)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
Log(LogDebug, "ApiEvents", "Processing event type 'DowntimeTriggered'.");
|
Log(LogDebug, "ApiEvents", "Processing event type 'DowntimeTriggered'.");
|
||||||
@ -355,4 +389,6 @@ void ApiEvents::DowntimeTriggeredHandler(const Downtime::Ptr& downtime)
|
|||||||
for (const EventQueue::Ptr& queue : queues) {
|
for (const EventQueue::Ptr& queue : queues) {
|
||||||
queue->ProcessEvent(result);
|
queue->ProcessEvent(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inboxes.Push(std::move(result));
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
|
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
|
||||||
|
|
||||||
|
#include "config/configcompiler.hpp"
|
||||||
#include "remote/eventqueue.hpp"
|
#include "remote/eventqueue.hpp"
|
||||||
#include "remote/filterutility.hpp"
|
#include "remote/filterutility.hpp"
|
||||||
#include "base/io-engine.hpp"
|
#include "base/io-engine.hpp"
|
||||||
@ -7,6 +8,10 @@
|
|||||||
#include "base/logger.hpp"
|
#include "base/logger.hpp"
|
||||||
#include "base/utility.hpp"
|
#include "base/utility.hpp"
|
||||||
#include <boost/asio/spawn.hpp>
|
#include <boost/asio/spawn.hpp>
|
||||||
|
#include <boost/date_time/posix_time/posix_time_duration.hpp>
|
||||||
|
#include <boost/date_time/posix_time/ptime.hpp>
|
||||||
|
#include <boost/system/error_code.hpp>
|
||||||
|
#include <utility>
|
||||||
|
|
||||||
using namespace icinga;
|
using namespace icinga;
|
||||||
|
|
||||||
@ -85,55 +90,6 @@ void EventQueue::SetFilter(std::unique_ptr<Expression> filter)
|
|||||||
m_Filter.swap(filter);
|
m_Filter.swap(filter);
|
||||||
}
|
}
|
||||||
|
|
||||||
Dictionary::Ptr EventQueue::WaitForEvent(void *client, double timeout)
|
|
||||||
{
|
|
||||||
boost::mutex::scoped_lock lock(m_Mutex);
|
|
||||||
|
|
||||||
for (;;) {
|
|
||||||
auto it = m_Events.find(client);
|
|
||||||
ASSERT(it != m_Events.end());
|
|
||||||
|
|
||||||
if (!it->second.empty()) {
|
|
||||||
Dictionary::Ptr result = *it->second.begin();
|
|
||||||
it->second.pop_front();
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!m_CV.timed_wait(lock, boost::posix_time::milliseconds(long(timeout * 1000))))
|
|
||||||
return nullptr;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Dictionary::Ptr EventQueue::WaitForEvent(void *client, boost::asio::yield_context yc, double timeout)
|
|
||||||
{
|
|
||||||
double deadline = -1.0;
|
|
||||||
|
|
||||||
for (;;) {
|
|
||||||
{
|
|
||||||
boost::mutex::scoped_try_lock lock(m_Mutex);
|
|
||||||
|
|
||||||
if (lock.owns_lock()) {
|
|
||||||
auto it = m_Events.find(client);
|
|
||||||
ASSERT(it != m_Events.end());
|
|
||||||
|
|
||||||
if (it->second.empty()) {
|
|
||||||
if (deadline == -1.0) {
|
|
||||||
deadline = Utility::GetTime() + timeout;
|
|
||||||
} else if (Utility::GetTime() >= deadline) {
|
|
||||||
return nullptr;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
Dictionary::Ptr result = *it->second.begin();
|
|
||||||
it->second.pop_front();
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
IoBoundWorkSlot dontLockTheIoThreadWhileWaiting (yc);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
std::vector<EventQueue::Ptr> EventQueue::GetQueuesForType(const String& type)
|
std::vector<EventQueue::Ptr> EventQueue::GetQueuesForType(const String& type)
|
||||||
{
|
{
|
||||||
EventQueueRegistry::ItemMap queues = EventQueueRegistry::GetInstance()->GetItems();
|
EventQueueRegistry::ItemMap queues = EventQueueRegistry::GetInstance()->GetItems();
|
||||||
@ -168,3 +124,208 @@ EventQueueRegistry *EventQueueRegistry::GetInstance()
|
|||||||
{
|
{
|
||||||
return Singleton<EventQueueRegistry>::GetInstance();
|
return Singleton<EventQueueRegistry>::GetInstance();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::mutex EventsInbox::m_FiltersMutex;
|
||||||
|
std::map<String, EventsInbox::Filter> EventsInbox::m_Filters ({{"", EventsInbox::Filter{1, nullptr}}});
|
||||||
|
|
||||||
|
EventsRouter EventsRouter::m_Instance;
|
||||||
|
|
||||||
|
EventsInbox::EventsInbox(String filter, const String& filterSource)
|
||||||
|
: m_Timer(IoEngine::Get().GetIoService())
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lock (m_FiltersMutex);
|
||||||
|
m_Filter = m_Filters.find(filter);
|
||||||
|
|
||||||
|
if (m_Filter == m_Filters.end()) {
|
||||||
|
lock.unlock();
|
||||||
|
|
||||||
|
auto expr (ConfigCompiler::CompileText(filterSource, filter));
|
||||||
|
|
||||||
|
lock.lock();
|
||||||
|
|
||||||
|
m_Filter = m_Filters.find(filter);
|
||||||
|
|
||||||
|
if (m_Filter == m_Filters.end()) {
|
||||||
|
m_Filter = m_Filters.emplace(std::move(filter), Filter{1, std::shared_ptr<Expression>(expr.release())}).first;
|
||||||
|
} else {
|
||||||
|
++m_Filter->second.Refs;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
++m_Filter->second.Refs;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
EventsInbox::~EventsInbox()
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lock (m_FiltersMutex);
|
||||||
|
|
||||||
|
if (!--m_Filter->second.Refs) {
|
||||||
|
m_Filters.erase(m_Filter);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const std::shared_ptr<Expression>& EventsInbox::GetFilter()
|
||||||
|
{
|
||||||
|
return m_Filter->second.Expr;
|
||||||
|
}
|
||||||
|
|
||||||
|
void EventsInbox::Push(Dictionary::Ptr event)
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lock (m_Mutex);
|
||||||
|
|
||||||
|
m_Queue.emplace(std::move(event));
|
||||||
|
m_Timer.expires_at(boost::posix_time::neg_infin);
|
||||||
|
}
|
||||||
|
|
||||||
|
Dictionary::Ptr EventsInbox::Shift(boost::asio::yield_context yc, double timeout)
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lock (m_Mutex, std::defer_lock);
|
||||||
|
|
||||||
|
m_Timer.expires_at(boost::posix_time::neg_infin);
|
||||||
|
|
||||||
|
{
|
||||||
|
boost::system::error_code ec;
|
||||||
|
|
||||||
|
while (!lock.try_lock()) {
|
||||||
|
m_Timer.async_wait(yc[ec]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (m_Queue.empty()) {
|
||||||
|
m_Timer.expires_from_now(boost::posix_time::milliseconds((unsigned long)(timeout * 1000.0)));
|
||||||
|
lock.unlock();
|
||||||
|
|
||||||
|
{
|
||||||
|
boost::system::error_code ec;
|
||||||
|
m_Timer.async_wait(yc[ec]);
|
||||||
|
|
||||||
|
while (!lock.try_lock()) {
|
||||||
|
m_Timer.async_wait(yc[ec]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (m_Queue.empty()) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
auto event (std::move(m_Queue.front()));
|
||||||
|
m_Queue.pop();
|
||||||
|
return std::move(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
EventsSubscriber::EventsSubscriber(std::set<EventType> types, String filter, const String& filterSource)
|
||||||
|
: m_Types(std::move(types)), m_Inbox(new EventsInbox(std::move(filter), filterSource))
|
||||||
|
{
|
||||||
|
EventsRouter::GetInstance().Subscribe(m_Types, m_Inbox);
|
||||||
|
}
|
||||||
|
|
||||||
|
EventsSubscriber::~EventsSubscriber()
|
||||||
|
{
|
||||||
|
EventsRouter::GetInstance().Unsubscribe(m_Types, m_Inbox);
|
||||||
|
}
|
||||||
|
|
||||||
|
const EventsInbox::Ptr& EventsSubscriber::GetInbox()
|
||||||
|
{
|
||||||
|
return m_Inbox;
|
||||||
|
}
|
||||||
|
|
||||||
|
EventsFilter::EventsFilter(std::map<std::shared_ptr<Expression>, std::set<EventsInbox::Ptr>> inboxes)
|
||||||
|
: m_Inboxes(std::move(inboxes))
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
EventsFilter::operator bool()
|
||||||
|
{
|
||||||
|
return !m_Inboxes.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
void EventsFilter::Push(Dictionary::Ptr event)
|
||||||
|
{
|
||||||
|
for (auto& perFilter : m_Inboxes) {
|
||||||
|
if (perFilter.first) {
|
||||||
|
ScriptFrame frame(true, new Namespace());
|
||||||
|
frame.Sandboxed = true;
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (!FilterUtility::EvaluateFilter(frame, perFilter.first.get(), event, "event")) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
} catch (const std::exception& ex) {
|
||||||
|
Log(LogWarning, "EventQueue")
|
||||||
|
<< "Error occurred while evaluating event filter for queue: " << DiagnosticInformation(ex);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (auto& inbox : perFilter.second) {
|
||||||
|
inbox->Push(event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
EventsRouter& EventsRouter::GetInstance()
|
||||||
|
{
|
||||||
|
return m_Instance;
|
||||||
|
}
|
||||||
|
|
||||||
|
void EventsRouter::Subscribe(const std::set<EventType>& types, const EventsInbox::Ptr& inbox)
|
||||||
|
{
|
||||||
|
const auto& filter (inbox->GetFilter());
|
||||||
|
std::unique_lock<std::mutex> lock (m_Mutex);
|
||||||
|
|
||||||
|
for (auto type : types) {
|
||||||
|
auto perType (m_Subscribers.find(type));
|
||||||
|
|
||||||
|
if (perType == m_Subscribers.end()) {
|
||||||
|
perType = m_Subscribers.emplace(type, decltype(perType->second)()).first;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto perFilter (perType->second.find(filter));
|
||||||
|
|
||||||
|
if (perFilter == perType->second.end()) {
|
||||||
|
perFilter = perType->second.emplace(filter, decltype(perFilter->second)()).first;
|
||||||
|
}
|
||||||
|
|
||||||
|
perFilter->second.emplace(inbox);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void EventsRouter::Unsubscribe(const std::set<EventType>& types, const EventsInbox::Ptr& inbox)
|
||||||
|
{
|
||||||
|
const auto& filter (inbox->GetFilter());
|
||||||
|
std::unique_lock<std::mutex> lock (m_Mutex);
|
||||||
|
|
||||||
|
for (auto type : types) {
|
||||||
|
auto perType (m_Subscribers.find(type));
|
||||||
|
|
||||||
|
if (perType != m_Subscribers.end()) {
|
||||||
|
auto perFilter (perType->second.find(filter));
|
||||||
|
|
||||||
|
if (perFilter != perType->second.end()) {
|
||||||
|
perFilter->second.erase(inbox);
|
||||||
|
|
||||||
|
if (perFilter->second.empty()) {
|
||||||
|
perType->second.erase(perFilter);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (perType->second.empty()) {
|
||||||
|
m_Subscribers.erase(perType);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
EventsFilter EventsRouter::GetInboxes(EventType type)
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lock (m_Mutex);
|
||||||
|
|
||||||
|
auto perType (m_Subscribers.find(type));
|
||||||
|
|
||||||
|
if (perType == m_Subscribers.end()) {
|
||||||
|
return EventsFilter({});
|
||||||
|
}
|
||||||
|
|
||||||
|
return EventsFilter(perType->second);
|
||||||
|
}
|
||||||
|
@ -6,12 +6,17 @@
|
|||||||
#include "remote/httphandler.hpp"
|
#include "remote/httphandler.hpp"
|
||||||
#include "base/object.hpp"
|
#include "base/object.hpp"
|
||||||
#include "config/expression.hpp"
|
#include "config/expression.hpp"
|
||||||
|
#include <boost/asio/deadline_timer.hpp>
|
||||||
#include <boost/asio/spawn.hpp>
|
#include <boost/asio/spawn.hpp>
|
||||||
#include <boost/thread/mutex.hpp>
|
#include <boost/thread/mutex.hpp>
|
||||||
#include <boost/thread/condition_variable.hpp>
|
#include <boost/thread/condition_variable.hpp>
|
||||||
|
#include <cstddef>
|
||||||
|
#include <cstdint>
|
||||||
|
#include <mutex>
|
||||||
#include <set>
|
#include <set>
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <deque>
|
#include <deque>
|
||||||
|
#include <queue>
|
||||||
|
|
||||||
namespace icinga
|
namespace icinga
|
||||||
{
|
{
|
||||||
@ -31,9 +36,6 @@ public:
|
|||||||
void SetTypes(const std::set<String>& types);
|
void SetTypes(const std::set<String>& types);
|
||||||
void SetFilter(std::unique_ptr<Expression> filter);
|
void SetFilter(std::unique_ptr<Expression> filter);
|
||||||
|
|
||||||
Dictionary::Ptr WaitForEvent(void *client, double timeout = 5);
|
|
||||||
Dictionary::Ptr WaitForEvent(void *client, boost::asio::yield_context yc, double timeout = 5);
|
|
||||||
|
|
||||||
static std::vector<EventQueue::Ptr> GetQueuesForType(const String& type);
|
static std::vector<EventQueue::Ptr> GetQueuesForType(const String& type);
|
||||||
static void UnregisterIfUnused(const String& name, const EventQueue::Ptr& queue);
|
static void UnregisterIfUnused(const String& name, const EventQueue::Ptr& queue);
|
||||||
|
|
||||||
@ -64,6 +66,108 @@ public:
|
|||||||
static EventQueueRegistry *GetInstance();
|
static EventQueueRegistry *GetInstance();
|
||||||
};
|
};
|
||||||
|
|
||||||
|
enum class EventType : uint_fast8_t
|
||||||
|
{
|
||||||
|
AcknowledgementCleared,
|
||||||
|
AcknowledgementSet,
|
||||||
|
CheckResult,
|
||||||
|
CommentAdded,
|
||||||
|
CommentRemoved,
|
||||||
|
DowntimeAdded,
|
||||||
|
DowntimeRemoved,
|
||||||
|
DowntimeStarted,
|
||||||
|
DowntimeTriggered,
|
||||||
|
Flapping,
|
||||||
|
Notification,
|
||||||
|
StateChange
|
||||||
|
};
|
||||||
|
|
||||||
|
class EventsInbox : public Object
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
DECLARE_PTR_TYPEDEFS(EventsInbox);
|
||||||
|
|
||||||
|
EventsInbox(String filter, const String& filterSource);
|
||||||
|
EventsInbox(const EventsInbox&) = delete;
|
||||||
|
EventsInbox(EventsInbox&&) = delete;
|
||||||
|
EventsInbox& operator=(const EventsInbox&) = delete;
|
||||||
|
EventsInbox& operator=(EventsInbox&&) = delete;
|
||||||
|
~EventsInbox();
|
||||||
|
|
||||||
|
const std::shared_ptr<Expression>& GetFilter();
|
||||||
|
|
||||||
|
void Push(Dictionary::Ptr event);
|
||||||
|
Dictionary::Ptr Shift(boost::asio::yield_context yc, double timeout = 5);
|
||||||
|
|
||||||
|
private:
|
||||||
|
struct Filter
|
||||||
|
{
|
||||||
|
std::size_t Refs;
|
||||||
|
std::shared_ptr<Expression> Expr;
|
||||||
|
};
|
||||||
|
|
||||||
|
static std::mutex m_FiltersMutex;
|
||||||
|
static std::map<String, Filter> m_Filters;
|
||||||
|
|
||||||
|
std::mutex m_Mutex;
|
||||||
|
decltype(m_Filters.begin()) m_Filter;
|
||||||
|
std::queue<Dictionary::Ptr> m_Queue;
|
||||||
|
boost::asio::deadline_timer m_Timer;
|
||||||
|
};
|
||||||
|
|
||||||
|
class EventsSubscriber
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
EventsSubscriber(std::set<EventType> types, String filter, const String& filterSource);
|
||||||
|
EventsSubscriber(const EventsSubscriber&) = delete;
|
||||||
|
EventsSubscriber(EventsSubscriber&&) = delete;
|
||||||
|
EventsSubscriber& operator=(const EventsSubscriber&) = delete;
|
||||||
|
EventsSubscriber& operator=(EventsSubscriber&&) = delete;
|
||||||
|
~EventsSubscriber();
|
||||||
|
|
||||||
|
const EventsInbox::Ptr& GetInbox();
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::set<EventType> m_Types;
|
||||||
|
EventsInbox::Ptr m_Inbox;
|
||||||
|
};
|
||||||
|
|
||||||
|
class EventsFilter
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
EventsFilter(std::map<std::shared_ptr<Expression>, std::set<EventsInbox::Ptr>> inboxes);
|
||||||
|
|
||||||
|
operator bool();
|
||||||
|
|
||||||
|
void Push(Dictionary::Ptr event);
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::map<std::shared_ptr<Expression>, std::set<EventsInbox::Ptr>> m_Inboxes;
|
||||||
|
};
|
||||||
|
|
||||||
|
class EventsRouter
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
static EventsRouter& GetInstance();
|
||||||
|
|
||||||
|
void Subscribe(const std::set<EventType>& types, const EventsInbox::Ptr& inbox);
|
||||||
|
void Unsubscribe(const std::set<EventType>& types, const EventsInbox::Ptr& inbox);
|
||||||
|
EventsFilter GetInboxes(EventType type);
|
||||||
|
|
||||||
|
private:
|
||||||
|
static EventsRouter m_Instance;
|
||||||
|
|
||||||
|
EventsRouter() = default;
|
||||||
|
EventsRouter(const EventsRouter&) = delete;
|
||||||
|
EventsRouter(EventsRouter&&) = delete;
|
||||||
|
EventsRouter& operator=(const EventsRouter&) = delete;
|
||||||
|
EventsRouter& operator=(EventsRouter&&) = delete;
|
||||||
|
~EventsRouter() = default;
|
||||||
|
|
||||||
|
std::mutex m_Mutex;
|
||||||
|
std::map<EventType, std::map<std::shared_ptr<Expression>, std::set<EventsInbox::Ptr>>> m_Subscribers;
|
||||||
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif /* EVENTQUEUE_H */
|
#endif /* EVENTQUEUE_H */
|
||||||
|
@ -12,11 +12,30 @@
|
|||||||
#include <boost/asio/buffer.hpp>
|
#include <boost/asio/buffer.hpp>
|
||||||
#include <boost/asio/write.hpp>
|
#include <boost/asio/write.hpp>
|
||||||
#include <boost/algorithm/string/replace.hpp>
|
#include <boost/algorithm/string/replace.hpp>
|
||||||
|
#include <map>
|
||||||
|
#include <set>
|
||||||
|
|
||||||
using namespace icinga;
|
using namespace icinga;
|
||||||
|
|
||||||
REGISTER_URLHANDLER("/v1/events", EventsHandler);
|
REGISTER_URLHANDLER("/v1/events", EventsHandler);
|
||||||
|
|
||||||
|
const std::map<String, EventType> l_EventTypes ({
|
||||||
|
{"AcknowledgementCleared", EventType::AcknowledgementCleared},
|
||||||
|
{"AcknowledgementSet", EventType::AcknowledgementSet},
|
||||||
|
{"CheckResult", EventType::CheckResult},
|
||||||
|
{"CommentAdded", EventType::CommentAdded},
|
||||||
|
{"CommentRemoved", EventType::CommentRemoved},
|
||||||
|
{"DowntimeAdded", EventType::DowntimeAdded},
|
||||||
|
{"DowntimeRemoved", EventType::DowntimeRemoved},
|
||||||
|
{"DowntimeStarted", EventType::DowntimeStarted},
|
||||||
|
{"DowntimeTriggered", EventType::DowntimeTriggered},
|
||||||
|
{"Flapping", EventType::Flapping},
|
||||||
|
{"Notification", EventType::Notification},
|
||||||
|
{"StateChange", EventType::StateChange}
|
||||||
|
});
|
||||||
|
|
||||||
|
const String l_ApiQuery ("<API query>");
|
||||||
|
|
||||||
bool EventsHandler::HandleRequest(
|
bool EventsHandler::HandleRequest(
|
||||||
AsioTlsStream& stream,
|
AsioTlsStream& stream,
|
||||||
const ApiUser::Ptr& user,
|
const ApiUser::Ptr& user,
|
||||||
@ -63,65 +82,52 @@ bool EventsHandler::HandleRequest(
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
String filter = HttpUtility::GetLastParameter(params, "filter");
|
std::set<EventType> eventTypes;
|
||||||
|
|
||||||
std::unique_ptr<Expression> ufilter;
|
{
|
||||||
|
ObjectLock olock(types);
|
||||||
|
for (const String& type : types) {
|
||||||
|
auto typeId (l_EventTypes.find(type));
|
||||||
|
|
||||||
if (!filter.IsEmpty())
|
if (typeId != l_EventTypes.end()) {
|
||||||
ufilter = ConfigCompiler::CompileText("<API query>", filter);
|
eventTypes.emplace(typeId->second);
|
||||||
|
}
|
||||||
/* create a new queue or update an existing one */
|
}
|
||||||
EventQueue::Ptr queue = EventQueue::GetByName(queueName);
|
|
||||||
|
|
||||||
if (!queue) {
|
|
||||||
queue = new EventQueue(queueName);
|
|
||||||
EventQueue::Register(queueName, queue);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
queue->SetTypes(types->ToSet<String>());
|
EventsSubscriber subscriber (std::move(eventTypes), HttpUtility::GetLastParameter(params, "filter"), l_ApiQuery);
|
||||||
queue->SetFilter(std::move(ufilter));
|
|
||||||
|
|
||||||
queue->AddClient(&request);
|
|
||||||
|
|
||||||
Defer removeClient ([&queue, &request, &queueName]() {
|
|
||||||
queue->RemoveClient(&request);
|
|
||||||
EventQueue::UnregisterIfUnused(queueName, queue);
|
|
||||||
});
|
|
||||||
|
|
||||||
server.StartStreaming();
|
server.StartStreaming();
|
||||||
|
|
||||||
response.result(http::status::ok);
|
response.result(http::status::ok);
|
||||||
response.set(http::field::content_type, "application/json");
|
response.set(http::field::content_type, "application/json");
|
||||||
|
|
||||||
{
|
IoBoundWorkSlot dontLockTheIoThread (yc);
|
||||||
IoBoundWorkSlot dontLockTheIoThreadWhileWriting (yc);
|
|
||||||
|
|
||||||
http::async_write(stream, response, yc);
|
http::async_write(stream, response, yc);
|
||||||
stream.async_flush(yc);
|
stream.async_flush(yc);
|
||||||
}
|
|
||||||
|
|
||||||
asio::const_buffer newLine ("\n", 1);
|
asio::const_buffer newLine ("\n", 1);
|
||||||
AsioConditionVariable dontLockOwnStrand (stream.get_io_service(), true);
|
|
||||||
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
auto event (queue->WaitForEvent(&request, yc));
|
auto event (subscriber.GetInbox()->Shift(yc));
|
||||||
|
|
||||||
if (event) {
|
if (event) {
|
||||||
|
CpuBoundWork buildingResponse (yc);
|
||||||
|
|
||||||
String body = JsonEncode(event);
|
String body = JsonEncode(event);
|
||||||
|
|
||||||
boost::algorithm::replace_all(body, "\n", "");
|
boost::algorithm::replace_all(body, "\n", "");
|
||||||
|
|
||||||
asio::const_buffer payload (body.CStr(), body.GetLength());
|
asio::const_buffer payload (body.CStr(), body.GetLength());
|
||||||
|
|
||||||
IoBoundWorkSlot dontLockTheIoThreadWhileWriting (yc);
|
buildingResponse.Done();
|
||||||
|
|
||||||
asio::async_write(stream, payload, yc);
|
asio::async_write(stream, payload, yc);
|
||||||
asio::async_write(stream, newLine, yc);
|
asio::async_write(stream, newLine, yc);
|
||||||
stream.async_flush(yc);
|
stream.async_flush(yc);
|
||||||
} else if (server.Disconnected()) {
|
} else if (server.Disconnected()) {
|
||||||
return true;
|
return true;
|
||||||
} else {
|
|
||||||
dontLockOwnStrand.Wait(yc);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user