Implemented replication for Endpoint objects.

This commit is contained in:
Gunnar Beutner 2012-09-03 10:28:14 +02:00
parent 082c627fb9
commit dc6246642f
37 changed files with 528 additions and 1561 deletions

View File

@ -175,6 +175,24 @@ void Dictionary::Remove(Dictionary::Iterator it)
m_Data.erase(it);
}
/**
* Makes a shallow copy of a dictionary.
*
* @returns a copy of the dictionary.
*/
Dictionary::Ptr Dictionary::ShallowClone(void) const
{
Dictionary::Ptr clone = boost::make_shared<Dictionary>();
String key;
Value value;
BOOST_FOREACH(tie(key, value), m_Data) {
clone->Set(key, value);
}
return clone;
}
/**
* Converts a JSON object to a dictionary.
*

View File

@ -50,6 +50,8 @@ public:
void Remove(const String& key);
void Remove(Iterator it);
Dictionary::Ptr ShallowClone(void) const;
static Dictionary::Ptr FromJson(cJSON *json);
cJSON *ToJson(void) const;

View File

@ -145,6 +145,11 @@ void DynamicObject::Set(const String& name, const Value& data)
InternalSetAttribute(name, data, GetCurrentTx());
}
void DynamicObject::Touch(const String& name)
{
InternalSetAttribute(name, InternalGetAttribute(name), GetCurrentTx());
}
Value DynamicObject::Get(const String& name) const
{
return InternalGetAttribute(name);

View File

@ -79,6 +79,7 @@ public:
void RegisterAttribute(const String& name, DynamicAttributeType type);
void Set(const String& name, const Value& data);
void Touch(const String& name);
Value Get(const String& name) const;
bool HasAttribute(const String& name) const;
@ -162,8 +163,11 @@ shared_ptr<T> DynamicObjectFactory(const Dictionary::Ptr& serializedUpdate)
return boost::make_shared<T>(serializedUpdate);
}
#define REGISTER_CLASS_ALIAS(klass, alias) \
static RegisterClassHelper g_Register ## klass(alias, DynamicObjectFactory<klass>)
#define REGISTER_CLASS(klass) \
static RegisterClassHelper g_Register ## klass(#klass, DynamicObjectFactory<klass>)
REGISTER_CLASS_ALIAS(klass, #klass)
}

View File

@ -33,6 +33,8 @@ condition_variable Process::m_TasksCV;
Process::Process(const String& command)
: AsyncTask<Process, ProcessResult>(), m_Command(command), m_UsePopen(false)
{
assert(Application::IsMainThread());
if (!m_ThreadCreated) {
thread t(&Process::WorkerThreadProc);
t.detach();

View File

@ -7,5 +7,4 @@ SUBDIRS = \
compat \
convenience \
delegation \
demo \
discovery
demo

View File

@ -23,14 +23,12 @@ using namespace icinga;
void CheckerComponent::Start(void)
{
m_Endpoint = boost::make_shared<VirtualEndpoint>();
m_Endpoint = Endpoint::MakeEndpoint("checker", true);
/* dummy registration so the delegation module knows this is a checker
TODO: figure out a better way for this */
m_Endpoint->RegisterSubscription("checker");
EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint);
Service::OnCheckerChanged.connect(bind(&CheckerComponent::CheckerChangedHandler, this, _1));
DynamicObject::OnUnregistered.connect(bind(&CheckerComponent::ServiceRemovedHandler, this, _1));
@ -50,10 +48,7 @@ void CheckerComponent::Start(void)
void CheckerComponent::Stop(void)
{
EndpointManager::Ptr mgr = EndpointManager::GetInstance();
if (mgr)
mgr->UnregisterEndpoint(m_Endpoint);
m_Endpoint->Unregister();
}
void CheckerComponent::CheckTimerHandler(void)
@ -158,7 +153,7 @@ void CheckerComponent::CheckerChangedHandler(const Service::Ptr& service)
{
String checker = service->GetChecker();
if (checker == EndpointManager::GetInstance()->GetIdentity() || checker == m_Endpoint->GetIdentity()) {
if (checker == EndpointManager::GetInstance()->GetIdentity() || checker == m_Endpoint->GetName()) {
if (m_PendingServices.find(service) != m_PendingServices.end())
return;

View File

@ -54,7 +54,7 @@ public:
virtual void Stop(void);
private:
VirtualEndpoint::Ptr m_Endpoint;
Endpoint::Ptr m_Endpoint;
ServiceSet m_IdleServices;
ServiceSet m_PendingServices;

View File

@ -26,18 +26,14 @@ using namespace icinga;
*/
void CIBSyncComponent::Start(void)
{
m_Endpoint = boost::make_shared<VirtualEndpoint>();
/* config objects */
m_Endpoint->RegisterTopicHandler("config::FetchObjects",
boost::bind(&CIBSyncComponent::FetchObjectsHandler, this, _2));
m_Endpoint = Endpoint::MakeEndpoint("cibsync", true);
DynamicObject::OnRegistered.connect(boost::bind(&CIBSyncComponent::LocalObjectRegisteredHandler, this, _1));
DynamicObject::OnUnregistered.connect(boost::bind(&CIBSyncComponent::LocalObjectUnregisteredHandler, this, _1));
DynamicObject::OnTransactionClosing.connect(boost::bind(&CIBSyncComponent::TransactionClosingHandler, this, _1));
EndpointManager::GetInstance()->OnNewEndpoint.connect(boost::bind(&CIBSyncComponent::NewEndpointHandler, this, _2));
Endpoint::OnConnected.connect(boost::bind(&CIBSyncComponent::EndpointConnectedHandler, this, _1));
m_Endpoint->RegisterTopicHandler("config::ObjectUpdate",
boost::bind(&CIBSyncComponent::RemoteObjectUpdateHandler, this, _2, _3));
m_Endpoint->RegisterTopicHandler("config::ObjectRemoved",
@ -46,8 +42,6 @@ void CIBSyncComponent::Start(void)
/* service status */
m_Endpoint->RegisterTopicHandler("checker::ServiceStateChange",
boost::bind(&CIBSyncComponent::ServiceStateChangeRequestHandler, _2, _3));
EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint);
}
/**
@ -55,10 +49,7 @@ void CIBSyncComponent::Start(void)
*/
void CIBSyncComponent::Stop(void)
{
EndpointManager::Ptr endpointManager = EndpointManager::GetInstance();
if (endpointManager)
endpointManager->UnregisterEndpoint(m_Endpoint);
m_Endpoint->Unregister();
}
void CIBSyncComponent::ServiceStateChangeRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
@ -84,21 +75,28 @@ void CIBSyncComponent::ServiceStateChangeRequestHandler(const Endpoint::Ptr& sen
CIB::UpdateTaskStatistics(now, 1);
}
void CIBSyncComponent::NewEndpointHandler(const Endpoint::Ptr& endpoint)
void CIBSyncComponent::EndpointConnectedHandler(const Endpoint::Ptr& endpoint)
{
/* no need to sync the config with local endpoints */
if (endpoint->IsLocal())
if (endpoint->IsLocalEndpoint())
return;
endpoint->OnSessionEstablished.connect(boost::bind(&CIBSyncComponent::SessionEstablishedHandler, this, _1));
}
/* we just assume the other endpoint wants object updates */
endpoint->RegisterSubscription("config::ObjectUpdate");
endpoint->RegisterSubscription("config::ObjectRemoved");
void CIBSyncComponent::SessionEstablishedHandler(const Endpoint::Ptr& endpoint)
{
RequestMessage request;
request.SetMethod("config::FetchObjects");
pair<DynamicObject::TypeMap::iterator, DynamicObject::TypeMap::iterator> trange = DynamicObject::GetTypes();
DynamicObject::TypeMap::iterator tt;
for (tt = trange.first; tt != trange.second; tt++) {
DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), tt->second) {
if (!ShouldReplicateObject(object))
continue;
EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, endpoint, request);
RequestMessage request = MakeObjectMessage(object, "config::ObjectUpdate", 0, true);
EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, endpoint, request);
}
}
}
RequestMessage CIBSyncComponent::MakeObjectMessage(const DynamicObject::Ptr& object, const String& method, double sinceTx, bool includeProperties)
@ -123,23 +121,6 @@ bool CIBSyncComponent::ShouldReplicateObject(const DynamicObject::Ptr& object)
return (!object->IsLocal());
}
void CIBSyncComponent::FetchObjectsHandler(const Endpoint::Ptr& sender)
{
pair<DynamicObject::TypeMap::iterator, DynamicObject::TypeMap::iterator> trange = DynamicObject::GetTypes();
DynamicObject::TypeMap::iterator tt;
for (tt = trange.first; tt != trange.second; tt++) {
DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), tt->second) {
if (!ShouldReplicateObject(object))
continue;
RequestMessage request = MakeObjectMessage(object, "config::ObjectUpdate", 0, true);
EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, sender, request);
}
}
}
void CIBSyncComponent::LocalObjectRegisteredHandler(const DynamicObject::Ptr& object)
{
if (!ShouldReplicateObject(object))
@ -212,7 +193,7 @@ void CIBSyncComponent::RemoteObjectUpdateHandler(const Endpoint::Ptr& sender, co
}
if (object->GetSource().IsEmpty())
object->SetSource(sender->GetIdentity());
object->SetSource(sender->GetName());
object->Register();
} else {

View File

@ -33,18 +33,16 @@ public:
virtual void Stop(void);
private:
VirtualEndpoint::Ptr m_Endpoint;
Endpoint::Ptr m_Endpoint;
static void ServiceStateChangeRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request);
void NewEndpointHandler(const Endpoint::Ptr& endpoint);
void SessionEstablishedHandler(const Endpoint::Ptr& endpoint);
void EndpointConnectedHandler(const Endpoint::Ptr& endpoint);
void LocalObjectRegisteredHandler(const DynamicObject::Ptr& object);
void LocalObjectUnregisteredHandler(const DynamicObject::Ptr& object);
void TransactionClosingHandler(const set<DynamicObject::Ptr>& modifiedObjects);
void FetchObjectsHandler(const Endpoint::Ptr& sender);
void RemoteObjectUpdateHandler(const Endpoint::Ptr& sender, const RequestMessage& request);
void RemoteObjectRemovedHandler(const RequestMessage& request);

