diff --git a/base/dictionary.cpp b/base/dictionary.cpp index 8f42647b1..6fcfeaf59 100644 --- a/base/dictionary.cpp +++ b/base/dictionary.cpp @@ -175,6 +175,24 @@ void Dictionary::Remove(Dictionary::Iterator it) m_Data.erase(it); } +/** + * Makes a shallow copy of a dictionary. + * + * @returns a copy of the dictionary. + */ +Dictionary::Ptr Dictionary::ShallowClone(void) const +{ + Dictionary::Ptr clone = boost::make_shared(); + + String key; + Value value; + BOOST_FOREACH(tie(key, value), m_Data) { + clone->Set(key, value); + } + + return clone; +} + /** * Converts a JSON object to a dictionary. * diff --git a/base/dictionary.h b/base/dictionary.h index bdfd2af61..43dbc61c9 100644 --- a/base/dictionary.h +++ b/base/dictionary.h @@ -50,6 +50,8 @@ public: void Remove(const String& key); void Remove(Iterator it); + Dictionary::Ptr ShallowClone(void) const; + static Dictionary::Ptr FromJson(cJSON *json); cJSON *ToJson(void) const; diff --git a/base/dynamicobject.cpp b/base/dynamicobject.cpp index b7faafccf..60471c288 100644 --- a/base/dynamicobject.cpp +++ b/base/dynamicobject.cpp @@ -145,6 +145,11 @@ void DynamicObject::Set(const String& name, const Value& data) InternalSetAttribute(name, data, GetCurrentTx()); } +void DynamicObject::Touch(const String& name) +{ + InternalSetAttribute(name, InternalGetAttribute(name), GetCurrentTx()); +} + Value DynamicObject::Get(const String& name) const { return InternalGetAttribute(name); diff --git a/base/dynamicobject.h b/base/dynamicobject.h index c85a006ca..6ff8b77ac 100644 --- a/base/dynamicobject.h +++ b/base/dynamicobject.h @@ -79,6 +79,7 @@ public: void RegisterAttribute(const String& name, DynamicAttributeType type); void Set(const String& name, const Value& data); + void Touch(const String& name); Value Get(const String& name) const; bool HasAttribute(const String& name) const; @@ -162,8 +163,11 @@ shared_ptr DynamicObjectFactory(const Dictionary::Ptr& serializedUpdate) return boost::make_shared(serializedUpdate); } +#define REGISTER_CLASS_ALIAS(klass, alias) \ + static RegisterClassHelper g_Register ## klass(alias, DynamicObjectFactory) + #define REGISTER_CLASS(klass) \ - static RegisterClassHelper g_Register ## klass(#klass, DynamicObjectFactory) + REGISTER_CLASS_ALIAS(klass, #klass) } diff --git a/base/process.cpp b/base/process.cpp index e8abd61f2..1a8e8aed8 100644 --- a/base/process.cpp +++ b/base/process.cpp @@ -33,6 +33,8 @@ condition_variable Process::m_TasksCV; Process::Process(const String& command) : AsyncTask(), m_Command(command), m_UsePopen(false) { + assert(Application::IsMainThread()); + if (!m_ThreadCreated) { thread t(&Process::WorkerThreadProc); t.detach(); diff --git a/components/Makefile.am b/components/Makefile.am index edf2f2952..7b1c745fb 100644 --- a/components/Makefile.am +++ b/components/Makefile.am @@ -7,5 +7,4 @@ SUBDIRS = \ compat \ convenience \ delegation \ - demo \ - discovery + demo diff --git a/components/checker/checkercomponent.cpp b/components/checker/checkercomponent.cpp index ed98ee86d..648373735 100644 --- a/components/checker/checkercomponent.cpp +++ b/components/checker/checkercomponent.cpp @@ -23,14 +23,12 @@ using namespace icinga; void CheckerComponent::Start(void) { - m_Endpoint = boost::make_shared(); + m_Endpoint = Endpoint::MakeEndpoint("checker", true); /* dummy registration so the delegation module knows this is a checker TODO: figure out a better way for this */ m_Endpoint->RegisterSubscription("checker"); - EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint); - Service::OnCheckerChanged.connect(bind(&CheckerComponent::CheckerChangedHandler, this, _1)); DynamicObject::OnUnregistered.connect(bind(&CheckerComponent::ServiceRemovedHandler, this, _1)); @@ -50,10 +48,7 @@ void CheckerComponent::Start(void) void CheckerComponent::Stop(void) { - EndpointManager::Ptr mgr = EndpointManager::GetInstance(); - - if (mgr) - mgr->UnregisterEndpoint(m_Endpoint); + m_Endpoint->Unregister(); } void CheckerComponent::CheckTimerHandler(void) @@ -158,7 +153,7 @@ void CheckerComponent::CheckerChangedHandler(const Service::Ptr& service) { String checker = service->GetChecker(); - if (checker == EndpointManager::GetInstance()->GetIdentity() || checker == m_Endpoint->GetIdentity()) { + if (checker == EndpointManager::GetInstance()->GetIdentity() || checker == m_Endpoint->GetName()) { if (m_PendingServices.find(service) != m_PendingServices.end()) return; diff --git a/components/checker/checkercomponent.h b/components/checker/checkercomponent.h index 8e59f6131..b54d5d97f 100644 --- a/components/checker/checkercomponent.h +++ b/components/checker/checkercomponent.h @@ -54,7 +54,7 @@ public: virtual void Stop(void); private: - VirtualEndpoint::Ptr m_Endpoint; + Endpoint::Ptr m_Endpoint; ServiceSet m_IdleServices; ServiceSet m_PendingServices; diff --git a/components/cibsync/cibsynccomponent.cpp b/components/cibsync/cibsynccomponent.cpp index a44a168fb..ed66ef43e 100644 --- a/components/cibsync/cibsynccomponent.cpp +++ b/components/cibsync/cibsynccomponent.cpp @@ -26,18 +26,14 @@ using namespace icinga; */ void CIBSyncComponent::Start(void) { - m_Endpoint = boost::make_shared(); - - /* config objects */ - m_Endpoint->RegisterTopicHandler("config::FetchObjects", - boost::bind(&CIBSyncComponent::FetchObjectsHandler, this, _2)); + m_Endpoint = Endpoint::MakeEndpoint("cibsync", true); DynamicObject::OnRegistered.connect(boost::bind(&CIBSyncComponent::LocalObjectRegisteredHandler, this, _1)); DynamicObject::OnUnregistered.connect(boost::bind(&CIBSyncComponent::LocalObjectUnregisteredHandler, this, _1)); DynamicObject::OnTransactionClosing.connect(boost::bind(&CIBSyncComponent::TransactionClosingHandler, this, _1)); - EndpointManager::GetInstance()->OnNewEndpoint.connect(boost::bind(&CIBSyncComponent::NewEndpointHandler, this, _2)); - + Endpoint::OnConnected.connect(boost::bind(&CIBSyncComponent::EndpointConnectedHandler, this, _1)); + m_Endpoint->RegisterTopicHandler("config::ObjectUpdate", boost::bind(&CIBSyncComponent::RemoteObjectUpdateHandler, this, _2, _3)); m_Endpoint->RegisterTopicHandler("config::ObjectRemoved", @@ -46,8 +42,6 @@ void CIBSyncComponent::Start(void) /* service status */ m_Endpoint->RegisterTopicHandler("checker::ServiceStateChange", boost::bind(&CIBSyncComponent::ServiceStateChangeRequestHandler, _2, _3)); - - EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint); } /** @@ -55,10 +49,7 @@ void CIBSyncComponent::Start(void) */ void CIBSyncComponent::Stop(void) { - EndpointManager::Ptr endpointManager = EndpointManager::GetInstance(); - - if (endpointManager) - endpointManager->UnregisterEndpoint(m_Endpoint); + m_Endpoint->Unregister(); } void CIBSyncComponent::ServiceStateChangeRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request) @@ -84,21 +75,28 @@ void CIBSyncComponent::ServiceStateChangeRequestHandler(const Endpoint::Ptr& sen CIB::UpdateTaskStatistics(now, 1); } -void CIBSyncComponent::NewEndpointHandler(const Endpoint::Ptr& endpoint) +void CIBSyncComponent::EndpointConnectedHandler(const Endpoint::Ptr& endpoint) { /* no need to sync the config with local endpoints */ - if (endpoint->IsLocal()) + if (endpoint->IsLocalEndpoint()) return; - endpoint->OnSessionEstablished.connect(boost::bind(&CIBSyncComponent::SessionEstablishedHandler, this, _1)); -} + /* we just assume the other endpoint wants object updates */ + endpoint->RegisterSubscription("config::ObjectUpdate"); + endpoint->RegisterSubscription("config::ObjectRemoved"); -void CIBSyncComponent::SessionEstablishedHandler(const Endpoint::Ptr& endpoint) -{ - RequestMessage request; - request.SetMethod("config::FetchObjects"); + pair trange = DynamicObject::GetTypes(); + DynamicObject::TypeMap::iterator tt; + for (tt = trange.first; tt != trange.second; tt++) { + DynamicObject::Ptr object; + BOOST_FOREACH(tie(tuples::ignore, object), tt->second) { + if (!ShouldReplicateObject(object)) + continue; - EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, endpoint, request); + RequestMessage request = MakeObjectMessage(object, "config::ObjectUpdate", 0, true); + EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, endpoint, request); + } + } } RequestMessage CIBSyncComponent::MakeObjectMessage(const DynamicObject::Ptr& object, const String& method, double sinceTx, bool includeProperties) @@ -123,23 +121,6 @@ bool CIBSyncComponent::ShouldReplicateObject(const DynamicObject::Ptr& object) return (!object->IsLocal()); } -void CIBSyncComponent::FetchObjectsHandler(const Endpoint::Ptr& sender) -{ - pair trange = DynamicObject::GetTypes(); - DynamicObject::TypeMap::iterator tt; - for (tt = trange.first; tt != trange.second; tt++) { - DynamicObject::Ptr object; - BOOST_FOREACH(tie(tuples::ignore, object), tt->second) { - if (!ShouldReplicateObject(object)) - continue; - - RequestMessage request = MakeObjectMessage(object, "config::ObjectUpdate", 0, true); - - EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, sender, request); - } - } -} - void CIBSyncComponent::LocalObjectRegisteredHandler(const DynamicObject::Ptr& object) { if (!ShouldReplicateObject(object)) @@ -212,7 +193,7 @@ void CIBSyncComponent::RemoteObjectUpdateHandler(const Endpoint::Ptr& sender, co } if (object->GetSource().IsEmpty()) - object->SetSource(sender->GetIdentity()); + object->SetSource(sender->GetName()); object->Register(); } else { diff --git a/components/cibsync/cibsynccomponent.h b/components/cibsync/cibsynccomponent.h index 56456682f..89fae5513 100644 --- a/components/cibsync/cibsynccomponent.h +++ b/components/cibsync/cibsynccomponent.h @@ -33,18 +33,16 @@ public: virtual void Stop(void); private: - VirtualEndpoint::Ptr m_Endpoint; + Endpoint::Ptr m_Endpoint; static void ServiceStateChangeRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request); - void NewEndpointHandler(const Endpoint::Ptr& endpoint); - void SessionEstablishedHandler(const Endpoint::Ptr& endpoint); + void EndpointConnectedHandler(const Endpoint::Ptr& endpoint); void LocalObjectRegisteredHandler(const DynamicObject::Ptr& object); void LocalObjectUnregisteredHandler(const DynamicObject::Ptr& object); void TransactionClosingHandler(const set& modifiedObjects); - void FetchObjectsHandler(const Endpoint::Ptr& sender); void RemoteObjectUpdateHandler(const Endpoint::Ptr& sender, const RequestMessage& request); void RemoteObjectRemovedHandler(const RequestMessage& request); diff --git a/components/delegation/delegationcomponent.cpp b/components/delegation/delegationcomponent.cpp index 560dac591..7acbccd82 100644 --- a/components/delegation/delegationcomponent.cpp +++ b/components/delegation/delegationcomponent.cpp @@ -40,9 +40,9 @@ vector DelegationComponent::GetCheckerCandidates(const Service::P { vector candidates; - EndpointManager::Iterator it; - for (it = EndpointManager::GetInstance()->Begin(); it != EndpointManager::GetInstance()->End(); it++) { - Endpoint::Ptr endpoint = it->second; + DynamicObject::Ptr object; + BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) { + Endpoint::Ptr endpoint = dynamic_pointer_cast(object); /* ignore disconnected endpoints */ if (!endpoint->IsConnected()) @@ -53,7 +53,7 @@ vector DelegationComponent::GetCheckerCandidates(const Service::P continue; /* ignore endpoints that aren't allowed to check this service */ - if (!service->IsAllowedChecker(it->first)) + if (!service->IsAllowedChecker(endpoint->GetName())) continue; candidates.push_back(endpoint); @@ -66,14 +66,16 @@ void DelegationComponent::DelegationTimerHandler(void) { map histogram; - EndpointManager::Iterator eit; - for (eit = EndpointManager::GetInstance()->Begin(); eit != EndpointManager::GetInstance()->End(); eit++) - histogram[eit->second] = 0; + DynamicObject::Ptr object; + BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) { + Endpoint::Ptr endpoint = dynamic_pointer_cast(object); + + histogram[endpoint] = 0; + } vector services; /* build "checker -> service count" histogram */ - DynamicObject::Ptr object; BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Service")) { Service::Ptr service = dynamic_pointer_cast(object); @@ -86,10 +88,11 @@ void DelegationComponent::DelegationTimerHandler(void) if (checker.IsEmpty()) continue; - Endpoint::Ptr endpoint = EndpointManager::GetInstance()->GetEndpointByIdentity(checker); - if (!endpoint) + if (!Endpoint::Exists(checker)) continue; + Endpoint::Ptr endpoint = Endpoint::GetByName(checker); + histogram[endpoint]++; } @@ -102,8 +105,8 @@ void DelegationComponent::DelegationTimerHandler(void) String checker = service->GetChecker(); Endpoint::Ptr oldEndpoint; - if (!checker.IsEmpty()) - oldEndpoint = EndpointManager::GetInstance()->GetEndpointByIdentity(checker); + if (Endpoint::Exists(checker)) + oldEndpoint = Endpoint::GetByName(checker); vector candidates = GetCheckerCandidates(service); @@ -146,7 +149,7 @@ void DelegationComponent::DelegationTimerHandler(void) if (histogram[candidate] > avg_services) continue; - service->SetChecker(candidate->GetIdentity()); + service->SetChecker(candidate->GetName()); histogram[candidate]++; delegated++; @@ -161,7 +164,7 @@ void DelegationComponent::DelegationTimerHandler(void) int count; BOOST_FOREACH(tie(endpoint, count), histogram) { stringstream msgbuf; - msgbuf << "histogram: " << endpoint->GetIdentity() << " - " << count; + msgbuf << "histogram: " << endpoint->GetName() << " - " << count; Logger::Write(LogInformation, "delegation", msgbuf.str()); } diff --git a/components/demo/democomponent.cpp b/components/demo/democomponent.cpp index edeae95c2..2474ed9b1 100644 --- a/components/demo/democomponent.cpp +++ b/components/demo/democomponent.cpp @@ -26,10 +26,9 @@ using namespace icinga; */ void DemoComponent::Start(void) { - m_Endpoint = boost::make_shared(); + m_Endpoint = Endpoint::MakeEndpoint("demo", true); m_Endpoint->RegisterTopicHandler("demo::HelloWorld", boost::bind(&DemoComponent::HelloWorldRequestHandler, this, _2, _3)); - EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint); m_DemoTimer = boost::make_shared(); m_DemoTimer->SetInterval(5); @@ -42,10 +41,7 @@ void DemoComponent::Start(void) */ void DemoComponent::Stop(void) { - EndpointManager::Ptr endpointManager = EndpointManager::GetInstance(); - - if (endpointManager) - endpointManager->UnregisterEndpoint(m_Endpoint); + m_Endpoint->Unregister(); } /** @@ -68,7 +64,7 @@ void DemoComponent::DemoTimerHandler(void) */ void DemoComponent::HelloWorldRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request) { - Logger::Write(LogInformation, "demo", "Got 'hello world' from address=" + sender->GetAddress() + ", identity=" + sender->GetIdentity()); + Logger::Write(LogInformation, "demo", "Got 'hello world' from address=" + sender->GetAddress() + ", identity=" + sender->GetName()); } EXPORT_COMPONENT(demo, DemoComponent); diff --git a/components/demo/democomponent.h b/components/demo/democomponent.h index 6eb93af5c..181c8ae09 100644 --- a/components/demo/democomponent.h +++ b/components/demo/democomponent.h @@ -34,7 +34,7 @@ public: private: Timer::Ptr m_DemoTimer; - VirtualEndpoint::Ptr m_Endpoint; + Endpoint::Ptr m_Endpoint; void DemoTimerHandler(void); void HelloWorldRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request); diff --git a/components/discovery/Makefile.am b/components/discovery/Makefile.am deleted file mode 100644 index 7a41350a9..000000000 --- a/components/discovery/Makefile.am +++ /dev/null @@ -1,35 +0,0 @@ -## Process this file with automake to produce Makefile.in - -pkglib_LTLIBRARIES = \ - discovery.la - -discovery_la_SOURCES = \ - discoverycomponent.cpp \ - discoverycomponent.h \ - discoverymessage.cpp \ - discoverymessage.h \ - i2-discovery.h - -discovery_la_CPPFLAGS = \ - $(BOOST_CPPFLAGS) \ - -I${top_srcdir}/base \ - -I${top_srcdir}/dyn \ - -I${top_srcdir}/jsonrpc \ - -I${top_srcdir}/icinga \ - -I${top_srcdir}/cib - -discovery_la_LDFLAGS = \ - $(BOOST_LDFLAGS) \ - -module \ - -no-undefined \ - @RELEASE_INFO@ \ - @VERSION_INFO@ - -discovery_la_LIBADD = \ - $(BOOST_SIGNALS_LIB) \ - $(BOOST_THREAD_LIB) \ - ${top_builddir}/base/libbase.la \ - ${top_builddir}/dyn/libdyn.la \ - ${top_builddir}/jsonrpc/libjsonrpc.la \ - ${top_builddir}/icinga/libicinga.la \ - ${top_builddir}/cib/libcib.la diff --git a/components/discovery/discovery.vcxproj b/components/discovery/discovery.vcxproj deleted file mode 100644 index ba7b39340..000000000 --- a/components/discovery/discovery.vcxproj +++ /dev/null @@ -1,94 +0,0 @@ - - - - - Debug - Win32 - - - Release - Win32 - - - - {EAD41628-BB96-4F99-9070-8A9676801295} - Win32Proj - discovery - - - - DynamicLibrary - true - MultiByte - - - DynamicLibrary - false - true - MultiByte - - - - - - - - - - - - - true - $(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\icinga;$(SolutionDir)\cib;$(SolutionDir)\dyn;$(IncludePath) - $(OutDir);$(LibraryPath) - - - false - $(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\icinga;$(SolutionDir)\cib;$(SolutionDir)\dyn;$(IncludePath) - $(OutDir);$(LibraryPath) - - - - Disabled - WIN32;_DEBUG;_WINDOWS;_USRDLL;DISCOVERY_EXPORTS;%(PreprocessorDefinitions) - Level3 - false - true - - - Windows - true - base.lib;jsonrpc.lib;icinga.lib;cib.lib;%(AdditionalDependencies) - - - - - MaxSpeed - true - true - WIN32;NDEBUG;_WINDOWS;_USRDLL;DISCOVERY_EXPORTS;%(PreprocessorDefinitions) - Level3 - false - true - - - Windows - true - true - true - base.lib;jsonrpc.lib;icinga.lib;cib.lib;%(AdditionalDependencies) - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/components/discovery/discovery.vcxproj.filters b/components/discovery/discovery.vcxproj.filters deleted file mode 100644 index a356d8a92..000000000 --- a/components/discovery/discovery.vcxproj.filters +++ /dev/null @@ -1,30 +0,0 @@ - - - - - Quelldateien - - - Quelldateien - - - - - Headerdateien - - - Headerdateien - - - Headerdateien - - - - - {53341f7e-6bad-4cf1-92cf-be906efe1704} - - - {c7b2deba-743b-4449-ae46-0b7ba1b1350a} - - - \ No newline at end of file diff --git a/components/discovery/discoverycomponent.cpp b/components/discovery/discoverycomponent.cpp deleted file mode 100644 index cdbfddb88..000000000 --- a/components/discovery/discoverycomponent.cpp +++ /dev/null @@ -1,448 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#include "i2-discovery.h" - -using namespace icinga; - -/** - * Starts the discovery component. - */ -void DiscoveryComponent::Start(void) -{ - m_Endpoint = boost::make_shared(); - - m_Endpoint->RegisterTopicHandler("discovery::RegisterComponent", - boost::bind(&DiscoveryComponent::RegisterComponentMessageHandler, this, _2, _3)); - - m_Endpoint->RegisterTopicHandler("discovery::NewComponent", - boost::bind(&DiscoveryComponent::NewComponentMessageHandler, this, _3)); - - m_Endpoint->RegisterTopicHandler("discovery::Welcome", - boost::bind(&DiscoveryComponent::WelcomeMessageHandler, this, _2, _3)); - - EndpointManager::GetInstance()->ForEachEndpoint(boost::bind(&DiscoveryComponent::NewEndpointHandler, this, _2)); - EndpointManager::GetInstance()->OnNewEndpoint.connect(boost::bind(&DiscoveryComponent::NewEndpointHandler, this, _2)); - - EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint); - - /* create the reconnect timer */ - m_DiscoveryTimer = boost::make_shared(); - m_DiscoveryTimer->SetInterval(30); - m_DiscoveryTimer->OnTimerExpired.connect(boost::bind(&DiscoveryComponent::DiscoveryTimerHandler, this)); - m_DiscoveryTimer->Start(); - - /* call the timer as soon as possible */ - m_DiscoveryTimer->Reschedule(0); -} - -/** - * Stops the discovery component. - */ -void DiscoveryComponent::Stop(void) -{ - EndpointManager::Ptr mgr = EndpointManager::GetInstance(); - - if (mgr) - mgr->UnregisterEndpoint(m_Endpoint); -} - -/** - * Checks whether the specified endpoint is already connected - * and disconnects older endpoints. - * - * @param self The endpoint that is to be checked. - * @param other The other endpoint. - */ -void DiscoveryComponent::CheckExistingEndpoint(const Endpoint::Ptr& self, const Endpoint::Ptr& other) -{ - if (self == other) - return; - - if (!other->IsConnected()) - return; - - if (self->GetIdentity() == other->GetIdentity()) { - Logger::Write(LogWarning, "discovery", "Detected duplicate identity:" + other->GetIdentity() + " - Disconnecting old endpoint."); - - other->Stop(); - EndpointManager::GetInstance()->UnregisterEndpoint(other); - } -} - -/** - * Deals with a new endpoint. - * - * @param endpoint The endpoint. - */ -void DiscoveryComponent::NewEndpointHandler(const Endpoint::Ptr& endpoint) -{ - /* immediately finish session setup for local endpoints */ - if (endpoint->IsLocal()) { - endpoint->OnSessionEstablished(endpoint); - return; - } - - String identity = endpoint->GetIdentity(); - - if (identity == EndpointManager::GetInstance()->GetIdentity()) { - Logger::Write(LogWarning, "discovery", "Detected loop-back connection - Disconnecting endpoint."); - - endpoint->Stop(); - EndpointManager::GetInstance()->UnregisterEndpoint(endpoint); - - return; - } - - EndpointManager::GetInstance()->ForEachEndpoint(boost::bind(&DiscoveryComponent::CheckExistingEndpoint, this, endpoint, _2)); - - // we assume the other component _always_ wants - // discovery::RegisterComponent messages from us - endpoint->RegisterSubscription("discovery::RegisterComponent"); - - // send a discovery::RegisterComponent message, if the - // other component is a broker this makes sure - // the broker knows about our message types - SendDiscoveryMessage("discovery::RegisterComponent", EndpointManager::GetInstance()->GetIdentity(), endpoint); - - map::iterator ic; - - // we assume the other component _always_ wants - // discovery::NewComponent messages from us - endpoint->RegisterSubscription("discovery::NewComponent"); - - // send discovery::NewComponent message for ourselves - SendDiscoveryMessage("discovery::NewComponent", EndpointManager::GetInstance()->GetIdentity(), endpoint); - - // send discovery::NewComponent messages for all components - // we know about - for (ic = m_Components.begin(); ic != m_Components.end(); ic++) { - SendDiscoveryMessage("discovery::NewComponent", ic->first, endpoint); - } - - // check if we already know the other component - ic = m_Components.find(endpoint->GetIdentity()); - - if (ic == m_Components.end()) { - // we don't know the other component yet, so - // wait until we get a discovery::NewComponent message - // from a broker - return; - } - - // register published/subscribed topics for this endpoint - ComponentDiscoveryInfo::Ptr info = ic->second; - BOOST_FOREACH(String subscription, info->Subscriptions) { - endpoint->RegisterSubscription(subscription); - } - - FinishDiscoverySetup(endpoint); -} - -/** - * Registers message Subscriptions/sources in the specified component information object. - * - * @param neea Event arguments for the endpoint. - * @param info Component information object. - * @return 0 - */ -void DiscoveryComponent::DiscoveryEndpointHandler(const Endpoint::Ptr& endpoint, const ComponentDiscoveryInfo::Ptr& info) const -{ - Endpoint::ConstTopicIterator i; - - for (i = endpoint->BeginSubscriptions(); i != endpoint->EndSubscriptions(); i++) - info->Subscriptions.insert(*i); -} - -/** - * Retrieves the component information object for the specified component. - * - * @param component The identity of the component. - * @param info Pointer to the information object. - * @returns true if the info object was successfully retrieved, false otherwise. - */ -bool DiscoveryComponent::GetComponentDiscoveryInfo(String component, ComponentDiscoveryInfo::Ptr *info) const -{ - if (component == EndpointManager::GetInstance()->GetIdentity()) { - /* Build fake discovery info for ourselves */ - *info = boost::make_shared(); - EndpointManager::GetInstance()->ForEachEndpoint(boost::bind(&DiscoveryComponent::DiscoveryEndpointHandler, this, _2, *info)); - - (*info)->LastSeen = 0; - (*info)->Node = IcingaApplication::GetInstance()->GetNode(); - (*info)->Service = IcingaApplication::GetInstance()->GetService(); - - return true; - } - - map::const_iterator i; - - i = m_Components.find(component); - - if (i == m_Components.end()) - return false; - - *info = i->second; - return true; -} - -/** - * Processes discovery::Welcome messages. - * - * @param nrea Event arguments for the request. - * @returns 0 - */ -void DiscoveryComponent::WelcomeMessageHandler(const Endpoint::Ptr& sender, const RequestMessage& request) -{ - if (sender->HasReceivedWelcome()) - return; - - sender->SetReceivedWelcome(true); - - if (sender->HasSentWelcome()) - sender->OnSessionEstablished(sender); -} - -/** - * Finishes the welcome handshake for a new component - * by registering message Subscriptions/sources for the component - * and sending a welcome message if necessary. - * - * @param endpoint The endpoint to set up. - */ -void DiscoveryComponent::FinishDiscoverySetup(const Endpoint::Ptr& endpoint) -{ - if (endpoint->HasSentWelcome()) - return; - - // we assume the other component _always_ wants - // discovery::Welcome messages from us - endpoint->RegisterSubscription("discovery::Welcome"); - RequestMessage request; - request.SetMethod("discovery::Welcome"); - EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, endpoint, request); - - endpoint->SetSentWelcome(true); - - if (endpoint->HasReceivedWelcome()) - endpoint->OnSessionEstablished(endpoint); -} - -/** - * Sends a discovery message for the specified identity using the - * specified message type. - * - * @param method The method to use for the message ("discovery::NewComponent" or "discovery::RegisterComponent"). - * @param identity The identity of the component for which a message should be sent. - * @param recipient The recipient of the message. A multicast message is sent if this parameter is empty. - */ -void DiscoveryComponent::SendDiscoveryMessage(const String& method, const String& identity, const Endpoint::Ptr& recipient) -{ - RequestMessage request; - request.SetMethod(method); - - DiscoveryMessage params; - request.SetParams(params); - - params.SetIdentity(identity); - - ComponentDiscoveryInfo::Ptr info; - - if (!GetComponentDiscoveryInfo(identity, &info)) - return; - - if (!info->Node.IsEmpty() && !info->Service.IsEmpty()) { - params.SetNode(info->Node); - params.SetService(info->Service); - } - - set::iterator i; - Dictionary::Ptr subscriptions = boost::make_shared(); - BOOST_FOREACH(String subscription, info->Subscriptions) { - subscriptions->Add(subscription); - } - - params.SetSubscriptions(subscriptions); - - if (recipient) - EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, recipient, request); - else - EndpointManager::GetInstance()->SendMulticastMessage(m_Endpoint, request); -} - -/** - * Processes a discovery message by registering the component in the - * discovery component registry. - * - * @param identity The authorative identity of the component. - * @param message The discovery message. - * @param trusted Whether the message comes from a trusted source (i.e. a broker). - */ -void DiscoveryComponent::ProcessDiscoveryMessage(const String& identity, const DiscoveryMessage& message, bool trusted) -{ - /* ignore discovery messages that are about ourselves */ - if (identity == EndpointManager::GetInstance()->GetIdentity()) - return; - - ComponentDiscoveryInfo::Ptr info = boost::make_shared(); - - info->LastSeen = Utility::GetTime(); - - String node; - if (message.GetNode(&node) && !node.IsEmpty()) - info->Node = node; - - String service; - if (message.GetService(&service) && !service.IsEmpty()) - info->Service = service; - - DynamicObject::Ptr endpointConfig = DynamicObject::GetObject("Endpoint", identity); - Dictionary::Ptr roles; - if (endpointConfig) - roles = endpointConfig->Get("roles"); - - Endpoint::Ptr endpoint = EndpointManager::GetInstance()->GetEndpointByIdentity(identity); - - Dictionary::Ptr subscriptions; - if (message.GetSubscriptions(&subscriptions)) { - Value subscription; - BOOST_FOREACH(tie(tuples::ignore, subscription), subscriptions) { - info->Subscriptions.insert(subscription); - if (endpoint) - endpoint->RegisterSubscription(subscription); - } - } - - map::iterator i; - - i = m_Components.find(identity); - - if (i != m_Components.end()) - m_Components.erase(i); - - m_Components[identity] = info; - - SendDiscoveryMessage("discovery::NewComponent", identity, Endpoint::Ptr()); - - /* don't send a welcome message for discovery::NewComponent messages */ - if (endpoint && !trusted) - FinishDiscoverySetup(endpoint); -} - -/** - * Processes "discovery::NewComponent" messages. - * - * @param nrea Event arguments for the request. - */ -void DiscoveryComponent::NewComponentMessageHandler(const RequestMessage& request) -{ - DiscoveryMessage message; - request.GetParams(&message); - - String identity; - if (!message.GetIdentity(&identity)) - return; - - ProcessDiscoveryMessage(identity, message, true); -} - -/** - * Processes "discovery::RegisterComponent" messages. - * - * @param nrea Event arguments for the request. - */ -void DiscoveryComponent::RegisterComponentMessageHandler(const Endpoint::Ptr& sender, const RequestMessage& request) -{ - DiscoveryMessage message; - request.GetParams(&message); - ProcessDiscoveryMessage(sender->GetIdentity(), message, false); -} - -/** - * Checks whether we have to reconnect to other components and removes stale - * components from the registry. - */ -void DiscoveryComponent::DiscoveryTimerHandler(void) -{ - EndpointManager::Ptr endpointManager = EndpointManager::GetInstance(); - - double now = Utility::GetTime(); - - /* check whether we have to reconnect to one of our upstream endpoints */ - DynamicObject::Ptr object; - BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) { - /* Check if we're already connected to this endpoint. */ - if (endpointManager->GetEndpointByIdentity(object->GetName())) - continue; - - String node = object->Get("node"); - String service = object->Get("service"); - if (!node.IsEmpty() && !service.IsEmpty()) { - /* reconnect to this endpoint */ - endpointManager->AddConnection(node, service); - } - } - - map::iterator curr, i; - for (i = m_Components.begin(); i != m_Components.end(); ) { - const String& identity = i->first; - const ComponentDiscoveryInfo::Ptr& info = i->second; - - curr = i; - i++; - - /* there's no need to reconnect to ourself */ - if (identity == EndpointManager::GetInstance()->GetIdentity()) - continue; - - /* for explicitly-configured upstream endpoints - * we prefer to use the node/service from the - * config object - which is what the for loop above does */ - if (DynamicObject::GetObject("endpoint", identity)) - continue; - - if (info->LastSeen < now - DiscoveryComponent::RegistrationTTL) { - /* unregister this component if its registration has expired */ - m_Components.erase(curr); - continue; - } - - /* send discovery message to all connected components to - refresh their TTL for this component */ - SendDiscoveryMessage("discovery::NewComponent", identity, Endpoint::Ptr()); - - Endpoint::Ptr endpoint = endpointManager->GetEndpointByIdentity(identity); - if (endpoint && endpoint->IsConnected()) { - /* update LastSeen if we're still connected to this endpoint */ - info->LastSeen = now; - } else { - /* try and reconnect to this component */ - try { - if (!info->Node.IsEmpty() && !info->Service.IsEmpty()) - endpointManager->AddConnection(info->Node, info->Service); - } catch (const exception& ex) { - stringstream msgbuf; - msgbuf << "Exception while trying to reconnect to endpoint '" << endpoint->GetIdentity() << "': " << ex.what();; - Logger::Write(LogInformation, "discovery", msgbuf.str()); - } - } - } -} - -EXPORT_COMPONENT(discovery, DiscoveryComponent); diff --git a/components/discovery/discoverycomponent.h b/components/discovery/discoverycomponent.h deleted file mode 100644 index 1160a5884..000000000 --- a/components/discovery/discoverycomponent.h +++ /dev/null @@ -1,82 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#ifndef DISCOVERYCOMPONENT_H -#define DISCOVERYCOMPONENT_H - -namespace icinga -{ - -/** - * @ingroup discovery - */ -class ComponentDiscoveryInfo : public Object -{ -public: - typedef shared_ptr Ptr; - typedef weak_ptr WeakPtr; - - String Node; - String Service; - - set Subscriptions; - set Publications; - - double LastSeen; -}; - -/** - * @ingroup discovery - */ -class DiscoveryComponent : public IComponent -{ -public: - virtual void Start(void); - virtual void Stop(void); - -private: - VirtualEndpoint::Ptr m_Endpoint; - map m_Components; - Timer::Ptr m_DiscoveryTimer; - - void NewEndpointHandler(const Endpoint::Ptr& endpoint); - - void NewComponentMessageHandler(const RequestMessage& request); - void RegisterComponentMessageHandler(const Endpoint::Ptr& sender, const RequestMessage& request); - - void WelcomeMessageHandler(const Endpoint::Ptr& sender, const RequestMessage& request); - - void SendDiscoveryMessage(const String& method, const String& identity, const Endpoint::Ptr& recipient); - void ProcessDiscoveryMessage(const String& identity, const DiscoveryMessage& message, bool trusted); - - bool GetComponentDiscoveryInfo(String component, ComponentDiscoveryInfo::Ptr *info) const; - - void CheckExistingEndpoint(const Endpoint::Ptr& self, const Endpoint::Ptr& other); - void DiscoveryEndpointHandler(const Endpoint::Ptr& endpoint, const ComponentDiscoveryInfo::Ptr& info) const; - - void DiscoveryTimerHandler(void); - - void FinishDiscoverySetup(const Endpoint::Ptr& endpoint); - - static const int RegistrationTTL = 300; -}; - -} - -#endif /* DISCOVERYCOMPONENT_H */ diff --git a/components/discovery/discoverymessage.cpp b/components/discovery/discoverymessage.cpp deleted file mode 100644 index fde9df491..000000000 --- a/components/discovery/discoverymessage.cpp +++ /dev/null @@ -1,71 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#include "i2-discovery.h" - -using namespace icinga; - -DiscoveryMessage::DiscoveryMessage(void) - : MessagePart() -{ } - -DiscoveryMessage::DiscoveryMessage(const MessagePart& message) - : MessagePart(message) -{ } - -bool DiscoveryMessage::GetIdentity(String *value) const -{ - return Get("identity", value); -} - -void DiscoveryMessage::SetIdentity(const String& value) -{ - Set("identity", value); -} - -bool DiscoveryMessage::GetNode(String *value) const -{ - return Get("node", value); -} - -void DiscoveryMessage::SetNode(const String& value) -{ - Set("node", value); -} - -bool DiscoveryMessage::GetService(String *value) const -{ - return Get("service", value); -} - -void DiscoveryMessage::SetService(const String& value) -{ - Set("service", value); -} - -bool DiscoveryMessage::GetSubscriptions(Dictionary::Ptr *value) const -{ - return Get("subscriptions", value); -} - -void DiscoveryMessage::SetSubscriptions(const Dictionary::Ptr& value) -{ - Set("subscriptions", value); -} - diff --git a/components/discovery/discoverymessage.h b/components/discovery/discoverymessage.h deleted file mode 100644 index 6f984b451..000000000 --- a/components/discovery/discoverymessage.h +++ /dev/null @@ -1,50 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#ifndef DISCOVERYMESSAGE_H -#define DISCOVERYMESSAGE_H - -namespace icinga -{ - -/** - * @ingroup discovery - */ -class DiscoveryMessage : public MessagePart -{ -public: - DiscoveryMessage(void); - DiscoveryMessage(const MessagePart& message); - - bool GetIdentity(String *value) const; - void SetIdentity(const String& value); - - bool GetNode(String *value) const; - void SetNode(const String& value); - - bool GetService(String *value) const; - void SetService(const String& value); - - bool GetSubscriptions(Dictionary::Ptr *value) const; - void SetSubscriptions(const Dictionary::Ptr& value); -}; - -} - -#endif /* SUBSCRIPTIONMESSAGE_H */ diff --git a/components/discovery/i2-discovery.h b/components/discovery/i2-discovery.h deleted file mode 100644 index 8d1139be0..000000000 --- a/components/discovery/i2-discovery.h +++ /dev/null @@ -1,38 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#ifndef I2DISCOVERY_H -#define I2DISCOVERY_H - -/** - * @defgroup discovery Discovery component - * - * The Discovery component takes care of connecting peers to each other - * and performs authorisation checks for the message subscriptions. - */ - -#include -#include -#include -#include - -#include "discoverymessage.h" -#include "discoverycomponent.h" - -#endif /* I2DISCOVERY_H */ diff --git a/configure.ac b/configure.ac index c9c39f6bd..859abae8f 100644 --- a/configure.ac +++ b/configure.ac @@ -76,7 +76,6 @@ components/compat/Makefile components/convenience/Makefile components/delegation/Makefile components/demo/Makefile -components/discovery/Makefile dyn/Makefile icinga/Makefile icinga-app/Makefile diff --git a/doc/icinga2-config.odt b/doc/icinga2-config.odt index d9d449c3f..8b93cea75 100644 Binary files a/doc/icinga2-config.odt and b/doc/icinga2-config.odt differ diff --git a/icinga-app/Makefile.am b/icinga-app/Makefile.am index 8fb735efc..dbf9c06a4 100644 --- a/icinga-app/Makefile.am +++ b/icinga-app/Makefile.am @@ -33,8 +33,7 @@ icinga_LDADD = \ -dlopen ${top_builddir}/components/compat/compat.la \ -dlopen ${top_builddir}/components/convenience/convenience.la \ -dlopen ${top_builddir}/components/delegation/delegation.la \ - -dlopen ${top_builddir}/components/demo/demo.la \ - -dlopen ${top_builddir}/components/discovery/discovery.la + -dlopen ${top_builddir}/components/demo/demo.la icinga_DEPENDENCIES = \ ${top_builddir}/components/cibsync/cibsync.la \ diff --git a/icinga/Makefile.am b/icinga/Makefile.am index 744f80825..91952ff2f 100644 --- a/icinga/Makefile.am +++ b/icinga/Makefile.am @@ -11,11 +11,7 @@ libicinga_la_SOURCES = \ endpointmanager.h \ icingaapplication.cpp \ icingaapplication.h \ - i2-icinga.h \ - jsonrpcendpoint.cpp \ - jsonrpcendpoint.h \ - virtualendpoint.cpp \ - virtualendpoint.h + i2-icinga.h libicinga_la_CPPFLAGS = \ -DI2_ICINGA_BUILD \ diff --git a/icinga/endpoint.cpp b/icinga/endpoint.cpp index 1209963df..701a240ca 100644 --- a/icinga/endpoint.cpp +++ b/icinga/endpoint.cpp @@ -21,24 +21,109 @@ using namespace icinga; -/** - * Retrieves the endpoint manager this endpoint is registered with. - * - * @returns The EndpointManager object. - */ -EndpointManager::Ptr Endpoint::GetEndpointManager(void) const +REGISTER_CLASS(Endpoint); + +boost::signal Endpoint::OnConnected; +boost::signal Endpoint::OnDisconnected; +boost::signal Endpoint::OnSubscriptionRegistered; +boost::signal Endpoint::OnSubscriptionUnregistered; + +Endpoint::Endpoint(const Dictionary::Ptr& serializedUpdate) + : DynamicObject(serializedUpdate) { - return m_EndpointManager.lock(); + RegisterAttribute("node", Attribute_Replicated); + RegisterAttribute("service", Attribute_Replicated); + RegisterAttribute("local", Attribute_Config); + RegisterAttribute("subscriptions", Attribute_Replicated); + RegisterAttribute("client", Attribute_Transient); +} + +bool Endpoint::Exists(const String& name) +{ + return (DynamicObject::GetObject("Endpoint", name)); +} + +Endpoint::Ptr Endpoint::GetByName(const String& name) +{ + DynamicObject::Ptr configObject = DynamicObject::GetObject("Endpoint", name); + + if (!configObject) + throw_exception(invalid_argument("Endpoint '" + name + "' does not exist.")); + + return dynamic_pointer_cast(configObject); +} + +Endpoint::Ptr Endpoint::MakeEndpoint(const String& name, bool local) +{ + ConfigItemBuilder::Ptr endpointConfig = boost::make_shared(); + endpointConfig->SetType("Endpoint"); + endpointConfig->SetName(local ? "local:" + name : name); + endpointConfig->SetLocal(local ? 1 : 0); + endpointConfig->AddExpression("local", OperatorSet, local); + + DynamicObject::Ptr object = endpointConfig->Compile()->Commit(); + return dynamic_pointer_cast(object); } /** - * Sets the endpoint manager this endpoint is registered with. + * Checks whether this is a local endpoint. * - * @param manager The EndpointManager object. + * @returns true if this is a local endpoint, false otherwise. */ -void Endpoint::SetEndpointManager(EndpointManager::WeakPtr manager) +bool Endpoint::IsLocalEndpoint(void) const { - m_EndpointManager = manager; + Value value = Get("local"); + + return (!value.IsEmpty() && value); +} + +/** + * Checks whether this endpoint is connected. + * + * @returns true if the endpoint is connected, false otherwise. + */ +bool Endpoint::IsConnected(void) const +{ + if (IsLocalEndpoint()) { + return true; + } else { + JsonRpcClient::Ptr client = GetClient(); + + return (client && client->IsConnected()); + } +} + +/** + * Retrieves the address for the endpoint. + * + * @returns The endpoint's address. + */ +String Endpoint::GetAddress(void) const +{ + if (IsLocalEndpoint()) { + return "local:" + GetName(); + } else { + JsonRpcClient::Ptr client = GetClient(); + + if (!client) + return ""; + + return client->GetPeerAddress(); + } +} + +JsonRpcClient::Ptr Endpoint::GetClient(void) const +{ + return Get("client"); +} + +void Endpoint::SetClient(const JsonRpcClient::Ptr& client) +{ + Set("client", client); + client->OnNewMessage.connect(boost::bind(&Endpoint::NewMessageHandler, this, _2)); + client->OnClosed.connect(boost::bind(&Endpoint::ClientClosedHandler, this)); + + OnConnected(GetSelf()); } /** @@ -46,9 +131,18 @@ void Endpoint::SetEndpointManager(EndpointManager::WeakPtr manager) * * @param topic The name of the topic. */ -void Endpoint::RegisterSubscription(String topic) +void Endpoint::RegisterSubscription(const String& topic) { - m_Subscriptions.insert(topic); + Dictionary::Ptr subscriptions = GetSubscriptions(); + + if (!subscriptions) + subscriptions = boost::make_shared(); + + if (!subscriptions->Contains(topic)) { + Dictionary::Ptr newSubscriptions = subscriptions->ShallowClone(); + newSubscriptions->Set(topic, topic); + SetSubscriptions(newSubscriptions); + } } /** @@ -56,9 +150,15 @@ void Endpoint::RegisterSubscription(String topic) * * @param topic The name of the topic. */ -void Endpoint::UnregisterSubscription(String topic) +void Endpoint::UnregisterSubscription(const String& topic) { - m_Subscriptions.erase(topic); + Dictionary::Ptr subscriptions = GetSubscriptions(); + + if (subscriptions && subscriptions->Contains(topic)) { + Dictionary::Ptr newSubscriptions = subscriptions->ShallowClone(); + newSubscriptions->Remove(topic); + SetSubscriptions(newSubscriptions); + } } /** @@ -67,9 +167,11 @@ void Endpoint::UnregisterSubscription(String topic) * @param topic The name of the topic. * @returns true if the endpoint is subscribed to the topic, false otherwise. */ -bool Endpoint::HasSubscription(String topic) const +bool Endpoint::HasSubscription(const String& topic) const { - return (m_Subscriptions.find(topic) != m_Subscriptions.end()); + Dictionary::Ptr subscriptions = GetSubscriptions(); + + return (subscriptions && subscriptions->Contains(topic)); } /** @@ -77,65 +179,168 @@ bool Endpoint::HasSubscription(String topic) const */ void Endpoint::ClearSubscriptions(void) { - m_Subscriptions.clear(); + Set("subscriptions", Empty); } -/** - * Returns the beginning of the subscriptions list. - * - * @returns An iterator that points to the first subscription. - */ -Endpoint::ConstTopicIterator Endpoint::BeginSubscriptions(void) const +Dictionary::Ptr Endpoint::GetSubscriptions(void) const { - return m_Subscriptions.begin(); + return Get("subscriptions"); } -/** - * Returns the end of the subscriptions list. - * - * @returns An iterator that points past the last subscription. - */ -Endpoint::ConstTopicIterator Endpoint::EndSubscriptions(void) const +void Endpoint::SetSubscriptions(const Dictionary::Ptr& subscriptions) { - return m_Subscriptions.end(); + Set("subscriptions", subscriptions); } -/** - * Sets whether a welcome message has been received from this endpoint. - * - * @param value Whether we've received a welcome message. - */ -void Endpoint::SetReceivedWelcome(bool value) +void Endpoint::RegisterTopicHandler(const String& topic, const function& callback) { - m_ReceivedWelcome = value; + map > >::iterator it; + it = m_TopicHandlers.find(topic); + + shared_ptr > sig; + + if (it == m_TopicHandlers.end()) { + sig = boost::make_shared >(); + m_TopicHandlers.insert(make_pair(topic, sig)); + } else { + sig = it->second; + } + + sig->connect(callback); + + RegisterSubscription(topic); } -/** - * Retrieves whether a welcome message has been received from this endpoint. - * - * @returns Whether we've received a welcome message. - */ -bool Endpoint::HasReceivedWelcome(void) const +void Endpoint::UnregisterTopicHandler(const String& topic, const function& callback) { - return m_ReceivedWelcome; + // TODO: implement + //m_TopicHandlers[method] -= callback; + //UnregisterSubscription(method); + + throw_exception(NotImplementedException()); } -/** - * Sets whether a welcome message has been sent to this endpoint. - * - * @param value Whether we've sent a welcome message. - */ -void Endpoint::SetSentWelcome(bool value) +void Endpoint::OnAttributeChanged(const String& name, const Value& oldValue) { - m_SentWelcome = value; + if (name == "subscriptions") { + Dictionary::Ptr oldSubscriptions, newSubscriptions; + + if (oldValue.IsObjectType()) + oldSubscriptions = oldValue; + + newSubscriptions = GetSubscriptions(); + + if (oldSubscriptions) { + String subscription; + BOOST_FOREACH(tie(tuples::ignore, subscription), oldSubscriptions) { + if (!newSubscriptions || !newSubscriptions->Contains(subscription)) { + Logger::Write(LogInformation, "icinga", "Removed subscription for '" + GetName() + "': " + subscription); + OnSubscriptionUnregistered(GetSelf(), subscription); + } + } + } + + if (newSubscriptions) { + String subscription; + BOOST_FOREACH(tie(tuples::ignore, subscription), newSubscriptions) { + if (!oldSubscriptions || !oldSubscriptions->Contains(subscription)) { + Logger::Write(LogInformation, "icinga", "New subscription for '" + GetName() + "': " + subscription); + OnSubscriptionRegistered(GetSelf(), subscription); + } + } + } + } } -/** - * Retrieves whether a welcome message has been sent to this endpoint. - * - * @returns Whether we've sent a welcome message. - */ -bool Endpoint::HasSentWelcome(void) const +void Endpoint::ProcessRequest(const Endpoint::Ptr& sender, const RequestMessage& request) { - return m_SentWelcome; + if (!IsConnected()) { + // TODO: persist the message + return; + } + + if (IsLocalEndpoint()) { + String method; + if (!request.GetMethod(&method)) + return; + + map > >::iterator it; + it = m_TopicHandlers.find(method); + + if (it == m_TopicHandlers.end()) + return; + + (*it->second)(GetSelf(), sender, request); + } else { + GetClient()->SendMessage(request); + } } + +void Endpoint::ProcessResponse(const Endpoint::Ptr& sender, const ResponseMessage& response) +{ + if (!IsConnected()) + return; + + if (IsLocalEndpoint()) + EndpointManager::GetInstance()->ProcessResponseMessage(sender, response); + else + GetClient()->SendMessage(response); +} + +void Endpoint::NewMessageHandler(const MessagePart& message) +{ + Endpoint::Ptr sender = GetSelf(); + + if (ResponseMessage::IsResponseMessage(message)) { + /* rather than routing the message to the right virtual + * endpoint we just process it here right away. */ + EndpointManager::GetInstance()->ProcessResponseMessage(sender, message); + return; + } + + RequestMessage request = message; + + String method; + if (!request.GetMethod(&method)) + return; + + String id; + if (request.GetID(&id)) + EndpointManager::GetInstance()->SendAnycastMessage(sender, request); + else + EndpointManager::GetInstance()->SendMulticastMessage(sender, request); +} + +void Endpoint::ClientClosedHandler(void) +{ + try { + GetClient()->CheckException(); + } catch (const exception& ex) { + stringstream message; + message << "Error occured for JSON-RPC socket: Message=" << ex.what(); + + Logger::Write(LogWarning, "jsonrpc", message.str()); + } + + Logger::Write(LogWarning, "jsonrpc", "Lost connection to endpoint: identity=" + GetName()); + + // TODO: _only_ clear non-persistent subscriptions + // unregister ourselves if no persistent subscriptions are left (use a + // timer for that, once we have a TTL property for the topics) + ClearSubscriptions(); + + Set("client", Empty); + + OnDisconnected(GetSelf()); +} + +String Endpoint::GetNode(void) const +{ + return Get("node"); +} + +String Endpoint::GetService(void) const +{ + return Get("service"); +} + diff --git a/icinga/endpoint.h b/icinga/endpoint.h index f6586bbe3..d2799e823 100644 --- a/icinga/endpoint.h +++ b/icinga/endpoint.h @@ -30,60 +30,65 @@ class EndpointManager; * * @ingroup icinga */ -class I2_ICINGA_API Endpoint : public Object +class I2_ICINGA_API Endpoint : public DynamicObject { public: typedef shared_ptr Ptr; typedef weak_ptr WeakPtr; - typedef set::const_iterator ConstTopicIterator; + typedef void (Callback)(const Endpoint::Ptr&, const Endpoint::Ptr&, const RequestMessage&); - Endpoint(void) - : m_ReceivedWelcome(false), m_SentWelcome(false) - { } + Endpoint(const Dictionary::Ptr& serializedUpdate); - virtual String GetIdentity(void) const = 0; - virtual String GetAddress(void) const = 0; + static bool Exists(const String& name); + static Endpoint::Ptr GetByName(const String& name); - void SetReceivedWelcome(bool value); - bool HasReceivedWelcome(void) const; + String GetAddress(void) const; - void SetSentWelcome(bool value); - bool HasSentWelcome(void) const; + JsonRpcClient::Ptr GetClient(void) const; + void SetClient(const JsonRpcClient::Ptr& client); - shared_ptr GetEndpointManager(void) const; - void SetEndpointManager(weak_ptr manager); + void RegisterSubscription(const String& topic); + void UnregisterSubscription(const String& topic); + bool HasSubscription(const String& topic) const; - void RegisterSubscription(String topic); - void UnregisterSubscription(String topic); - bool HasSubscription(String topic) const; + Dictionary::Ptr GetSubscriptions(void) const; + void SetSubscriptions(const Dictionary::Ptr& subscriptions); - virtual bool IsLocal(void) const = 0; - virtual bool IsConnected(void) const = 0; + bool IsLocalEndpoint(void) const; + bool IsConnected(void) const; - virtual void ProcessRequest(Endpoint::Ptr sender, const RequestMessage& message) = 0; - virtual void ProcessResponse(Endpoint::Ptr sender, const ResponseMessage& message) = 0; - - virtual void Stop(void) = 0; + void ProcessRequest(const Endpoint::Ptr& sender, const RequestMessage& message); + void ProcessResponse(const Endpoint::Ptr& sender, const ResponseMessage& message); void ClearSubscriptions(void); - ConstTopicIterator BeginSubscriptions(void) const; - ConstTopicIterator EndSubscriptions(void) const; + void RegisterTopicHandler(const String& topic, const function& callback); + void UnregisterTopicHandler(const String& topic, const function& callback); - boost::signal OnSessionEstablished; + virtual void OnAttributeChanged(const String& name, const Value& oldValue); + + String GetNode(void) const; + String GetService(void) const; + + static Endpoint::Ptr MakeEndpoint(const String& name, bool local); + + static boost::signal OnConnected; + static boost::signal OnDisconnected; + + static boost::signal OnSubscriptionRegistered; + static boost::signal OnSubscriptionUnregistered; private: - set m_Subscriptions; /**< The topics this endpoint is - subscribed to. */ bool m_ReceivedWelcome; /**< Have we received a welcome message from this endpoint? */ bool m_SentWelcome; /**< Have we sent a welcome message to this endpoint? */ - weak_ptr m_EndpointManager; /**< The endpoint manager - this endpoint is - registered with. */ + map > > m_TopicHandlers; + + void NewMessageHandler(const MessagePart& message); + void ClientClosedHandler(void); }; } diff --git a/icinga/endpointmanager.cpp b/icinga/endpointmanager.cpp index 1e7dc3a4a..4bbef5110 100644 --- a/icinga/endpointmanager.cpp +++ b/icinga/endpointmanager.cpp @@ -31,6 +31,16 @@ EndpointManager::EndpointManager(void) m_RequestTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::RequestTimerHandler, this)); m_RequestTimer->SetInterval(5); m_RequestTimer->Start(); + + m_SubscriptionTimer = boost::make_shared(); + m_SubscriptionTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::SubscriptionTimerHandler, this)); + m_SubscriptionTimer->SetInterval(10); + m_SubscriptionTimer->Start(); + + m_ReconnectTimer = boost::make_shared(); + m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::ReconnectTimerHandler, this)); + m_ReconnectTimer->SetInterval(10); + m_ReconnectTimer->Start(); } /** @@ -42,6 +52,16 @@ EndpointManager::EndpointManager(void) void EndpointManager::SetIdentity(const String& identity) { m_Identity = identity; + + if (m_Endpoint) + m_Endpoint->Unregister(); + + DynamicObject::Ptr object = DynamicObject::GetObject("Endpoint", identity); + + if (object) + m_Endpoint = dynamic_pointer_cast(object); + else + m_Endpoint = Endpoint::MakeEndpoint(identity, false); } /** @@ -54,26 +74,6 @@ String EndpointManager::GetIdentity(void) const return m_Identity; } -/** - * Sets the SSL context that is used for remote connections. - * - * @param sslContext The new SSL context. - */ -void EndpointManager::SetSSLContext(const shared_ptr& sslContext) -{ - m_SSLContext = sslContext; -} - -/** - * Retrieves the SSL context that is used for remote connections. - * - * @returns The SSL context. - */ -shared_ptr EndpointManager::GetSSLContext(void) const -{ - return m_SSLContext; -} - /** * Creates a new JSON-RPC listener on the specified port. * @@ -81,15 +81,20 @@ shared_ptr EndpointManager::GetSSLContext(void) const */ void EndpointManager::AddListener(const String& service) { - if (!GetSSLContext()) + shared_ptr sslContext = IcingaApplication::GetInstance()->GetSSLContext(); + + if (!sslContext) throw_exception(logic_error("SSL context is required for AddListener()")); stringstream s; s << "Adding new listener: port " << service; Logger::Write(LogInformation, "icinga", s.str()); - JsonRpcServer::Ptr server = boost::make_shared(m_SSLContext); - RegisterServer(server); + JsonRpcServer::Ptr server = boost::make_shared(sslContext); + + m_Servers.insert(server); + server->OnNewClient.connect(boost::bind(&EndpointManager::NewClientHandler, + this, _2)); server->Bind(service, AF_INET6); server->Listen(); @@ -102,107 +107,47 @@ void EndpointManager::AddListener(const String& service) * @param node The remote host. * @param service The remote port. */ -void EndpointManager::AddConnection(const String& node, const String& service) -{ - stringstream s; - s << "Adding new endpoint: [" << node << "]:" << service; - Logger::Write(LogInformation, "icinga", s.str()); - - JsonRpcEndpoint::Ptr endpoint = boost::make_shared(); - RegisterEndpoint(endpoint); - endpoint->Connect(node, service, m_SSLContext); -} - -/** - * Registers a new JSON-RPC server with this endpoint manager. - * - * @param server The JSON-RPC server. - */ -void EndpointManager::RegisterServer(const JsonRpcServer::Ptr& server) -{ - m_Servers.push_back(server); - server->OnNewClient.connect(boost::bind(&EndpointManager::NewClientHandler, - this, _2)); +void EndpointManager::AddConnection(const String& node, const String& service) { + JsonRpcClient::Ptr client = boost::make_shared(RoleOutbound, + IcingaApplication::GetInstance()->GetSSLContext()); + client->Connect(node, service); + NewClientHandler(client); } /** * Processes a new client connection. * - * @param ncea Event arguments. + * @param client The new client. */ void EndpointManager::NewClientHandler(const TcpClient::Ptr& client) { - Logger::Write(LogInformation, "icinga", "Accepted new client from " + client->GetPeerAddress()); + JsonRpcClient::Ptr jclient = static_pointer_cast(client); - JsonRpcEndpoint::Ptr endpoint = boost::make_shared(); - endpoint->SetClient(static_pointer_cast(client)); - client->Start(); - RegisterEndpoint(endpoint); + Logger::Write(LogInformation, "icinga", "New client connection from " + jclient->GetPeerAddress()); + + m_PendingClients.insert(jclient); + jclient->OnConnected.connect(boost::bind(&EndpointManager::ClientConnectedHandler, this, _1)); + jclient->Start(); } -/** - * Unregisters a JSON-RPC server. - * - * @param server The JSON-RPC server. - */ -void EndpointManager::UnregisterServer(const JsonRpcServer::Ptr& server) +void EndpointManager::ClientConnectedHandler(const TcpClient::Ptr& client) { - m_Servers.erase( - remove(m_Servers.begin(), m_Servers.end(), server), - m_Servers.end()); - // TODO: unbind event -} + JsonRpcClient::Ptr jclient = static_pointer_cast(client); -/** - * Registers a new endpoint with this endpoint manager. - * - * @param endpoint The new endpoint. - */ -void EndpointManager::RegisterEndpoint(const Endpoint::Ptr& endpoint) -{ - endpoint->SetEndpointManager(GetSelf()); + m_PendingClients.erase(jclient); - UnregisterEndpoint(endpoint); + shared_ptr cert = jclient->GetPeerCertificate(); - String identity = endpoint->GetIdentity(); + String identity = Utility::GetCertificateCN(cert); - if (!identity.IsEmpty()) { - m_Endpoints[identity] = endpoint; - OnNewEndpoint(GetSelf(), endpoint); - } else { - m_PendingEndpoints.push_back(endpoint); - } + Endpoint::Ptr endpoint; - if (endpoint->IsLocal()) { - /* this endpoint might have introduced new subscriptions - * or publications which affect remote endpoints, we need - * to close all fully-connected remote endpoints to make sure - * these subscriptions/publications are kept up-to-date. */ - Iterator prev, it; - for (it = m_Endpoints.begin(); it != m_Endpoints.end(); ) { - prev = it; - it++; + if (Endpoint::Exists(identity)) + endpoint = Endpoint::GetByName(identity); + else + endpoint = Endpoint::MakeEndpoint(identity, false); - if (!prev->second->IsLocal()) - m_Endpoints.erase(prev); - } - } -} - -/** - * Unregisters an endpoint. - * - * @param endpoint The endpoint. - */ -void EndpointManager::UnregisterEndpoint(const Endpoint::Ptr& endpoint) -{ - m_PendingEndpoints.erase( - remove(m_PendingEndpoints.begin(), m_PendingEndpoints.end(), endpoint), - m_PendingEndpoints.end()); - - String identity = endpoint->GetIdentity(); - if (!identity.IsEmpty()) - m_Endpoints.erase(identity); + endpoint->SetClient(jclient); } /** @@ -240,8 +185,9 @@ void EndpointManager::SendAnycastMessage(const Endpoint::Ptr& sender, throw_exception(invalid_argument("Message is missing the 'method' property.")); vector candidates; - Endpoint::Ptr endpoint; - BOOST_FOREACH(tie(tuples::ignore, endpoint), m_Endpoints) { + DynamicObject::Ptr object; + BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) { + Endpoint::Ptr endpoint = dynamic_pointer_cast(object); /* don't forward messages between non-local endpoints */ if (!sender->IsLocal() && !endpoint->IsLocal()) continue; @@ -275,8 +221,10 @@ void EndpointManager::SendMulticastMessage(const Endpoint::Ptr& sender, if (!message.GetMethod(&method)) throw_exception(invalid_argument("Message is missing the 'method' property.")); - Endpoint::Ptr recipient; - BOOST_FOREACH(tie(tuples::ignore, recipient), m_Endpoints) { + DynamicObject::Ptr object; + BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) { + Endpoint::Ptr recipient = dynamic_pointer_cast(object); + /* don't forward messages back to the sender */ if (sender == recipient) continue; @@ -291,31 +239,16 @@ void EndpointManager::SendMulticastMessage(const Endpoint::Ptr& sender, * * @param callback The callback function. */ -void EndpointManager::ForEachEndpoint(function callback) -{ - map::iterator prev, i; - for (i = m_Endpoints.begin(); i != m_Endpoints.end(); ) { - prev = i; - i++; - - callback(GetSelf(), prev->second); - } -} - -/** - * Retrieves an endpoint that has the specified identity. - * - * @param identity The identity of the endpoint. - */ -Endpoint::Ptr EndpointManager::GetEndpointByIdentity(const String& identity) const -{ - map::const_iterator i; - i = m_Endpoints.find(identity); - if (i != m_Endpoints.end()) - return i->second; - else - return Endpoint::Ptr(); -} +//void EndpointManager::ForEachEndpoint(function callback) +//{ +// map::iterator prev, i; +// for (i = m_Endpoints.begin(); i != m_Endpoints.end(); ) { +// prev = i; +// i++; +// +// callback(GetSelf(), prev->second); +// } +//} void EndpointManager::SendAPIMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient, RequestMessage& message, @@ -348,6 +281,46 @@ bool EndpointManager::RequestTimeoutLessComparer(const pair(); + + DynamicObject::Ptr object; + BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) { + Endpoint::Ptr endpoint = dynamic_pointer_cast(object); + + if (!endpoint->IsLocalEndpoint()) + continue; + + String topic; + BOOST_FOREACH(tie(tuples::ignore, topic), endpoint->GetSubscriptions()) { + subscriptions->Set(topic, topic); + } + } + + m_Endpoint->SetSubscriptions(subscriptions); +} + +void EndpointManager::ReconnectTimerHandler(void) +{ + DynamicObject::Ptr object; + BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) { + Endpoint::Ptr endpoint = dynamic_pointer_cast(object); + + if (endpoint->IsConnected()) + continue; + + String node, service; + node = endpoint->GetNode(); + service = endpoint->GetService(); + + if (node.IsEmpty() || service.IsEmpty()) + continue; + + AddConnection(node, service); + } +} + void EndpointManager::RequestTimerHandler(void) { map::iterator it; @@ -379,15 +352,15 @@ void EndpointManager::ProcessResponseMessage(const Endpoint::Ptr& sender, const m_Requests.erase(it); } -EndpointManager::Iterator EndpointManager::Begin(void) -{ - return m_Endpoints.begin(); -} +//EndpointManager::Iterator EndpointManager::Begin(void) +//{ +// return m_Endpoints.begin(); +//} -EndpointManager::Iterator EndpointManager::End(void) -{ - return m_Endpoints.end(); -} +//EndpointManager::Iterator EndpointManager::End(void) +//{ +// return m_Endpoints.end(); +//} EndpointManager::Ptr EndpointManager::GetInstance(void) { diff --git a/icinga/endpointmanager.h b/icinga/endpointmanager.h index d8dd41f74..fe57d74d5 100644 --- a/icinga/endpointmanager.h +++ b/icinga/endpointmanager.h @@ -34,7 +34,7 @@ public: typedef shared_ptr Ptr; typedef weak_ptr WeakPtr; - typedef map::iterator Iterator; +// typedef map::iterator Iterator; EndpointManager(void); @@ -49,9 +49,6 @@ public: void AddListener(const String& service); void AddConnection(const String& node, const String& service); - void RegisterEndpoint(const Endpoint::Ptr& endpoint); - void UnregisterEndpoint(const Endpoint::Ptr& endpoint); - void SendUnicastMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient, const MessagePart& message); void SendAnycastMessage(const Endpoint::Ptr& sender, const RequestMessage& message); void SendMulticastMessage(const Endpoint::Ptr& sender, const RequestMessage& message); @@ -61,21 +58,22 @@ public: void ProcessResponseMessage(const Endpoint::Ptr& sender, const ResponseMessage& message); - void ForEachEndpoint(function callback); - Iterator Begin(void); - Iterator End(void); - - Endpoint::Ptr GetEndpointByIdentity(const String& identity) const; +// void ForEachEndpoint(function callback); +// Iterator Begin(void); +// Iterator End(void); boost::signal OnNewEndpoint; private: String m_Identity; - shared_ptr m_SSLContext; + Endpoint::Ptr m_Endpoint; - vector m_Servers; - vector m_PendingEndpoints; - map m_Endpoints; + Timer::Ptr m_SubscriptionTimer; + + Timer::Ptr m_ReconnectTimer; + + set m_Servers; + set m_PendingClients; /** * Information about a pending API request. @@ -98,13 +96,15 @@ private: map m_Requests; Timer::Ptr m_RequestTimer; - void RegisterServer(const JsonRpcServer::Ptr& server); - void UnregisterServer(const JsonRpcServer::Ptr& server); - static bool RequestTimeoutLessComparer(const pair& a, const pair& b); void RequestTimerHandler(void); + void SubscriptionTimerHandler(void); + + void ReconnectTimerHandler(void); + void NewClientHandler(const TcpClient::Ptr& client); + void ClientConnectedHandler(const TcpClient::Ptr& client); }; } diff --git a/icinga/i2-icinga.h b/icinga/i2-icinga.h index 67dc8c27a..85637333a 100644 --- a/icinga/i2-icinga.h +++ b/icinga/i2-icinga.h @@ -42,8 +42,6 @@ using boost::algorithm::is_any_of; #endif /* I2_ICINGA_BUILD */ #include "endpoint.h" -#include "jsonrpcendpoint.h" -#include "virtualendpoint.h" #include "endpointmanager.h" #include "icingaapplication.h" diff --git a/icinga/icingaapplication.cpp b/icinga/icingaapplication.cpp index 330e28d36..9f65bd9c1 100644 --- a/icinga/icingaapplication.cpp +++ b/icinga/icingaapplication.cpp @@ -133,8 +133,7 @@ int IcingaApplication::Main(const vector& args) Logger::Write(LogInformation, "icinga", "My identity: " + identity); EndpointManager::GetInstance()->SetIdentity(identity); - shared_ptr sslContext = Utility::MakeSSLContext(GetCertificateFile(), GetCertificateFile(), GetCAFile()); - EndpointManager::GetInstance()->SetSSLContext(sslContext); + m_SSLContext = Utility::MakeSSLContext(GetCertificateFile(), GetCertificateFile(), GetCAFile()); } /* create the primary RPC listener */ @@ -215,3 +214,8 @@ double IcingaApplication::GetStartTime(void) const { return m_StartTime; } + +shared_ptr IcingaApplication::GetSSLContext(void) const +{ + return m_SSLContext; +} diff --git a/icinga/icingaapplication.h b/icinga/icingaapplication.h index 2045e1a4c..d1175758b 100644 --- a/icinga/icingaapplication.h +++ b/icinga/icingaapplication.h @@ -47,6 +47,7 @@ public: String GetPidPath(void) const; String GetStatePath(void) const; Dictionary::Ptr GetMacros(void) const; + shared_ptr GetSSLContext(void) const; double GetStartTime(void) const; @@ -61,6 +62,7 @@ private: String m_PidPath; String m_StatePath; Dictionary::Ptr m_Macros; + shared_ptr m_SSLContext; double m_StartTime; diff --git a/icinga/jsonrpcendpoint.cpp b/icinga/jsonrpcendpoint.cpp deleted file mode 100644 index e21215cc5..000000000 --- a/icinga/jsonrpcendpoint.cpp +++ /dev/null @@ -1,148 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#include "i2-icinga.h" - -using namespace icinga; - -String JsonRpcEndpoint::GetIdentity(void) const -{ - return m_Identity; -} - -String JsonRpcEndpoint::GetAddress(void) const -{ - if (!m_Client) - return ""; - - return m_Client->GetPeerAddress(); -} - -JsonRpcClient::Ptr JsonRpcEndpoint::GetClient(void) -{ - return m_Client; -} - -void JsonRpcEndpoint::Connect(String node, String service, shared_ptr sslContext) -{ - JsonRpcClient::Ptr client = boost::make_shared(RoleOutbound, sslContext); - SetClient(client); - client->Connect(node, service); - client->Start(); -} - -void JsonRpcEndpoint::SetClient(JsonRpcClient::Ptr client) -{ - m_Client = client; - client->OnNewMessage.connect(boost::bind(&JsonRpcEndpoint::NewMessageHandler, this, _2)); - client->OnClosed.connect(boost::bind(&JsonRpcEndpoint::ClientClosedHandler, this)); - client->OnConnected.connect(boost::bind(&JsonRpcEndpoint::ClientConnectedHandler, this)); -} - -bool JsonRpcEndpoint::IsLocal(void) const -{ - return false; -} - -bool JsonRpcEndpoint::IsConnected(void) const -{ - return (m_Client && m_Client->IsConnected()); -} - -void JsonRpcEndpoint::ProcessRequest(Endpoint::Ptr sender, const RequestMessage& message) -{ - if (IsConnected()) { - m_Client->SendMessage(message); - } else { - // TODO: persist the event - } -} - -void JsonRpcEndpoint::ProcessResponse(Endpoint::Ptr sender, const ResponseMessage& message) -{ - m_Client->SendMessage(message); -} - -void JsonRpcEndpoint::NewMessageHandler(const MessagePart& message) -{ - Endpoint::Ptr sender = GetSelf(); - - if (ResponseMessage::IsResponseMessage(message)) { - /* rather than routing the message to the right virtual - * endpoint we just process it here right away. */ - GetEndpointManager()->ProcessResponseMessage(sender, message); - return; - } - - RequestMessage request = message; - - String method; - if (!request.GetMethod(&method)) - return; - - String id; - if (request.GetID(&id)) - GetEndpointManager()->SendAnycastMessage(sender, request); - else - GetEndpointManager()->SendMulticastMessage(sender, request); -} - -void JsonRpcEndpoint::ClientClosedHandler(void) -{ - try { - m_Client->CheckException(); - } catch (const exception& ex) { - stringstream message; - message << "Error occured for JSON-RPC socket: Message=" << ex.what(); - - Logger::Write(LogWarning, "jsonrpc", message.str()); - } - - Logger::Write(LogWarning, "jsonrpc", "Lost connection to endpoint: identity=" + GetIdentity()); - - // TODO: _only_ clear non-persistent subscriptions - // unregister ourselves if no persistent subscriptions are left (use a timer for that, once we have a TTL property for the topics) - ClearSubscriptions(); - - // remove the endpoint if there are no more subscriptions */ - if (BeginSubscriptions() == EndSubscriptions()) { - Hold(); - GetEndpointManager()->UnregisterEndpoint(GetSelf()); - } - - m_Client.reset(); - - // TODO: persist events, etc., for now we just disable the endpoint -} - -void JsonRpcEndpoint::ClientConnectedHandler(void) -{ - String identity = Utility::GetCertificateCN(m_Client->GetPeerCertificate()); - - if (GetIdentity().IsEmpty() && !identity.IsEmpty()) { - m_Identity = identity; - GetEndpointManager()->RegisterEndpoint(GetSelf()); - } -} - -void JsonRpcEndpoint::Stop(void) -{ - if (m_Client) - m_Client->Close(); -} diff --git a/icinga/jsonrpcendpoint.h b/icinga/jsonrpcendpoint.h deleted file mode 100644 index e685aac30..000000000 --- a/icinga/jsonrpcendpoint.h +++ /dev/null @@ -1,71 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#ifndef JSONRPCENDPOINT_H -#define JSONRPCENDPOINT_H - -namespace icinga -{ - -/** - * A JSON-RPC endpoint that can be used to communicate with a remote - * Icinga instance. - * - * @ingroup icinga - */ -class I2_ICINGA_API JsonRpcEndpoint : public Endpoint -{ -public: - typedef shared_ptr Ptr; - typedef weak_ptr WeakPtr; - - void Connect(String node, String service, - shared_ptr sslContext); - - JsonRpcClient::Ptr GetClient(void); - void SetClient(JsonRpcClient::Ptr client); - - virtual String GetIdentity(void) const; - virtual String GetAddress(void) const; - - virtual bool IsLocal(void) const; - virtual bool IsConnected(void) const; - - virtual void ProcessRequest(Endpoint::Ptr sender, const RequestMessage& message); - virtual void ProcessResponse(Endpoint::Ptr sender, const ResponseMessage& message); - - virtual void Stop(void); - -private: - String m_Identity; /**< The identity of this endpoint. */ - - shared_ptr m_SSLContext; - String m_Address; - JsonRpcClient::Ptr m_Client; - - void SetAddress(String address); - - void NewMessageHandler(const MessagePart& message); - void ClientClosedHandler(void); - void ClientConnectedHandler(void); -}; - -} - -#endif /* JSONRPCENDPOINT_H */ diff --git a/icinga/virtualendpoint.cpp b/icinga/virtualendpoint.cpp deleted file mode 100644 index c82bf3f24..000000000 --- a/icinga/virtualendpoint.cpp +++ /dev/null @@ -1,97 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#include "i2-icinga.h" - -using namespace icinga; - -String VirtualEndpoint::GetIdentity(void) const -{ - return "__" + GetAddress(); -} - -String VirtualEndpoint::GetAddress(void) const -{ - char address[50]; - sprintf(address, "virtual:%p", (void *)this); - return address; -} - -bool VirtualEndpoint::IsLocal(void) const -{ - return true; -} - -bool VirtualEndpoint::IsConnected(void) const -{ - return true; -} - -void VirtualEndpoint::RegisterTopicHandler(String topic, function callback) -{ - map > >::iterator it; - it = m_TopicHandlers.find(topic); - - shared_ptr > sig; - - if (it == m_TopicHandlers.end()) { - sig = boost::make_shared >(); - m_TopicHandlers.insert(make_pair(topic, sig)); - } else { - sig = it->second; - } - - sig->connect(callback); - - RegisterSubscription(topic); -} - -void VirtualEndpoint::UnregisterTopicHandler(String topic, function callback) -{ - // TODO: implement - //m_TopicHandlers[method] -= callback; - //UnregisterMethodSubscription(method); - - throw_exception(NotImplementedException()); -} - -void VirtualEndpoint::ProcessRequest(Endpoint::Ptr sender, const RequestMessage& request) -{ - String method; - if (!request.GetMethod(&method)) - return; - - map > >::iterator it; - it = m_TopicHandlers.find(method); - - if (it == m_TopicHandlers.end()) - return; - - (*it->second)(GetSelf(), sender, request); -} - -void VirtualEndpoint::ProcessResponse(Endpoint::Ptr sender, const ResponseMessage& response) -{ - GetEndpointManager()->ProcessResponseMessage(sender, response); -} - -void VirtualEndpoint::Stop(void) -{ - /* Nothing to do here. */ -} diff --git a/icinga/virtualendpoint.h b/icinga/virtualendpoint.h deleted file mode 100644 index f81b6f833..000000000 --- a/icinga/virtualendpoint.h +++ /dev/null @@ -1,57 +0,0 @@ -/****************************************************************************** - * Icinga 2 * - * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * - * * - * This program is free software; you can redistribute it and/or * - * modify it under the terms of the GNU General Public License * - * as published by the Free Software Foundation; either version 2 * - * of the License, or (at your option) any later version. * - * * - * This program is distributed in the hope that it will be useful, * - * but WITHOUT ANY WARRANTY; without even the implied warranty of * - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * - * GNU General Public License for more details. * - * * - * You should have received a copy of the GNU General Public License * - * along with this program; if not, write to the Free Software Foundation * - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * - ******************************************************************************/ - -#ifndef VIRTUALENDPOINT_H -#define VIRTUALENDPOINT_H - -namespace icinga -{ - -/** - * A local endpoint. - * - * @ingroup icinga - */ -class I2_ICINGA_API VirtualEndpoint : public Endpoint -{ -public: - typedef shared_ptr Ptr; - typedef weak_ptr WeakPtr; - - void RegisterTopicHandler(String topic, function callback); - void UnregisterTopicHandler(String topic, function callback); - - virtual String GetIdentity(void) const; - virtual String GetAddress(void) const; - - virtual bool IsLocal(void) const; - virtual bool IsConnected(void) const; - - virtual void ProcessRequest(Endpoint::Ptr sender, const RequestMessage& message); - virtual void ProcessResponse(Endpoint::Ptr sender, const ResponseMessage& message); - - virtual void Stop(void); - -private: - map< String, shared_ptr > > m_TopicHandlers; -}; - -} - -#endif /* VIRTUALENDPOINT_H */ diff --git a/jsonrpc/jsonrpcclient.cpp b/jsonrpc/jsonrpcclient.cpp index 29cca0888..496c0f273 100644 --- a/jsonrpc/jsonrpcclient.cpp +++ b/jsonrpc/jsonrpcclient.cpp @@ -41,7 +41,9 @@ JsonRpcClient::JsonRpcClient(TcpClientRole role, shared_ptr sslContext) void JsonRpcClient::SendMessage(const MessagePart& message) { Value value = message.GetDictionary(); - NetString::WriteStringToIOQueue(this, value.Serialize()); + String json = value.Serialize(); + //std::cerr << ">> " << json << std::endl; + NetString::WriteStringToIOQueue(this, json); } /** @@ -52,6 +54,8 @@ void JsonRpcClient::DataAvailableHandler(void) String jsonString; while (NetString::ReadStringFromIOQueue(this, &jsonString)) { + //std::cerr << "<< " << jsonString << std::endl; + try { Value value = Value::Deserialize(jsonString);