/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ #include "config/configcompiler.hpp" #include "remote/eventqueue.hpp" #include "remote/filterutility.hpp" #include "base/io-engine.hpp" #include "base/singleton.hpp" #include "base/logger.hpp" #include "base/utility.hpp" #include #include #include #include #include using namespace icinga; EventQueue::EventQueue(String name) : m_Name(std::move(name)) { } bool EventQueue::CanProcessEvent(const String& type) const { boost::mutex::scoped_lock lock(m_Mutex); return m_Types.find(type) != m_Types.end(); } void EventQueue::ProcessEvent(const Dictionary::Ptr& event) { Namespace::Ptr frameNS = new Namespace(); ScriptFrame frame(true, frameNS); frame.Sandboxed = true; try { if (!FilterUtility::EvaluateFilter(frame, m_Filter.get(), event, "event")) return; } catch (const std::exception& ex) { Log(LogWarning, "EventQueue") << "Error occurred while evaluating event filter for queue '" << m_Name << "': " << DiagnosticInformation(ex); return; } boost::mutex::scoped_lock lock(m_Mutex); typedef std::pair > kv_pair; for (kv_pair& kv : m_Events) { kv.second.push_back(event); } m_CV.notify_all(); } void EventQueue::AddClient(void *client) { boost::mutex::scoped_lock lock(m_Mutex); auto result = m_Events.insert(std::make_pair(client, std::deque())); ASSERT(result.second); #ifndef I2_DEBUG (void)result; #endif /* I2_DEBUG */ } void EventQueue::RemoveClient(void *client) { boost::mutex::scoped_lock lock(m_Mutex); m_Events.erase(client); } void EventQueue::UnregisterIfUnused(const String& name, const EventQueue::Ptr& queue) { boost::mutex::scoped_lock lock(queue->m_Mutex); if (queue->m_Events.empty()) Unregister(name); } void EventQueue::SetTypes(const std::set& types) { boost::mutex::scoped_lock lock(m_Mutex); m_Types = types; } void EventQueue::SetFilter(std::unique_ptr filter) { boost::mutex::scoped_lock lock(m_Mutex); m_Filter.swap(filter); } std::vector EventQueue::GetQueuesForType(const String& type) { EventQueueRegistry::ItemMap queues = EventQueueRegistry::GetInstance()->GetItems(); std::vector availQueues; typedef std::pair kv_pair; for (const kv_pair& kv : queues) { if (kv.second->CanProcessEvent(type)) availQueues.push_back(kv.second); } return availQueues; } EventQueue::Ptr EventQueue::GetByName(const String& name) { return EventQueueRegistry::GetInstance()->GetItem(name); } void EventQueue::Register(const String& name, const EventQueue::Ptr& function) { EventQueueRegistry::GetInstance()->Register(name, function); } void EventQueue::Unregister(const String& name) { EventQueueRegistry::GetInstance()->Unregister(name); } EventQueueRegistry *EventQueueRegistry::GetInstance() { return Singleton::GetInstance(); } std::mutex EventsInbox::m_FiltersMutex; std::map EventsInbox::m_Filters ({{"", EventsInbox::Filter{1, nullptr}}}); EventsRouter EventsRouter::m_Instance; EventsInbox::EventsInbox(String filter, const String& filterSource) : m_Timer(IoEngine::Get().GetIoService()) { std::unique_lock lock (m_FiltersMutex); m_Filter = m_Filters.find(filter); if (m_Filter == m_Filters.end()) { lock.unlock(); auto expr (ConfigCompiler::CompileText(filterSource, filter)); lock.lock(); m_Filter = m_Filters.find(filter); if (m_Filter == m_Filters.end()) { m_Filter = m_Filters.emplace(std::move(filter), Filter{1, std::shared_ptr(expr.release())}).first; } else { ++m_Filter->second.Refs; } } else { ++m_Filter->second.Refs; } } EventsInbox::~EventsInbox() { std::unique_lock lock (m_FiltersMutex); if (!--m_Filter->second.Refs) { m_Filters.erase(m_Filter); } } const std::shared_ptr& EventsInbox::GetFilter() { return m_Filter->second.Expr; } void EventsInbox::Push(Dictionary::Ptr event) { std::unique_lock lock (m_Mutex); m_Queue.emplace(std::move(event)); m_Timer.expires_at(boost::posix_time::neg_infin); } Dictionary::Ptr EventsInbox::Shift(boost::asio::yield_context yc, double timeout) { std::unique_lock lock (m_Mutex, std::defer_lock); m_Timer.expires_at(boost::posix_time::neg_infin); { boost::system::error_code ec; while (!lock.try_lock()) { m_Timer.async_wait(yc[ec]); } } if (m_Queue.empty()) { m_Timer.expires_from_now(boost::posix_time::milliseconds((unsigned long)(timeout * 1000.0))); lock.unlock(); { boost::system::error_code ec; m_Timer.async_wait(yc[ec]); while (!lock.try_lock()) { m_Timer.async_wait(yc[ec]); } } if (m_Queue.empty()) { return nullptr; } } auto event (std::move(m_Queue.front())); m_Queue.pop(); return std::move(event); } EventsSubscriber::EventsSubscriber(std::set types, String filter, const String& filterSource) : m_Types(std::move(types)), m_Inbox(new EventsInbox(std::move(filter), filterSource)) { EventsRouter::GetInstance().Subscribe(m_Types, m_Inbox); } EventsSubscriber::~EventsSubscriber() { EventsRouter::GetInstance().Unsubscribe(m_Types, m_Inbox); } const EventsInbox::Ptr& EventsSubscriber::GetInbox() { return m_Inbox; } EventsFilter::EventsFilter(std::map, std::set> inboxes) : m_Inboxes(std::move(inboxes)) { } EventsFilter::operator bool() { return !m_Inboxes.empty(); } void EventsFilter::Push(Dictionary::Ptr event) { for (auto& perFilter : m_Inboxes) { if (perFilter.first) { ScriptFrame frame(true, new Namespace()); frame.Sandboxed = true; try { if (!FilterUtility::EvaluateFilter(frame, perFilter.first.get(), event, "event")) { continue; } } catch (const std::exception& ex) { Log(LogWarning, "EventQueue") << "Error occurred while evaluating event filter for queue: " << DiagnosticInformation(ex); continue; } } for (auto& inbox : perFilter.second) { inbox->Push(event); } } } EventsRouter& EventsRouter::GetInstance() { return m_Instance; } void EventsRouter::Subscribe(const std::set& types, const EventsInbox::Ptr& inbox) { const auto& filter (inbox->GetFilter()); std::unique_lock lock (m_Mutex); for (auto type : types) { auto perType (m_Subscribers.find(type)); if (perType == m_Subscribers.end()) { perType = m_Subscribers.emplace(type, decltype(perType->second)()).first; } auto perFilter (perType->second.find(filter)); if (perFilter == perType->second.end()) { perFilter = perType->second.emplace(filter, decltype(perFilter->second)()).first; } perFilter->second.emplace(inbox); } } void EventsRouter::Unsubscribe(const std::set& types, const EventsInbox::Ptr& inbox) { const auto& filter (inbox->GetFilter()); std::unique_lock lock (m_Mutex); for (auto type : types) { auto perType (m_Subscribers.find(type)); if (perType != m_Subscribers.end()) { auto perFilter (perType->second.find(filter)); if (perFilter != perType->second.end()) { perFilter->second.erase(inbox); if (perFilter->second.empty()) { perType->second.erase(perFilter); } } if (perType->second.empty()) { m_Subscribers.erase(perType); } } } } EventsFilter EventsRouter::GetInboxes(EventType type) { std::unique_lock lock (m_Mutex); auto perType (m_Subscribers.find(type)); if (perType == m_Subscribers.end()) { return EventsFilter({}); } return EventsFilter(perType->second); }