View File

@ -40,9 +40,9 @@ vector<Endpoint::Ptr> DelegationComponent::GetCheckerCandidates(const Service::P
{
vector<Endpoint::Ptr> candidates;
EndpointManager::Iterator it;
for (it = EndpointManager::GetInstance()->Begin(); it != EndpointManager::GetInstance()->End(); it++) {
Endpoint::Ptr endpoint = it->second;
DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) {
Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
/* ignore disconnected endpoints */
if (!endpoint->IsConnected())
@ -53,7 +53,7 @@ vector<Endpoint::Ptr> DelegationComponent::GetCheckerCandidates(const Service::P
continue;
/* ignore endpoints that aren't allowed to check this service */
if (!service->IsAllowedChecker(it->first))
if (!service->IsAllowedChecker(endpoint->GetName()))
continue;
candidates.push_back(endpoint);
@ -66,14 +66,16 @@ void DelegationComponent::DelegationTimerHandler(void)
{
map<Endpoint::Ptr, int> histogram;
EndpointManager::Iterator eit;
for (eit = EndpointManager::GetInstance()->Begin(); eit != EndpointManager::GetInstance()->End(); eit++)
histogram[eit->second] = 0;
DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) {
Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
histogram[endpoint] = 0;
}
vector<Service::Ptr> services;
/* build "checker -> service count" histogram */
DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Service")) {
Service::Ptr service = dynamic_pointer_cast<Service>(object);
@ -86,10 +88,11 @@ void DelegationComponent::DelegationTimerHandler(void)
if (checker.IsEmpty())
continue;
Endpoint::Ptr endpoint = EndpointManager::GetInstance()->GetEndpointByIdentity(checker);
if (!endpoint)
if (!Endpoint::Exists(checker))
continue;
Endpoint::Ptr endpoint = Endpoint::GetByName(checker);
histogram[endpoint]++;
}
@ -102,8 +105,8 @@ void DelegationComponent::DelegationTimerHandler(void)
String checker = service->GetChecker();
Endpoint::Ptr oldEndpoint;
if (!checker.IsEmpty())
oldEndpoint = EndpointManager::GetInstance()->GetEndpointByIdentity(checker);
if (Endpoint::Exists(checker))
oldEndpoint = Endpoint::GetByName(checker);
vector<Endpoint::Ptr> candidates = GetCheckerCandidates(service);
@ -146,7 +149,7 @@ void DelegationComponent::DelegationTimerHandler(void)
if (histogram[candidate] > avg_services)
continue;
service->SetChecker(candidate->GetIdentity());
service->SetChecker(candidate->GetName());
histogram[candidate]++;
delegated++;
@ -161,7 +164,7 @@ void DelegationComponent::DelegationTimerHandler(void)
int count;
BOOST_FOREACH(tie(endpoint, count), histogram) {
stringstream msgbuf;
msgbuf << "histogram: " << endpoint->GetIdentity() << " - " << count;
msgbuf << "histogram: " << endpoint->GetName() << " - " << count;
Logger::Write(LogInformation, "delegation", msgbuf.str());
}

View File

@ -26,10 +26,9 @@ using namespace icinga;
*/
void DemoComponent::Start(void)
{
m_Endpoint = boost::make_shared<VirtualEndpoint>();
m_Endpoint = Endpoint::MakeEndpoint("demo", true);
m_Endpoint->RegisterTopicHandler("demo::HelloWorld",
boost::bind(&DemoComponent::HelloWorldRequestHandler, this, _2, _3));
EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint);
m_DemoTimer = boost::make_shared<Timer>();
m_DemoTimer->SetInterval(5);
@ -42,10 +41,7 @@ void DemoComponent::Start(void)
*/
void DemoComponent::Stop(void)
{
EndpointManager::Ptr endpointManager = EndpointManager::GetInstance();
if (endpointManager)
endpointManager->UnregisterEndpoint(m_Endpoint);
m_Endpoint->Unregister();
}
/**
@ -68,7 +64,7 @@ void DemoComponent::DemoTimerHandler(void)
*/
void DemoComponent::HelloWorldRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
{
Logger::Write(LogInformation, "demo", "Got 'hello world' from address=" + sender->GetAddress() + ", identity=" + sender->GetIdentity());
Logger::Write(LogInformation, "demo", "Got 'hello world' from address=" + sender->GetAddress() + ", identity=" + sender->GetName());
}
EXPORT_COMPONENT(demo, DemoComponent);

View File

@ -34,7 +34,7 @@ public:
private:
Timer::Ptr m_DemoTimer;
VirtualEndpoint::Ptr m_Endpoint;
Endpoint::Ptr m_Endpoint;
void DemoTimerHandler(void);
void HelloWorldRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request);

View File

@ -1,35 +0,0 @@
## Process this file with automake to produce Makefile.in
pkglib_LTLIBRARIES = \
discovery.la
discovery_la_SOURCES = \
discoverycomponent.cpp \
discoverycomponent.h \
discoverymessage.cpp \
discoverymessage.h \
i2-discovery.h
discovery_la_CPPFLAGS = \
$(BOOST_CPPFLAGS) \
-I${top_srcdir}/base \
-I${top_srcdir}/dyn \
-I${top_srcdir}/jsonrpc \
-I${top_srcdir}/icinga \
-I${top_srcdir}/cib
discovery_la_LDFLAGS = \
$(BOOST_LDFLAGS) \
-module \
-no-undefined \
@RELEASE_INFO@ \
@VERSION_INFO@
discovery_la_LIBADD = \
$(BOOST_SIGNALS_LIB) \
$(BOOST_THREAD_LIB) \
${top_builddir}/base/libbase.la \
${top_builddir}/dyn/libdyn.la \
${top_builddir}/jsonrpc/libjsonrpc.la \
${top_builddir}/icinga/libicinga.la \
${top_builddir}/cib/libcib.la

View File

@ -1,94 +0,0 @@
<?xml version="1.0" encoding="utf-8"?>
<Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup Label="ProjectConfigurations">
<ProjectConfiguration Include="Debug|Win32">
<Configuration>Debug</Configuration>
<Platform>Win32</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Release|Win32">
<Configuration>Release</Configuration>
<Platform>Win32</Platform>
</ProjectConfiguration>
</ItemGroup>
<PropertyGroup Label="Globals">
<ProjectGuid>{EAD41628-BB96-4F99-9070-8A9676801295}</ProjectGuid>
<Keyword>Win32Proj</Keyword>
<RootNamespace>discovery</RootNamespace>
</PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'" Label="Configuration">
<ConfigurationType>DynamicLibrary</ConfigurationType>
<UseDebugLibraries>true</UseDebugLibraries>
<CharacterSet>MultiByte</CharacterSet>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'" Label="Configuration">
<ConfigurationType>DynamicLibrary</ConfigurationType>
<UseDebugLibraries>false</UseDebugLibraries>
<WholeProgramOptimization>true</WholeProgramOptimization>
<CharacterSet>MultiByte</CharacterSet>
</PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
<ImportGroup Label="ExtensionSettings">
</ImportGroup>
<ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup>
<ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup>
<PropertyGroup Label="UserMacros" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<LinkIncremental>true</LinkIncremental>
<IncludePath>$(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\icinga;$(SolutionDir)\cib;$(SolutionDir)\dyn;$(IncludePath)</IncludePath>
<LibraryPath>$(OutDir);$(LibraryPath)</LibraryPath>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<LinkIncremental>false</LinkIncremental>
<IncludePath>$(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\icinga;$(SolutionDir)\cib;$(SolutionDir)\dyn;$(IncludePath)</IncludePath>
<LibraryPath>$(OutDir);$(LibraryPath)</LibraryPath>
</PropertyGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<ClCompile>
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;_DEBUG;_WINDOWS;_USRDLL;DISCOVERY_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<WarningLevel>Level3</WarningLevel>
<MinimalRebuild>false</MinimalRebuild>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
<GenerateDebugInformation>true</GenerateDebugInformation>
<AdditionalDependencies>base.lib;jsonrpc.lib;icinga.lib;cib.lib;%(AdditionalDependencies)</AdditionalDependencies>
</Link>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<ClCompile>
<Optimization>MaxSpeed</Optimization>
<FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions>
<PreprocessorDefinitions>WIN32;NDEBUG;_WINDOWS;_USRDLL;DISCOVERY_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<WarningLevel>Level3</WarningLevel>
<MinimalRebuild>false</MinimalRebuild>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
<GenerateDebugInformation>true</GenerateDebugInformation>
<EnableCOMDATFolding>true</EnableCOMDATFolding>
<OptimizeReferences>true</OptimizeReferences>
<AdditionalDependencies>base.lib;jsonrpc.lib;icinga.lib;cib.lib;%(AdditionalDependencies)</AdditionalDependencies>
</Link>
</ItemDefinitionGroup>
<ItemGroup>
<ClCompile Include="discoverycomponent.cpp" />
<ClCompile Include="discoverymessage.cpp" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="discoverycomponent.h" />
<ClInclude Include="discoverymessage.h" />
<ClInclude Include="i2-discovery.h" />
</ItemGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>
</Project>

View File

@ -1,30 +0,0 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup>
<ClCompile Include="discoverycomponent.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="discoverymessage.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="discoverycomponent.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="i2-discovery.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="discoverymessage.h">
<Filter>Headerdateien</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<Filter Include="Headerdateien">
<UniqueIdentifier>{53341f7e-6bad-4cf1-92cf-be906efe1704}</UniqueIdentifier>
</Filter>
<Filter Include="Quelldateien">
<UniqueIdentifier>{c7b2deba-743b-4449-ae46-0b7ba1b1350a}</UniqueIdentifier>
</Filter>
</ItemGroup>
</Project>

View File

@ -1,448 +0,0 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "i2-discovery.h"
using namespace icinga;
/**
* Starts the discovery component.
*/
void DiscoveryComponent::Start(void)
{
m_Endpoint = boost::make_shared<VirtualEndpoint>();
m_Endpoint->RegisterTopicHandler("discovery::RegisterComponent",
boost::bind(&DiscoveryComponent::RegisterComponentMessageHandler, this, _2, _3));
m_Endpoint->RegisterTopicHandler("discovery::NewComponent",
boost::bind(&DiscoveryComponent::NewComponentMessageHandler, this, _3));
m_Endpoint->RegisterTopicHandler("discovery::Welcome",
boost::bind(&DiscoveryComponent::WelcomeMessageHandler, this, _2, _3));
EndpointManager::GetInstance()->ForEachEndpoint(boost::bind(&DiscoveryComponent::NewEndpointHandler, this, _2));
EndpointManager::GetInstance()->OnNewEndpoint.connect(boost::bind(&DiscoveryComponent::NewEndpointHandler, this, _2));
EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint);
/* create the reconnect timer */
m_DiscoveryTimer = boost::make_shared<Timer>();
m_DiscoveryTimer->SetInterval(30);
m_DiscoveryTimer->OnTimerExpired.connect(boost::bind(&DiscoveryComponent::DiscoveryTimerHandler, this));
m_DiscoveryTimer->Start();
/* call the timer as soon as possible */
m_DiscoveryTimer->Reschedule(0);
}
/**
* Stops the discovery component.
*/
void DiscoveryComponent::Stop(void)
{
EndpointManager::Ptr mgr = EndpointManager::GetInstance();
if (mgr)
mgr->UnregisterEndpoint(m_Endpoint);
}
/**
* Checks whether the specified endpoint is already connected
* and disconnects older endpoints.
*
* @param self The endpoint that is to be checked.
* @param other The other endpoint.
*/
void DiscoveryComponent::CheckExistingEndpoint(const Endpoint::Ptr& self, const Endpoint::Ptr& other)
{
if (self == other)
return;
if (!other->IsConnected())
return;
if (self->GetIdentity() == other->GetIdentity()) {
Logger::Write(LogWarning, "discovery", "Detected duplicate identity:" + other->GetIdentity() + " - Disconnecting old endpoint.");
other->Stop();
EndpointManager::GetInstance()->UnregisterEndpoint(other);
}
}
/**
* Deals with a new endpoint.
*
* @param endpoint The endpoint.
*/
void DiscoveryComponent::NewEndpointHandler(const Endpoint::Ptr& endpoint)
{
/* immediately finish session setup for local endpoints */
if (endpoint->IsLocal()) {
endpoint->OnSessionEstablished(endpoint);
return;
}
String identity = endpoint->GetIdentity();
if (identity == EndpointManager::GetInstance()->GetIdentity()) {
Logger::Write(LogWarning, "discovery", "Detected loop-back connection - Disconnecting endpoint.");
endpoint->Stop();
EndpointManager::GetInstance()->UnregisterEndpoint(endpoint);
return;
}
EndpointManager::GetInstance()->ForEachEndpoint(boost::bind(&DiscoveryComponent::CheckExistingEndpoint, this, endpoint, _2));
// we assume the other component _always_ wants
// discovery::RegisterComponent messages from us
endpoint->RegisterSubscription("discovery::RegisterComponent");
// send a discovery::RegisterComponent message, if the
// other component is a broker this makes sure
// the broker knows about our message types
SendDiscoveryMessage("discovery::RegisterComponent", EndpointManager::GetInstance()->GetIdentity(), endpoint);
map<String, ComponentDiscoveryInfo::Ptr>::iterator ic;
// we assume the other component _always_ wants
// discovery::NewComponent messages from us
endpoint->RegisterSubscription("discovery::NewComponent");
// send discovery::NewComponent message for ourselves
SendDiscoveryMessage("discovery::NewComponent", EndpointManager::GetInstance()->GetIdentity(), endpoint);
// send discovery::NewComponent messages for all components
// we know about
for (ic = m_Components.begin(); ic != m_Components.end(); ic++) {
SendDiscoveryMessage("discovery::NewComponent", ic->first, endpoint);
}
// check if we already know the other component
ic = m_Components.find(endpoint->GetIdentity());
if (ic == m_Components.end()) {
// we don't know the other component yet, so
// wait until we get a discovery::NewComponent message
// from a broker
return;
}
// register published/subscribed topics for this endpoint
ComponentDiscoveryInfo::Ptr info = ic->second;
BOOST_FOREACH(String subscription, info->Subscriptions) {
endpoint->RegisterSubscription(subscription);
}
FinishDiscoverySetup(endpoint);
}
/**
* Registers message Subscriptions/sources in the specified component information object.
*
* @param neea Event arguments for the endpoint.
* @param info Component information object.
* @return 0
*/
void DiscoveryComponent::DiscoveryEndpointHandler(const Endpoint::Ptr& endpoint, const ComponentDiscoveryInfo::Ptr& info) const
{
Endpoint::ConstTopicIterator i;
for (i = endpoint->BeginSubscriptions(); i != endpoint->EndSubscriptions(); i++)
info->Subscriptions.insert(*i);
}
/**
* Retrieves the component information object for the specified component.
*
* @param component The identity of the component.
* @param info Pointer to the information object.
* @returns true if the info object was successfully retrieved, false otherwise.
*/
bool DiscoveryComponent::GetComponentDiscoveryInfo(String component, ComponentDiscoveryInfo::Ptr *info) const
{
if (component == EndpointManager::GetInstance()->GetIdentity()) {
/* Build fake discovery info for ourselves */
*info = boost::make_shared<ComponentDiscoveryInfo>();
EndpointManager::GetInstance()->ForEachEndpoint(boost::bind(&DiscoveryComponent::DiscoveryEndpointHandler, this, _2, *info));
(*info)->LastSeen = 0;
(*info)->Node = IcingaApplication::GetInstance()->GetNode();
(*info)->Service = IcingaApplication::GetInstance()->GetService();
return true;
}
map<String, ComponentDiscoveryInfo::Ptr>::const_iterator i;
i = m_Components.find(component);
if (i == m_Components.end())
return false;
*info = i->second;
return true;
}
/**
* Processes discovery::Welcome messages.
*
* @param nrea Event arguments for the request.
* @returns 0
*/
void DiscoveryComponent::WelcomeMessageHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
{
if (sender->HasReceivedWelcome())
return;
sender->SetReceivedWelcome(true);
if (sender->HasSentWelcome())
sender->OnSessionEstablished(sender);
}
/**
* Finishes the welcome handshake for a new component
* by registering message Subscriptions/sources for the component
* and sending a welcome message if necessary.
*
* @param endpoint The endpoint to set up.
*/
void DiscoveryComponent::FinishDiscoverySetup(const Endpoint::Ptr& endpoint)
{
if (endpoint->HasSentWelcome())
return;
// we assume the other component _always_ wants
// discovery::Welcome messages from us
endpoint->RegisterSubscription("discovery::Welcome");
RequestMessage request;
request.SetMethod("discovery::Welcome");
EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, endpoint, request);
endpoint->SetSentWelcome(true);
if (endpoint->HasReceivedWelcome())
endpoint->OnSessionEstablished(endpoint);
}
/**
* Sends a discovery message for the specified identity using the
* specified message type.
*
* @param method The method to use for the message ("discovery::NewComponent" or "discovery::RegisterComponent").
* @param identity The identity of the component for which a message should be sent.
* @param recipient The recipient of the message. A multicast message is sent if this parameter is empty.
*/
void DiscoveryComponent::SendDiscoveryMessage(const String& method, const String& identity, const Endpoint::Ptr& recipient)
{
RequestMessage request;
request.SetMethod(method);
DiscoveryMessage params;
request.SetParams(params);
params.SetIdentity(identity);
ComponentDiscoveryInfo::Ptr info;
if (!GetComponentDiscoveryInfo(identity, &info))
return;
if (!info->Node.IsEmpty() && !info->Service.IsEmpty()) {
params.SetNode(info->Node);
params.SetService(info->Service);
}
set<String>::iterator i;
Dictionary::Ptr subscriptions = boost::make_shared<Dictionary>();
BOOST_FOREACH(String subscription, info->Subscriptions) {
subscriptions->Add(subscription);
}
params.SetSubscriptions(subscriptions);
if (recipient)
EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, recipient, request);
else
EndpointManager::GetInstance()->SendMulticastMessage(m_Endpoint, request);
}
/**
* Processes a discovery message by registering the component in the
* discovery component registry.
*
* @param identity The authorative identity of the component.
* @param message The discovery message.
* @param trusted Whether the message comes from a trusted source (i.e. a broker).
*/
void DiscoveryComponent::ProcessDiscoveryMessage(const String& identity, const DiscoveryMessage& message, bool trusted)
{
/* ignore discovery messages that are about ourselves */
if (identity == EndpointManager::GetInstance()->GetIdentity())
return;
ComponentDiscoveryInfo::Ptr info = boost::make_shared<ComponentDiscoveryInfo>();
info->LastSeen = Utility::GetTime();
String node;
if (message.GetNode(&node) && !node.IsEmpty())
info->Node = node;
String service;
if (message.GetService(&service) && !service.IsEmpty())
info->Service = service;
DynamicObject::Ptr endpointConfig = DynamicObject::GetObject("Endpoint", identity);
Dictionary::Ptr roles;
if (endpointConfig)
roles = endpointConfig->Get("roles");
Endpoint::Ptr endpoint = EndpointManager::GetInstance()->GetEndpointByIdentity(identity);
Dictionary::Ptr subscriptions;
if (message.GetSubscriptions(&subscriptions)) {
Value subscription;
BOOST_FOREACH(tie(tuples::ignore, subscription), subscriptions) {
info->Subscriptions.insert(subscription);
if (endpoint)
endpoint->RegisterSubscription(subscription);
}
}
map<String, ComponentDiscoveryInfo::Ptr>::iterator i;
i = m_Components.find(identity);
if (i != m_Components.end())
m_Components.erase(i);
m_Components[identity] = info;
SendDiscoveryMessage("discovery::NewComponent", identity, Endpoint::Ptr());
/* don't send a welcome message for discovery::NewComponent messages */
if (endpoint && !trusted)
FinishDiscoverySetup(endpoint);
}
/**
* Processes "discovery::NewComponent" messages.
*
* @param nrea Event arguments for the request.
*/
void DiscoveryComponent::NewComponentMessageHandler(const RequestMessage& request)
{
DiscoveryMessage message;
request.GetParams(&message);
String identity;
if (!message.GetIdentity(&identity))
return;
ProcessDiscoveryMessage(identity, message, true);
}
/**
* Processes "discovery::RegisterComponent" messages.
*
* @param nrea Event arguments for the request.
*/
void DiscoveryComponent::RegisterComponentMessageHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
{
DiscoveryMessage message;
request.GetParams(&message);
ProcessDiscoveryMessage(sender->GetIdentity(), message, false);
}
/**
* Checks whether we have to reconnect to other components and removes stale
* components from the registry.
*/
void DiscoveryComponent::DiscoveryTimerHandler(void)
{
EndpointManager::Ptr endpointManager = EndpointManager::GetInstance();
double now = Utility::GetTime();
/* check whether we have to reconnect to one of our upstream endpoints */
DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) {
/* Check if we're already connected to this endpoint. */
if (endpointManager->GetEndpointByIdentity(object->GetName()))
continue;
String node = object->Get("node");
String service = object->Get("service");
if (!node.IsEmpty() && !service.IsEmpty()) {
/* reconnect to this endpoint */
endpointManager->AddConnection(node, service);
}
}
map<String, ComponentDiscoveryInfo::Ptr>::iterator curr, i;
for (i = m_Components.begin(); i != m_Components.end(); ) {
const String& identity = i->first;
const ComponentDiscoveryInfo::Ptr& info = i->second;
curr = i;
i++;
/* there's no need to reconnect to ourself */
if (identity == EndpointManager::GetInstance()->GetIdentity())
continue;
/* for explicitly-configured upstream endpoints
* we prefer to use the node/service from the
* config object - which is what the for loop above does */
if (DynamicObject::GetObject("endpoint", identity))
continue;
if (info->LastSeen < now - DiscoveryComponent::RegistrationTTL) {
/* unregister this component if its registration has expired */
m_Components.erase(curr);
continue;
}
/* send discovery message to all connected components to
refresh their TTL for this component */
SendDiscoveryMessage("discovery::NewComponent", identity, Endpoint::Ptr());
Endpoint::Ptr endpoint = endpointManager->GetEndpointByIdentity(identity);
if (endpoint && endpoint->IsConnected()) {
/* update LastSeen if we're still connected to this endpoint */
info->LastSeen = now;
} else {
/* try and reconnect to this component */
try {
if (!info->Node.IsEmpty() && !info->Service.IsEmpty())
endpointManager->AddConnection(info->Node, info->Service);
} catch (const exception& ex) {
stringstream msgbuf;
msgbuf << "Exception while trying to reconnect to endpoint '" << endpoint->GetIdentity() << "': " << ex.what();;
Logger::Write(LogInformation, "discovery", msgbuf.str());
}
}
}
}
EXPORT_COMPONENT(discovery, DiscoveryComponent);

