mirror of https://github.com/Icinga/icinga2.git
Implement api event streams
Documentation is not yet complete. refs #9078
This commit is contained in:
parent
2a2da04af6
commit
286538c17e
|
@ -171,9 +171,10 @@ Available permissions for specific url endpoints:
|
|||
config/modify | /v1/config
|
||||
objects/query/<type> | /v1/objects
|
||||
objects/create/<type> | /v1/objects
|
||||
objects/modify`/<type> | /v1/objects
|
||||
objects/modify/<type> | /v1/objects
|
||||
objects/delete/<type> | /v1/objects
|
||||
status/query | /v1/status
|
||||
events/<type> | /v1/events
|
||||
|
||||
The required actions or types can be replaced by using a wildcard match ("*").
|
||||
|
||||
|
@ -227,9 +228,9 @@ The Icinga 2 API provides multiple url endpoints:
|
|||
--------------|----------------------------------------------------
|
||||
/v1/actions | Endpoint for running specific [API actions](9-icinga2-api.md#icinga2-api-actions).
|
||||
/v1/config | Endpoint for [managing configuration modules](9-icinga2-api.md#icinga2-api-config-management).
|
||||
/v1/events | Endpoint for subscribing to [API events](9-icinga2-api.md#icinga2-api-actions).
|
||||
/v1/objects | Endpoint for querying, creating, modifying and deleting [config objects](9-icinga2-api.md#icinga2-api-config-objects).
|
||||
/v1/status | Endpoint for receiving icinga2 [status and statistics](9-icinga2-api.md#icinga2-api-status).
|
||||
/v1/events | Endpoint for subscribing to [API event streams](9-icinga2-api.md#icinga2-api-event-streams).
|
||||
/v1/types | Endpoint for listing Icinga 2 configuration object types and their attributes.
|
||||
|
||||
Please check the respective sections for detailed urls and parameters.
|
||||
|
@ -316,7 +317,41 @@ Reschedule a service check for all services in NOT-OK state:
|
|||
|
||||
## <a id="icinga2-api-event-streams"></a> Event Streams
|
||||
|
||||
**TODO** https://dev.icinga.org/issues/9078
|
||||
Subscribing to an event stream requires a unique `queue` name
|
||||
as query parameter. Multiple HTTP clients may use the same queue
|
||||
with existing filters.
|
||||
|
||||
The following event stream types are available:
|
||||
|
||||
Type | Description
|
||||
------------------------------|------------------------------
|
||||
CheckResult | Check results for hosts and services.
|
||||
StateChange | Host/service state changes.
|
||||
Notification | Notification events including notified users for hosts and services.
|
||||
AcknowledgementSet | Acknowledgement set on hosts and services.
|
||||
AcknowledgementCleared | Acknowledgement cleared on hosts and services.
|
||||
CommentAdded | Comment added for hosts and services.
|
||||
CommentRemoved | Comment removed for hosts and services.
|
||||
DowntimeAdded | Downtime added for hosts and services.
|
||||
DowntimeRemoved | Downtime removed for hosts and services.
|
||||
DowntimeTriggered | Downtime triggered for hosts and services.
|
||||
|
||||
Multiple event streams can be subscribed to by passing multiple
|
||||
`types` query parameters. Note: Each type requires [api permissions]()
|
||||
being set.
|
||||
|
||||
|
||||
TODO
|
||||
|
||||
* Types
|
||||
* Permissions
|
||||
* Filter
|
||||
|
||||
Event streams can be filtered by attributes using the prefix `event.`.
|
||||
|
||||
* Output
|
||||
|
||||
Long-Polling with new lines as separator.
|
||||
|
||||
## <a id="icinga2-api-status"></a> Status and Statistics
|
||||
|
||||
|
|
|
@ -33,4 +33,4 @@ public:
|
|||
|
||||
}
|
||||
|
||||
#endif /* ABOUTFORM_H */
|
||||
#endif /* ABOUTFORM_H */
|
||||
|
|
|
@ -36,4 +36,4 @@ public:
|
|||
|
||||
}
|
||||
|
||||
#endif /* CONNECTFORM_H */
|
||||
#endif /* CONNECTFORM_H */
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
#include "base/value.hpp"
|
||||
#include <boost/range/iterator.hpp>
|
||||
#include <vector>
|
||||
#include <set>
|
||||
|
||||
namespace icinga
|
||||
{
|
||||
|
@ -109,7 +110,14 @@ public:
|
|||
std::copy(v.begin(), v.end(), std::back_inserter(result->m_Data));
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
template<typename T>
|
||||
std::set<T> ToSet(void)
|
||||
{
|
||||
ObjectLock olock(this);
|
||||
return std::set<T>(Begin(), End());
|
||||
}
|
||||
|
||||
virtual Object::Ptr Clone(void) const override;
|
||||
|
||||
Array::Ptr Reverse(void) const;
|
||||
|
|
|
@ -234,6 +234,8 @@ void TlsStream::OnEvent(int revents)
|
|||
m_Socket->Close();
|
||||
m_Socket.reset();
|
||||
|
||||
m_Eof = true;
|
||||
|
||||
m_ErrorCode = ERR_peek_error();
|
||||
m_ErrorOccurred = true;
|
||||
|
||||
|
|
|
@ -41,7 +41,7 @@ mkclass_target(user.ti user.tcpp user.thpp)
|
|||
set(icinga_SOURCES
|
||||
api.cpp apiactions.cpp apievents.cpp checkable.cpp checkable.thpp checkable-dependency.cpp checkable-downtime.cpp checkable-event.cpp
|
||||
checkable-flapping.cpp checkcommand.cpp checkcommand.thpp checkresult.cpp checkresult.thpp
|
||||
cib.cpp command.cpp command.thpp comment.cpp comment.thpp compatutility.cpp dependency.cpp dependency.thpp
|
||||
cib.cpp clusterevents.cpp command.cpp command.thpp comment.cpp comment.thpp compatutility.cpp dependency.cpp dependency.thpp
|
||||
dependency-apply.cpp downtime.cpp downtime.thpp eventcommand.cpp eventcommand.thpp
|
||||
externalcommandprocessor.cpp host.cpp host.thpp hostgroup.cpp hostgroup.thpp icingaapplication.cpp icingaapplication.thpp
|
||||
customvarobject.cpp customvarobject.thpp icingastatuswriter.cpp icingastatuswriter.thpp
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -22,9 +22,6 @@
|
|||
|
||||
#include "icinga/checkable.hpp"
|
||||
#include "icinga/host.hpp"
|
||||
#include "icinga/checkcommand.hpp"
|
||||
#include "icinga/eventcommand.hpp"
|
||||
#include "icinga/notificationcommand.hpp"
|
||||
|
||||
namespace icinga
|
||||
{
|
||||
|
@ -38,46 +35,26 @@ public:
|
|||
static void StaticInitialize(void);
|
||||
|
||||
static void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr& origin);
|
||||
static Value CheckResultAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
|
||||
static void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type, const MessageOrigin::Ptr& origin);
|
||||
|
||||
static void NextCheckChangedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin);
|
||||
static Value NextCheckChangedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
|
||||
|
||||
static void NextNotificationChangedHandler(const Notification::Ptr& notification, const MessageOrigin::Ptr& origin);
|
||||
static Value NextNotificationChangedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
|
||||
static void NotificationSentToAllUsersHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
|
||||
const std::set<User::Ptr>& users, NotificationType type, const CheckResult::Ptr& cr, const String& author,
|
||||
const String& text);
|
||||
|
||||
static void ForceNextCheckChangedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin);
|
||||
static Value ForceNextCheckChangedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
|
||||
static void FlappingChangedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin);
|
||||
|
||||
static void ForceNextNotificationChangedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin);
|
||||
static Value ForceNextNotificationChangedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
|
||||
static void AcknowledgementSetHandler(const Checkable::Ptr& checkable,
|
||||
const String& author, const String& comment, AcknowledgementType type,
|
||||
bool notify, double expiry, const MessageOrigin::Ptr& origin);
|
||||
static void AcknowledgementClearedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin);
|
||||
|
||||
static void CommentAddedHandler(const Checkable::Ptr& checkable, const Comment::Ptr& comment, const MessageOrigin::Ptr& origin);
|
||||
static Value CommentAddedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
|
||||
|
||||
static void CommentRemovedHandler(const Checkable::Ptr& checkable, const Comment::Ptr& comment, const MessageOrigin::Ptr& origin);
|
||||
static Value CommentRemovedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
|
||||
|
||||
static void DowntimeAddedHandler(const Checkable::Ptr& checkable, const Downtime::Ptr& downtime, const MessageOrigin::Ptr& origin);
|
||||
static Value DowntimeAddedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
|
||||
|
||||
static void DowntimeRemovedHandler(const Checkable::Ptr& checkable, const Downtime::Ptr& downtime, const MessageOrigin::Ptr& origin);
|
||||
static Value DowntimeRemovedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
|
||||
|
||||
static void AcknowledgementSetHandler(const Checkable::Ptr& checkable, const String& author, const String& comment, AcknowledgementType type,
|
||||
bool notify, double expiry, const MessageOrigin::Ptr& origin);
|
||||
static Value AcknowledgementSetAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
|
||||
|
||||
static void AcknowledgementClearedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin);
|
||||
static Value AcknowledgementClearedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
|
||||
|
||||
static Value ExecuteCommandAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
|
||||
|
||||
static String GetRepositoryDir(void);
|
||||
static void RepositoryTimerHandler(void);
|
||||
static Value UpdateRepositoryAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
|
||||
|
||||
static Dictionary::Ptr MakeCheckResultMessage(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
|
||||
static void DowntimeTriggeredHandler(const Checkable::Ptr& checkable, const Downtime::Ptr& downtime);
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@
|
|||
#include "icinga/checkcommand.hpp"
|
||||
#include "icinga/icingaapplication.hpp"
|
||||
#include "icinga/cib.hpp"
|
||||
#include "icinga/apievents.hpp"
|
||||
#include "icinga/clusterevents.hpp"
|
||||
#include "remote/messageorigin.hpp"
|
||||
#include "remote/apilistener.hpp"
|
||||
#include "base/objectlock.hpp"
|
||||
|
@ -140,7 +140,7 @@ void Checkable::ProcessCheckResult(const CheckResult::Ptr& cr, const MessageOrig
|
|||
|
||||
if (listener) {
|
||||
/* send message back to its origin */
|
||||
Dictionary::Ptr message = ApiEvents::MakeCheckResultMessage(this, cr);
|
||||
Dictionary::Ptr message = ClusterEvents::MakeCheckResultMessage(this, cr);
|
||||
listener->SyncSendMessage(command_endpoint, message);
|
||||
}
|
||||
|
||||
|
|
|
@ -236,9 +236,12 @@ void Checkable::TriggerDowntime(const String& id)
|
|||
downtime->SetTriggerTime(Utility::GetTime());
|
||||
|
||||
Dictionary::Ptr triggers = downtime->GetTriggers();
|
||||
ObjectLock olock(triggers);
|
||||
BOOST_FOREACH(const Dictionary::Pair& kv, triggers) {
|
||||
TriggerDowntime(kv.first);
|
||||
|
||||
{
|
||||
ObjectLock olock(triggers);
|
||||
BOOST_FOREACH(const Dictionary::Pair& kv, triggers) {
|
||||
TriggerDowntime(kv.first);
|
||||
}
|
||||
}
|
||||
|
||||
OnDowntimeTriggered(owner, downtime);
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,85 @@
|
|||
/******************************************************************************
|
||||
* Icinga 2 *
|
||||
* Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org) *
|
||||
* *
|
||||
* This program is free software; you can redistribute it and/or *
|
||||
* modify it under the terms of the GNU General Public License *
|
||||
* as published by the Free Software Foundation; either version 2 *
|
||||
* of the License, or (at your option) any later version. *
|
||||
* *
|
||||
* This program is distributed in the hope that it will be useful, *
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
|
||||
* GNU General Public License for more details. *
|
||||
* *
|
||||
* You should have received a copy of the GNU General Public License *
|
||||
* along with this program; if not, write to the Free Software Foundation *
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
|
||||
******************************************************************************/
|
||||
|
||||
#ifndef CLUSTEREVENTS_H
|
||||
#define CLUSTEREVENTS_H
|
||||
|
||||
#include "icinga/checkable.hpp"
|
||||
#include "icinga/host.hpp"
|
||||
#include "icinga/checkcommand.hpp"
|
||||
#include "icinga/eventcommand.hpp"
|
||||
#include "icinga/notificationcommand.hpp"
|
||||
|
||||
namespace icinga
|
||||
{
|
||||
|
||||
/**
|
||||
* @ingroup icinga
|
||||
*/
|
||||
class I2_ICINGA_API ClusterEvents
|
||||
{
|
||||
public:
|
||||
static void StaticInitialize(void);
|
||||
|
||||
static void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr& origin);
|
||||
static Value CheckResultAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
|
||||
|
||||
static void NextCheckChangedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin);
|
||||
static Value NextCheckChangedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
|
||||
|
||||
static void NextNotificationChangedHandler(const Notification::Ptr& notification, const MessageOrigin::Ptr& origin);
|
||||
static Value NextNotificationChangedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
|
||||
|
||||
static void ForceNextCheckChangedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin);
|
||||
static Value ForceNextCheckChangedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
|
||||
|
||||
static void ForceNextNotificationChangedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin);
|
||||
static Value ForceNextNotificationChangedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
|
||||
|
||||
static void CommentAddedHandler(const Checkable::Ptr& checkable, const Comment::Ptr& comment, const MessageOrigin::Ptr& origin);
|
||||
static Value CommentAddedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
|
||||
|
||||
static void CommentRemovedHandler(const Checkable::Ptr& checkable, const Comment::Ptr& comment, const MessageOrigin::Ptr& origin);
|
||||
static Value CommentRemovedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
|
||||
|
||||
static void DowntimeAddedHandler(const Checkable::Ptr& checkable, const Downtime::Ptr& downtime, const MessageOrigin::Ptr& origin);
|
||||
static Value DowntimeAddedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
|
||||
|
||||
static void DowntimeRemovedHandler(const Checkable::Ptr& checkable, const Downtime::Ptr& downtime, const MessageOrigin::Ptr& origin);
|
||||
static Value DowntimeRemovedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
|
||||
|
||||
static void AcknowledgementSetHandler(const Checkable::Ptr& checkable, const String& author, const String& comment, AcknowledgementType type,
|
||||
bool notify, double expiry, const MessageOrigin::Ptr& origin);
|
||||
static Value AcknowledgementSetAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
|
||||
|
||||
static void AcknowledgementClearedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin);
|
||||
static Value AcknowledgementClearedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
|
||||
|
||||
static Value ExecuteCommandAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
|
||||
|
||||
static String GetRepositoryDir(void);
|
||||
static void RepositoryTimerHandler(void);
|
||||
static Value UpdateRepositoryAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
|
||||
|
||||
static Dictionary::Ptr MakeCheckResultMessage(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif /* CLUSTEREVENTS_H */
|
|
@ -26,7 +26,7 @@ set(remote_SOURCES
|
|||
apilistener-filesync.cpp apiuser.cpp apiuser.thpp authority.cpp base64.cpp
|
||||
configfileshandler.cpp configpackageshandler.cpp configpackageutility.cpp configobjectutility.cpp
|
||||
configstageshandler.cpp createobjecthandler.cpp deleteobjecthandler.cpp
|
||||
endpoint.cpp endpoint.thpp filterutility.cpp
|
||||
endpoint.cpp endpoint.thpp eventshandler.cpp eventqueue.cpp filterutility.cpp
|
||||
httpchunkedencoding.cpp httpclientconnection.cpp httpserverconnection.cpp httphandler.cpp httprequest.cpp httpresponse.cpp
|
||||
httputility.cpp jsonrpc.cpp jsonrpcconnection.cpp jsonrpcconnection-heartbeat.cpp
|
||||
messageorigin.cpp modifyobjecthandler.cpp statushandler.cpp objectqueryhandler.cpp typequeryhandler.cpp
|
||||
|
|
|
@ -327,7 +327,6 @@ void ApiListener::NewClientHandlerInternal(const Socket::Ptr& client, const Stri
|
|||
Log(LogInformation, "ApiListener")
|
||||
<< "New client connection for identity '" << identity << "'" << (verify_ok ? "" : " (unauthenticated)");
|
||||
|
||||
|
||||
if (verify_ok)
|
||||
endpoint = Endpoint::GetByName(identity);
|
||||
} else {
|
||||
|
|
|
@ -0,0 +1,149 @@
|
|||
/******************************************************************************
|
||||
* Icinga 2 *
|
||||
* Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org) *
|
||||
* *
|
||||
* This program is free software; you can redistribute it and/or *
|
||||
* modify it under the terms of the GNU General Public License *
|
||||
* as published by the Free Software Foundation; either version 2 *
|
||||
* of the License, or (at your option) any later version. *
|
||||
* *
|
||||
* This program is distributed in the hope that it will be useful, *
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
|
||||
* GNU General Public License for more details. *
|
||||
* *
|
||||
* You should have received a copy of the GNU General Public License *
|
||||
* along with this program; if not, write to the Free Software Foundation *
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
|
||||
******************************************************************************/
|
||||
|
||||
#include "remote/eventqueue.hpp"
|
||||
#include "remote/filterutility.hpp"
|
||||
#include "base/singleton.hpp"
|
||||
|
||||
using namespace icinga;
|
||||
|
||||
EventQueue::EventQueue(void)
|
||||
: m_Filter(NULL)
|
||||
{ }
|
||||
|
||||
EventQueue::~EventQueue(void)
|
||||
{
|
||||
delete m_Filter;
|
||||
}
|
||||
|
||||
bool EventQueue::CanProcessEvent(const String& type) const
|
||||
{
|
||||
boost::mutex::scoped_lock lock(m_Mutex);
|
||||
|
||||
return m_Types.find(type) != m_Types.end();
|
||||
}
|
||||
|
||||
void EventQueue::ProcessEvent(const Dictionary::Ptr& event)
|
||||
{
|
||||
ScriptFrame frame;
|
||||
|
||||
if (!FilterUtility::EvaluateFilter(frame, m_Filter, event, "event"))
|
||||
return;
|
||||
|
||||
boost::mutex::scoped_lock lock(m_Mutex);
|
||||
|
||||
typedef std::pair<void *const, std::deque<Dictionary::Ptr> > kv_pair;
|
||||
BOOST_FOREACH(kv_pair& kv, m_Events) {
|
||||
kv.second.push_back(event);
|
||||
}
|
||||
|
||||
m_CV.notify_all();
|
||||
}
|
||||
|
||||
void EventQueue::AddClient(void *client)
|
||||
{
|
||||
boost::mutex::scoped_lock lock(m_Mutex);
|
||||
|
||||
typedef std::map<void *, std::deque<Dictionary::Ptr> >::iterator it_type;
|
||||
std::pair<it_type, bool> result = m_Events.insert(std::make_pair(client, std::deque<Dictionary::Ptr>()));
|
||||
ASSERT(result.second);
|
||||
}
|
||||
|
||||
void EventQueue::RemoveClient(void *client)
|
||||
{
|
||||
boost::mutex::scoped_lock lock(m_Mutex);
|
||||
|
||||
m_Events.erase(client);
|
||||
}
|
||||
|
||||
void EventQueue::UnregisterIfUnused(const String& name, const EventQueue::Ptr& queue)
|
||||
{
|
||||
boost::mutex::scoped_lock lock(queue->m_Mutex);
|
||||
|
||||
if (queue->m_Events.empty())
|
||||
Unregister(name);
|
||||
}
|
||||
|
||||
void EventQueue::SetTypes(const std::set<String>& types)
|
||||
{
|
||||
boost::mutex::scoped_lock lock(m_Mutex);
|
||||
m_Types = types;
|
||||
}
|
||||
|
||||
void EventQueue::SetFilter(Expression *filter)
|
||||
{
|
||||
boost::mutex::scoped_lock lock(m_Mutex);
|
||||
delete m_Filter;
|
||||
m_Filter = filter;
|
||||
}
|
||||
|
||||
Dictionary::Ptr EventQueue::WaitForEvent(void *client, double timeout)
|
||||
{
|
||||
boost::mutex::scoped_lock lock(m_Mutex);
|
||||
|
||||
for (;;) {
|
||||
std::map<void *, std::deque<Dictionary::Ptr> >::iterator it = m_Events.find(client);
|
||||
ASSERT(it != m_Events.end());
|
||||
|
||||
if (!it->second.empty()) {
|
||||
Dictionary::Ptr result = *it->second.begin();
|
||||
it->second.pop_front();
|
||||
return result;
|
||||
}
|
||||
|
||||
if (!m_CV.timed_wait(lock, boost::posix_time::milliseconds(timeout * 1000)))
|
||||
return Dictionary::Ptr();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
std::vector<EventQueue::Ptr> EventQueue::GetQueuesForType(const String& type)
|
||||
{
|
||||
EventQueueRegistry::ItemMap queues = EventQueueRegistry::GetInstance()->GetItems();
|
||||
|
||||
std::vector<EventQueue::Ptr> availQueues;
|
||||
|
||||
typedef std::pair<String, EventQueue::Ptr> kv_pair;
|
||||
BOOST_FOREACH(const kv_pair& kv, queues) {
|
||||
if (kv.second->CanProcessEvent(type))
|
||||
availQueues.push_back(kv.second);
|
||||
}
|
||||
|
||||
return availQueues;
|
||||
}
|
||||
|
||||
EventQueue::Ptr EventQueue::GetByName(const String& name)
|
||||
{
|
||||
return EventQueueRegistry::GetInstance()->GetItem(name);
|
||||
}
|
||||
|
||||
void EventQueue::Register(const String& name, const EventQueue::Ptr& function)
|
||||
{
|
||||
EventQueueRegistry::GetInstance()->Register(name, function);
|
||||
}
|
||||
|
||||
void EventQueue::Unregister(const String& name)
|
||||
{
|
||||
EventQueueRegistry::GetInstance()->Unregister(name);
|
||||
}
|
||||
|
||||
EventQueueRegistry *EventQueueRegistry::GetInstance(void)
|
||||
{
|
||||
return Singleton<EventQueueRegistry>::GetInstance();
|
||||
}
|
|
@ -0,0 +1,85 @@
|
|||
/******************************************************************************
|
||||
* Icinga 2 *
|
||||
* Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org) *
|
||||
* *
|
||||
* This program is free software; you can redistribute it and/or *
|
||||
* modify it under the terms of the GNU General Public License *
|
||||
* as published by the Free Software Foundation; either version 2 *
|
||||
* of the License, or (at your option) any later version. *
|
||||
* *
|
||||
* This program is distributed in the hope that it will be useful, *
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
|
||||
* GNU General Public License for more details. *
|
||||
* *
|
||||
* You should have received a copy of the GNU General Public License *
|
||||
* along with this program; if not, write to the Free Software Foundation *
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
|
||||
******************************************************************************/
|
||||
|
||||
#ifndef EVENTQUEUE_H
|
||||
#define EVENTQUEUE_H
|
||||
|
||||
#include "remote/httphandler.hpp"
|
||||
#include "base/object.hpp"
|
||||
#include "config/expression.hpp"
|
||||
#include <boost/thread/thread.hpp>
|
||||
#include <boost/thread/mutex.hpp>
|
||||
#include <boost/thread/condition_variable.hpp>
|
||||
#include <set>
|
||||
#include <map>
|
||||
#include <deque>
|
||||
|
||||
namespace icinga
|
||||
{
|
||||
|
||||
class I2_REMOTE_API EventQueue : public Object
|
||||
{
|
||||
public:
|
||||
DECLARE_PTR_TYPEDEFS(EventQueue);
|
||||
|
||||
EventQueue(void);
|
||||
~EventQueue(void);
|
||||
|
||||
bool CanProcessEvent(const String& type) const;
|
||||
void ProcessEvent(const Dictionary::Ptr& event);
|
||||
void AddClient(void *client);
|
||||
void RemoveClient(void *client);
|
||||
|
||||
void SetTypes(const std::set<String>& types);
|
||||
void SetFilter(Expression *filter);
|
||||
|
||||
Dictionary::Ptr WaitForEvent(void *client, double timeout = 5);
|
||||
|
||||
static std::vector<EventQueue::Ptr> GetQueuesForType(const String& type);
|
||||
static void UnregisterIfUnused(const String& name, const EventQueue::Ptr& queue);
|
||||
|
||||
static EventQueue::Ptr GetByName(const String& name);
|
||||
static void Register(const String& name, const EventQueue::Ptr& function);
|
||||
static void Unregister(const String& name);
|
||||
|
||||
private:
|
||||
mutable boost::mutex m_Mutex;
|
||||
boost::condition_variable m_CV;
|
||||
|
||||
std::set<String> m_Types;
|
||||
Expression *m_Filter;
|
||||
double m_Ttl;
|
||||
|
||||
std::map<void *, std::deque<Dictionary::Ptr> > m_Events;
|
||||
};
|
||||
|
||||
/**
|
||||
* A registry for API event queues.
|
||||
*
|
||||
* @ingroup base
|
||||
*/
|
||||
class I2_REMOTE_API EventQueueRegistry : public Registry<EventQueueRegistry, EventQueue::Ptr>
|
||||
{
|
||||
public:
|
||||
static EventQueueRegistry *GetInstance(void);
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif /* EVENTQUEUE_H */
|
|
@ -0,0 +1,119 @@
|
|||
/******************************************************************************
|
||||
* Icinga 2 *
|
||||
* Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org) *
|
||||
* *
|
||||
* This program is free software; you can redistribute it and/or *
|
||||
* modify it under the terms of the GNU General Public License *
|
||||
* as published by the Free Software Foundation; either version 2 *
|
||||
* of the License, or (at your option) any later version. *
|
||||
* *
|
||||
* This program is distributed in the hope that it will be useful, *
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
|
||||
* GNU General Public License for more details. *
|
||||
* *
|
||||
* You should have received a copy of the GNU General Public License *
|
||||
* along with this program; if not, write to the Free Software Foundation *
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
|
||||
******************************************************************************/
|
||||
|
||||
#include "remote/eventshandler.hpp"
|
||||
#include "remote/httputility.hpp"
|
||||
#include "remote/filterutility.hpp"
|
||||
#include "config/configcompiler.hpp"
|
||||
#include "config/expression.hpp"
|
||||
#include "base/objectlock.hpp"
|
||||
#include "base/json.hpp"
|
||||
#include <boost/foreach.hpp>
|
||||
#include <boost/algorithm/string/replace.hpp>
|
||||
|
||||
using namespace icinga;
|
||||
|
||||
REGISTER_URLHANDLER("/v1/events", EventsHandler);
|
||||
|
||||
bool EventsHandler::HandleRequest(const ApiUser::Ptr& user, HttpRequest& request, HttpResponse& response)
|
||||
{
|
||||
if (request.RequestUrl->GetPath().size() != 2)
|
||||
return false;
|
||||
|
||||
if (request.RequestMethod != "POST")
|
||||
return false;
|
||||
|
||||
if (request.ProtocolVersion == HttpVersion10) {
|
||||
HttpUtility::SendJsonError(response, 400, "HTTP/1.0 not supported for event streams.");
|
||||
return true;
|
||||
}
|
||||
|
||||
Dictionary::Ptr params = HttpUtility::FetchRequestParameters(request);
|
||||
|
||||
Array::Ptr types = params->Get("types");
|
||||
|
||||
if (!types) {
|
||||
HttpUtility::SendJsonError(response, 400, "'types' query parameter is required.");
|
||||
return true;
|
||||
}
|
||||
|
||||
{
|
||||
ObjectLock olock(types);
|
||||
BOOST_FOREACH(const String& type, types) {
|
||||
FilterUtility::CheckPermission(user, "events/" + type);
|
||||
}
|
||||
}
|
||||
|
||||
String queueName = HttpUtility::GetLastParameter(params, "queue");
|
||||
|
||||
if (queueName.IsEmpty()) {
|
||||
HttpUtility::SendJsonError(response, 400, "'queue' query parameter is required.");
|
||||
return true;
|
||||
}
|
||||
|
||||
String filter = HttpUtility::GetLastParameter(params, "filter");
|
||||
|
||||
Expression *ufilter = NULL;
|
||||
|
||||
if (!filter.IsEmpty())
|
||||
ufilter = ConfigCompiler::CompileText("<API query>", filter);
|
||||
|
||||
/* create a new queue or update an existing one */
|
||||
EventQueue::Ptr queue = EventQueue::GetByName(queueName);
|
||||
|
||||
if (!queue) {
|
||||
queue = new EventQueue();
|
||||
EventQueue::Register(queueName, queue);
|
||||
}
|
||||
|
||||
queue->SetTypes(types->ToSet<String>());
|
||||
queue->SetFilter(ufilter);
|
||||
|
||||
queue->AddClient(&request);
|
||||
|
||||
response.SetStatus(200, "OK");
|
||||
response.AddHeader("Content-Type", "application/json");
|
||||
|
||||
for (;;) {
|
||||
Dictionary::Ptr result = queue->WaitForEvent(&request);
|
||||
|
||||
if (!response.IsPeerConnected()) {
|
||||
queue->RemoveClient(&request);
|
||||
EventQueue::UnregisterIfUnused(queueName, queue);
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!result)
|
||||
continue;
|
||||
|
||||
String body = JsonEncode(result);
|
||||
|
||||
boost::algorithm::replace_all(body, "\n", "");
|
||||
|
||||
try {
|
||||
response.WriteBody(body.CStr(), body.GetLength());
|
||||
response.WriteBody("\n", 1);
|
||||
} catch (const std::exception&) {
|
||||
queue->RemoveClient(&request);
|
||||
EventQueue::UnregisterIfUnused(queueName, queue);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,39 @@
|
|||
/******************************************************************************
|
||||
* Icinga 2 *
|
||||
* Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org) *
|
||||
* *
|
||||
* This program is free software; you can redistribute it and/or *
|
||||
* modify it under the terms of the GNU General Public License *
|
||||
* as published by the Free Software Foundation; either version 2 *
|
||||
* of the License, or (at your option) any later version. *
|
||||
* *
|
||||
* This program is distributed in the hope that it will be useful, *
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
|
||||
* GNU General Public License for more details. *
|
||||
* *
|
||||
* You should have received a copy of the GNU General Public License *
|
||||
* along with this program; if not, write to the Free Software Foundation *
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
|
||||
******************************************************************************/
|
||||
|
||||
#ifndef EVENTSHANDLER_H
|
||||
#define EVENTSHANDLER_H
|
||||
|
||||
#include "remote/httphandler.hpp"
|
||||
#include "remote/eventqueue.hpp"
|
||||
|
||||
namespace icinga
|
||||
{
|
||||
|
||||
class I2_REMOTE_API EventsHandler : public HttpHandler
|
||||
{
|
||||
public:
|
||||
DECLARE_PTR_TYPEDEFS(EventsHandler);
|
||||
|
||||
virtual bool HandleRequest(const ApiUser::Ptr& user, HttpRequest& request, HttpResponse& response) override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif /* EVENTSHANDLER_H */
|
|
@ -90,13 +90,19 @@ String ConfigObjectTargetProvider::GetPluralName(const String& type) const
|
|||
return Type::GetByName(type)->GetPluralName();
|
||||
}
|
||||
|
||||
static bool EvaluateFilter(ScriptFrame& frame, Expression *filter, const Object::Ptr& target)
|
||||
bool FilterUtility::EvaluateFilter(ScriptFrame& frame, Expression *filter,
|
||||
const Object::Ptr& target, const String& variableName)
|
||||
{
|
||||
if (!filter)
|
||||
return true;
|
||||
|
||||
Type::Ptr type = target->GetReflectionType();
|
||||
String varName = type->GetName().ToLower();
|
||||
String varName;
|
||||
|
||||
if (variableName.IsEmpty())
|
||||
varName = type->GetName().ToLower();
|
||||
else
|
||||
varName = variableName;
|
||||
|
||||
Dictionary::Ptr vars;
|
||||
|
||||
|
@ -128,7 +134,7 @@ static bool EvaluateFilter(ScriptFrame& frame, Expression *filter, const Object:
|
|||
static void FilteredAddTarget(ScriptFrame& permissionFrame, Expression *permissionFilter,
|
||||
ScriptFrame& frame, Expression *ufilter, std::vector<Value>& result, const Object::Ptr& target)
|
||||
{
|
||||
if (EvaluateFilter(permissionFrame, permissionFilter, target) && EvaluateFilter(frame, ufilter, target))
|
||||
if (FilterUtility::EvaluateFilter(permissionFrame, permissionFilter, target) && FilterUtility::EvaluateFilter(frame, ufilter, target))
|
||||
result.push_back(target);
|
||||
}
|
||||
|
||||
|
@ -206,7 +212,7 @@ std::vector<Value> FilterUtility::GetFilterTargets(const QueryDescription& qd, c
|
|||
if (query->Contains(attr)) {
|
||||
Object::Ptr target = provider->GetTargetByName(type, HttpUtility::GetLastParameter(query, attr));
|
||||
|
||||
if (EvaluateFilter(permissionFrame, permissionFilter, target))
|
||||
if (FilterUtility::EvaluateFilter(permissionFrame, permissionFilter, target))
|
||||
result.push_back(target);
|
||||
}
|
||||
|
||||
|
@ -220,7 +226,7 @@ std::vector<Value> FilterUtility::GetFilterTargets(const QueryDescription& qd, c
|
|||
BOOST_FOREACH(const String& name, names) {
|
||||
Object::Ptr target = provider->GetTargetByName(type, name);
|
||||
|
||||
if (EvaluateFilter(permissionFrame, permissionFilter, target))
|
||||
if (FilterUtility::EvaluateFilter(permissionFrame, permissionFilter, target))
|
||||
result.push_back(target);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -70,6 +70,8 @@ public:
|
|||
static Type::Ptr TypeFromPluralName(const String& pluralName);
|
||||
static void CheckPermission(const ApiUser::Ptr& user, const String& permission, Expression **filter = NULL);
|
||||
static std::vector<Value> GetFilterTargets(const QueryDescription& qd, const Dictionary::Ptr& query, const ApiUser::Ptr& user);
|
||||
static bool EvaluateFilter(ScriptFrame& frame, Expression *filter,
|
||||
const Object::Ptr& target, const String& variableName = String());
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -241,3 +241,7 @@ size_t HttpResponse::ReadBody(char *data, size_t count)
|
|||
return m_Body->Read(data, count, true);
|
||||
}
|
||||
|
||||
bool HttpResponse::IsPeerConnected(void) const
|
||||
{
|
||||
return !m_Stream->IsEof();
|
||||
}
|
||||
|
|
|
@ -61,6 +61,8 @@ public:
|
|||
void WriteBody(const char *data, size_t count);
|
||||
void Finish(void);
|
||||
|
||||
bool IsPeerConnected(void) const;
|
||||
|
||||
private:
|
||||
HttpResponseState m_State;
|
||||
boost::shared_ptr<ChunkReadContext> m_ChunkContext;
|
||||
|
|
|
@ -40,7 +40,7 @@ public:
|
|||
static void SendJsonBody(HttpResponse& response, const Value& val);
|
||||
static Value GetLastParameter(const Dictionary::Ptr& params, const String& key);
|
||||
static void SendJsonError(HttpResponse& response, const int code,
|
||||
const String& verbose="", const String& diagnosticInformation="");
|
||||
const String& verbose = String(), const String& diagnosticInformation = String());
|
||||
|
||||
private:
|
||||
static String GetErrorNameByCode(int code);
|
||||
|
|
Loading…
Reference in New Issue