2019-02-25 14:48:22 +01:00
|
|
|
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
|
2015-10-19 17:31:18 +02:00
|
|
|
|
|
|
|
#include "remote/eventqueue.hpp"
|
|
|
|
#include "remote/filterutility.hpp"
|
2019-02-15 12:32:22 +01:00
|
|
|
#include "base/io-engine.hpp"
|
2015-10-19 17:31:18 +02:00
|
|
|
#include "base/singleton.hpp"
|
2016-09-04 16:53:24 +02:00
|
|
|
#include "base/logger.hpp"
|
2019-02-15 12:32:22 +01:00
|
|
|
#include <boost/asio/spawn.hpp>
|
2015-10-19 17:31:18 +02:00
|
|
|
|
|
|
|
using namespace icinga;
|
|
|
|
|
2018-01-04 08:54:18 +01:00
|
|
|
EventQueue::EventQueue(String name)
|
|
|
|
: m_Name(std::move(name))
|
2015-10-19 17:31:18 +02:00
|
|
|
{ }
|
|
|
|
|
|
|
|
bool EventQueue::CanProcessEvent(const String& type) const
|
|
|
|
{
|
|
|
|
boost::mutex::scoped_lock lock(m_Mutex);
|
|
|
|
|
|
|
|
return m_Types.find(type) != m_Types.end();
|
|
|
|
}
|
|
|
|
|
|
|
|
void EventQueue::ProcessEvent(const Dictionary::Ptr& event)
|
|
|
|
{
|
2018-01-03 10:19:24 +01:00
|
|
|
ScriptFrame frame(true);
|
2015-10-21 15:38:26 +02:00
|
|
|
frame.Sandboxed = true;
|
2015-10-19 17:31:18 +02:00
|
|
|
|
2016-09-02 08:51:51 +02:00
|
|
|
try {
|
2018-02-07 14:24:06 +01:00
|
|
|
if (!FilterUtility::EvaluateFilter(frame, m_Filter.get(), event, "event"))
|
2016-09-02 08:51:51 +02:00
|
|
|
return;
|
|
|
|
} catch (const std::exception& ex) {
|
|
|
|
Log(LogWarning, "EventQueue")
|
2017-12-19 15:50:05 +01:00
|
|
|
<< "Error occurred while evaluating event filter for queue '" << m_Name << "': " << DiagnosticInformation(ex);
|
2015-10-19 17:31:18 +02:00
|
|
|
return;
|
2016-09-02 08:51:51 +02:00
|
|
|
}
|
2015-10-19 17:31:18 +02:00
|
|
|
|
|
|
|
boost::mutex::scoped_lock lock(m_Mutex);
|
|
|
|
|
|
|
|
typedef std::pair<void *const, std::deque<Dictionary::Ptr> > kv_pair;
|
2016-08-25 06:19:44 +02:00
|
|
|
for (kv_pair& kv : m_Events) {
|
2015-10-19 17:31:18 +02:00
|
|
|
kv.second.push_back(event);
|
|
|
|
}
|
|
|
|
|
|
|
|
m_CV.notify_all();
|
|
|
|
}
|
|
|
|
|
|
|
|
void EventQueue::AddClient(void *client)
|
|
|
|
{
|
|
|
|
boost::mutex::scoped_lock lock(m_Mutex);
|
|
|
|
|
2016-08-27 19:56:12 +02:00
|
|
|
auto result = m_Events.insert(std::make_pair(client, std::deque<Dictionary::Ptr>()));
|
2015-10-19 17:31:18 +02:00
|
|
|
ASSERT(result.second);
|
2019-03-08 14:07:29 +01:00
|
|
|
|
|
|
|
#ifndef I2_DEBUG
|
|
|
|
(void)result;
|
|
|
|
#endif /* I2_DEBUG */
|
2015-10-19 17:31:18 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
void EventQueue::RemoveClient(void *client)
|
|
|
|
{
|
|
|
|
boost::mutex::scoped_lock lock(m_Mutex);
|
|
|
|
|
|
|
|
m_Events.erase(client);
|
|
|
|
}
|
|
|
|
|
|
|
|
void EventQueue::UnregisterIfUnused(const String& name, const EventQueue::Ptr& queue)
|
|
|
|
{
|
|
|
|
boost::mutex::scoped_lock lock(queue->m_Mutex);
|
|
|
|
|
|
|
|
if (queue->m_Events.empty())
|
|
|
|
Unregister(name);
|
|
|
|
}
|
|
|
|
|
|
|
|
void EventQueue::SetTypes(const std::set<String>& types)
|
|
|
|
{
|
|
|
|
boost::mutex::scoped_lock lock(m_Mutex);
|
|
|
|
m_Types = types;
|
|
|
|
}
|
|
|
|
|
2017-12-15 05:34:46 +01:00
|
|
|
void EventQueue::SetFilter(std::unique_ptr<Expression> filter)
|
2015-10-19 17:31:18 +02:00
|
|
|
{
|
|
|
|
boost::mutex::scoped_lock lock(m_Mutex);
|
2017-12-15 05:34:46 +01:00
|
|
|
m_Filter.swap(filter);
|
2015-10-19 17:31:18 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
Dictionary::Ptr EventQueue::WaitForEvent(void *client, double timeout)
|
|
|
|
{
|
|
|
|
boost::mutex::scoped_lock lock(m_Mutex);
|
|
|
|
|
|
|
|
for (;;) {
|
2016-08-27 19:56:12 +02:00
|
|
|
auto it = m_Events.find(client);
|
2015-10-19 17:31:18 +02:00
|
|
|
ASSERT(it != m_Events.end());
|
|
|
|
|
|
|
|
if (!it->second.empty()) {
|
|
|
|
Dictionary::Ptr result = *it->second.begin();
|
|
|
|
it->second.pop_front();
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
2018-04-15 06:02:42 +02:00
|
|
|
if (!m_CV.timed_wait(lock, boost::posix_time::milliseconds(long(timeout * 1000))))
|
2017-11-30 08:36:35 +01:00
|
|
|
return nullptr;
|
2015-10-19 17:31:18 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-02-15 12:32:22 +01:00
|
|
|
Dictionary::Ptr EventQueue::WaitForEvent(void *client, boost::asio::yield_context yc)
|
|
|
|
{
|
|
|
|
for (;;) {
|
|
|
|
{
|
|
|
|
boost::mutex::scoped_lock lock(m_Mutex);
|
|
|
|
|
|
|
|
auto it = m_Events.find(client);
|
|
|
|
ASSERT(it != m_Events.end());
|
|
|
|
|
|
|
|
if (!it->second.empty()) {
|
|
|
|
Dictionary::Ptr result = *it->second.begin();
|
|
|
|
it->second.pop_front();
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
IoBoundWorkSlot dontLockTheIoThreadWhileWaiting (yc);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-10-19 17:31:18 +02:00
|
|
|
std::vector<EventQueue::Ptr> EventQueue::GetQueuesForType(const String& type)
|
|
|
|
{
|
|
|
|
EventQueueRegistry::ItemMap queues = EventQueueRegistry::GetInstance()->GetItems();
|
|
|
|
|
|
|
|
std::vector<EventQueue::Ptr> availQueues;
|
|
|
|
|
|
|
|
typedef std::pair<String, EventQueue::Ptr> kv_pair;
|
2016-08-25 06:19:44 +02:00
|
|
|
for (const kv_pair& kv : queues) {
|
2015-10-19 17:31:18 +02:00
|
|
|
if (kv.second->CanProcessEvent(type))
|
|
|
|
availQueues.push_back(kv.second);
|
|
|
|
}
|
|
|
|
|
|
|
|
return availQueues;
|
|
|
|
}
|
|
|
|
|
|
|
|
EventQueue::Ptr EventQueue::GetByName(const String& name)
|
|
|
|
{
|
|
|
|
return EventQueueRegistry::GetInstance()->GetItem(name);
|
|
|
|
}
|
|
|
|
|
|
|
|
void EventQueue::Register(const String& name, const EventQueue::Ptr& function)
|
|
|
|
{
|
|
|
|
EventQueueRegistry::GetInstance()->Register(name, function);
|
|
|
|
}
|
|
|
|
|
|
|
|
void EventQueue::Unregister(const String& name)
|
|
|
|
{
|
|
|
|
EventQueueRegistry::GetInstance()->Unregister(name);
|
|
|
|
}
|
|
|
|
|
2018-01-04 04:25:35 +01:00
|
|
|
EventQueueRegistry *EventQueueRegistry::GetInstance()
|
2015-10-19 17:31:18 +02:00
|
|
|
{
|
|
|
|
return Singleton<EventQueueRegistry>::GetInstance();
|
|
|
|
}
|