View File

@ -1,82 +0,0 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef DISCOVERYCOMPONENT_H
#define DISCOVERYCOMPONENT_H
namespace icinga
{
/**
* @ingroup discovery
*/
class ComponentDiscoveryInfo : public Object
{
public:
typedef shared_ptr<ComponentDiscoveryInfo> Ptr;
typedef weak_ptr<ComponentDiscoveryInfo> WeakPtr;
String Node;
String Service;
set<String> Subscriptions;
set<String> Publications;
double LastSeen;
};
/**
* @ingroup discovery
*/
class DiscoveryComponent : public IComponent
{
public:
virtual void Start(void);
virtual void Stop(void);
private:
VirtualEndpoint::Ptr m_Endpoint;
map<String, ComponentDiscoveryInfo::Ptr> m_Components;
Timer::Ptr m_DiscoveryTimer;
void NewEndpointHandler(const Endpoint::Ptr& endpoint);
void NewComponentMessageHandler(const RequestMessage& request);
void RegisterComponentMessageHandler(const Endpoint::Ptr& sender, const RequestMessage& request);
void WelcomeMessageHandler(const Endpoint::Ptr& sender, const RequestMessage& request);
void SendDiscoveryMessage(const String& method, const String& identity, const Endpoint::Ptr& recipient);
void ProcessDiscoveryMessage(const String& identity, const DiscoveryMessage& message, bool trusted);
bool GetComponentDiscoveryInfo(String component, ComponentDiscoveryInfo::Ptr *info) const;
void CheckExistingEndpoint(const Endpoint::Ptr& self, const Endpoint::Ptr& other);
void DiscoveryEndpointHandler(const Endpoint::Ptr& endpoint, const ComponentDiscoveryInfo::Ptr& info) const;
void DiscoveryTimerHandler(void);
void FinishDiscoverySetup(const Endpoint::Ptr& endpoint);
static const int RegistrationTTL = 300;
};
}
#endif /* DISCOVERYCOMPONENT_H */

