icinga2/lib/remote/eventqueue.cpp

349 lines
7.9 KiB
C++
Raw Normal View History

/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
#include "config/configcompiler.hpp"
#include "remote/eventqueue.hpp"
#include "remote/filterutility.hpp"
2019-02-15 12:32:22 +01:00
#include "base/io-engine.hpp"
#include "base/singleton.hpp"
2016-09-04 16:53:24 +02:00
#include "base/logger.hpp"
#include "base/utility.hpp"
2019-02-15 12:32:22 +01:00
#include <boost/asio/spawn.hpp>
#include <boost/date_time/posix_time/posix_time_duration.hpp>
#include <boost/date_time/posix_time/ptime.hpp>
#include <boost/system/error_code.hpp>
#include <utility>
using namespace icinga;
EventQueue::EventQueue(String name)
: m_Name(std::move(name))
{ }
bool EventQueue::CanProcessEvent(const String& type) const
{
boost::mutex::scoped_lock lock(m_Mutex);
return m_Types.find(type) != m_Types.end();
}
void EventQueue::ProcessEvent(const Dictionary::Ptr& event)
{
Namespace::Ptr frameNS = new Namespace();
ScriptFrame frame(true, frameNS);
frame.Sandboxed = true;
try {
2018-02-07 14:24:06 +01:00
if (!FilterUtility::EvaluateFilter(frame, m_Filter.get(), event, "event"))
return;
} catch (const std::exception& ex) {
Log(LogWarning, "EventQueue")
<< "Error occurred while evaluating event filter for queue '" << m_Name << "': " << DiagnosticInformation(ex);
return;
}
boost::mutex::scoped_lock lock(m_Mutex);
typedef std::pair<void *const, std::deque<Dictionary::Ptr> > kv_pair;
for (kv_pair& kv : m_Events) {
kv.second.push_back(event);
}
m_CV.notify_all();
}
void EventQueue::AddClient(void *client)
{
boost::mutex::scoped_lock lock(m_Mutex);
auto result = m_Events.insert(std::make_pair(client, std::deque<Dictionary::Ptr>()));
ASSERT(result.second);
2019-03-08 14:07:29 +01:00
#ifndef I2_DEBUG
(void)result;
#endif /* I2_DEBUG */
}
void EventQueue::RemoveClient(void *client)
{
boost::mutex::scoped_lock lock(m_Mutex);
m_Events.erase(client);
}
void EventQueue::UnregisterIfUnused(const String& name, const EventQueue::Ptr& queue)
{
boost::mutex::scoped_lock lock(queue->m_Mutex);
if (queue->m_Events.empty())
Unregister(name);
}
void EventQueue::SetTypes(const std::set<String>& types)
{
boost::mutex::scoped_lock lock(m_Mutex);
m_Types = types;
}
void EventQueue::SetFilter(std::unique_ptr<Expression> filter)
{
boost::mutex::scoped_lock lock(m_Mutex);
m_Filter.swap(filter);
}
Dictionary::Ptr EventQueue::WaitForEvent(void *client, double timeout)
{
boost::mutex::scoped_lock lock(m_Mutex);
for (;;) {
auto it = m_Events.find(client);
ASSERT(it != m_Events.end());
if (!it->second.empty()) {
Dictionary::Ptr result = *it->second.begin();
it->second.pop_front();
return result;
}
Explicitly use long with boost::posix_time In file included from lib/base/base_unity.cpp:61: lib/base/timer.cpp:295:31: error: no matching conversion for functional-style cast from 'double' to 'boost::posix_time::milliseconds' (aka 'subsecond_duration<boost::posix_time::time_duration, 1000>') l_TimerCV.timed_wait(lock, boost::posix_time::milliseconds(wait * 1000)); ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ In file included from lib/remote/remote_unity.cpp:19: lib/remote/eventqueue.cpp:111:30: error: no matching conversion for functional-style cast from 'double' to 'boost::posix_time::milliseconds' (aka 'subsecond_duration<boost::posix_time::time_duration, 1000>') if (!m_CV.timed_wait(lock, boost::posix_time::milliseconds(timeout * 1000))) ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ In file included from lib/checker/checker_unity.cpp:1: lib/checker/checkercomponent.cpp:128:26: error: no matching conversion for functional-style cast from 'double' to 'boost::posix_time::milliseconds' (aka 'subsecond_duration<boost::posix_time::time_duration, 1000>') m_CV.timed_wait(lock, boost::posix_time::milliseconds(wait * 1000)); ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ /usr/local/include/boost/date_time/time_duration.hpp:270:30: note: candidate constructor (the implicit copy constructor) not viable: no known conversion from 'double' to 'const boost::date_time::subsecond_duration<boost::posix_time::time_duration, 1000>' for 1st argument class BOOST_SYMBOL_VISIBLE subsecond_duration : public base_duration ^ /usr/local/include/boost/date_time/time_duration.hpp:270:30: note: candidate constructor (the implicit move constructor) not viable: no known conversion from 'double' to 'boost::date_time::subsecond_duration<boost::posix_time::time_duration, 1000>' for 1st argument /usr/local/include/boost/date_time/time_duration.hpp:286:59: note: candidate template ignored: disabled by 'enable_if' [with T = double] typename boost::enable_if<boost::is_integral<T>, void>::type* = 0) : ^
2018-04-15 06:02:42 +02:00
if (!m_CV.timed_wait(lock, boost::posix_time::milliseconds(long(timeout * 1000))))
2017-11-30 08:36:35 +01:00
return nullptr;
}
}
std::vector<EventQueue::Ptr> EventQueue::GetQueuesForType(const String& type)
{
EventQueueRegistry::ItemMap queues = EventQueueRegistry::GetInstance()->GetItems();
std::vector<EventQueue::Ptr> availQueues;
typedef std::pair<String, EventQueue::Ptr> kv_pair;
for (const kv_pair& kv : queues) {
if (kv.second->CanProcessEvent(type))
availQueues.push_back(kv.second);
}
return availQueues;
}
EventQueue::Ptr EventQueue::GetByName(const String& name)
{
return EventQueueRegistry::GetInstance()->GetItem(name);
}
void EventQueue::Register(const String& name, const EventQueue::Ptr& function)
{
EventQueueRegistry::GetInstance()->Register(name, function);
}
void EventQueue::Unregister(const String& name)
{
EventQueueRegistry::GetInstance()->Unregister(name);
}
EventQueueRegistry *EventQueueRegistry::GetInstance()
{
return Singleton<EventQueueRegistry>::GetInstance();
}
std::mutex EventsInbox::m_FiltersMutex;
std::map<String, EventsInbox::Filter> EventsInbox::m_Filters;
EventsRouter EventsRouter::m_Instance;
EventsInbox::EventsInbox(String filter, const String& filterSource)
: m_Timer(IoEngine::Get().GetIoService())
{
std::unique_lock<std::mutex> lock (m_FiltersMutex);
m_Filter = m_Filters.find(filter);
if (m_Filter == m_Filters.end()) {
lock.unlock();
auto expr (ConfigCompiler::CompileText(filterSource, filter));
lock.lock();
m_Filter = m_Filters.find(filter);
if (m_Filter == m_Filters.end()) {
m_Filter = m_Filters.emplace(std::move(filter), Filter{1, std::shared_ptr<Expression>(expr.release())}).first;
} else {
++m_Filter->second.Refs;
}
} else {
++m_Filter->second.Refs;
}
}
EventsInbox::~EventsInbox()
{
std::unique_lock<std::mutex> lock (m_FiltersMutex);
if (!--m_Filter->second.Refs) {
m_Filters.erase(m_Filter);
}
}
const std::shared_ptr<Expression>& EventsInbox::GetFilter()
{
return m_Filter->second.Expr;
}
void EventsInbox::Push(Dictionary::Ptr event)
{
std::unique_lock<std::mutex> lock (m_Mutex);
m_Queue.emplace(std::move(event));
m_Timer.expires_at(boost::posix_time::neg_infin);
}
Dictionary::Ptr EventsInbox::Shift(boost::asio::yield_context yc, double timeout)
{
std::unique_lock<std::mutex> lock (m_Mutex, std::defer_lock);
m_Timer.expires_at(boost::posix_time::neg_infin);
{
boost::system::error_code ec;
while (!lock.try_lock()) {
m_Timer.async_wait(yc[ec]);
}
}
if (m_Queue.empty()) {
m_Timer.expires_from_now(boost::posix_time::milliseconds((unsigned long)(timeout * 1000.0)));
lock.unlock();
{
boost::system::error_code ec;
m_Timer.async_wait(yc[ec]);
while (!lock.try_lock()) {
m_Timer.async_wait(yc[ec]);
}
}
if (m_Queue.empty()) {
return nullptr;
}
}
auto event (std::move(m_Queue.front()));
m_Queue.pop();
return std::move(event);
}
EventsSubscriber::EventsSubscriber(std::set<EventType> types, String filter, const String& filterSource)
: m_Types(std::move(types)), m_Inbox(new EventsInbox(std::move(filter), filterSource))
{
EventsRouter::GetInstance().Subscribe(m_Types, m_Inbox);
}
EventsSubscriber::~EventsSubscriber()
{
EventsRouter::GetInstance().Unsubscribe(m_Types, m_Inbox);
}
2019-04-05 17:34:46 +02:00
const EventsInbox::Ptr& EventsSubscriber::GetInbox()
{
return m_Inbox;
}
EventsFilter::EventsFilter(std::map<std::shared_ptr<Expression>, std::set<EventsInbox::Ptr>> inboxes)
: m_Inboxes(std::move(inboxes))
{
}
EventsFilter::operator bool()
{
return !m_Inboxes.empty();
}
void EventsFilter::Push(Dictionary::Ptr event)
{
for (auto& perFilter : m_Inboxes) {
ScriptFrame frame(true);
frame.Sandboxed = true;
try {
if (!FilterUtility::EvaluateFilter(frame, perFilter.first.get(), event, "event")) {
continue;
}
} catch (const std::exception& ex) {
Log(LogWarning, "EventQueue")
<< "Error occurred while evaluating event filter for queue: " << DiagnosticInformation(ex);
continue;
}
for (auto& inbox : perFilter.second) {
inbox->Push(event);
}
}
}
EventsRouter& EventsRouter::GetInstance()
{
return m_Instance;
}
void EventsRouter::Subscribe(const std::set<EventType>& types, const EventsInbox::Ptr& inbox)
{
const auto& filter (inbox->GetFilter());
std::unique_lock<std::mutex> lock (m_Mutex);
for (auto type : types) {
auto perType (m_Subscribers.find(type));
if (perType == m_Subscribers.end()) {
perType = m_Subscribers.emplace(type, decltype(perType->second)()).first;
}
auto perFilter (perType->second.find(filter));
if (perFilter == perType->second.end()) {
perFilter = perType->second.emplace(filter, decltype(perFilter->second)()).first;
}
perFilter->second.emplace(inbox);
}
}
void EventsRouter::Unsubscribe(const std::set<EventType>& types, const EventsInbox::Ptr& inbox)
{
const auto& filter (inbox->GetFilter());
std::unique_lock<std::mutex> lock (m_Mutex);
for (auto type : types) {
auto perType (m_Subscribers.find(type));
if (perType != m_Subscribers.end()) {
auto perFilter (perType->second.find(filter));
if (perFilter != perType->second.end()) {
perFilter->second.erase(inbox);
if (perFilter->second.empty()) {
perType->second.erase(perFilter);
}
}
if (perType->second.empty()) {
m_Subscribers.erase(perType);
}
}
}
}
EventsFilter EventsRouter::GetInboxes(EventType type)
{
std::unique_lock<std::mutex> lock (m_Mutex);
auto perType (m_Subscribers.find(type));
if (perType == m_Subscribers.end()) {
return EventsFilter({});
}
return EventsFilter(perType->second);
}