mirror of
https://github.com/Icinga/icinga2.git
synced 2025-07-30 17:14:25 +02:00
Implement new event queue for ASIO consumers
This commit is contained in:
parent
56894bea17
commit
7688994601
@ -1,5 +1,6 @@
|
||||
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
|
||||
|
||||
#include "config/configcompiler.hpp"
|
||||
#include "remote/eventqueue.hpp"
|
||||
#include "remote/filterutility.hpp"
|
||||
#include "base/io-engine.hpp"
|
||||
@ -7,6 +8,10 @@
|
||||
#include "base/logger.hpp"
|
||||
#include "base/utility.hpp"
|
||||
#include <boost/asio/spawn.hpp>
|
||||
#include <boost/date_time/posix_time/posix_time_duration.hpp>
|
||||
#include <boost/date_time/posix_time/ptime.hpp>
|
||||
#include <boost/system/error_code.hpp>
|
||||
#include <utility>
|
||||
|
||||
using namespace icinga;
|
||||
|
||||
@ -168,3 +173,201 @@ EventQueueRegistry *EventQueueRegistry::GetInstance()
|
||||
{
|
||||
return Singleton<EventQueueRegistry>::GetInstance();
|
||||
}
|
||||
|
||||
std::mutex EventsInbox::m_FiltersMutex;
|
||||
std::map<String, EventsInbox::Filter> EventsInbox::m_Filters;
|
||||
|
||||
EventsRouter EventsRouter::m_Instance;
|
||||
|
||||
EventsInbox::EventsInbox(String filter, const String& filterSource)
|
||||
: m_Timer(IoEngine::Get().GetIoService())
|
||||
{
|
||||
std::unique_lock<std::mutex> lock (m_FiltersMutex);
|
||||
m_Filter = m_Filters.find(filter);
|
||||
|
||||
if (m_Filter == m_Filters.end()) {
|
||||
lock.unlock();
|
||||
|
||||
auto expr (ConfigCompiler::CompileText(filterSource, filter));
|
||||
|
||||
lock.lock();
|
||||
|
||||
m_Filter = m_Filters.find(filter);
|
||||
|
||||
if (m_Filter == m_Filters.end()) {
|
||||
m_Filter = m_Filters.emplace(std::move(filter), Filter{1, std::shared_ptr<Expression>(expr.release())}).first;
|
||||
} else {
|
||||
++m_Filter->second.Refs;
|
||||
}
|
||||
} else {
|
||||
++m_Filter->second.Refs;
|
||||
}
|
||||
}
|
||||
|
||||
EventsInbox::~EventsInbox()
|
||||
{
|
||||
std::unique_lock<std::mutex> lock (m_FiltersMutex);
|
||||
|
||||
if (!--m_Filter->second.Refs) {
|
||||
m_Filters.erase(m_Filter);
|
||||
}
|
||||
}
|
||||
|
||||
const std::shared_ptr<Expression>& EventsInbox::GetFilter()
|
||||
{
|
||||
return m_Filter->second.Expr;
|
||||
}
|
||||
|
||||
void EventsInbox::Push(Dictionary::Ptr event)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock (m_Mutex);
|
||||
|
||||
m_Queue.emplace(std::move(event));
|
||||
m_Timer.expires_at(boost::posix_time::neg_infin);
|
||||
}
|
||||
|
||||
Dictionary::Ptr EventsInbox::Shift(boost::asio::yield_context yc, double timeout)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock (m_Mutex, std::defer_lock);
|
||||
|
||||
m_Timer.expires_at(boost::posix_time::neg_infin);
|
||||
|
||||
{
|
||||
boost::system::error_code ec;
|
||||
|
||||
while (!lock.try_lock()) {
|
||||
m_Timer.async_wait(yc[ec]);
|
||||
}
|
||||
}
|
||||
|
||||
if (m_Queue.empty()) {
|
||||
m_Timer.expires_from_now(boost::posix_time::milliseconds((unsigned long)(timeout * 1000.0)));
|
||||
lock.unlock();
|
||||
|
||||
{
|
||||
boost::system::error_code ec;
|
||||
m_Timer.async_wait(yc[ec]);
|
||||
|
||||
while (!lock.try_lock()) {
|
||||
m_Timer.async_wait(yc[ec]);
|
||||
}
|
||||
}
|
||||
|
||||
if (m_Queue.empty()) {
|
||||
return nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
auto event (std::move(m_Queue.front()));
|
||||
m_Queue.pop();
|
||||
return std::move(event);
|
||||
}
|
||||
|
||||
EventsSubscriber::EventsSubscriber(std::set<EventType> types, String filter, const String& filterSource)
|
||||
: m_Types(std::move(types)), m_Inbox(new EventsInbox(std::move(filter), filterSource))
|
||||
{
|
||||
EventsRouter::GetInstance().Subscribe(m_Types, m_Inbox);
|
||||
}
|
||||
|
||||
EventsSubscriber::~EventsSubscriber()
|
||||
{
|
||||
EventsRouter::GetInstance().Unsubscribe(m_Types, m_Inbox);
|
||||
}
|
||||
|
||||
EventsFilter::EventsFilter(std::map<std::shared_ptr<Expression>, std::set<EventsInbox::Ptr>> inboxes)
|
||||
: m_Inboxes(std::move(inboxes))
|
||||
{
|
||||
}
|
||||
|
||||
EventsFilter::operator bool()
|
||||
{
|
||||
return !m_Inboxes.empty();
|
||||
}
|
||||
|
||||
void EventsFilter::Push(Dictionary::Ptr event)
|
||||
{
|
||||
for (auto& perFilter : m_Inboxes) {
|
||||
ScriptFrame frame(true);
|
||||
frame.Sandboxed = true;
|
||||
|
||||
try {
|
||||
if (!FilterUtility::EvaluateFilter(frame, perFilter.first.get(), event, "event")) {
|
||||
continue;
|
||||
}
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, "EventQueue")
|
||||
<< "Error occurred while evaluating event filter for queue: " << DiagnosticInformation(ex);
|
||||
continue;
|
||||
}
|
||||
|
||||
for (auto& inbox : perFilter.second) {
|
||||
inbox->Push(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
EventsRouter& EventsRouter::GetInstance()
|
||||
{
|
||||
return m_Instance;
|
||||
}
|
||||
|
||||
void EventsRouter::Subscribe(const std::set<EventType>& types, const EventsInbox::Ptr& inbox)
|
||||
{
|
||||
const auto& filter (inbox->GetFilter());
|
||||
std::unique_lock<std::mutex> lock (m_Mutex);
|
||||
|
||||
for (auto type : types) {
|
||||
auto perType (m_Subscribers.find(type));
|
||||
|
||||
if (perType == m_Subscribers.end()) {
|
||||
perType = m_Subscribers.emplace(type, decltype(perType->second)()).first;
|
||||
}
|
||||
|
||||
auto perFilter (perType->second.find(filter));
|
||||
|
||||
if (perFilter == perType->second.end()) {
|
||||
perFilter = perType->second.emplace(filter, decltype(perFilter->second)()).first;
|
||||
}
|
||||
|
||||
perFilter->second.emplace(inbox);
|
||||
}
|
||||
}
|
||||
|
||||
void EventsRouter::Unsubscribe(const std::set<EventType>& types, const EventsInbox::Ptr& inbox)
|
||||
{
|
||||
const auto& filter (inbox->GetFilter());
|
||||
std::unique_lock<std::mutex> lock (m_Mutex);
|
||||
|
||||
for (auto type : types) {
|
||||
auto perType (m_Subscribers.find(type));
|
||||
|
||||
if (perType != m_Subscribers.end()) {
|
||||
auto perFilter (perType->second.find(filter));
|
||||
|
||||
if (perFilter != perType->second.end()) {
|
||||
perFilter->second.erase(inbox);
|
||||
|
||||
if (perFilter->second.empty()) {
|
||||
perType->second.erase(perFilter);
|
||||
}
|
||||
}
|
||||
|
||||
if (perType->second.empty()) {
|
||||
m_Subscribers.erase(perType);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
EventsFilter EventsRouter::GetInboxes(EventType type)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock (m_Mutex);
|
||||
|
||||
auto perType (m_Subscribers.find(type));
|
||||
|
||||
if (perType == m_Subscribers.end()) {
|
||||
return EventsFilter({});
|
||||
}
|
||||
|
||||
return EventsFilter(perType->second);
|
||||
}
|
||||
|
@ -6,12 +6,17 @@
|
||||
#include "remote/httphandler.hpp"
|
||||
#include "base/object.hpp"
|
||||
#include "config/expression.hpp"
|
||||
#include <boost/asio/deadline_timer.hpp>
|
||||
#include <boost/asio/spawn.hpp>
|
||||
#include <boost/thread/mutex.hpp>
|
||||
#include <boost/thread/condition_variable.hpp>
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <mutex>
|
||||
#include <set>
|
||||
#include <map>
|
||||
#include <deque>
|
||||
#include <queue>
|
||||
|
||||
namespace icinga
|
||||
{
|
||||
@ -64,6 +69,106 @@ public:
|
||||
static EventQueueRegistry *GetInstance();
|
||||
};
|
||||
|
||||
enum class EventType : uint_fast8_t
|
||||
{
|
||||
AcknowledgementCleared,
|
||||
AcknowledgementSet,
|
||||
CheckResult,
|
||||
CommentAdded,
|
||||
CommentRemoved,
|
||||
DowntimeAdded,
|
||||
DowntimeRemoved,
|
||||
DowntimeStarted,
|
||||
DowntimeTriggered,
|
||||
Flapping,
|
||||
Notification,
|
||||
StateChange
|
||||
};
|
||||
|
||||
class EventsInbox : public Object
|
||||
{
|
||||
public:
|
||||
DECLARE_PTR_TYPEDEFS(EventsInbox);
|
||||
|
||||
EventsInbox(String filter, const String& filterSource);
|
||||
EventsInbox(const EventsInbox&) = delete;
|
||||
EventsInbox(EventsInbox&&) = delete;
|
||||
EventsInbox& operator=(const EventsInbox&) = delete;
|
||||
EventsInbox& operator=(EventsInbox&&) = delete;
|
||||
~EventsInbox();
|
||||
|
||||
const std::shared_ptr<Expression>& GetFilter();
|
||||
|
||||
void Push(Dictionary::Ptr event);
|
||||
Dictionary::Ptr Shift(boost::asio::yield_context yc, double timeout = 5);
|
||||
|
||||
private:
|
||||
struct Filter
|
||||
{
|
||||
std::size_t Refs;
|
||||
std::shared_ptr<Expression> Expr;
|
||||
};
|
||||
|
||||
static std::mutex m_FiltersMutex;
|
||||
static std::map<String, Filter> m_Filters;
|
||||
|
||||
std::mutex m_Mutex;
|
||||
decltype(m_Filters.begin()) m_Filter;
|
||||
std::queue<Dictionary::Ptr> m_Queue;
|
||||
boost::asio::deadline_timer m_Timer;
|
||||
};
|
||||
|
||||
class EventsSubscriber
|
||||
{
|
||||
public:
|
||||
EventsSubscriber(std::set<EventType> types, String filter, const String& filterSource);
|
||||
EventsSubscriber(const EventsSubscriber&) = delete;
|
||||
EventsSubscriber(EventsSubscriber&&) = delete;
|
||||
EventsSubscriber& operator=(const EventsSubscriber&) = delete;
|
||||
EventsSubscriber& operator=(EventsSubscriber&&) = delete;
|
||||
~EventsSubscriber();
|
||||
|
||||
private:
|
||||
std::set<EventType> m_Types;
|
||||
EventsInbox::Ptr m_Inbox;
|
||||
};
|
||||
|
||||
class EventsFilter
|
||||
{
|
||||
public:
|
||||
EventsFilter(std::map<std::shared_ptr<Expression>, std::set<EventsInbox::Ptr>> inboxes);
|
||||
|
||||
operator bool();
|
||||
|
||||
void Push(Dictionary::Ptr event);
|
||||
|
||||
private:
|
||||
std::map<std::shared_ptr<Expression>, std::set<EventsInbox::Ptr>> m_Inboxes;
|
||||
};
|
||||
|
||||
class EventsRouter
|
||||
{
|
||||
public:
|
||||
static EventsRouter& GetInstance();
|
||||
|
||||
void Subscribe(const std::set<EventType>& types, const EventsInbox::Ptr& inbox);
|
||||
void Unsubscribe(const std::set<EventType>& types, const EventsInbox::Ptr& inbox);
|
||||
EventsFilter GetInboxes(EventType type);
|
||||
|
||||
private:
|
||||
static EventsRouter m_Instance;
|
||||
|
||||
EventsRouter() = default;
|
||||
EventsRouter(const EventsRouter&) = delete;
|
||||
EventsRouter(EventsRouter&&) = delete;
|
||||
EventsRouter& operator=(const EventsRouter&) = delete;
|
||||
EventsRouter& operator=(EventsRouter&&) = delete;
|
||||
~EventsRouter() = default;
|
||||
|
||||
std::mutex m_Mutex;
|
||||
std::map<EventType, std::map<std::shared_ptr<Expression>, std::set<EventsInbox::Ptr>>> m_Subscribers;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif /* EVENTQUEUE_H */
|
||||
|
Loading…
x
Reference in New Issue
Block a user