View File

@ -1,71 +0,0 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "i2-discovery.h"
using namespace icinga;
DiscoveryMessage::DiscoveryMessage(void)
: MessagePart()
{ }
DiscoveryMessage::DiscoveryMessage(const MessagePart& message)
: MessagePart(message)
{ }
bool DiscoveryMessage::GetIdentity(String *value) const
{
return Get("identity", value);
}
void DiscoveryMessage::SetIdentity(const String& value)
{
Set("identity", value);
}
bool DiscoveryMessage::GetNode(String *value) const
{
return Get("node", value);
}
void DiscoveryMessage::SetNode(const String& value)
{
Set("node", value);
}
bool DiscoveryMessage::GetService(String *value) const
{
return Get("service", value);
}
void DiscoveryMessage::SetService(const String& value)
{
Set("service", value);
}
bool DiscoveryMessage::GetSubscriptions(Dictionary::Ptr *value) const
{
return Get("subscriptions", value);
}
void DiscoveryMessage::SetSubscriptions(const Dictionary::Ptr& value)
{
Set("subscriptions", value);
}

View File

@ -1,50 +0,0 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef DISCOVERYMESSAGE_H
#define DISCOVERYMESSAGE_H
namespace icinga
{
/**
* @ingroup discovery
*/
class DiscoveryMessage : public MessagePart
{
public:
DiscoveryMessage(void);
DiscoveryMessage(const MessagePart& message);
bool GetIdentity(String *value) const;
void SetIdentity(const String& value);
bool GetNode(String *value) const;
void SetNode(const String& value);
bool GetService(String *value) const;
void SetService(const String& value);
bool GetSubscriptions(Dictionary::Ptr *value) const;
void SetSubscriptions(const Dictionary::Ptr& value);
};
}
#endif /* SUBSCRIPTIONMESSAGE_H */

View File

@ -1,38 +0,0 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef I2DISCOVERY_H
#define I2DISCOVERY_H
/**
* @defgroup discovery Discovery component
*
* The Discovery component takes care of connecting peers to each other
* and performs authorisation checks for the message subscriptions.
*/
#include <i2-base.h>
#include <i2-jsonrpc.h>
#include <i2-icinga.h>
#include <i2-cib.h>
#include "discoverymessage.h"
#include "discoverycomponent.h"
#endif /* I2DISCOVERY_H */

View File

@ -76,7 +76,6 @@ components/compat/Makefile
components/convenience/Makefile
components/delegation/Makefile
components/demo/Makefile
components/discovery/Makefile
dyn/Makefile
icinga/Makefile
icinga-app/Makefile

Binary file not shown.

View File

@ -33,8 +33,7 @@ icinga_LDADD = \
-dlopen ${top_builddir}/components/compat/compat.la \
-dlopen ${top_builddir}/components/convenience/convenience.la \
-dlopen ${top_builddir}/components/delegation/delegation.la \
-dlopen ${top_builddir}/components/demo/demo.la \
-dlopen ${top_builddir}/components/discovery/discovery.la
-dlopen ${top_builddir}/components/demo/demo.la
icinga_DEPENDENCIES = \
${top_builddir}/components/cibsync/cibsync.la \

View File

@ -11,11 +11,7 @@ libicinga_la_SOURCES = \
endpointmanager.h \
icingaapplication.cpp \
icingaapplication.h \
i2-icinga.h \
jsonrpcendpoint.cpp \
jsonrpcendpoint.h \
virtualendpoint.cpp \
virtualendpoint.h
i2-icinga.h
libicinga_la_CPPFLAGS = \
-DI2_ICINGA_BUILD \

View File

