diff --git a/doc/9-icinga2-api.md b/doc/9-icinga2-api.md index 083d5f761..473dd2f0d 100644 --- a/doc/9-icinga2-api.md +++ b/doc/9-icinga2-api.md @@ -171,9 +171,10 @@ Available permissions for specific url endpoints: config/modify | /v1/config objects/query/<type> | /v1/objects objects/create/<type> | /v1/objects - objects/modify`/<type> | /v1/objects + objects/modify/<type> | /v1/objects objects/delete/<type> | /v1/objects status/query | /v1/status + events/<type> | /v1/events The required actions or types can be replaced by using a wildcard match ("*"). @@ -227,9 +228,9 @@ The Icinga 2 API provides multiple url endpoints: --------------|---------------------------------------------------- /v1/actions | Endpoint for running specific [API actions](9-icinga2-api.md#icinga2-api-actions). /v1/config | Endpoint for [managing configuration modules](9-icinga2-api.md#icinga2-api-config-management). - /v1/events | Endpoint for subscribing to [API events](9-icinga2-api.md#icinga2-api-actions). /v1/objects | Endpoint for querying, creating, modifying and deleting [config objects](9-icinga2-api.md#icinga2-api-config-objects). /v1/status | Endpoint for receiving icinga2 [status and statistics](9-icinga2-api.md#icinga2-api-status). + /v1/events | Endpoint for subscribing to [API event streams](9-icinga2-api.md#icinga2-api-event-streams). /v1/types | Endpoint for listing Icinga 2 configuration object types and their attributes. Please check the respective sections for detailed urls and parameters. @@ -316,7 +317,41 @@ Reschedule a service check for all services in NOT-OK state: ## Event Streams -**TODO** https://dev.icinga.org/issues/9078 +Subscribing to an event stream requires a unique `queue` name +as query parameter. Multiple HTTP clients may use the same queue +with existing filters. + +The following event stream types are available: + + Type | Description + ------------------------------|------------------------------ + CheckResult | Check results for hosts and services. + StateChange | Host/service state changes. + Notification | Notification events including notified users for hosts and services. + AcknowledgementSet | Acknowledgement set on hosts and services. + AcknowledgementCleared | Acknowledgement cleared on hosts and services. + CommentAdded | Comment added for hosts and services. + CommentRemoved | Comment removed for hosts and services. + DowntimeAdded | Downtime added for hosts and services. + DowntimeRemoved | Downtime removed for hosts and services. + DowntimeTriggered | Downtime triggered for hosts and services. + +Multiple event streams can be subscribed to by passing multiple +`types` query parameters. Note: Each type requires [api permissions]() +being set. + + +TODO + +* Types +* Permissions +* Filter + +Event streams can be filtered by attributes using the prefix `event.`. + +* Output + +Long-Polling with new lines as separator. ## Status and Statistics diff --git a/icinga-studio/aboutform.hpp b/icinga-studio/aboutform.hpp index 58c6da516..65704901d 100644 --- a/icinga-studio/aboutform.hpp +++ b/icinga-studio/aboutform.hpp @@ -33,4 +33,4 @@ public: } -#endif /* ABOUTFORM_H */ \ No newline at end of file +#endif /* ABOUTFORM_H */ diff --git a/icinga-studio/connectform.hpp b/icinga-studio/connectform.hpp index 8c0a9a9a5..6c9e25469 100644 --- a/icinga-studio/connectform.hpp +++ b/icinga-studio/connectform.hpp @@ -36,4 +36,4 @@ public: } -#endif /* CONNECTFORM_H */ \ No newline at end of file +#endif /* CONNECTFORM_H */ diff --git a/lib/base/array.hpp b/lib/base/array.hpp index 8aff1b041..f5fc8018b 100644 --- a/lib/base/array.hpp +++ b/lib/base/array.hpp @@ -25,6 +25,7 @@ #include "base/value.hpp" #include #include +#include namespace icinga { @@ -109,7 +110,14 @@ public: std::copy(v.begin(), v.end(), std::back_inserter(result->m_Data)); return result; } - + + template + std::set ToSet(void) + { + ObjectLock olock(this); + return std::set(Begin(), End()); + } + virtual Object::Ptr Clone(void) const override; Array::Ptr Reverse(void) const; diff --git a/lib/base/tlsstream.cpp b/lib/base/tlsstream.cpp index ba0333b25..d68d82fed 100644 --- a/lib/base/tlsstream.cpp +++ b/lib/base/tlsstream.cpp @@ -234,6 +234,8 @@ void TlsStream::OnEvent(int revents) m_Socket->Close(); m_Socket.reset(); + m_Eof = true; + m_ErrorCode = ERR_peek_error(); m_ErrorOccurred = true; diff --git a/lib/icinga/CMakeLists.txt b/lib/icinga/CMakeLists.txt index 7171cecab..cca5f9d61 100644 --- a/lib/icinga/CMakeLists.txt +++ b/lib/icinga/CMakeLists.txt @@ -41,7 +41,7 @@ mkclass_target(user.ti user.tcpp user.thpp) set(icinga_SOURCES api.cpp apiactions.cpp apievents.cpp checkable.cpp checkable.thpp checkable-dependency.cpp checkable-downtime.cpp checkable-event.cpp checkable-flapping.cpp checkcommand.cpp checkcommand.thpp checkresult.cpp checkresult.thpp - cib.cpp command.cpp command.thpp comment.cpp comment.thpp compatutility.cpp dependency.cpp dependency.thpp + cib.cpp clusterevents.cpp command.cpp command.thpp comment.cpp comment.thpp compatutility.cpp dependency.cpp dependency.thpp dependency-apply.cpp downtime.cpp downtime.thpp eventcommand.cpp eventcommand.thpp externalcommandprocessor.cpp host.cpp host.thpp hostgroup.cpp hostgroup.thpp icingaapplication.cpp icingaapplication.thpp customvarobject.cpp customvarobject.thpp icingastatuswriter.cpp icingastatuswriter.thpp diff --git a/lib/icinga/apievents.cpp b/lib/icinga/apievents.cpp index 2dfeda7e5..48331f8a3 100644 --- a/lib/icinga/apievents.cpp +++ b/lib/icinga/apievents.cpp @@ -19,1016 +19,362 @@ #include "icinga/apievents.hpp" #include "icinga/service.hpp" -#include "icinga/perfdatavalue.hpp" -#include "remote/apilistener.hpp" -#include "remote/endpoint.hpp" -#include "remote/messageorigin.hpp" -#include "remote/zone.hpp" -#include "remote/apifunction.hpp" -#include "base/application.hpp" -#include "base/configtype.hpp" -#include "base/utility.hpp" -#include "base/exception.hpp" +#include "remote/eventqueue.hpp" #include "base/initialize.hpp" #include "base/serializer.hpp" -#include "base/json.hpp" -#include +#include "base/logger.hpp" using namespace icinga; INITIALIZE_ONCE(&ApiEvents::StaticInitialize); -REGISTER_APIFUNCTION(CheckResult, event, &ApiEvents::CheckResultAPIHandler); -REGISTER_APIFUNCTION(SetNextCheck, event, &ApiEvents::NextCheckChangedAPIHandler); -REGISTER_APIFUNCTION(SetNextNotification, event, &ApiEvents::NextNotificationChangedAPIHandler); -REGISTER_APIFUNCTION(SetForceNextCheck, event, &ApiEvents::ForceNextCheckChangedAPIHandler); -REGISTER_APIFUNCTION(SetForceNextNotification, event, &ApiEvents::ForceNextNotificationChangedAPIHandler); -REGISTER_APIFUNCTION(AddComment, event, &ApiEvents::CommentAddedAPIHandler); -REGISTER_APIFUNCTION(RemoveComment, event, &ApiEvents::CommentRemovedAPIHandler); -REGISTER_APIFUNCTION(AddDowntime, event, &ApiEvents::DowntimeAddedAPIHandler); -REGISTER_APIFUNCTION(RemoveDowntime, event, &ApiEvents::DowntimeRemovedAPIHandler); -REGISTER_APIFUNCTION(SetAcknowledgement, event, &ApiEvents::AcknowledgementSetAPIHandler); -REGISTER_APIFUNCTION(ClearAcknowledgement, event, &ApiEvents::AcknowledgementClearedAPIHandler); -REGISTER_APIFUNCTION(UpdateRepository, event, &ApiEvents::UpdateRepositoryAPIHandler); -REGISTER_APIFUNCTION(ExecuteCommand, event, &ApiEvents::ExecuteCommandAPIHandler); - -static Timer::Ptr l_RepositoryTimer; - void ApiEvents::StaticInitialize(void) { Checkable::OnNewCheckResult.connect(&ApiEvents::CheckResultHandler); - Checkable::OnNextCheckChanged.connect(&ApiEvents::NextCheckChangedHandler); - Notification::OnNextNotificationChanged.connect(&ApiEvents::NextNotificationChangedHandler); - Checkable::OnForceNextCheckChanged.connect(&ApiEvents::ForceNextCheckChangedHandler); - Checkable::OnForceNextNotificationChanged.connect(&ApiEvents::ForceNextNotificationChangedHandler); + Checkable::OnStateChange.connect(&ApiEvents::StateChangeHandler); + Checkable::OnNotificationSentToAllUsers.connect(&ApiEvents::NotificationSentToAllUsersHandler); + + Checkable::OnFlappingChanged.connect(&ApiEvents::FlappingChangedHandler); - Checkable::OnCommentAdded.connect(&ApiEvents::CommentAddedHandler); - Checkable::OnCommentRemoved.connect(&ApiEvents::CommentRemovedHandler); - Checkable::OnDowntimeAdded.connect(&ApiEvents::DowntimeAddedHandler); - Checkable::OnDowntimeRemoved.connect(&ApiEvents::DowntimeRemovedHandler); Checkable::OnAcknowledgementSet.connect(&ApiEvents::AcknowledgementSetHandler); Checkable::OnAcknowledgementCleared.connect(&ApiEvents::AcknowledgementClearedHandler); - l_RepositoryTimer = new Timer(); - l_RepositoryTimer->SetInterval(30); - l_RepositoryTimer->OnTimerExpired.connect(boost::bind(&ApiEvents::RepositoryTimerHandler)); - l_RepositoryTimer->Start(); - l_RepositoryTimer->Reschedule(0); -} + Checkable::OnCommentAdded.connect(&ApiEvents::CommentAddedHandler); + Checkable::OnCommentRemoved.connect(&ApiEvents::CommentRemovedHandler); -Dictionary::Ptr ApiEvents::MakeCheckResultMessage(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) -{ - Dictionary::Ptr message = new Dictionary(); - message->Set("jsonrpc", "2.0"); - message->Set("method", "event::CheckResult"); - - Host::Ptr host; - Service::Ptr service; - tie(host, service) = GetHostService(checkable); - - Dictionary::Ptr params = new Dictionary(); - params->Set("host", host->GetName()); - if (service) - params->Set("service", service->GetShortName()); - else { - Value agent_service_name = checkable->GetExtension("agent_service_name"); - - if (!agent_service_name.IsEmpty()) - params->Set("service", agent_service_name); - } - params->Set("cr", Serialize(cr)); - - message->Set("params", params); - - return message; + Checkable::OnDowntimeAdded.connect(&ApiEvents::DowntimeAddedHandler); + Checkable::OnDowntimeRemoved.connect(&ApiEvents::DowntimeRemovedHandler); + Checkable::OnDowntimeTriggered.connect(&ApiEvents::DowntimeTriggeredHandler); } void ApiEvents::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr& origin) { - ApiListener::Ptr listener = ApiListener::GetInstance(); + std::vector queues = EventQueue::GetQueuesForType("CheckResult"); - if (!listener) + if (queues.empty()) return; - Dictionary::Ptr message = MakeCheckResultMessage(checkable, cr); - listener->RelayMessage(origin, checkable, message, true); -} + Log(LogDebug, "ApiEvents", "Processing event type 'CheckResult'."); -Value ApiEvents::CheckResultAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) -{ - Endpoint::Ptr endpoint = origin->FromClient->GetEndpoint(); - - if (!endpoint) { - Log(LogNotice, "ApiEvents") - << "Discarding 'check result' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed)."; - return Empty; - } - - if (!params) - return Empty; - - CheckResult::Ptr cr = new CheckResult(); - - Dictionary::Ptr vcr = params->Get("cr"); - Array::Ptr vperf = vcr->Get("performance_data"); - vcr->Remove("performance_data"); - - Deserialize(cr, params->Get("cr"), true); - - Array::Ptr rperf = new Array(); - - if (vperf) { - ObjectLock olock(vperf); - BOOST_FOREACH(const Value& vp, vperf) { - Value p; - - if (vp.IsObjectType()) { - PerfdataValue::Ptr val = new PerfdataValue(); - Deserialize(val, vp, true); - rperf->Add(val); - } else - rperf->Add(vp); - } - } - - cr->SetPerformanceData(rperf); - - Host::Ptr host = Host::GetByName(params->Get("host")); - - if (!host) - return Empty; - - Checkable::Ptr checkable; - - if (params->Contains("service")) - checkable = host->GetServiceByShortName(params->Get("service")); - else - checkable = host; - - if (!checkable) - return Empty; - - if (origin->FromZone && !origin->FromZone->CanAccessObject(checkable) && endpoint != checkable->GetCommandEndpoint()) { - Log(LogNotice, "ApiEvents") - << "Discarding 'check result' message from '" << origin->FromClient->GetIdentity() << "': Unauthorized access."; - return Empty; - } - - if (endpoint == checkable->GetCommandEndpoint()) - checkable->ProcessCheckResult(cr); - else - checkable->ProcessCheckResult(cr, origin); - - return Empty; -} - -void ApiEvents::NextCheckChangedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin) -{ - ApiListener::Ptr listener = ApiListener::GetInstance(); - - if (!listener) - return; + Dictionary::Ptr result = new Dictionary(); + result->Set("type", "CheckResult"); + result->Set("timestamp", Utility::GetTime()); Host::Ptr host; Service::Ptr service; tie(host, service) = GetHostService(checkable); - Dictionary::Ptr params = new Dictionary(); - params->Set("host", host->GetName()); + result->Set("host", host->GetName()); if (service) - params->Set("service", service->GetShortName()); - params->Set("next_check", checkable->GetNextCheck()); + result->Set("service", service->GetShortName()); - Dictionary::Ptr message = new Dictionary(); - message->Set("jsonrpc", "2.0"); - message->Set("method", "event::SetNextCheck"); - message->Set("params", params); + result->Set("check_result", Serialize(cr)); - listener->RelayMessage(origin, checkable, message, true); + BOOST_FOREACH(const EventQueue::Ptr& queue, queues) { + queue->ProcessEvent(result); + } } -Value ApiEvents::NextCheckChangedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) +void ApiEvents::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type, const MessageOrigin::Ptr& origin) { - Endpoint::Ptr endpoint = origin->FromClient->GetEndpoint(); + std::vector queues = EventQueue::GetQueuesForType("StateChange"); - if (!endpoint) { - Log(LogNotice, "ApiEvents") - << "Discarding 'next check changed' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed)."; - return Empty; - } - - if (!params) - return Empty; - - Host::Ptr host = Host::GetByName(params->Get("host")); - - if (!host) - return Empty; - - Checkable::Ptr checkable; - - if (params->Contains("service")) - checkable = host->GetServiceByShortName(params->Get("service")); - else - checkable = host; - - if (!checkable) - return Empty; - - if (origin->FromZone && !origin->FromZone->CanAccessObject(checkable)) { - Log(LogNotice, "ApiEvents") - << "Discarding 'next check changed' message from '" << origin->FromClient->GetIdentity() << "': Unauthorized access."; - return Empty; - } - - checkable->SetNextCheck(params->Get("next_check"), false, origin); - - return Empty; -} - -void ApiEvents::NextNotificationChangedHandler(const Notification::Ptr& notification, const MessageOrigin::Ptr& origin) -{ - ApiListener::Ptr listener = ApiListener::GetInstance(); - - if (!listener) + if (queues.empty()) return; - Dictionary::Ptr params = new Dictionary(); - params->Set("notification", notification->GetName()); - params->Set("next_notification", notification->GetNextNotification()); + Log(LogDebug, "ApiEvents", "Processing event type 'StateChange'."); - Dictionary::Ptr message = new Dictionary(); - message->Set("jsonrpc", "2.0"); - message->Set("method", "event::SetNextNotification"); - message->Set("params", params); - - listener->RelayMessage(origin, notification, message, true); -} - -Value ApiEvents::NextNotificationChangedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) -{ - Endpoint::Ptr endpoint = origin->FromClient->GetEndpoint(); - - if (!endpoint) { - Log(LogNotice, "ApiEvents") - << "Discarding 'next notification changed' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed)."; - return Empty; - } - - if (!params) - return Empty; - - Notification::Ptr notification = Notification::GetByName(params->Get("notification")); - - if (!notification) - return Empty; - - if (origin->FromZone && !origin->FromZone->CanAccessObject(notification)) { - Log(LogNotice, "ApiEvents") - << "Discarding 'next notification changed' message from '" << origin->FromClient->GetIdentity() << "': Unauthorized access."; - return Empty; - } - - notification->SetNextNotification(params->Get("next_notification"), false, origin); - - return Empty; -} - -void ApiEvents::ForceNextCheckChangedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin) -{ - ApiListener::Ptr listener = ApiListener::GetInstance(); - - if (!listener) - return; + Dictionary::Ptr result = new Dictionary(); + result->Set("type", "StateChange"); + result->Set("timestamp", Utility::GetTime()); Host::Ptr host; Service::Ptr service; tie(host, service) = GetHostService(checkable); - Dictionary::Ptr params = new Dictionary(); - params->Set("host", host->GetName()); + result->Set("host", host->GetName()); if (service) - params->Set("service", service->GetShortName()); - params->Set("forced", checkable->GetForceNextCheck()); + result->Set("service", service->GetShortName()); - Dictionary::Ptr message = new Dictionary(); - message->Set("jsonrpc", "2.0"); - message->Set("method", "event::SetForceNextCheck"); - message->Set("params", params); + result->Set("state", service ? service->GetState() : host->GetState()); + result->Set("state_type", checkable->GetStateType()); + result->Set("check_result", Serialize(cr)); - listener->RelayMessage(origin, checkable, message, true); + BOOST_FOREACH(const EventQueue::Ptr& queue, queues) { + queue->ProcessEvent(result); + } } -Value ApiEvents::ForceNextCheckChangedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) +void ApiEvents::NotificationSentToAllUsersHandler(const Notification::Ptr& notification, + const Checkable::Ptr& checkable, const std::set& users, NotificationType type, + const CheckResult::Ptr& cr, const String& author, const String& text) { - Endpoint::Ptr endpoint = origin->FromClient->GetEndpoint(); + std::vector queues = EventQueue::GetQueuesForType("Notification"); - if (!endpoint) { - Log(LogNotice, "ApiEvents") - << "Discarding 'force next check changed' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed)."; - return Empty; - } - - if (!params) - return Empty; - - Host::Ptr host = Host::GetByName(params->Get("host")); - - if (!host) - return Empty; - - Checkable::Ptr checkable; - - if (params->Contains("service")) - checkable = host->GetServiceByShortName(params->Get("service")); - else - checkable = host; - - if (!checkable) - return Empty; - - if (origin->FromZone && !origin->FromZone->CanAccessObject(checkable)) { - Log(LogNotice, "ApiEvents") - << "Discarding 'force next check' message from '" << origin->FromClient->GetIdentity() << "': Unauthorized access."; - return Empty; - } - - checkable->SetForceNextCheck(params->Get("forced"), false, origin); - - return Empty; -} - -void ApiEvents::ForceNextNotificationChangedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin) -{ - ApiListener::Ptr listener = ApiListener::GetInstance(); - - if (!listener) + if (queues.empty()) return; + Log(LogDebug, "ApiEvents", "Processing event type 'Notification'."); + + Dictionary::Ptr result = new Dictionary(); + result->Set("type", "Notification"); + result->Set("timestamp", Utility::GetTime()); + Host::Ptr host; Service::Ptr service; tie(host, service) = GetHostService(checkable); - Dictionary::Ptr params = new Dictionary(); - params->Set("host", host->GetName()); + result->Set("host", host->GetName()); if (service) - params->Set("service", service->GetShortName()); - params->Set("forced", checkable->GetForceNextNotification()); + result->Set("service", service->GetShortName()); - Dictionary::Ptr message = new Dictionary(); - message->Set("jsonrpc", "2.0"); - message->Set("method", "event::SetForceNextNotification"); - message->Set("params", params); + Array::Ptr userNames = new Array(); - listener->RelayMessage(origin, checkable, message, true); -} - -Value ApiEvents::ForceNextNotificationChangedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) -{ - Endpoint::Ptr endpoint = origin->FromClient->GetEndpoint(); - - if (!endpoint) { - Log(LogNotice, "ApiEvents") - << "Discarding 'force next notification changed' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed)."; - return Empty; + BOOST_FOREACH(const User::Ptr& user, users) { + userNames->Add(user->GetName()); } - if (!params) - return Empty; + result->Set("users", userNames); + result->Set("notification_type", Notification::NotificationTypeToString(type)); + result->Set("author", author); + result->Set("text", text); + result->Set("check_result", Serialize(cr)); - Host::Ptr host = Host::GetByName(params->Get("host")); - - if (!host) - return Empty; - - Checkable::Ptr checkable; - - if (params->Contains("service")) - checkable = host->GetServiceByShortName(params->Get("service")); - else - checkable = host; - - if (!checkable) - return Empty; - - if (origin->FromZone && !origin->FromZone->CanAccessObject(checkable)) { - Log(LogNotice, "ApiEvents") - << "Discarding 'force next notification' message from '" << origin->FromClient->GetIdentity() << "': Unauthorized access."; - return Empty; + BOOST_FOREACH(const EventQueue::Ptr& queue, queues) { + queue->ProcessEvent(result); } - - checkable->SetForceNextNotification(params->Get("forced"), false, origin); - - return Empty; } -void ApiEvents::CommentAddedHandler(const Checkable::Ptr& checkable, const Comment::Ptr& comment, const MessageOrigin::Ptr& origin) +void ApiEvents::FlappingChangedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin) { - ApiListener::Ptr listener = ApiListener::GetInstance(); + std::vector queues = EventQueue::GetQueuesForType("Flapping"); - if (!listener) + if (queues.empty()) return; + Log(LogDebug, "ApiEvents", "Processing event type 'Flapping'."); + + Dictionary::Ptr result = new Dictionary(); + result->Set("type", "Flapping"); + result->Set("timestamp", Utility::GetTime()); + Host::Ptr host; Service::Ptr service; tie(host, service) = GetHostService(checkable); - Dictionary::Ptr params = new Dictionary(); - params->Set("host", host->GetName()); + result->Set("host", host->GetName()); if (service) - params->Set("service", service->GetShortName()); - params->Set("comment", Serialize(comment)); + result->Set("service", service->GetShortName()); - Dictionary::Ptr message = new Dictionary(); - message->Set("jsonrpc", "2.0"); - message->Set("method", "event::AddComment"); - message->Set("params", params); + result->Set("state", service ? service->GetState() : host->GetState()); + result->Set("state_type", checkable->GetStateType()); + result->Set("is_flapping", checkable->IsFlapping()); - listener->RelayMessage(origin, checkable, message, true); -} - -Value ApiEvents::CommentAddedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) -{ - Endpoint::Ptr endpoint = origin->FromClient->GetEndpoint(); - - if (!endpoint) { - Log(LogNotice, "ApiEvents") - << "Discarding 'comment added' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed)."; - return Empty; + BOOST_FOREACH(const EventQueue::Ptr& queue, queues) { + queue->ProcessEvent(result); } - - if (!params) - return Empty; - - Host::Ptr host = Host::GetByName(params->Get("host")); - - if (!host) - return Empty; - - Checkable::Ptr checkable; - - if (params->Contains("service")) - checkable = host->GetServiceByShortName(params->Get("service")); - else - checkable = host; - - if (!checkable) - return Empty; - - if (origin->FromZone && !origin->FromZone->CanAccessObject(checkable)) { - Log(LogNotice, "ApiEvents") - << "Discarding 'comment added' message from '" << origin->FromClient->GetIdentity() << "': Unauthorized access."; - return Empty; - } - - Comment::Ptr comment = new Comment(); - Deserialize(comment, params->Get("comment"), true); - - checkable->AddComment(comment->GetEntryType(), comment->GetAuthor(), - comment->GetText(), comment->GetExpireTime(), comment->GetId(), origin); - - return Empty; -} - -void ApiEvents::CommentRemovedHandler(const Checkable::Ptr& checkable, const Comment::Ptr& comment, const MessageOrigin::Ptr& origin) -{ - ApiListener::Ptr listener = ApiListener::GetInstance(); - - if (!listener) - return; - - Host::Ptr host; - Service::Ptr service; - tie(host, service) = GetHostService(checkable); - - Dictionary::Ptr params = new Dictionary(); - params->Set("host", host->GetName()); - if (service) - params->Set("service", service->GetShortName()); - params->Set("id", comment->GetId()); - - Dictionary::Ptr message = new Dictionary(); - message->Set("jsonrpc", "2.0"); - message->Set("method", "event::RemoveComment"); - message->Set("params", params); - - listener->RelayMessage(origin, checkable, message, true); -} - -Value ApiEvents::CommentRemovedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) -{ - Endpoint::Ptr endpoint = origin->FromClient->GetEndpoint(); - - if (!endpoint) { - Log(LogNotice, "ApiEvents") - << "Discarding 'comment removed' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed)."; - return Empty; - } - - if (!params) - return Empty; - - Host::Ptr host = Host::GetByName(params->Get("host")); - - if (!host) - return Empty; - - Checkable::Ptr checkable; - - if (params->Contains("service")) - checkable = host->GetServiceByShortName(params->Get("service")); - else - checkable = host; - - if (!checkable) - return Empty; - - if (origin->FromZone && !origin->FromZone->CanAccessObject(checkable)) { - Log(LogNotice, "ApiEvents") - << "Discarding 'comment removed' message from '" << origin->FromClient->GetIdentity() << "': Unauthorized access."; - return Empty; - } - - checkable->RemoveComment(params->Get("id"), origin); - - return Empty; -} - -void ApiEvents::DowntimeAddedHandler(const Checkable::Ptr& checkable, const Downtime::Ptr& downtime, const MessageOrigin::Ptr& origin) -{ - ApiListener::Ptr listener = ApiListener::GetInstance(); - - if (!listener) - return; - - Host::Ptr host; - Service::Ptr service; - tie(host, service) = GetHostService(checkable); - - Dictionary::Ptr params = new Dictionary(); - params->Set("host", host->GetName()); - if (service) - params->Set("service", service->GetShortName()); - params->Set("downtime", Serialize(downtime)); - - Dictionary::Ptr message = new Dictionary(); - message->Set("jsonrpc", "2.0"); - message->Set("method", "event::AddDowntime"); - message->Set("params", params); - - listener->RelayMessage(origin, checkable, message, true); -} - -Value ApiEvents::DowntimeAddedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) -{ - Endpoint::Ptr endpoint = origin->FromClient->GetEndpoint(); - - if (!endpoint) { - Log(LogNotice, "ApiEvents") - << "Discarding 'downtime added' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed)."; - return Empty; - } - - if (!params) - return Empty; - - Host::Ptr host = Host::GetByName(params->Get("host")); - - if (!host) - return Empty; - - Checkable::Ptr checkable; - - if (params->Contains("service")) - checkable = host->GetServiceByShortName(params->Get("service")); - else - checkable = host; - - if (!checkable) - return Empty; - - if (origin->FromZone && !origin->FromZone->CanAccessObject(checkable)) { - Log(LogNotice, "ApiEvents") - << "Discarding 'downtime added' message from '" << origin->FromClient->GetIdentity() << "': Unauthorized access."; - return Empty; - } - - Downtime::Ptr downtime = new Downtime(); - Deserialize(downtime, params->Get("downtime"), true); - - checkable->AddDowntime(downtime->GetAuthor(), downtime->GetComment(), - downtime->GetStartTime(), downtime->GetEndTime(), - downtime->GetFixed(), downtime->GetTriggeredBy(), - downtime->GetDuration(), downtime->GetScheduledBy(), - downtime->GetId(), origin); - - return Empty; -} - -void ApiEvents::DowntimeRemovedHandler(const Checkable::Ptr& checkable, const Downtime::Ptr& downtime, const MessageOrigin::Ptr& origin) -{ - ApiListener::Ptr listener = ApiListener::GetInstance(); - - if (!listener) - return; - - Host::Ptr host; - Service::Ptr service; - tie(host, service) = GetHostService(checkable); - - Dictionary::Ptr params = new Dictionary(); - params->Set("host", host->GetName()); - if (service) - params->Set("service", service->GetShortName()); - params->Set("id", downtime->GetId()); - - Dictionary::Ptr message = new Dictionary(); - message->Set("jsonrpc", "2.0"); - message->Set("method", "event::RemoveDowntime"); - message->Set("params", params); - - listener->RelayMessage(origin, checkable, message, true); -} - -Value ApiEvents::DowntimeRemovedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) -{ - Endpoint::Ptr endpoint = origin->FromClient->GetEndpoint(); - - if (!endpoint) { - Log(LogNotice, "ApiEvents") - << "Discarding 'downtime removed' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed)."; - return Empty; - } - - if (!params) - return Empty; - - Host::Ptr host = Host::GetByName(params->Get("host")); - - if (!host) - return Empty; - - Checkable::Ptr checkable; - - if (params->Contains("service")) - checkable = host->GetServiceByShortName(params->Get("service")); - else - checkable = host; - - if (!checkable) - return Empty; - - if (origin->FromZone && !origin->FromZone->CanAccessObject(checkable)) { - Log(LogNotice, "ApiEvents") - << "Discarding 'downtime removed' message from '" << origin->FromClient->GetIdentity() << "': Unauthorized access."; - return Empty; - } - - checkable->RemoveDowntime(params->Get("id"), false, origin); - - return Empty; } void ApiEvents::AcknowledgementSetHandler(const Checkable::Ptr& checkable, const String& author, const String& comment, AcknowledgementType type, bool notify, double expiry, const MessageOrigin::Ptr& origin) { - ApiListener::Ptr listener = ApiListener::GetInstance(); + std::vector queues = EventQueue::GetQueuesForType("AcknowledgementSet"); - if (!listener) + if (queues.empty()) return; + Log(LogDebug, "ApiEvents", "Processing event type 'AcknowledgementSet'."); + + Dictionary::Ptr result = new Dictionary(); + result->Set("type", "AcknowledgementSet"); + result->Set("timestamp", Utility::GetTime()); + Host::Ptr host; Service::Ptr service; tie(host, service) = GetHostService(checkable); - Dictionary::Ptr params = new Dictionary(); - params->Set("host", host->GetName()); + result->Set("host", host->GetName()); if (service) - params->Set("service", service->GetShortName()); - params->Set("author", author); - params->Set("comment", comment); - params->Set("acktype", type); - params->Set("notify", notify); - params->Set("expiry", expiry); + result->Set("service", service->GetShortName()); - Dictionary::Ptr message = new Dictionary(); - message->Set("jsonrpc", "2.0"); - message->Set("method", "event::SetAcknowledgement"); - message->Set("params", params); + result->Set("state", service ? service->GetState() : host->GetState()); + result->Set("state_type", checkable->GetStateType()); - listener->RelayMessage(origin, checkable, message, true); -} + result->Set("author", author); + result->Set("comment", comment); + result->Set("acknowledgement_type", type); + result->Set("notify", notify); + result->Set("expiry", expiry); -Value ApiEvents::AcknowledgementSetAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) -{ - Endpoint::Ptr endpoint = origin->FromClient->GetEndpoint(); - - if (!endpoint) { - Log(LogNotice, "ApiEvents") - << "Discarding 'acknowledgement set' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed)."; - return Empty; + BOOST_FOREACH(const EventQueue::Ptr& queue, queues) { + queue->ProcessEvent(result); } - - if (!params) - return Empty; - - Host::Ptr host = Host::GetByName(params->Get("host")); - - if (!host) - return Empty; - - Checkable::Ptr checkable; - - if (params->Contains("service")) - checkable = host->GetServiceByShortName(params->Get("service")); - else - checkable = host; - - if (!checkable) - return Empty; - - if (origin->FromZone && !origin->FromZone->CanAccessObject(checkable)) { - Log(LogNotice, "ApiEvents") - << "Discarding 'acknowledgement set' message from '" << origin->FromClient->GetIdentity() << "': Unauthorized access."; - return Empty; - } - - checkable->AcknowledgeProblem(params->Get("author"), params->Get("comment"), - static_cast(static_cast(params->Get("acktype"))), - params->Get("notify"), params->Get("expiry"), origin); - - return Empty; } void ApiEvents::AcknowledgementClearedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin) { - ApiListener::Ptr listener = ApiListener::GetInstance(); + std::vector queues = EventQueue::GetQueuesForType("AcknowledgementCleared"); - if (!listener) + if (queues.empty()) return; + Log(LogDebug, "ApiEvents", "Processing event type 'AcknowledgementCleared'."); + + Dictionary::Ptr result = new Dictionary(); + result->Set("type", "AcknowledgementCleared"); + result->Set("timestamp", Utility::GetTime()); + Host::Ptr host; Service::Ptr service; tie(host, service) = GetHostService(checkable); - Dictionary::Ptr params = new Dictionary(); - params->Set("host", host->GetName()); + result->Set("host", host->GetName()); if (service) - params->Set("service", service->GetShortName()); + result->Set("service", service->GetShortName()); - Dictionary::Ptr message = new Dictionary(); - message->Set("jsonrpc", "2.0"); - message->Set("method", "event::ClearAcknowledgement"); - message->Set("params", params); + result->Set("state", service ? service->GetState() : host->GetState()); + result->Set("state_type", checkable->GetStateType()); - listener->RelayMessage(origin, checkable, message, true); + BOOST_FOREACH(const EventQueue::Ptr& queue, queues) { + queue->ProcessEvent(result); + } + + result->Set("acknowledgement_type", AcknowledgementNone); } -Value ApiEvents::AcknowledgementClearedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) +void ApiEvents::CommentAddedHandler(const Checkable::Ptr& checkable, const Comment::Ptr& comment, const MessageOrigin::Ptr& origin) { - Endpoint::Ptr endpoint = origin->FromClient->GetEndpoint(); + std::vector queues = EventQueue::GetQueuesForType("CommentAdded"); - if (!endpoint) { - Log(LogNotice, "ApiEvents") - << "Discarding 'acknowledgement cleared' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed)."; - return Empty; - } - - if (!params) - return Empty; - - Host::Ptr host = Host::GetByName(params->Get("host")); - - if (!host) - return Empty; - - Checkable::Ptr checkable; - - if (params->Contains("service")) - checkable = host->GetServiceByShortName(params->Get("service")); - else - checkable = host; - - if (!checkable) - return Empty; - - if (origin->FromZone && !origin->FromZone->CanAccessObject(checkable)) { - Log(LogNotice, "ApiEvents") - << "Discarding 'acknowledgement cleared' message from '" << origin->FromClient->GetIdentity() << "': Unauthorized access."; - return Empty; - } - - checkable->ClearAcknowledgement(origin); - - return Empty; -} - -Value ApiEvents::ExecuteCommandAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) -{ - Endpoint::Ptr sourceEndpoint = origin->FromClient->GetEndpoint(); - - if (!sourceEndpoint || (origin->FromZone && !Zone::GetLocalZone()->IsChildOf(origin->FromZone))) { - Log(LogNotice, "ApiEvents") - << "Discarding 'execute command' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed)."; - return Empty; - } - - ApiListener::Ptr listener = ApiListener::GetInstance(); - - if (!listener) { - Log(LogCritical, "ApiListener", "No instance available."); - return Empty; - } - - if (!listener->GetAcceptCommands()) { - Log(LogWarning, "ApiListener") - << "Ignoring command. '" << listener->GetName() << "' does not accept commands."; - - Host::Ptr host = new Host(); - Dictionary::Ptr attrs = new Dictionary(); - - attrs->Set("__name", params->Get("host")); - attrs->Set("type", "Host"); - - Deserialize(host, attrs, false, FAConfig); - - if (params->Contains("service")) - host->SetExtension("agent_service_name", params->Get("service")); - - CheckResult::Ptr cr = new CheckResult(); - cr->SetState(ServiceUnknown); - cr->SetOutput("Endpoint '" + Endpoint::GetLocalEndpoint()->GetName() + "' does not accept commands."); - Dictionary::Ptr message = MakeCheckResultMessage(host, cr); - listener->SyncSendMessage(sourceEndpoint, message); - - return Empty; - } - - /* use a virtual host object for executing the command */ - Host::Ptr host = new Host(); - Dictionary::Ptr attrs = new Dictionary(); - - attrs->Set("__name", params->Get("host")); - attrs->Set("type", "Host"); - - Deserialize(host, attrs, false, FAConfig); - - if (params->Contains("service")) - host->SetExtension("agent_service_name", params->Get("service")); - - String command = params->Get("command"); - String command_type = params->Get("command_type"); - - if (command_type == "check_command") { - if (!CheckCommand::GetByName(command)) { - CheckResult::Ptr cr = new CheckResult(); - cr->SetState(ServiceUnknown); - cr->SetOutput("Check command '" + command + "' does not exist."); - Dictionary::Ptr message = MakeCheckResultMessage(host, cr); - listener->SyncSendMessage(sourceEndpoint, message); - return Empty; - } - } else if (command_type == "event_command") { - if (!EventCommand::GetByName(command)) { - Log(LogWarning, "ApiEvents") - << "Event command '" << command << "' does not exist."; - return Empty; - } - } else - return Empty; - - attrs->Set(command_type, params->Get("command")); - attrs->Set("command_endpoint", sourceEndpoint->GetName()); - - Deserialize(host, attrs, false, FAConfig); - - host->SetExtension("agent_check", true); - - Dictionary::Ptr macros = params->Get("macros"); - - if (command_type == "check_command") { - try { - host->ExecuteRemoteCheck(macros); - } catch (const std::exception& ex) { - CheckResult::Ptr cr = new CheckResult(); - cr->SetState(ServiceUnknown); - - String output = "Exception occured while checking '" + host->GetName() + "': " + DiagnosticInformation(ex); - cr->SetOutput(output); - - double now = Utility::GetTime(); - cr->SetScheduleStart(now); - cr->SetScheduleEnd(now); - cr->SetExecutionStart(now); - cr->SetExecutionEnd(now); - - Dictionary::Ptr message = MakeCheckResultMessage(host, cr); - listener->SyncSendMessage(sourceEndpoint, message); - - Log(LogCritical, "checker", output); - } - } else if (command_type == "event_command") { - host->ExecuteEventHandler(macros, true); - } - - return Empty; -} - -void ApiEvents::RepositoryTimerHandler(void) -{ - ApiListener::Ptr listener = ApiListener::GetInstance(); - - if (!listener) + if (queues.empty()) return; - Dictionary::Ptr repository = new Dictionary(); + Log(LogDebug, "ApiEvents", "Processing event type 'CommentAdded'."); - BOOST_FOREACH(const Host::Ptr& host, ConfigType::GetObjectsByType()) { - Array::Ptr services = new Array(); + Dictionary::Ptr result = new Dictionary(); + result->Set("type", "CommentAdded"); + result->Set("timestamp", Utility::GetTime()); - BOOST_FOREACH(const Service::Ptr& service, host->GetServices()) { - services->Add(service->GetShortName()); - } + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); - repository->Set(host->GetName(), services); + result->Set("host", host->GetName()); + if (service) + result->Set("service", service->GetShortName()); + + result->Set("comment", Serialize(comment)); + + BOOST_FOREACH(const EventQueue::Ptr& queue, queues) { + queue->ProcessEvent(result); } +} - Endpoint::Ptr my_endpoint = Endpoint::GetLocalEndpoint(); +void ApiEvents::CommentRemovedHandler(const Checkable::Ptr& checkable, const Comment::Ptr& comment, const MessageOrigin::Ptr& origin) +{ + std::vector queues = EventQueue::GetQueuesForType("CommentRemoved"); - if (!my_endpoint) { - Log(LogWarning, "ApiEvents", "No local endpoint defined. Bailing out."); - return; - } - - Zone::Ptr my_zone = my_endpoint->GetZone(); - - if (!my_zone) + if (queues.empty()) return; - Dictionary::Ptr params = new Dictionary(); - params->Set("seen", Utility::GetTime()); - params->Set("endpoint", my_endpoint->GetName()); + Log(LogDebug, "ApiEvents", "Processing event type 'CommentRemoved'."); - Zone::Ptr parent_zone = my_zone->GetParent(); - if (parent_zone) - params->Set("parent_zone", parent_zone->GetName()); + Dictionary::Ptr result = new Dictionary(); + result->Set("type", "CommentRemoved"); + result->Set("timestamp", Utility::GetTime()); - params->Set("zone", my_zone->GetName()); - params->Set("repository", repository); + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); - Dictionary::Ptr message = new Dictionary(); - message->Set("jsonrpc", "2.0"); - message->Set("method", "event::UpdateRepository"); - message->Set("params", params); + result->Set("host", host->GetName()); + if (service) + result->Set("service", service->GetShortName()); - listener->RelayMessage(MessageOrigin::Ptr(), my_zone, message, false); -} - -String ApiEvents::GetRepositoryDir(void) -{ - return Application::GetLocalStateDir() + "/lib/icinga2/api/repository/"; -} - -Value ApiEvents::UpdateRepositoryAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) -{ - if (!params) - return Empty; - - Value vrepository = params->Get("repository"); - if (vrepository.IsEmpty() || !vrepository.IsObjectType()) - return Empty; - - String repositoryFile = GetRepositoryDir() + SHA256(params->Get("endpoint")) + ".repo"; - String repositoryTempFile = repositoryFile + ".tmp"; - - std::ofstream fp(repositoryTempFile.CStr(), std::ofstream::out | std::ostream::trunc); - fp << JsonEncode(params); - fp.close(); - -#ifdef _WIN32 - _unlink(repositoryFile.CStr()); -#endif /* _WIN32 */ - - if (rename(repositoryTempFile.CStr(), repositoryFile.CStr()) < 0) { - BOOST_THROW_EXCEPTION(posix_error() - << boost::errinfo_api_function("rename") - << boost::errinfo_errno(errno) - << boost::errinfo_file_name(repositoryTempFile)); + BOOST_FOREACH(const EventQueue::Ptr& queue, queues) { + queue->ProcessEvent(result); } - - ApiListener::Ptr listener = ApiListener::GetInstance(); - - if (!listener) - return Empty; - - Dictionary::Ptr message = new Dictionary(); - message->Set("jsonrpc", "2.0"); - message->Set("method", "event::UpdateRepository"); - message->Set("params", params); - - listener->RelayMessage(origin, Zone::GetLocalZone(), message, true); - - return Empty; } +void ApiEvents::DowntimeAddedHandler(const Checkable::Ptr& checkable, const Downtime::Ptr& downtime, const MessageOrigin::Ptr& origin) +{ + std::vector queues = EventQueue::GetQueuesForType("DowntimeAdded"); + + if (queues.empty()) + return; + + Log(LogDebug, "ApiEvents", "Processing event type 'DowntimeAdded'."); + + Dictionary::Ptr result = new Dictionary(); + result->Set("type", "DowntimeAdded"); + result->Set("timestamp", Utility::GetTime()); + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + result->Set("host", host->GetName()); + if (service) + result->Set("service", service->GetShortName()); + + result->Set("downtime", Serialize(downtime)); + + BOOST_FOREACH(const EventQueue::Ptr& queue, queues) { + queue->ProcessEvent(result); + } +} + +void ApiEvents::DowntimeRemovedHandler(const Checkable::Ptr& checkable, const Downtime::Ptr& downtime, const MessageOrigin::Ptr& origin) +{ + std::vector queues = EventQueue::GetQueuesForType("DowntimeRemoved"); + + if (queues.empty()) + return; + + Log(LogDebug, "ApiEvents", "Processing event type 'DowntimeRemoved'."); + + Dictionary::Ptr result = new Dictionary(); + result->Set("type", "DowntimeRemoved"); + result->Set("timestamp", Utility::GetTime()); + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + result->Set("host", host->GetName()); + if (service) + result->Set("service", service->GetShortName()); + + BOOST_FOREACH(const EventQueue::Ptr& queue, queues) { + queue->ProcessEvent(result); + } +} + +void ApiEvents::DowntimeTriggeredHandler(const Checkable::Ptr& checkable, const Downtime::Ptr& downtime) +{ + std::vector queues = EventQueue::GetQueuesForType("DowntimeTriggered"); + + if (queues.empty()) + return; + + Log(LogDebug, "ApiEvents", "Processing event type 'DowntimeTriggered'."); + + Dictionary::Ptr result = new Dictionary(); + result->Set("type", "DowntimeTriggered"); + result->Set("timestamp", Utility::GetTime()); + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + result->Set("host", host->GetName()); + if (service) + result->Set("service", service->GetShortName()); + + result->Set("downtime", Serialize(downtime)); + + BOOST_FOREACH(const EventQueue::Ptr& queue, queues) { + queue->ProcessEvent(result); + } +} diff --git a/lib/icinga/apievents.hpp b/lib/icinga/apievents.hpp index f01754b97..9e889ffed 100644 --- a/lib/icinga/apievents.hpp +++ b/lib/icinga/apievents.hpp @@ -22,9 +22,6 @@ #include "icinga/checkable.hpp" #include "icinga/host.hpp" -#include "icinga/checkcommand.hpp" -#include "icinga/eventcommand.hpp" -#include "icinga/notificationcommand.hpp" namespace icinga { @@ -38,46 +35,26 @@ public: static void StaticInitialize(void); static void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr& origin); - static Value CheckResultAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); + static void StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type, const MessageOrigin::Ptr& origin); - static void NextCheckChangedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin); - static Value NextCheckChangedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); - static void NextNotificationChangedHandler(const Notification::Ptr& notification, const MessageOrigin::Ptr& origin); - static Value NextNotificationChangedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); + static void NotificationSentToAllUsersHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable, + const std::set& users, NotificationType type, const CheckResult::Ptr& cr, const String& author, + const String& text); - static void ForceNextCheckChangedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin); - static Value ForceNextCheckChangedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); + static void FlappingChangedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin); - static void ForceNextNotificationChangedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin); - static Value ForceNextNotificationChangedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); + static void AcknowledgementSetHandler(const Checkable::Ptr& checkable, + const String& author, const String& comment, AcknowledgementType type, + bool notify, double expiry, const MessageOrigin::Ptr& origin); + static void AcknowledgementClearedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin); static void CommentAddedHandler(const Checkable::Ptr& checkable, const Comment::Ptr& comment, const MessageOrigin::Ptr& origin); - static Value CommentAddedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); - static void CommentRemovedHandler(const Checkable::Ptr& checkable, const Comment::Ptr& comment, const MessageOrigin::Ptr& origin); - static Value CommentRemovedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); static void DowntimeAddedHandler(const Checkable::Ptr& checkable, const Downtime::Ptr& downtime, const MessageOrigin::Ptr& origin); - static Value DowntimeAddedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); - static void DowntimeRemovedHandler(const Checkable::Ptr& checkable, const Downtime::Ptr& downtime, const MessageOrigin::Ptr& origin); - static Value DowntimeRemovedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); - - static void AcknowledgementSetHandler(const Checkable::Ptr& checkable, const String& author, const String& comment, AcknowledgementType type, - bool notify, double expiry, const MessageOrigin::Ptr& origin); - static Value AcknowledgementSetAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); - - static void AcknowledgementClearedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin); - static Value AcknowledgementClearedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); - - static Value ExecuteCommandAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); - - static String GetRepositoryDir(void); - static void RepositoryTimerHandler(void); - static Value UpdateRepositoryAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); - - static Dictionary::Ptr MakeCheckResultMessage(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); + static void DowntimeTriggeredHandler(const Checkable::Ptr& checkable, const Downtime::Ptr& downtime); }; } diff --git a/lib/icinga/checkable-check.cpp b/lib/icinga/checkable-check.cpp index 38bd0de1b..6305a5ee3 100644 --- a/lib/icinga/checkable-check.cpp +++ b/lib/icinga/checkable-check.cpp @@ -23,7 +23,7 @@ #include "icinga/checkcommand.hpp" #include "icinga/icingaapplication.hpp" #include "icinga/cib.hpp" -#include "icinga/apievents.hpp" +#include "icinga/clusterevents.hpp" #include "remote/messageorigin.hpp" #include "remote/apilistener.hpp" #include "base/objectlock.hpp" @@ -140,7 +140,7 @@ void Checkable::ProcessCheckResult(const CheckResult::Ptr& cr, const MessageOrig if (listener) { /* send message back to its origin */ - Dictionary::Ptr message = ApiEvents::MakeCheckResultMessage(this, cr); + Dictionary::Ptr message = ClusterEvents::MakeCheckResultMessage(this, cr); listener->SyncSendMessage(command_endpoint, message); } diff --git a/lib/icinga/checkable-downtime.cpp b/lib/icinga/checkable-downtime.cpp index 01e8bba62..a56d71de9 100644 --- a/lib/icinga/checkable-downtime.cpp +++ b/lib/icinga/checkable-downtime.cpp @@ -236,9 +236,12 @@ void Checkable::TriggerDowntime(const String& id) downtime->SetTriggerTime(Utility::GetTime()); Dictionary::Ptr triggers = downtime->GetTriggers(); - ObjectLock olock(triggers); - BOOST_FOREACH(const Dictionary::Pair& kv, triggers) { - TriggerDowntime(kv.first); + + { + ObjectLock olock(triggers); + BOOST_FOREACH(const Dictionary::Pair& kv, triggers) { + TriggerDowntime(kv.first); + } } OnDowntimeTriggered(owner, downtime); diff --git a/lib/icinga/clusterevents.cpp b/lib/icinga/clusterevents.cpp new file mode 100644 index 000000000..1228673d8 --- /dev/null +++ b/lib/icinga/clusterevents.cpp @@ -0,0 +1,1034 @@ +/****************************************************************************** + * Icinga 2 * + * Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org) * + * * + * This program is free software; you can redistribute it and/or * + * modify it under the terms of the GNU General Public License * + * as published by the Free Software Foundation; either version 2 * + * of the License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the Free Software Foundation * + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * + ******************************************************************************/ + +#include "icinga/clusterevents.hpp" +#include "icinga/service.hpp" +#include "icinga/perfdatavalue.hpp" +#include "remote/apilistener.hpp" +#include "remote/endpoint.hpp" +#include "remote/messageorigin.hpp" +#include "remote/zone.hpp" +#include "remote/apifunction.hpp" +#include "remote/eventqueue.hpp" +#include "base/application.hpp" +#include "base/configtype.hpp" +#include "base/utility.hpp" +#include "base/exception.hpp" +#include "base/initialize.hpp" +#include "base/serializer.hpp" +#include "base/json.hpp" +#include + +using namespace icinga; + +INITIALIZE_ONCE(&ClusterEvents::StaticInitialize); + +REGISTER_APIFUNCTION(CheckResult, event, &ClusterEvents::CheckResultAPIHandler); +REGISTER_APIFUNCTION(SetNextCheck, event, &ClusterEvents::NextCheckChangedAPIHandler); +REGISTER_APIFUNCTION(SetNextNotification, event, &ClusterEvents::NextNotificationChangedAPIHandler); +REGISTER_APIFUNCTION(SetForceNextCheck, event, &ClusterEvents::ForceNextCheckChangedAPIHandler); +REGISTER_APIFUNCTION(SetForceNextNotification, event, &ClusterEvents::ForceNextNotificationChangedAPIHandler); +REGISTER_APIFUNCTION(AddComment, event, &ClusterEvents::CommentAddedAPIHandler); +REGISTER_APIFUNCTION(RemoveComment, event, &ClusterEvents::CommentRemovedAPIHandler); +REGISTER_APIFUNCTION(AddDowntime, event, &ClusterEvents::DowntimeAddedAPIHandler); +REGISTER_APIFUNCTION(RemoveDowntime, event, &ClusterEvents::DowntimeRemovedAPIHandler); +REGISTER_APIFUNCTION(SetAcknowledgement, event, &ClusterEvents::AcknowledgementSetAPIHandler); +REGISTER_APIFUNCTION(ClearAcknowledgement, event, &ClusterEvents::AcknowledgementClearedAPIHandler); +REGISTER_APIFUNCTION(UpdateRepository, event, &ClusterEvents::UpdateRepositoryAPIHandler); +REGISTER_APIFUNCTION(ExecuteCommand, event, &ClusterEvents::ExecuteCommandAPIHandler); + +static Timer::Ptr l_RepositoryTimer; + +void ClusterEvents::StaticInitialize(void) +{ + Checkable::OnNewCheckResult.connect(&ClusterEvents::CheckResultHandler); + Checkable::OnNextCheckChanged.connect(&ClusterEvents::NextCheckChangedHandler); + Notification::OnNextNotificationChanged.connect(&ClusterEvents::NextNotificationChangedHandler); + Checkable::OnForceNextCheckChanged.connect(&ClusterEvents::ForceNextCheckChangedHandler); + Checkable::OnForceNextNotificationChanged.connect(&ClusterEvents::ForceNextNotificationChangedHandler); + + Checkable::OnCommentAdded.connect(&ClusterEvents::CommentAddedHandler); + Checkable::OnCommentRemoved.connect(&ClusterEvents::CommentRemovedHandler); + Checkable::OnDowntimeAdded.connect(&ClusterEvents::DowntimeAddedHandler); + Checkable::OnDowntimeRemoved.connect(&ClusterEvents::DowntimeRemovedHandler); + Checkable::OnAcknowledgementSet.connect(&ClusterEvents::AcknowledgementSetHandler); + Checkable::OnAcknowledgementCleared.connect(&ClusterEvents::AcknowledgementClearedHandler); + + l_RepositoryTimer = new Timer(); + l_RepositoryTimer->SetInterval(30); + l_RepositoryTimer->OnTimerExpired.connect(boost::bind(&ClusterEvents::RepositoryTimerHandler)); + l_RepositoryTimer->Start(); + l_RepositoryTimer->Reschedule(0); +} + +Dictionary::Ptr ClusterEvents::MakeCheckResultMessage(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) +{ + Dictionary::Ptr message = new Dictionary(); + message->Set("jsonrpc", "2.0"); + message->Set("method", "event::CheckResult"); + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + Dictionary::Ptr params = new Dictionary(); + params->Set("host", host->GetName()); + if (service) + params->Set("service", service->GetShortName()); + else { + Value agent_service_name = checkable->GetExtension("agent_service_name"); + + if (!agent_service_name.IsEmpty()) + params->Set("service", agent_service_name); + } + params->Set("cr", Serialize(cr)); + + message->Set("params", params); + + return message; +} + +void ClusterEvents::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr& origin) +{ + ApiListener::Ptr listener = ApiListener::GetInstance(); + + if (!listener) + return; + + Dictionary::Ptr message = MakeCheckResultMessage(checkable, cr); + listener->RelayMessage(origin, checkable, message, true); +} + +Value ClusterEvents::CheckResultAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) +{ + Endpoint::Ptr endpoint = origin->FromClient->GetEndpoint(); + + if (!endpoint) { + Log(LogNotice, "ClusterEvents") + << "Discarding 'check result' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed)."; + return Empty; + } + + if (!params) + return Empty; + + CheckResult::Ptr cr = new CheckResult(); + + Dictionary::Ptr vcr = params->Get("cr"); + Array::Ptr vperf = vcr->Get("performance_data"); + vcr->Remove("performance_data"); + + Deserialize(cr, params->Get("cr"), true); + + Array::Ptr rperf = new Array(); + + if (vperf) { + ObjectLock olock(vperf); + BOOST_FOREACH(const Value& vp, vperf) { + Value p; + + if (vp.IsObjectType()) { + PerfdataValue::Ptr val = new PerfdataValue(); + Deserialize(val, vp, true); + rperf->Add(val); + } else + rperf->Add(vp); + } + } + + cr->SetPerformanceData(rperf); + + Host::Ptr host = Host::GetByName(params->Get("host")); + + if (!host) + return Empty; + + Checkable::Ptr checkable; + + if (params->Contains("service")) + checkable = host->GetServiceByShortName(params->Get("service")); + else + checkable = host; + + if (!checkable) + return Empty; + + if (origin->FromZone && !origin->FromZone->CanAccessObject(checkable) && endpoint != checkable->GetCommandEndpoint()) { + Log(LogNotice, "ClusterEvents") + << "Discarding 'check result' message from '" << origin->FromClient->GetIdentity() << "': Unauthorized access."; + return Empty; + } + + if (endpoint == checkable->GetCommandEndpoint()) + checkable->ProcessCheckResult(cr); + else + checkable->ProcessCheckResult(cr, origin); + + return Empty; +} + +void ClusterEvents::NextCheckChangedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin) +{ + ApiListener::Ptr listener = ApiListener::GetInstance(); + + if (!listener) + return; + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + Dictionary::Ptr params = new Dictionary(); + params->Set("host", host->GetName()); + if (service) + params->Set("service", service->GetShortName()); + params->Set("next_check", checkable->GetNextCheck()); + + Dictionary::Ptr message = new Dictionary(); + message->Set("jsonrpc", "2.0"); + message->Set("method", "event::SetNextCheck"); + message->Set("params", params); + + listener->RelayMessage(origin, checkable, message, true); +} + +Value ClusterEvents::NextCheckChangedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) +{ + Endpoint::Ptr endpoint = origin->FromClient->GetEndpoint(); + + if (!endpoint) { + Log(LogNotice, "ClusterEvents") + << "Discarding 'next check changed' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed)."; + return Empty; + } + + if (!params) + return Empty; + + Host::Ptr host = Host::GetByName(params->Get("host")); + + if (!host) + return Empty; + + Checkable::Ptr checkable; + + if (params->Contains("service")) + checkable = host->GetServiceByShortName(params->Get("service")); + else + checkable = host; + + if (!checkable) + return Empty; + + if (origin->FromZone && !origin->FromZone->CanAccessObject(checkable)) { + Log(LogNotice, "ClusterEvents") + << "Discarding 'next check changed' message from '" << origin->FromClient->GetIdentity() << "': Unauthorized access."; + return Empty; + } + + checkable->SetNextCheck(params->Get("next_check"), false, origin); + + return Empty; +} + +void ClusterEvents::NextNotificationChangedHandler(const Notification::Ptr& notification, const MessageOrigin::Ptr& origin) +{ + ApiListener::Ptr listener = ApiListener::GetInstance(); + + if (!listener) + return; + + Dictionary::Ptr params = new Dictionary(); + params->Set("notification", notification->GetName()); + params->Set("next_notification", notification->GetNextNotification()); + + Dictionary::Ptr message = new Dictionary(); + message->Set("jsonrpc", "2.0"); + message->Set("method", "event::SetNextNotification"); + message->Set("params", params); + + listener->RelayMessage(origin, notification, message, true); +} + +Value ClusterEvents::NextNotificationChangedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) +{ + Endpoint::Ptr endpoint = origin->FromClient->GetEndpoint(); + + if (!endpoint) { + Log(LogNotice, "ClusterEvents") + << "Discarding 'next notification changed' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed)."; + return Empty; + } + + if (!params) + return Empty; + + Notification::Ptr notification = Notification::GetByName(params->Get("notification")); + + if (!notification) + return Empty; + + if (origin->FromZone && !origin->FromZone->CanAccessObject(notification)) { + Log(LogNotice, "ClusterEvents") + << "Discarding 'next notification changed' message from '" << origin->FromClient->GetIdentity() << "': Unauthorized access."; + return Empty; + } + + notification->SetNextNotification(params->Get("next_notification"), false, origin); + + return Empty; +} + +void ClusterEvents::ForceNextCheckChangedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin) +{ + ApiListener::Ptr listener = ApiListener::GetInstance(); + + if (!listener) + return; + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + Dictionary::Ptr params = new Dictionary(); + params->Set("host", host->GetName()); + if (service) + params->Set("service", service->GetShortName()); + params->Set("forced", checkable->GetForceNextCheck()); + + Dictionary::Ptr message = new Dictionary(); + message->Set("jsonrpc", "2.0"); + message->Set("method", "event::SetForceNextCheck"); + message->Set("params", params); + + listener->RelayMessage(origin, checkable, message, true); +} + +Value ClusterEvents::ForceNextCheckChangedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) +{ + Endpoint::Ptr endpoint = origin->FromClient->GetEndpoint(); + + if (!endpoint) { + Log(LogNotice, "ClusterEvents") + << "Discarding 'force next check changed' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed)."; + return Empty; + } + + if (!params) + return Empty; + + Host::Ptr host = Host::GetByName(params->Get("host")); + + if (!host) + return Empty; + + Checkable::Ptr checkable; + + if (params->Contains("service")) + checkable = host->GetServiceByShortName(params->Get("service")); + else + checkable = host; + + if (!checkable) + return Empty; + + if (origin->FromZone && !origin->FromZone->CanAccessObject(checkable)) { + Log(LogNotice, "ClusterEvents") + << "Discarding 'force next check' message from '" << origin->FromClient->GetIdentity() << "': Unauthorized access."; + return Empty; + } + + checkable->SetForceNextCheck(params->Get("forced"), false, origin); + + return Empty; +} + +void ClusterEvents::ForceNextNotificationChangedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin) +{ + ApiListener::Ptr listener = ApiListener::GetInstance(); + + if (!listener) + return; + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + Dictionary::Ptr params = new Dictionary(); + params->Set("host", host->GetName()); + if (service) + params->Set("service", service->GetShortName()); + params->Set("forced", checkable->GetForceNextNotification()); + + Dictionary::Ptr message = new Dictionary(); + message->Set("jsonrpc", "2.0"); + message->Set("method", "event::SetForceNextNotification"); + message->Set("params", params); + + listener->RelayMessage(origin, checkable, message, true); +} + +Value ClusterEvents::ForceNextNotificationChangedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) +{ + Endpoint::Ptr endpoint = origin->FromClient->GetEndpoint(); + + if (!endpoint) { + Log(LogNotice, "ClusterEvents") + << "Discarding 'force next notification changed' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed)."; + return Empty; + } + + if (!params) + return Empty; + + Host::Ptr host = Host::GetByName(params->Get("host")); + + if (!host) + return Empty; + + Checkable::Ptr checkable; + + if (params->Contains("service")) + checkable = host->GetServiceByShortName(params->Get("service")); + else + checkable = host; + + if (!checkable) + return Empty; + + if (origin->FromZone && !origin->FromZone->CanAccessObject(checkable)) { + Log(LogNotice, "ClusterEvents") + << "Discarding 'force next notification' message from '" << origin->FromClient->GetIdentity() << "': Unauthorized access."; + return Empty; + } + + checkable->SetForceNextNotification(params->Get("forced"), false, origin); + + return Empty; +} + +void ClusterEvents::CommentAddedHandler(const Checkable::Ptr& checkable, const Comment::Ptr& comment, const MessageOrigin::Ptr& origin) +{ + ApiListener::Ptr listener = ApiListener::GetInstance(); + + if (!listener) + return; + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + Dictionary::Ptr params = new Dictionary(); + params->Set("host", host->GetName()); + if (service) + params->Set("service", service->GetShortName()); + params->Set("comment", Serialize(comment)); + + Dictionary::Ptr message = new Dictionary(); + message->Set("jsonrpc", "2.0"); + message->Set("method", "event::AddComment"); + message->Set("params", params); + + listener->RelayMessage(origin, checkable, message, true); +} + +Value ClusterEvents::CommentAddedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) +{ + Endpoint::Ptr endpoint = origin->FromClient->GetEndpoint(); + + if (!endpoint) { + Log(LogNotice, "ClusterEvents") + << "Discarding 'comment added' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed)."; + return Empty; + } + + if (!params) + return Empty; + + Host::Ptr host = Host::GetByName(params->Get("host")); + + if (!host) + return Empty; + + Checkable::Ptr checkable; + + if (params->Contains("service")) + checkable = host->GetServiceByShortName(params->Get("service")); + else + checkable = host; + + if (!checkable) + return Empty; + + if (origin->FromZone && !origin->FromZone->CanAccessObject(checkable)) { + Log(LogNotice, "ClusterEvents") + << "Discarding 'comment added' message from '" << origin->FromClient->GetIdentity() << "': Unauthorized access."; + return Empty; + } + + Comment::Ptr comment = new Comment(); + Deserialize(comment, params->Get("comment"), true); + + checkable->AddComment(comment->GetEntryType(), comment->GetAuthor(), + comment->GetText(), comment->GetExpireTime(), comment->GetId(), origin); + + return Empty; +} + +void ClusterEvents::CommentRemovedHandler(const Checkable::Ptr& checkable, const Comment::Ptr& comment, const MessageOrigin::Ptr& origin) +{ + ApiListener::Ptr listener = ApiListener::GetInstance(); + + if (!listener) + return; + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + Dictionary::Ptr params = new Dictionary(); + params->Set("host", host->GetName()); + if (service) + params->Set("service", service->GetShortName()); + params->Set("id", comment->GetId()); + + Dictionary::Ptr message = new Dictionary(); + message->Set("jsonrpc", "2.0"); + message->Set("method", "event::RemoveComment"); + message->Set("params", params); + + listener->RelayMessage(origin, checkable, message, true); +} + +Value ClusterEvents::CommentRemovedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) +{ + Endpoint::Ptr endpoint = origin->FromClient->GetEndpoint(); + + if (!endpoint) { + Log(LogNotice, "ClusterEvents") + << "Discarding 'comment removed' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed)."; + return Empty; + } + + if (!params) + return Empty; + + Host::Ptr host = Host::GetByName(params->Get("host")); + + if (!host) + return Empty; + + Checkable::Ptr checkable; + + if (params->Contains("service")) + checkable = host->GetServiceByShortName(params->Get("service")); + else + checkable = host; + + if (!checkable) + return Empty; + + if (origin->FromZone && !origin->FromZone->CanAccessObject(checkable)) { + Log(LogNotice, "ClusterEvents") + << "Discarding 'comment removed' message from '" << origin->FromClient->GetIdentity() << "': Unauthorized access."; + return Empty; + } + + checkable->RemoveComment(params->Get("id"), origin); + + return Empty; +} + +void ClusterEvents::DowntimeAddedHandler(const Checkable::Ptr& checkable, const Downtime::Ptr& downtime, const MessageOrigin::Ptr& origin) +{ + ApiListener::Ptr listener = ApiListener::GetInstance(); + + if (!listener) + return; + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + Dictionary::Ptr params = new Dictionary(); + params->Set("host", host->GetName()); + if (service) + params->Set("service", service->GetShortName()); + params->Set("downtime", Serialize(downtime)); + + Dictionary::Ptr message = new Dictionary(); + message->Set("jsonrpc", "2.0"); + message->Set("method", "event::AddDowntime"); + message->Set("params", params); + + listener->RelayMessage(origin, checkable, message, true); +} + +Value ClusterEvents::DowntimeAddedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) +{ + Endpoint::Ptr endpoint = origin->FromClient->GetEndpoint(); + + if (!endpoint) { + Log(LogNotice, "ClusterEvents") + << "Discarding 'downtime added' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed)."; + return Empty; + } + + if (!params) + return Empty; + + Host::Ptr host = Host::GetByName(params->Get("host")); + + if (!host) + return Empty; + + Checkable::Ptr checkable; + + if (params->Contains("service")) + checkable = host->GetServiceByShortName(params->Get("service")); + else + checkable = host; + + if (!checkable) + return Empty; + + if (origin->FromZone && !origin->FromZone->CanAccessObject(checkable)) { + Log(LogNotice, "ClusterEvents") + << "Discarding 'downtime added' message from '" << origin->FromClient->GetIdentity() << "': Unauthorized access."; + return Empty; + } + + Downtime::Ptr downtime = new Downtime(); + Deserialize(downtime, params->Get("downtime"), true); + + checkable->AddDowntime(downtime->GetAuthor(), downtime->GetComment(), + downtime->GetStartTime(), downtime->GetEndTime(), + downtime->GetFixed(), downtime->GetTriggeredBy(), + downtime->GetDuration(), downtime->GetScheduledBy(), + downtime->GetId(), origin); + + return Empty; +} + +void ClusterEvents::DowntimeRemovedHandler(const Checkable::Ptr& checkable, const Downtime::Ptr& downtime, const MessageOrigin::Ptr& origin) +{ + ApiListener::Ptr listener = ApiListener::GetInstance(); + + if (!listener) + return; + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + Dictionary::Ptr params = new Dictionary(); + params->Set("host", host->GetName()); + if (service) + params->Set("service", service->GetShortName()); + params->Set("id", downtime->GetId()); + + Dictionary::Ptr message = new Dictionary(); + message->Set("jsonrpc", "2.0"); + message->Set("method", "event::RemoveDowntime"); + message->Set("params", params); + + listener->RelayMessage(origin, checkable, message, true); +} + +Value ClusterEvents::DowntimeRemovedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) +{ + Endpoint::Ptr endpoint = origin->FromClient->GetEndpoint(); + + if (!endpoint) { + Log(LogNotice, "ClusterEvents") + << "Discarding 'downtime removed' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed)."; + return Empty; + } + + if (!params) + return Empty; + + Host::Ptr host = Host::GetByName(params->Get("host")); + + if (!host) + return Empty; + + Checkable::Ptr checkable; + + if (params->Contains("service")) + checkable = host->GetServiceByShortName(params->Get("service")); + else + checkable = host; + + if (!checkable) + return Empty; + + if (origin->FromZone && !origin->FromZone->CanAccessObject(checkable)) { + Log(LogNotice, "ClusterEvents") + << "Discarding 'downtime removed' message from '" << origin->FromClient->GetIdentity() << "': Unauthorized access."; + return Empty; + } + + checkable->RemoveDowntime(params->Get("id"), false, origin); + + return Empty; +} + +void ClusterEvents::AcknowledgementSetHandler(const Checkable::Ptr& checkable, + const String& author, const String& comment, AcknowledgementType type, + bool notify, double expiry, const MessageOrigin::Ptr& origin) +{ + ApiListener::Ptr listener = ApiListener::GetInstance(); + + if (!listener) + return; + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + Dictionary::Ptr params = new Dictionary(); + params->Set("host", host->GetName()); + if (service) + params->Set("service", service->GetShortName()); + params->Set("author", author); + params->Set("comment", comment); + params->Set("acktype", type); + params->Set("notify", notify); + params->Set("expiry", expiry); + + Dictionary::Ptr message = new Dictionary(); + message->Set("jsonrpc", "2.0"); + message->Set("method", "event::SetAcknowledgement"); + message->Set("params", params); + + listener->RelayMessage(origin, checkable, message, true); +} + +Value ClusterEvents::AcknowledgementSetAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) +{ + Endpoint::Ptr endpoint = origin->FromClient->GetEndpoint(); + + if (!endpoint) { + Log(LogNotice, "ClusterEvents") + << "Discarding 'acknowledgement set' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed)."; + return Empty; + } + + if (!params) + return Empty; + + Host::Ptr host = Host::GetByName(params->Get("host")); + + if (!host) + return Empty; + + Checkable::Ptr checkable; + + if (params->Contains("service")) + checkable = host->GetServiceByShortName(params->Get("service")); + else + checkable = host; + + if (!checkable) + return Empty; + + if (origin->FromZone && !origin->FromZone->CanAccessObject(checkable)) { + Log(LogNotice, "ClusterEvents") + << "Discarding 'acknowledgement set' message from '" << origin->FromClient->GetIdentity() << "': Unauthorized access."; + return Empty; + } + + checkable->AcknowledgeProblem(params->Get("author"), params->Get("comment"), + static_cast(static_cast(params->Get("acktype"))), + params->Get("notify"), params->Get("expiry"), origin); + + return Empty; +} + +void ClusterEvents::AcknowledgementClearedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin) +{ + ApiListener::Ptr listener = ApiListener::GetInstance(); + + if (!listener) + return; + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + Dictionary::Ptr params = new Dictionary(); + params->Set("host", host->GetName()); + if (service) + params->Set("service", service->GetShortName()); + + Dictionary::Ptr message = new Dictionary(); + message->Set("jsonrpc", "2.0"); + message->Set("method", "event::ClearAcknowledgement"); + message->Set("params", params); + + listener->RelayMessage(origin, checkable, message, true); +} + +Value ClusterEvents::AcknowledgementClearedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) +{ + Endpoint::Ptr endpoint = origin->FromClient->GetEndpoint(); + + if (!endpoint) { + Log(LogNotice, "ClusterEvents") + << "Discarding 'acknowledgement cleared' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed)."; + return Empty; + } + + if (!params) + return Empty; + + Host::Ptr host = Host::GetByName(params->Get("host")); + + if (!host) + return Empty; + + Checkable::Ptr checkable; + + if (params->Contains("service")) + checkable = host->GetServiceByShortName(params->Get("service")); + else + checkable = host; + + if (!checkable) + return Empty; + + if (origin->FromZone && !origin->FromZone->CanAccessObject(checkable)) { + Log(LogNotice, "ClusterEvents") + << "Discarding 'acknowledgement cleared' message from '" << origin->FromClient->GetIdentity() << "': Unauthorized access."; + return Empty; + } + + checkable->ClearAcknowledgement(origin); + + return Empty; +} + +Value ClusterEvents::ExecuteCommandAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) +{ + Endpoint::Ptr sourceEndpoint = origin->FromClient->GetEndpoint(); + + if (!sourceEndpoint || (origin->FromZone && !Zone::GetLocalZone()->IsChildOf(origin->FromZone))) { + Log(LogNotice, "ClusterEvents") + << "Discarding 'execute command' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed)."; + return Empty; + } + + ApiListener::Ptr listener = ApiListener::GetInstance(); + + if (!listener) { + Log(LogCritical, "ApiListener", "No instance available."); + return Empty; + } + + if (!listener->GetAcceptCommands()) { + Log(LogWarning, "ApiListener") + << "Ignoring command. '" << listener->GetName() << "' does not accept commands."; + + Host::Ptr host = new Host(); + Dictionary::Ptr attrs = new Dictionary(); + + attrs->Set("__name", params->Get("host")); + attrs->Set("type", "Host"); + + Deserialize(host, attrs, false, FAConfig); + + if (params->Contains("service")) + host->SetExtension("agent_service_name", params->Get("service")); + + CheckResult::Ptr cr = new CheckResult(); + cr->SetState(ServiceUnknown); + cr->SetOutput("Endpoint '" + Endpoint::GetLocalEndpoint()->GetName() + "' does not accept commands."); + Dictionary::Ptr message = MakeCheckResultMessage(host, cr); + listener->SyncSendMessage(sourceEndpoint, message); + + return Empty; + } + + /* use a virtual host object for executing the command */ + Host::Ptr host = new Host(); + Dictionary::Ptr attrs = new Dictionary(); + + attrs->Set("__name", params->Get("host")); + attrs->Set("type", "Host"); + + Deserialize(host, attrs, false, FAConfig); + + if (params->Contains("service")) + host->SetExtension("agent_service_name", params->Get("service")); + + String command = params->Get("command"); + String command_type = params->Get("command_type"); + + if (command_type == "check_command") { + if (!CheckCommand::GetByName(command)) { + CheckResult::Ptr cr = new CheckResult(); + cr->SetState(ServiceUnknown); + cr->SetOutput("Check command '" + command + "' does not exist."); + Dictionary::Ptr message = MakeCheckResultMessage(host, cr); + listener->SyncSendMessage(sourceEndpoint, message); + return Empty; + } + } else if (command_type == "event_command") { + if (!EventCommand::GetByName(command)) { + Log(LogWarning, "ClusterEvents") + << "Event command '" << command << "' does not exist."; + return Empty; + } + } else + return Empty; + + attrs->Set(command_type, params->Get("command")); + attrs->Set("command_endpoint", sourceEndpoint->GetName()); + + Deserialize(host, attrs, false, FAConfig); + + host->SetExtension("agent_check", true); + + Dictionary::Ptr macros = params->Get("macros"); + + if (command_type == "check_command") { + try { + host->ExecuteRemoteCheck(macros); + } catch (const std::exception& ex) { + CheckResult::Ptr cr = new CheckResult(); + cr->SetState(ServiceUnknown); + + String output = "Exception occured while checking '" + host->GetName() + "': " + DiagnosticInformation(ex); + cr->SetOutput(output); + + double now = Utility::GetTime(); + cr->SetScheduleStart(now); + cr->SetScheduleEnd(now); + cr->SetExecutionStart(now); + cr->SetExecutionEnd(now); + + Dictionary::Ptr message = MakeCheckResultMessage(host, cr); + listener->SyncSendMessage(sourceEndpoint, message); + + Log(LogCritical, "checker", output); + } + } else if (command_type == "event_command") { + host->ExecuteEventHandler(macros, true); + } + + return Empty; +} + +void ClusterEvents::RepositoryTimerHandler(void) +{ + ApiListener::Ptr listener = ApiListener::GetInstance(); + + if (!listener) + return; + + Dictionary::Ptr repository = new Dictionary(); + + BOOST_FOREACH(const Host::Ptr& host, ConfigType::GetObjectsByType()) { + Array::Ptr services = new Array(); + + BOOST_FOREACH(const Service::Ptr& service, host->GetServices()) { + services->Add(service->GetShortName()); + } + + repository->Set(host->GetName(), services); + } + + Endpoint::Ptr my_endpoint = Endpoint::GetLocalEndpoint(); + + if (!my_endpoint) { + Log(LogWarning, "ClusterEvents", "No local endpoint defined. Bailing out."); + return; + } + + Zone::Ptr my_zone = my_endpoint->GetZone(); + + if (!my_zone) + return; + + Dictionary::Ptr params = new Dictionary(); + params->Set("seen", Utility::GetTime()); + params->Set("endpoint", my_endpoint->GetName()); + + Zone::Ptr parent_zone = my_zone->GetParent(); + if (parent_zone) + params->Set("parent_zone", parent_zone->GetName()); + + params->Set("zone", my_zone->GetName()); + params->Set("repository", repository); + + Dictionary::Ptr message = new Dictionary(); + message->Set("jsonrpc", "2.0"); + message->Set("method", "event::UpdateRepository"); + message->Set("params", params); + + listener->RelayMessage(MessageOrigin::Ptr(), my_zone, message, false); +} + +String ClusterEvents::GetRepositoryDir(void) +{ + return Application::GetLocalStateDir() + "/lib/icinga2/api/repository/"; +} + +Value ClusterEvents::UpdateRepositoryAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) +{ + if (!params) + return Empty; + + Value vrepository = params->Get("repository"); + if (vrepository.IsEmpty() || !vrepository.IsObjectType()) + return Empty; + + String repositoryFile = GetRepositoryDir() + SHA256(params->Get("endpoint")) + ".repo"; + String repositoryTempFile = repositoryFile + ".tmp"; + + std::ofstream fp(repositoryTempFile.CStr(), std::ofstream::out | std::ostream::trunc); + fp << JsonEncode(params); + fp.close(); + +#ifdef _WIN32 + _unlink(repositoryFile.CStr()); +#endif /* _WIN32 */ + + if (rename(repositoryTempFile.CStr(), repositoryFile.CStr()) < 0) { + BOOST_THROW_EXCEPTION(posix_error() + << boost::errinfo_api_function("rename") + << boost::errinfo_errno(errno) + << boost::errinfo_file_name(repositoryTempFile)); + } + + ApiListener::Ptr listener = ApiListener::GetInstance(); + + if (!listener) + return Empty; + + Dictionary::Ptr message = new Dictionary(); + message->Set("jsonrpc", "2.0"); + message->Set("method", "event::UpdateRepository"); + message->Set("params", params); + + listener->RelayMessage(origin, Zone::GetLocalZone(), message, true); + + return Empty; +} diff --git a/lib/icinga/clusterevents.hpp b/lib/icinga/clusterevents.hpp new file mode 100644 index 000000000..ab6c9d166 --- /dev/null +++ b/lib/icinga/clusterevents.hpp @@ -0,0 +1,85 @@ +/****************************************************************************** + * Icinga 2 * + * Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org) * + * * + * This program is free software; you can redistribute it and/or * + * modify it under the terms of the GNU General Public License * + * as published by the Free Software Foundation; either version 2 * + * of the License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the Free Software Foundation * + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * + ******************************************************************************/ + +#ifndef CLUSTEREVENTS_H +#define CLUSTEREVENTS_H + +#include "icinga/checkable.hpp" +#include "icinga/host.hpp" +#include "icinga/checkcommand.hpp" +#include "icinga/eventcommand.hpp" +#include "icinga/notificationcommand.hpp" + +namespace icinga +{ + +/** + * @ingroup icinga + */ +class I2_ICINGA_API ClusterEvents +{ +public: + static void StaticInitialize(void); + + static void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr& origin); + static Value CheckResultAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); + + static void NextCheckChangedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin); + static Value NextCheckChangedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); + + static void NextNotificationChangedHandler(const Notification::Ptr& notification, const MessageOrigin::Ptr& origin); + static Value NextNotificationChangedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); + + static void ForceNextCheckChangedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin); + static Value ForceNextCheckChangedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); + + static void ForceNextNotificationChangedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin); + static Value ForceNextNotificationChangedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); + + static void CommentAddedHandler(const Checkable::Ptr& checkable, const Comment::Ptr& comment, const MessageOrigin::Ptr& origin); + static Value CommentAddedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); + + static void CommentRemovedHandler(const Checkable::Ptr& checkable, const Comment::Ptr& comment, const MessageOrigin::Ptr& origin); + static Value CommentRemovedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); + + static void DowntimeAddedHandler(const Checkable::Ptr& checkable, const Downtime::Ptr& downtime, const MessageOrigin::Ptr& origin); + static Value DowntimeAddedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); + + static void DowntimeRemovedHandler(const Checkable::Ptr& checkable, const Downtime::Ptr& downtime, const MessageOrigin::Ptr& origin); + static Value DowntimeRemovedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); + + static void AcknowledgementSetHandler(const Checkable::Ptr& checkable, const String& author, const String& comment, AcknowledgementType type, + bool notify, double expiry, const MessageOrigin::Ptr& origin); + static Value AcknowledgementSetAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); + + static void AcknowledgementClearedHandler(const Checkable::Ptr& checkable, const MessageOrigin::Ptr& origin); + static Value AcknowledgementClearedAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); + + static Value ExecuteCommandAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); + + static String GetRepositoryDir(void); + static void RepositoryTimerHandler(void); + static Value UpdateRepositoryAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); + + static Dictionary::Ptr MakeCheckResultMessage(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); +}; + +} + +#endif /* CLUSTEREVENTS_H */ diff --git a/lib/remote/CMakeLists.txt b/lib/remote/CMakeLists.txt index 66cc2eefe..8916b2c8f 100644 --- a/lib/remote/CMakeLists.txt +++ b/lib/remote/CMakeLists.txt @@ -26,7 +26,7 @@ set(remote_SOURCES apilistener-filesync.cpp apiuser.cpp apiuser.thpp authority.cpp base64.cpp configfileshandler.cpp configpackageshandler.cpp configpackageutility.cpp configobjectutility.cpp configstageshandler.cpp createobjecthandler.cpp deleteobjecthandler.cpp - endpoint.cpp endpoint.thpp filterutility.cpp + endpoint.cpp endpoint.thpp eventshandler.cpp eventqueue.cpp filterutility.cpp httpchunkedencoding.cpp httpclientconnection.cpp httpserverconnection.cpp httphandler.cpp httprequest.cpp httpresponse.cpp httputility.cpp jsonrpc.cpp jsonrpcconnection.cpp jsonrpcconnection-heartbeat.cpp messageorigin.cpp modifyobjecthandler.cpp statushandler.cpp objectqueryhandler.cpp typequeryhandler.cpp diff --git a/lib/remote/apilistener.cpp b/lib/remote/apilistener.cpp index 29da538e6..0b91b40cd 100644 --- a/lib/remote/apilistener.cpp +++ b/lib/remote/apilistener.cpp @@ -327,7 +327,6 @@ void ApiListener::NewClientHandlerInternal(const Socket::Ptr& client, const Stri Log(LogInformation, "ApiListener") << "New client connection for identity '" << identity << "'" << (verify_ok ? "" : " (unauthenticated)"); - if (verify_ok) endpoint = Endpoint::GetByName(identity); } else { diff --git a/lib/remote/eventqueue.cpp b/lib/remote/eventqueue.cpp new file mode 100644 index 000000000..eb4e581fc --- /dev/null +++ b/lib/remote/eventqueue.cpp @@ -0,0 +1,149 @@ +/****************************************************************************** + * Icinga 2 * + * Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org) * + * * + * This program is free software; you can redistribute it and/or * + * modify it under the terms of the GNU General Public License * + * as published by the Free Software Foundation; either version 2 * + * of the License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the Free Software Foundation * + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * + ******************************************************************************/ + +#include "remote/eventqueue.hpp" +#include "remote/filterutility.hpp" +#include "base/singleton.hpp" + +using namespace icinga; + +EventQueue::EventQueue(void) + : m_Filter(NULL) +{ } + +EventQueue::~EventQueue(void) +{ + delete m_Filter; +} + +bool EventQueue::CanProcessEvent(const String& type) const +{ + boost::mutex::scoped_lock lock(m_Mutex); + + return m_Types.find(type) != m_Types.end(); +} + +void EventQueue::ProcessEvent(const Dictionary::Ptr& event) +{ + ScriptFrame frame; + + if (!FilterUtility::EvaluateFilter(frame, m_Filter, event, "event")) + return; + + boost::mutex::scoped_lock lock(m_Mutex); + + typedef std::pair > kv_pair; + BOOST_FOREACH(kv_pair& kv, m_Events) { + kv.second.push_back(event); + } + + m_CV.notify_all(); +} + +void EventQueue::AddClient(void *client) +{ + boost::mutex::scoped_lock lock(m_Mutex); + + typedef std::map >::iterator it_type; + std::pair result = m_Events.insert(std::make_pair(client, std::deque())); + ASSERT(result.second); +} + +void EventQueue::RemoveClient(void *client) +{ + boost::mutex::scoped_lock lock(m_Mutex); + + m_Events.erase(client); +} + +void EventQueue::UnregisterIfUnused(const String& name, const EventQueue::Ptr& queue) +{ + boost::mutex::scoped_lock lock(queue->m_Mutex); + + if (queue->m_Events.empty()) + Unregister(name); +} + +void EventQueue::SetTypes(const std::set& types) +{ + boost::mutex::scoped_lock lock(m_Mutex); + m_Types = types; +} + +void EventQueue::SetFilter(Expression *filter) +{ + boost::mutex::scoped_lock lock(m_Mutex); + delete m_Filter; + m_Filter = filter; +} + +Dictionary::Ptr EventQueue::WaitForEvent(void *client, double timeout) +{ + boost::mutex::scoped_lock lock(m_Mutex); + + for (;;) { + std::map >::iterator it = m_Events.find(client); + ASSERT(it != m_Events.end()); + + if (!it->second.empty()) { + Dictionary::Ptr result = *it->second.begin(); + it->second.pop_front(); + return result; + } + + if (!m_CV.timed_wait(lock, boost::posix_time::milliseconds(timeout * 1000))) + return Dictionary::Ptr(); + } +} + + +std::vector EventQueue::GetQueuesForType(const String& type) +{ + EventQueueRegistry::ItemMap queues = EventQueueRegistry::GetInstance()->GetItems(); + + std::vector availQueues; + + typedef std::pair kv_pair; + BOOST_FOREACH(const kv_pair& kv, queues) { + if (kv.second->CanProcessEvent(type)) + availQueues.push_back(kv.second); + } + + return availQueues; +} + +EventQueue::Ptr EventQueue::GetByName(const String& name) +{ + return EventQueueRegistry::GetInstance()->GetItem(name); +} + +void EventQueue::Register(const String& name, const EventQueue::Ptr& function) +{ + EventQueueRegistry::GetInstance()->Register(name, function); +} + +void EventQueue::Unregister(const String& name) +{ + EventQueueRegistry::GetInstance()->Unregister(name); +} + +EventQueueRegistry *EventQueueRegistry::GetInstance(void) +{ + return Singleton::GetInstance(); +} diff --git a/lib/remote/eventqueue.hpp b/lib/remote/eventqueue.hpp new file mode 100644 index 000000000..9a6a4c822 --- /dev/null +++ b/lib/remote/eventqueue.hpp @@ -0,0 +1,85 @@ +/****************************************************************************** + * Icinga 2 * + * Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org) * + * * + * This program is free software; you can redistribute it and/or * + * modify it under the terms of the GNU General Public License * + * as published by the Free Software Foundation; either version 2 * + * of the License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the Free Software Foundation * + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * + ******************************************************************************/ + +#ifndef EVENTQUEUE_H +#define EVENTQUEUE_H + +#include "remote/httphandler.hpp" +#include "base/object.hpp" +#include "config/expression.hpp" +#include +#include +#include +#include +#include +#include + +namespace icinga +{ + +class I2_REMOTE_API EventQueue : public Object +{ +public: + DECLARE_PTR_TYPEDEFS(EventQueue); + + EventQueue(void); + ~EventQueue(void); + + bool CanProcessEvent(const String& type) const; + void ProcessEvent(const Dictionary::Ptr& event); + void AddClient(void *client); + void RemoveClient(void *client); + + void SetTypes(const std::set& types); + void SetFilter(Expression *filter); + + Dictionary::Ptr WaitForEvent(void *client, double timeout = 5); + + static std::vector GetQueuesForType(const String& type); + static void UnregisterIfUnused(const String& name, const EventQueue::Ptr& queue); + + static EventQueue::Ptr GetByName(const String& name); + static void Register(const String& name, const EventQueue::Ptr& function); + static void Unregister(const String& name); + +private: + mutable boost::mutex m_Mutex; + boost::condition_variable m_CV; + + std::set m_Types; + Expression *m_Filter; + double m_Ttl; + + std::map > m_Events; +}; + +/** + * A registry for API event queues. + * + * @ingroup base + */ +class I2_REMOTE_API EventQueueRegistry : public Registry +{ +public: + static EventQueueRegistry *GetInstance(void); +}; + +} + +#endif /* EVENTQUEUE_H */ diff --git a/lib/remote/eventshandler.cpp b/lib/remote/eventshandler.cpp new file mode 100644 index 000000000..7d1beb807 --- /dev/null +++ b/lib/remote/eventshandler.cpp @@ -0,0 +1,119 @@ +/****************************************************************************** + * Icinga 2 * + * Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org) * + * * + * This program is free software; you can redistribute it and/or * + * modify it under the terms of the GNU General Public License * + * as published by the Free Software Foundation; either version 2 * + * of the License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the Free Software Foundation * + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * + ******************************************************************************/ + +#include "remote/eventshandler.hpp" +#include "remote/httputility.hpp" +#include "remote/filterutility.hpp" +#include "config/configcompiler.hpp" +#include "config/expression.hpp" +#include "base/objectlock.hpp" +#include "base/json.hpp" +#include +#include + +using namespace icinga; + +REGISTER_URLHANDLER("/v1/events", EventsHandler); + +bool EventsHandler::HandleRequest(const ApiUser::Ptr& user, HttpRequest& request, HttpResponse& response) +{ + if (request.RequestUrl->GetPath().size() != 2) + return false; + + if (request.RequestMethod != "POST") + return false; + + if (request.ProtocolVersion == HttpVersion10) { + HttpUtility::SendJsonError(response, 400, "HTTP/1.0 not supported for event streams."); + return true; + } + + Dictionary::Ptr params = HttpUtility::FetchRequestParameters(request); + + Array::Ptr types = params->Get("types"); + + if (!types) { + HttpUtility::SendJsonError(response, 400, "'types' query parameter is required."); + return true; + } + + { + ObjectLock olock(types); + BOOST_FOREACH(const String& type, types) { + FilterUtility::CheckPermission(user, "events/" + type); + } + } + + String queueName = HttpUtility::GetLastParameter(params, "queue"); + + if (queueName.IsEmpty()) { + HttpUtility::SendJsonError(response, 400, "'queue' query parameter is required."); + return true; + } + + String filter = HttpUtility::GetLastParameter(params, "filter"); + + Expression *ufilter = NULL; + + if (!filter.IsEmpty()) + ufilter = ConfigCompiler::CompileText("", filter); + + /* create a new queue or update an existing one */ + EventQueue::Ptr queue = EventQueue::GetByName(queueName); + + if (!queue) { + queue = new EventQueue(); + EventQueue::Register(queueName, queue); + } + + queue->SetTypes(types->ToSet()); + queue->SetFilter(ufilter); + + queue->AddClient(&request); + + response.SetStatus(200, "OK"); + response.AddHeader("Content-Type", "application/json"); + + for (;;) { + Dictionary::Ptr result = queue->WaitForEvent(&request); + + if (!response.IsPeerConnected()) { + queue->RemoveClient(&request); + EventQueue::UnregisterIfUnused(queueName, queue); + return true; + } + + if (!result) + continue; + + String body = JsonEncode(result); + + boost::algorithm::replace_all(body, "\n", ""); + + try { + response.WriteBody(body.CStr(), body.GetLength()); + response.WriteBody("\n", 1); + } catch (const std::exception&) { + queue->RemoveClient(&request); + EventQueue::UnregisterIfUnused(queueName, queue); + throw; + } + } +} + diff --git a/lib/remote/eventshandler.hpp b/lib/remote/eventshandler.hpp new file mode 100644 index 000000000..75c82c083 --- /dev/null +++ b/lib/remote/eventshandler.hpp @@ -0,0 +1,39 @@ +/****************************************************************************** + * Icinga 2 * + * Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org) * + * * + * This program is free software; you can redistribute it and/or * + * modify it under the terms of the GNU General Public License * + * as published by the Free Software Foundation; either version 2 * + * of the License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the Free Software Foundation * + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * + ******************************************************************************/ + +#ifndef EVENTSHANDLER_H +#define EVENTSHANDLER_H + +#include "remote/httphandler.hpp" +#include "remote/eventqueue.hpp" + +namespace icinga +{ + +class I2_REMOTE_API EventsHandler : public HttpHandler +{ +public: + DECLARE_PTR_TYPEDEFS(EventsHandler); + + virtual bool HandleRequest(const ApiUser::Ptr& user, HttpRequest& request, HttpResponse& response) override; +}; + +} + +#endif /* EVENTSHANDLER_H */ diff --git a/lib/remote/filterutility.cpp b/lib/remote/filterutility.cpp index f48510699..8ae5a575a 100644 --- a/lib/remote/filterutility.cpp +++ b/lib/remote/filterutility.cpp @@ -90,13 +90,19 @@ String ConfigObjectTargetProvider::GetPluralName(const String& type) const return Type::GetByName(type)->GetPluralName(); } -static bool EvaluateFilter(ScriptFrame& frame, Expression *filter, const Object::Ptr& target) +bool FilterUtility::EvaluateFilter(ScriptFrame& frame, Expression *filter, + const Object::Ptr& target, const String& variableName) { if (!filter) return true; Type::Ptr type = target->GetReflectionType(); - String varName = type->GetName().ToLower(); + String varName; + + if (variableName.IsEmpty()) + varName = type->GetName().ToLower(); + else + varName = variableName; Dictionary::Ptr vars; @@ -128,7 +134,7 @@ static bool EvaluateFilter(ScriptFrame& frame, Expression *filter, const Object: static void FilteredAddTarget(ScriptFrame& permissionFrame, Expression *permissionFilter, ScriptFrame& frame, Expression *ufilter, std::vector& result, const Object::Ptr& target) { - if (EvaluateFilter(permissionFrame, permissionFilter, target) && EvaluateFilter(frame, ufilter, target)) + if (FilterUtility::EvaluateFilter(permissionFrame, permissionFilter, target) && FilterUtility::EvaluateFilter(frame, ufilter, target)) result.push_back(target); } @@ -206,7 +212,7 @@ std::vector FilterUtility::GetFilterTargets(const QueryDescription& qd, c if (query->Contains(attr)) { Object::Ptr target = provider->GetTargetByName(type, HttpUtility::GetLastParameter(query, attr)); - if (EvaluateFilter(permissionFrame, permissionFilter, target)) + if (FilterUtility::EvaluateFilter(permissionFrame, permissionFilter, target)) result.push_back(target); } @@ -220,7 +226,7 @@ std::vector FilterUtility::GetFilterTargets(const QueryDescription& qd, c BOOST_FOREACH(const String& name, names) { Object::Ptr target = provider->GetTargetByName(type, name); - if (EvaluateFilter(permissionFrame, permissionFilter, target)) + if (FilterUtility::EvaluateFilter(permissionFrame, permissionFilter, target)) result.push_back(target); } } diff --git a/lib/remote/filterutility.hpp b/lib/remote/filterutility.hpp index f2b13260d..dc9f994ec 100644 --- a/lib/remote/filterutility.hpp +++ b/lib/remote/filterutility.hpp @@ -70,6 +70,8 @@ public: static Type::Ptr TypeFromPluralName(const String& pluralName); static void CheckPermission(const ApiUser::Ptr& user, const String& permission, Expression **filter = NULL); static std::vector GetFilterTargets(const QueryDescription& qd, const Dictionary::Ptr& query, const ApiUser::Ptr& user); + static bool EvaluateFilter(ScriptFrame& frame, Expression *filter, + const Object::Ptr& target, const String& variableName = String()); }; } diff --git a/lib/remote/httpresponse.cpp b/lib/remote/httpresponse.cpp index c10cc0e6a..dbd841e2a 100644 --- a/lib/remote/httpresponse.cpp +++ b/lib/remote/httpresponse.cpp @@ -241,3 +241,7 @@ size_t HttpResponse::ReadBody(char *data, size_t count) return m_Body->Read(data, count, true); } +bool HttpResponse::IsPeerConnected(void) const +{ + return !m_Stream->IsEof(); +} diff --git a/lib/remote/httpresponse.hpp b/lib/remote/httpresponse.hpp index 94c11d3f0..74b50f7da 100644 --- a/lib/remote/httpresponse.hpp +++ b/lib/remote/httpresponse.hpp @@ -61,6 +61,8 @@ public: void WriteBody(const char *data, size_t count); void Finish(void); + bool IsPeerConnected(void) const; + private: HttpResponseState m_State; boost::shared_ptr m_ChunkContext; diff --git a/lib/remote/httputility.hpp b/lib/remote/httputility.hpp index f560723a3..515491507 100644 --- a/lib/remote/httputility.hpp +++ b/lib/remote/httputility.hpp @@ -40,7 +40,7 @@ public: static void SendJsonBody(HttpResponse& response, const Value& val); static Value GetLastParameter(const Dictionary::Ptr& params, const String& key); static void SendJsonError(HttpResponse& response, const int code, - const String& verbose="", const String& diagnosticInformation=""); + const String& verbose = String(), const String& diagnosticInformation = String()); private: static String GetErrorNameByCode(int code);