@ -21,24 +21,109 @@
using namespace icinga;
/**
* Retrieves the endpoint manager this endpoint is registered with.
*
* @returns The EndpointManager object.
*/
EndpointManager::Ptr Endpoint::GetEndpointManager(void) const
REGISTER_CLASS(Endpoint);
boost::signal<void (const Endpoint::Ptr&)> Endpoint::OnConnected;
boost::signal<void (const Endpoint::Ptr&)> Endpoint::OnDisconnected;
boost::signal<void (const Endpoint::Ptr&, const String& topic)> Endpoint::OnSubscriptionRegistered;
boost::signal<void (const Endpoint::Ptr&, const String& topic)> Endpoint::OnSubscriptionUnregistered;
Endpoint::Endpoint(const Dictionary::Ptr& serializedUpdate)
: DynamicObject(serializedUpdate)
{
return m_EndpointManager.lock();
RegisterAttribute("node", Attribute_Replicated);
RegisterAttribute("service", Attribute_Replicated);
RegisterAttribute("local", Attribute_Config);
RegisterAttribute("subscriptions", Attribute_Replicated);
RegisterAttribute("client", Attribute_Transient);
}
bool Endpoint::Exists(const String& name)
{
return (DynamicObject::GetObject("Endpoint", name));
}
Endpoint::Ptr Endpoint::GetByName(const String& name)
{
DynamicObject::Ptr configObject = DynamicObject::GetObject("Endpoint", name);
if (!configObject)
throw_exception(invalid_argument("Endpoint '" + name + "' does not exist."));
return dynamic_pointer_cast<Endpoint>(configObject);
}
Endpoint::Ptr Endpoint::MakeEndpoint(const String& name, bool local)
{
ConfigItemBuilder::Ptr endpointConfig = boost::make_shared<ConfigItemBuilder>();
endpointConfig->SetType("Endpoint");
endpointConfig->SetName(local ? "local:" + name : name);
endpointConfig->SetLocal(local ? 1 : 0);
endpointConfig->AddExpression("local", OperatorSet, local);
DynamicObject::Ptr object = endpointConfig->Compile()->Commit();
return dynamic_pointer_cast<Endpoint>(object);
}
/**
* Sets the endpoint manager this endpoint is registered with.
* Checks whether this is a local endpoint.
*
* @param manager The EndpointManager object.
* @returns true if this is a local endpoint, false otherwise.
*/
void Endpoint::SetEndpointManager(EndpointManager::WeakPtr manager)
bool Endpoint::IsLocalEndpoint(void) const
{
m_EndpointManager = manager;
Value value = Get("local");
return (!value.IsEmpty() && value);
}
/**
* Checks whether this endpoint is connected.
*
* @returns true if the endpoint is connected, false otherwise.
*/
bool Endpoint::IsConnected(void) const
{
if (IsLocalEndpoint()) {
return true;
} else {
JsonRpcClient::Ptr client = GetClient();
return (client && client->IsConnected());
}
}
/**
* Retrieves the address for the endpoint.
*
* @returns The endpoint's address.
*/
String Endpoint::GetAddress(void) const
{
if (IsLocalEndpoint()) {
return "local:" + GetName();
} else {
JsonRpcClient::Ptr client = GetClient();
if (!client)
return "<disconnected endpoint>";
return client->GetPeerAddress();
}
}
JsonRpcClient::Ptr Endpoint::GetClient(void) const
{
return Get("client");
}
void Endpoint::SetClient(const JsonRpcClient::Ptr& client)
{
Set("client", client);
client->OnNewMessage.connect(boost::bind(&Endpoint::NewMessageHandler, this, _2));
client->OnClosed.connect(boost::bind(&Endpoint::ClientClosedHandler, this));
OnConnected(GetSelf());
}
/**
@ -46,9 +131,18 @@ void Endpoint::SetEndpointManager(EndpointManager::WeakPtr manager)
*
* @param topic The name of the topic.
*/
void Endpoint::RegisterSubscription(String topic)
void Endpoint::RegisterSubscription(const String& topic)
{
m_Subscriptions.insert(topic);
Dictionary::Ptr subscriptions = GetSubscriptions();
if (!subscriptions)
subscriptions = boost::make_shared<Dictionary>();
if (!subscriptions->Contains(topic)) {
Dictionary::Ptr newSubscriptions = subscriptions->ShallowClone();
newSubscriptions->Set(topic, topic);
SetSubscriptions(newSubscriptions);
}
}
/**
@ -56,9 +150,15 @@ void Endpoint::RegisterSubscription(String topic)
*
* @param topic The name of the topic.
*/
void Endpoint::UnregisterSubscription(String topic)
void Endpoint::UnregisterSubscription(const String& topic)
{
m_Subscriptions.erase(topic);
Dictionary::Ptr subscriptions = GetSubscriptions();
if (subscriptions && subscriptions->Contains(topic)) {
Dictionary::Ptr newSubscriptions = subscriptions->ShallowClone();
newSubscriptions->Remove(topic);
SetSubscriptions(newSubscriptions);
}
}
/**
@ -67,9 +167,11 @@ void Endpoint::UnregisterSubscription(String topic)
* @param topic The name of the topic.
* @returns true if the endpoint is subscribed to the topic, false otherwise.
*/
bool Endpoint::HasSubscription(String topic) const
bool Endpoint::HasSubscription(const String& topic) const
{
return (m_Subscriptions.find(topic) != m_Subscriptions.end());
Dictionary::Ptr subscriptions = GetSubscriptions();
return (subscriptions && subscriptions->Contains(topic));
}
/**
@ -77,65 +179,168 @@ bool Endpoint::HasSubscription(String topic) const
*/
void Endpoint::ClearSubscriptions(void)
{
m_Subscriptions.clear();
Set("subscriptions", Empty);
}
/**
* Returns the beginning of the subscriptions list.
*
* @returns An iterator that points to the first subscription.
*/
Endpoint::ConstTopicIterator Endpoint::BeginSubscriptions(void) const
Dictionary::Ptr Endpoint::GetSubscriptions(void) const
{
return m_Subscriptions.begin();
return Get("subscriptions");
}
/**
* Returns the end of the subscriptions list.
*
* @returns An iterator that points past the last subscription.
*/
Endpoint::ConstTopicIterator Endpoint::EndSubscriptions(void) const
void Endpoint::SetSubscriptions(const Dictionary::Ptr& subscriptions)
{
return m_Subscriptions.end();
Set("subscriptions", subscriptions);
}
/**
* Sets whether a welcome message has been received from this endpoint.
*
* @param value Whether we've received a welcome message.
*/
void Endpoint::SetReceivedWelcome(bool value)
void Endpoint::RegisterTopicHandler(const String& topic, const function<Endpoint::Callback>& callback)
{
m_ReceivedWelcome = value;
map<String, shared_ptr<boost::signal<Endpoint::Callback> > >::iterator it;
it = m_TopicHandlers.find(topic);
shared_ptr<boost::signal<Endpoint::Callback> > sig;
if (it == m_TopicHandlers.end()) {
sig = boost::make_shared<boost::signal<Endpoint::Callback> >();
m_TopicHandlers.insert(make_pair(topic, sig));
} else {
sig = it->second;
}
sig->connect(callback);
RegisterSubscription(topic);
}
/**
* Retrieves whether a welcome message has been received from this endpoint.
*
* @returns Whether we've received a welcome message.
*/
bool Endpoint::HasReceivedWelcome(void) const
void Endpoint::UnregisterTopicHandler(const String& topic, const function<Endpoint::Callback>& callback)
{
return m_ReceivedWelcome;
// TODO: implement
//m_TopicHandlers[method] -= callback;
//UnregisterSubscription(method);
throw_exception(NotImplementedException());
}
/**
* Sets whether a welcome message has been sent to this endpoint.
*
* @param value Whether we've sent a welcome message.
*/
void Endpoint::SetSentWelcome(bool value)
void Endpoint::OnAttributeChanged(const String& name, const Value& oldValue)
{
m_SentWelcome = value;
if (name == "subscriptions") {
Dictionary::Ptr oldSubscriptions, newSubscriptions;
if (oldValue.IsObjectType<Dictionary>())
oldSubscriptions = oldValue;
newSubscriptions = GetSubscriptions();
if (oldSubscriptions) {
String subscription;
BOOST_FOREACH(tie(tuples::ignore, subscription), oldSubscriptions) {
if (!newSubscriptions || !newSubscriptions->Contains(subscription)) {
Logger::Write(LogInformation, "icinga", "Removed subscription for '" + GetName() + "': " + subscription);
OnSubscriptionUnregistered(GetSelf(), subscription);
}
}
}
if (newSubscriptions) {
String subscription;
BOOST_FOREACH(tie(tuples::ignore, subscription), newSubscriptions) {
if (!oldSubscriptions || !oldSubscriptions->Contains(subscription)) {
Logger::Write(LogInformation, "icinga", "New subscription for '" + GetName() + "': " + subscription);
OnSubscriptionRegistered(GetSelf(), subscription);
}
}
}
}
}
/**
* Retrieves whether a welcome message has been sent to this endpoint.
*
* @returns Whether we've sent a welcome message.
*/
bool Endpoint::HasSentWelcome(void) const
void Endpoint::ProcessRequest(const Endpoint::Ptr& sender, const RequestMessage& request)
{
return m_SentWelcome;
if (!IsConnected()) {
// TODO: persist the message
return;
}
if (IsLocalEndpoint()) {
String method;
if (!request.GetMethod(&method))
return;
map<String, shared_ptr<boost::signal<Endpoint::Callback> > >::iterator it;
it = m_TopicHandlers.find(method);
if (it == m_TopicHandlers.end())
return;
(*it->second)(GetSelf(), sender, request);
} else {
GetClient()->SendMessage(request);
}
}
void Endpoint::ProcessResponse(const Endpoint::Ptr& sender, const ResponseMessage& response)
{
if (!IsConnected())
return;
if (IsLocalEndpoint())
EndpointManager::GetInstance()->ProcessResponseMessage(sender, response);
else
GetClient()->SendMessage(response);
}
void Endpoint::NewMessageHandler(const MessagePart& message)
{
Endpoint::Ptr sender = GetSelf();
if (ResponseMessage::IsResponseMessage(message)) {
/* rather than routing the message to the right virtual
* endpoint we just process it here right away. */
EndpointManager::GetInstance()->ProcessResponseMessage(sender, message);
return;
}
RequestMessage request = message;
String method;
if (!request.GetMethod(&method))
return;
String id;
if (request.GetID(&id))
EndpointManager::GetInstance()->SendAnycastMessage(sender, request);
else
EndpointManager::GetInstance()->SendMulticastMessage(sender, request);
}
void Endpoint::ClientClosedHandler(void)
{
try {
GetClient()->CheckException();
} catch (const exception& ex) {
stringstream message;
message << "Error occured for JSON-RPC socket: Message=" << ex.what();
Logger::Write(LogWarning, "jsonrpc", message.str());
}
Logger::Write(LogWarning, "jsonrpc", "Lost connection to endpoint: identity=" + GetName());
// TODO: _only_ clear non-persistent subscriptions
// unregister ourselves if no persistent subscriptions are left (use a
// timer for that, once we have a TTL property for the topics)
ClearSubscriptions();
Set("client", Empty);
OnDisconnected(GetSelf());
}
String Endpoint::GetNode(void) const
{
return Get("node");
}
String Endpoint::GetService(void) const
{
return Get("service");
}

View File

@ -30,60 +30,65 @@ class EndpointManager;
*
* @ingroup icinga
*/
class I2_ICINGA_API Endpoint : public Object
class I2_ICINGA_API Endpoint : public DynamicObject
{
public:
typedef shared_ptr<Endpoint> Ptr;
typedef weak_ptr<Endpoint> WeakPtr;
typedef set<String>::const_iterator ConstTopicIterator;
typedef void (Callback)(const Endpoint::Ptr&, const Endpoint::Ptr&, const RequestMessage&);
Endpoint(void)
: m_ReceivedWelcome(false), m_SentWelcome(false)
{ }
Endpoint(const Dictionary::Ptr& serializedUpdate);
virtual String GetIdentity(void) const = 0;
virtual String GetAddress(void) const = 0;
static bool Exists(const String& name);
static Endpoint::Ptr GetByName(const String& name);
void SetReceivedWelcome(bool value);
bool HasReceivedWelcome(void) const;
String GetAddress(void) const;
void SetSentWelcome(bool value);
bool HasSentWelcome(void) const;
JsonRpcClient::Ptr GetClient(void) const;
void SetClient(const JsonRpcClient::Ptr& client);
shared_ptr<EndpointManager> GetEndpointManager(void) const;
void SetEndpointManager(weak_ptr<EndpointManager> manager);
void RegisterSubscription(const String& topic);
void UnregisterSubscription(const String& topic);
bool HasSubscription(const String& topic) const;
void RegisterSubscription(String topic);
void UnregisterSubscription(String topic);
bool HasSubscription(String topic) const;
Dictionary::Ptr GetSubscriptions(void) const;
void SetSubscriptions(const Dictionary::Ptr& subscriptions);
virtual bool IsLocal(void) const = 0;
virtual bool IsConnected(void) const = 0;
bool IsLocalEndpoint(void) const;
bool IsConnected(void) const;
virtual void ProcessRequest(Endpoint::Ptr sender, const RequestMessage& message) = 0;
virtual void ProcessResponse(Endpoint::Ptr sender, const ResponseMessage& message) = 0;
virtual void Stop(void) = 0;
void ProcessRequest(const Endpoint::Ptr& sender, const RequestMessage& message);
void ProcessResponse(const Endpoint::Ptr& sender, const ResponseMessage& message);
void ClearSubscriptions(void);
ConstTopicIterator BeginSubscriptions(void) const;
ConstTopicIterator EndSubscriptions(void) const;
void RegisterTopicHandler(const String& topic, const function<Callback>& callback);
void UnregisterTopicHandler(const String& topic, const function<Callback>& callback);
boost::signal<void (const Endpoint::Ptr&)> OnSessionEstablished;
virtual void OnAttributeChanged(const String& name, const Value& oldValue);
String GetNode(void) const;
String GetService(void) const;
static Endpoint::Ptr MakeEndpoint(const String& name, bool local);
static boost::signal<void (const Endpoint::Ptr&)> OnConnected;
static boost::signal<void (const Endpoint::Ptr&)> OnDisconnected;
static boost::signal<void (const Endpoint::Ptr&, const String& topic)> OnSubscriptionRegistered;
static boost::signal<void (const Endpoint::Ptr&, const String& topic)> OnSubscriptionUnregistered;
private:
set<String> m_Subscriptions; /**< The topics this endpoint is
subscribed to. */
bool m_ReceivedWelcome; /**< Have we received a welcome message
from this endpoint? */
bool m_SentWelcome; /**< Have we sent a welcome message to this
endpoint? */
weak_ptr<EndpointManager> m_EndpointManager; /**< The endpoint manager
this endpoint is
registered with. */
map<String, shared_ptr<boost::signal<Callback> > > m_TopicHandlers;
void NewMessageHandler(const MessagePart& message);
void ClientClosedHandler(void);
};
}

View File

@ -31,6 +31,16 @@ EndpointManager::EndpointManager(void)
m_RequestTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::RequestTimerHandler, this));
m_RequestTimer->SetInterval(5);
m_RequestTimer->Start();
m_SubscriptionTimer = boost::make_shared<Timer>();
m_SubscriptionTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::SubscriptionTimerHandler, this));
m_SubscriptionTimer->SetInterval(10);
m_SubscriptionTimer->Start();
m_ReconnectTimer = boost::make_shared<Timer>();
m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::ReconnectTimerHandler, this));
m_ReconnectTimer->SetInterval(10);
m_ReconnectTimer->Start();
}
/**
@ -42,6 +52,16 @@ EndpointManager::EndpointManager(void)
void EndpointManager::SetIdentity(const String& identity)
{
m_Identity = identity;
if (m_Endpoint)
m_Endpoint->Unregister();
DynamicObject::Ptr object = DynamicObject::GetObject("Endpoint", identity);
if (object)
m_Endpoint = dynamic_pointer_cast<Endpoint>(object);
else
m_Endpoint = Endpoint::MakeEndpoint(identity, false);
}
/**
@ -54,26 +74,6 @@ String EndpointManager::GetIdentity(void) const
return m_Identity;
}
/**
* Sets the SSL context that is used for remote connections.
*
* @param sslContext The new SSL context.
*/
void EndpointManager::SetSSLContext(const shared_ptr<SSL_CTX>& sslContext)
{
m_SSLContext = sslContext;
}
/**
* Retrieves the SSL context that is used for remote connections.
*
* @returns The SSL context.
*/
shared_ptr<SSL_CTX> EndpointManager::GetSSLContext(void) const
{
return m_SSLContext;
}
/**
* Creates a new JSON-RPC listener on the specified port.
*
@ -81,15 +81,20 @@ shared_ptr<SSL_CTX> EndpointManager::GetSSLContext(void) const
*/
void EndpointManager::AddListener(const String& service)
{
if (!GetSSLContext())
shared_ptr<SSL_CTX> sslContext = IcingaApplication::GetInstance()->GetSSLContext();
if (!sslContext)
throw_exception(logic_error("SSL context is required for AddListener()"));
stringstream s;
s << "Adding new listener: port " << service;
Logger::Write(LogInformation, "icinga", s.str());
JsonRpcServer::Ptr server = boost::make_shared<JsonRpcServer>(m_SSLContext);
RegisterServer(server);
JsonRpcServer::Ptr server = boost::make_shared<JsonRpcServer>(sslContext);
m_Servers.insert(server);
server->OnNewClient.connect(boost::bind(&EndpointManager::NewClientHandler,
this, _2));
server->Bind(service, AF_INET6);
server->Listen();
@ -102,107 +107,47 @@ void EndpointManager::AddListener(const String& service)
* @param node The remote host.
* @param service The remote port.
*/
void EndpointManager::AddConnection(const String& node, const String& service)
{
stringstream s;
s << "Adding new endpoint: [" << node << "]:" << service;
Logger::Write(LogInformation, "icinga", s.str());
JsonRpcEndpoint::Ptr endpoint = boost::make_shared<JsonRpcEndpoint>();
RegisterEndpoint(endpoint);
endpoint->Connect(node, service, m_SSLContext);
}
/**
* Registers a new JSON-RPC server with this endpoint manager.
*
* @param server The JSON-RPC server.
*/
void EndpointManager::RegisterServer(const JsonRpcServer::Ptr& server)
{
m_Servers.push_back(server);
server->OnNewClient.connect(boost::bind(&EndpointManager::NewClientHandler,
this, _2));
void EndpointManager::AddConnection(const String& node, const String& service) {
JsonRpcClient::Ptr client = boost::make_shared<JsonRpcClient>(RoleOutbound,
IcingaApplication::GetInstance()->GetSSLContext());
client->Connect(node, service);
NewClientHandler(client);
}
/**
* Processes a new client connection.
*
* @param ncea Event arguments.
* @param client The new client.
*/
void EndpointManager::NewClientHandler(const TcpClient::Ptr& client)
{
Logger::Write(LogInformation, "icinga", "Accepted new client from " + client->GetPeerAddress());
JsonRpcClient::Ptr jclient = static_pointer_cast<JsonRpcClient>(client);
JsonRpcEndpoint::Ptr endpoint = boost::make_shared<JsonRpcEndpoint>();
endpoint->SetClient(static_pointer_cast<JsonRpcClient>(client));
client->Start();
RegisterEndpoint(endpoint);
Logger::Write(LogInformation, "icinga", "New client connection from " + jclient->GetPeerAddress());
m_PendingClients.insert(jclient);
jclient->OnConnected.connect(boost::bind(&EndpointManager::ClientConnectedHandler, this, _1));
jclient->Start();
}
/**
* Unregisters a JSON-RPC server.
*
* @param server The JSON-RPC server.
*/
void EndpointManager::UnregisterServer(const JsonRpcServer::Ptr& server)
void EndpointManager::ClientConnectedHandler(const TcpClient::Ptr& client)
{
m_Servers.erase(
remove(m_Servers.begin(), m_Servers.end(), server),
m_Servers.end());
// TODO: unbind event
}
JsonRpcClient::Ptr jclient = static_pointer_cast<JsonRpcClient>(client);
/**
* Registers a new endpoint with this endpoint manager.
*
* @param endpoint The new endpoint.
*/
void EndpointManager::RegisterEndpoint(const Endpoint::Ptr& endpoint)
{
endpoint->SetEndpointManager(GetSelf());
m_PendingClients.erase(jclient);
UnregisterEndpoint(endpoint);
shared_ptr<X509> cert = jclient->GetPeerCertificate();
String identity = endpoint->GetIdentity();
String identity = Utility::GetCertificateCN(cert);
if (!identity.IsEmpty()) {
m_Endpoints[identity] = endpoint;
OnNewEndpoint(GetSelf(), endpoint);
} else {
m_PendingEndpoints.push_back(endpoint);
}
Endpoint::Ptr endpoint;
if (endpoint->IsLocal()) {
/* this endpoint might have introduced new subscriptions
* or publications which affect remote endpoints, we need
* to close all fully-connected remote endpoints to make sure
* these subscriptions/publications are kept up-to-date. */
Iterator prev, it;
for (it = m_Endpoints.begin(); it != m_Endpoints.end(); ) {
prev = it;
it++;
if (Endpoint::Exists(identity))
endpoint = Endpoint::GetByName(identity);
else
endpoint = Endpoint::MakeEndpoint(identity, false);
if (!prev->second->IsLocal())
m_Endpoints.erase(prev);
}
}
}
/**
* Unregisters an endpoint.
*
* @param endpoint The endpoint.
*/
void EndpointManager::UnregisterEndpoint(const Endpoint::Ptr& endpoint)
{
m_PendingEndpoints.erase(
remove(m_PendingEndpoints.begin(), m_PendingEndpoints.end(), endpoint),
m_PendingEndpoints.end());
String identity = endpoint->GetIdentity();
if (!identity.IsEmpty())
m_Endpoints.erase(identity);
endpoint->SetClient(jclient);
}
/**
@ -240,8 +185,9 @@ void EndpointManager::SendAnycastMessage(const Endpoint::Ptr& sender,
throw_exception(invalid_argument("Message is missing the 'method' property."));
vector<Endpoint::Ptr> candidates;
Endpoint::Ptr endpoint;
BOOST_FOREACH(tie(tuples::ignore, endpoint), m_Endpoints) {
DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) {
Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
/* don't forward messages between non-local endpoints */
if (!sender->IsLocal() && !endpoint->IsLocal())
continue;
@ -275,8 +221,10 @@ void EndpointManager::SendMulticastMessage(const Endpoint::Ptr& sender,
if (!message.GetMethod(&method))
throw_exception(invalid_argument("Message is missing the 'method' property."));
Endpoint::Ptr recipient;
BOOST_FOREACH(tie(tuples::ignore, recipient), m_Endpoints) {
DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) {
Endpoint::Ptr recipient = dynamic_pointer_cast<Endpoint>(object);
/* don't forward messages back to the sender */
if (sender == recipient)
continue;
@ -291,31 +239,16 @@ void EndpointManager::SendMulticastMessage(const Endpoint::Ptr& sender,
*
* @param callback The callback function.
*/
void EndpointManager::ForEachEndpoint(function<void (const EndpointManager::Ptr&, const Endpoint::Ptr&)> callback)
{
map<String, Endpoint::Ptr>::iterator prev, i;
for (i = m_Endpoints.begin(); i != m_Endpoints.end(); ) {
prev = i;
i++;
callback(GetSelf(), prev->second);
}
}
/**
* Retrieves an endpoint that has the specified identity.
*
* @param identity The identity of the endpoint.
*/
Endpoint::Ptr EndpointManager::GetEndpointByIdentity(const String& identity) const
{
map<String, Endpoint::Ptr>::const_iterator i;
i = m_Endpoints.find(identity);
if (i != m_Endpoints.end())
return i->second;
else
return Endpoint::Ptr();
}
//void EndpointManager::ForEachEndpoint(function<void (const EndpointManager::Ptr&, const Endpoint::Ptr&)> callback)
//{
// map<String, Endpoint::Ptr>::iterator prev, i;
// for (i = m_Endpoints.begin(); i != m_Endpoints.end(); ) {
// prev = i;
// i++;
//
// callback(GetSelf(), prev->second);
// }
//}
void EndpointManager::SendAPIMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient,
RequestMessage& message,
@ -348,6 +281,46 @@ bool EndpointManager::RequestTimeoutLessComparer(const pair<String, PendingReque
return a.second.Timeout < b.second.Timeout;
}
void EndpointManager::SubscriptionTimerHandler(void)
{
Dictionary::Ptr subscriptions = boost::make_shared<Dictionary>();
DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) {
Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
if (!endpoint->IsLocalEndpoint())
continue;
String topic;
BOOST_FOREACH(tie(tuples::ignore, topic), endpoint->GetSubscriptions()) {
subscriptions->Set(topic, topic);
}
}
m_Endpoint->SetSubscriptions(subscriptions);
}
void EndpointManager::ReconnectTimerHandler(void)
{
DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) {
Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
if (endpoint->IsConnected())
continue;
String node, service;
node = endpoint->GetNode();
service = endpoint->GetService();
if (node.IsEmpty() || service.IsEmpty())
continue;
AddConnection(node, service);
}
}
void EndpointManager::RequestTimerHandler(void)
{
map<String, PendingRequest>::iterator it;
@ -379,15 +352,15 @@ void EndpointManager::ProcessResponseMessage(const Endpoint::Ptr& sender, const
m_Requests.erase(it);
}
EndpointManager::Iterator EndpointManager::Begin(void)
{
return m_Endpoints.begin();
}
//EndpointManager::Iterator EndpointManager::Begin(void)
//{
// return m_Endpoints.begin();
//}
EndpointManager::Iterator EndpointManager::End(void)
{
return m_Endpoints.end();
}
//EndpointManager::Iterator EndpointManager::End(void)
//{
// return m_Endpoints.end();
//}
EndpointManager::Ptr EndpointManager::GetInstance(void)
{

View File

@ -34,7 +34,7 @@ public:
typedef shared_ptr<EndpointManager> Ptr;
typedef weak_ptr<EndpointManager> WeakPtr;
typedef map<String, Endpoint::Ptr>::iterator Iterator;
// typedef map<String, Endpoint::Ptr>::iterator Iterator;
EndpointManager(void);
@ -49,9 +49,6 @@ public:
void AddListener(const String& service);
void AddConnection(const String& node, const String& service);
void RegisterEndpoint(const Endpoint::Ptr& endpoint);
void UnregisterEndpoint(const Endpoint::Ptr& endpoint);
void SendUnicastMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient, const MessagePart& message);
void SendAnycastMessage(const Endpoint::Ptr& sender, const RequestMessage& message);
void SendMulticastMessage(const Endpoint::Ptr& sender, const RequestMessage& message);
@ -61,21 +58,22 @@ public:
void ProcessResponseMessage(const Endpoint::Ptr& sender, const ResponseMessage& message);
void ForEachEndpoint(function<void (const EndpointManager::Ptr&, const Endpoint::Ptr&)> callback);
Iterator Begin(void);
Iterator End(void);
Endpoint::Ptr GetEndpointByIdentity(const String& identity) const;
// void ForEachEndpoint(function<void (const EndpointManager::Ptr&, const Endpoint::Ptr&)> callback);
// Iterator Begin(void);
// Iterator End(void);
boost::signal<void (const EndpointManager::Ptr&, const Endpoint::Ptr&)> OnNewEndpoint;
private:
String m_Identity;
shared_ptr<SSL_CTX> m_SSLContext;
Endpoint::Ptr m_Endpoint;
vector<JsonRpcServer::Ptr> m_Servers;
vector<Endpoint::Ptr> m_PendingEndpoints;
map<String, Endpoint::Ptr> m_Endpoints;
Timer::Ptr m_SubscriptionTimer;
Timer::Ptr m_ReconnectTimer;
set<JsonRpcServer::Ptr> m_Servers;
set<JsonRpcClient::Ptr> m_PendingClients;
/**
* Information about a pending API request.
@ -98,13 +96,15 @@ private:
map<String, PendingRequest> m_Requests;
Timer::Ptr m_RequestTimer;
void RegisterServer(const JsonRpcServer::Ptr& server);
void UnregisterServer(const JsonRpcServer::Ptr& server);
static bool RequestTimeoutLessComparer(const pair<String, PendingRequest>& a, const pair<String, PendingRequest>& b);
void RequestTimerHandler(void);
void SubscriptionTimerHandler(void);
void ReconnectTimerHandler(void);
void NewClientHandler(const TcpClient::Ptr& client);
void ClientConnectedHandler(const TcpClient::Ptr& client);
};
}

View File

@ -42,8 +42,6 @@ using boost::algorithm::is_any_of;
#endif /* I2_ICINGA_BUILD */
#include "endpoint.h"
#include "jsonrpcendpoint.h"
#include "virtualendpoint.h"
#include "endpointmanager.h"
#include "icingaapplication.h"

View File

@ -133,8 +133,7 @@ int IcingaApplication::Main(const vector<String>& args)
Logger::Write(LogInformation, "icinga", "My identity: " + identity);
EndpointManager::GetInstance()->SetIdentity(identity);
shared_ptr<SSL_CTX> sslContext = Utility::MakeSSLContext(GetCertificateFile(), GetCertificateFile(), GetCAFile());
EndpointManager::GetInstance()->SetSSLContext(sslContext);
m_SSLContext = Utility::MakeSSLContext(GetCertificateFile(), GetCertificateFile(), GetCAFile());
}
/* create the primary RPC listener */
@ -215,3 +214,8 @@ double IcingaApplication::GetStartTime(void) const
{
return m_StartTime;
}
shared_ptr<SSL_CTX> IcingaApplication::GetSSLContext(void) const
{
return m_SSLContext;
}

View File

@ -47,6 +47,7 @@ public:
String GetPidPath(void) const;
String GetStatePath(void) const;
Dictionary::Ptr GetMacros(void) const;
shared_ptr<SSL_CTX> GetSSLContext(void) const;
double GetStartTime(void) const;
@ -61,6 +62,7 @@ private:
String m_PidPath;
String m_StatePath;
Dictionary::Ptr m_Macros;
shared_ptr<SSL_CTX> m_SSLContext;
double m_StartTime;

View File

@ -1,148 +0,0 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "i2-icinga.h"
using namespace icinga;
String JsonRpcEndpoint::GetIdentity(void) const
{
return m_Identity;
}
String JsonRpcEndpoint::GetAddress(void) const
{
if (!m_Client)
return "<disconnected endpoint>";
return m_Client->GetPeerAddress();
}
JsonRpcClient::Ptr JsonRpcEndpoint::GetClient(void)
{
return m_Client;
}
void JsonRpcEndpoint::Connect(String node, String service, shared_ptr<SSL_CTX> sslContext)
{
JsonRpcClient::Ptr client = boost::make_shared<JsonRpcClient>(RoleOutbound, sslContext);
SetClient(client);
client->Connect(node, service);
client->Start();
}
void JsonRpcEndpoint::SetClient(JsonRpcClient::Ptr client)
{
m_Client = client;
client->OnNewMessage.connect(boost::bind(&JsonRpcEndpoint::NewMessageHandler, this, _2));
client->OnClosed.connect(boost::bind(&JsonRpcEndpoint::ClientClosedHandler, this));
client->OnConnected.connect(boost::bind(&JsonRpcEndpoint::ClientConnectedHandler, this));
}
bool JsonRpcEndpoint::IsLocal(void) const
{
return false;
}
bool JsonRpcEndpoint::IsConnected(void) const
{
return (m_Client && m_Client->IsConnected());
}
void JsonRpcEndpoint::ProcessRequest(Endpoint::Ptr sender, const RequestMessage& message)
{
if (IsConnected()) {
m_Client->SendMessage(message);
} else {
// TODO: persist the event
}
}
void JsonRpcEndpoint::ProcessResponse(Endpoint::Ptr sender, const ResponseMessage& message)
{
m_Client->SendMessage(message);
}
void JsonRpcEndpoint::NewMessageHandler(const MessagePart& message)
{
Endpoint::Ptr sender = GetSelf();
if (ResponseMessage::IsResponseMessage(message)) {
/* rather than routing the message to the right virtual
* endpoint we just process it here right away. */
GetEndpointManager()->ProcessResponseMessage(sender, message);
return;
}
RequestMessage request = message;
String method;
if (!request.GetMethod(&method))
return;
String id;
if (request.GetID(&id))
GetEndpointManager()->SendAnycastMessage(sender, request);
else
GetEndpointManager()->SendMulticastMessage(sender, request);
}
void JsonRpcEndpoint::ClientClosedHandler(void)
{
try {
m_Client->CheckException();
} catch (const exception& ex) {
stringstream message;
message << "Error occured for JSON-RPC socket: Message=" << ex.what();
Logger::Write(LogWarning, "jsonrpc", message.str());
}
Logger::Write(LogWarning, "jsonrpc", "Lost connection to endpoint: identity=" + GetIdentity());
// TODO: _only_ clear non-persistent subscriptions
// unregister ourselves if no persistent subscriptions are left (use a timer for that, once we have a TTL property for the topics)
ClearSubscriptions();
// remove the endpoint if there are no more subscriptions */
if (BeginSubscriptions() == EndSubscriptions()) {
Hold();
GetEndpointManager()->UnregisterEndpoint(GetSelf());
}
m_Client.reset();
// TODO: persist events, etc., for now we just disable the endpoint
}
void JsonRpcEndpoint::ClientConnectedHandler(void)
{
String identity = Utility::GetCertificateCN(m_Client->GetPeerCertificate());
if (GetIdentity().IsEmpty() && !identity.IsEmpty()) {
m_Identity = identity;
GetEndpointManager()->RegisterEndpoint(GetSelf());
}
}
void JsonRpcEndpoint::Stop(void)
{
if (m_Client)
m_Client->Close();
}

View File

@ -1,71 +0,0 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef JSONRPCENDPOINT_H
#define JSONRPCENDPOINT_H
namespace icinga
{
/**
* A JSON-RPC endpoint that can be used to communicate with a remote
* Icinga instance.
*
* @ingroup icinga
*/
class I2_ICINGA_API JsonRpcEndpoint : public Endpoint
{
public:
typedef shared_ptr<JsonRpcEndpoint> Ptr;
typedef weak_ptr<JsonRpcEndpoint> WeakPtr;
void Connect(String node, String service,
shared_ptr<SSL_CTX> sslContext);
JsonRpcClient::Ptr GetClient(void);
void SetClient(JsonRpcClient::Ptr client);
virtual String GetIdentity(void) const;
virtual String GetAddress(void) const;
virtual bool IsLocal(void) const;
virtual bool IsConnected(void) const;
virtual void ProcessRequest(Endpoint::Ptr sender, const RequestMessage& message);
virtual void ProcessResponse(Endpoint::Ptr sender, const ResponseMessage& message);
virtual void Stop(void);
private:
String m_Identity; /**< The identity of this endpoint. */
shared_ptr<SSL_CTX> m_SSLContext;
String m_Address;
JsonRpcClient::Ptr m_Client;
void SetAddress(String address);
void NewMessageHandler(const MessagePart& message);
void ClientClosedHandler(void);
void ClientConnectedHandler(void);
};
}
#endif /* JSONRPCENDPOINT_H */

View File

@ -1,97 +0,0 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "i2-icinga.h"
using namespace icinga;
String VirtualEndpoint::GetIdentity(void) const
{
return "__" + GetAddress();
}
String VirtualEndpoint::GetAddress(void) const
{
char address[50];
sprintf(address, "virtual:%p", (void *)this);
return address;
}
bool VirtualEndpoint::IsLocal(void) const
{
return true;
}
bool VirtualEndpoint::IsConnected(void) const
{
return true;
}
void VirtualEndpoint::RegisterTopicHandler(String topic, function<void (const VirtualEndpoint::Ptr&, const Endpoint::Ptr, const RequestMessage&)> callback)
{
map<String, shared_ptr<boost::signal<void (const VirtualEndpoint::Ptr&, const Endpoint::Ptr, const RequestMessage&)> > >::iterator it;
it = m_TopicHandlers.find(topic);
shared_ptr<boost::signal<void (const VirtualEndpoint::Ptr&, const Endpoint::Ptr, const RequestMessage&)> > sig;
if (it == m_TopicHandlers.end()) {
sig = boost::make_shared<boost::signal<void (const VirtualEndpoint::Ptr&, const Endpoint::Ptr, const RequestMessage&)> >();
m_TopicHandlers.insert(make_pair(topic, sig));
} else {
sig = it->second;
}
sig->connect(callback);
RegisterSubscription(topic);
}
void VirtualEndpoint::UnregisterTopicHandler(String topic, function<void (const VirtualEndpoint::Ptr&, const Endpoint::Ptr, const RequestMessage&)> callback)
{
// TODO: implement
//m_TopicHandlers[method] -= callback;
//UnregisterMethodSubscription(method);
throw_exception(NotImplementedException());
}
void VirtualEndpoint::ProcessRequest(Endpoint::Ptr sender, const RequestMessage& request)
{
String method;
if (!request.GetMethod(&method))
return;
map<String, shared_ptr<boost::signal<void (const VirtualEndpoint::Ptr&, const Endpoint::Ptr, const RequestMessage&)> > >::iterator it;
it = m_TopicHandlers.find(method);
if (it == m_TopicHandlers.end())
return;
(*it->second)(GetSelf(), sender, request);
}
void VirtualEndpoint::ProcessResponse(Endpoint::Ptr sender, const ResponseMessage& response)
{
GetEndpointManager()->ProcessResponseMessage(sender, response);
}
void VirtualEndpoint::Stop(void)
{
/* Nothing to do here. */
}

View File

@ -1,57 +0,0 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef VIRTUALENDPOINT_H
#define VIRTUALENDPOINT_H
namespace icinga
{
/**
* A local endpoint.
*
* @ingroup icinga
*/
class I2_ICINGA_API VirtualEndpoint : public Endpoint
{
public:
typedef shared_ptr<VirtualEndpoint> Ptr;
typedef weak_ptr<VirtualEndpoint> WeakPtr;
void RegisterTopicHandler(String topic, function<void (const VirtualEndpoint::Ptr&, const Endpoint::Ptr, const RequestMessage&)> callback);
void UnregisterTopicHandler(String topic, function<void (const VirtualEndpoint::Ptr&, const Endpoint::Ptr, const RequestMessage&)> callback);
virtual String GetIdentity(void) const;
virtual String GetAddress(void) const;
virtual bool IsLocal(void) const;
virtual bool IsConnected(void) const;
virtual void ProcessRequest(Endpoint::Ptr sender, const RequestMessage& message);
virtual void ProcessResponse(Endpoint::Ptr sender, const ResponseMessage& message);
virtual void Stop(void);
private:
map< String, shared_ptr<boost::signal<void (const VirtualEndpoint::Ptr&, const Endpoint::Ptr, const RequestMessage&)> > > m_TopicHandlers;
};
}
#endif /* VIRTUALENDPOINT_H */

View File

@ -41,7 +41,9 @@ JsonRpcClient::JsonRpcClient(TcpClientRole role, shared_ptr<SSL_CTX> sslContext)
void JsonRpcClient::SendMessage(const MessagePart& message)
{
Value value = message.GetDictionary();
NetString::WriteStringToIOQueue(this, value.Serialize());
String json = value.Serialize();
//std::cerr << ">> " << json << std::endl;
NetString::WriteStringToIOQueue(this, json);
}
/**
@ -52,6 +54,8 @@ void JsonRpcClient::DataAvailableHandler(void)
String jsonString;
while (NetString::ReadStringFromIOQueue(this, &jsonString)) {
//std::cerr << "<< " << jsonString << std::endl;
try {
Value value = Value::Deserialize(jsonString);