Refactored subscription handling into a separate component.

This commit is contained in:
Gunnar Beutner 2012-04-19 11:29:36 +02:00
parent fd64dc5a99
commit 2cd43ed8c6
23 changed files with 187 additions and 72 deletions

View File

@ -81,13 +81,13 @@ void Application::RunEventLoop(void)
time_t now = time(NULL);
time_t next = Timer::GetNextCall();
long sleep = (next < now) ? 0 : (next - now);
time_t sleep = (next < now) ? 0 : (next - now);
if (m_ShuttingDown)
break;
timeval tv;
tv.tv_sec = (sleep < 0) ? 0 : sleep;
tv.tv_sec = (sleep < 0) ? 0 : (long)sleep;
tv.tv_usec = 0;
int ready;

View File

@ -7,7 +7,7 @@ void Component::SetApplication(const Application::WeakPtr& application)
m_Application = application;
}
Application::Ptr Component::GetApplication(void)
Application::Ptr Component::GetApplication(void) const
{
return m_Application.lock();
}
@ -17,7 +17,7 @@ void Component::SetConfig(const ConfigObject::Ptr& componentConfig)
m_Config = componentConfig;
}
ConfigObject::Ptr Component::GetConfig(void)
ConfigObject::Ptr Component::GetConfig(void) const
{
return m_Config;
}

View File

@ -15,12 +15,12 @@ public:
typedef weak_ptr<Component> WeakPtr;
void SetApplication(const Application::WeakPtr& application);
Application::Ptr GetApplication(void);
Application::Ptr GetApplication(void) const;
void SetConfig(const ConfigObject::Ptr& componentConfig);
ConfigObject::Ptr GetConfig(void);
ConfigObject::Ptr GetConfig(void) const;
virtual string GetName(void) = 0;
virtual string GetName(void) const = 0;
virtual void Start(void) = 0;
virtual void Stop(void) = 0;
};

View File

@ -4,6 +4,7 @@
#ifdef _MSC_VER
# define HAVE_CXX11
# pragma warning(disable:4251)
# define _CRT_SECURE_NO_DEPRECATE
#else /* _MSC_VER */
# include "config.h"
#endif /* _MSC_VER */

View File

@ -23,6 +23,7 @@ void Variant::Convert(VariantType newType) const
if (newType == m_Type)
return;
// TODO: convert variant data
throw NotImplementedException();
}

View File

@ -5,7 +5,7 @@
using namespace icinga;
string ConfigFileComponent::GetName(void)
string ConfigFileComponent::GetName(void) const
{
return "configfilecomponent";
}

View File

@ -12,7 +12,7 @@ public:
typedef shared_ptr<ConfigFileComponent> Ptr;
typedef weak_ptr<ConfigFileComponent> WeakPtr;
virtual string GetName(void);
virtual string GetName(void) const;
virtual void Start(void);
virtual void Stop(void);
};

View File

@ -46,12 +46,12 @@
<PropertyGroup Label="UserMacros" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<LinkIncremental>true</LinkIncremental>
<IncludePath>$(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\icinga;$(SolutionDir)\cJSON;$(IncludePath)</IncludePath>
<IncludePath>$(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\icinga;$(IncludePath)</IncludePath>
<LibraryPath>$(OutDir);$(LibraryPath)</LibraryPath>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<LinkIncremental>false</LinkIncremental>
<IncludePath>$(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\icinga;$(SolutionDir)\cJSON;$(IncludePath)</IncludePath>
<IncludePath>$(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\icinga;$(IncludePath)</IncludePath>
<LibraryPath>$(OutDir);$(LibraryPath)</LibraryPath>
</PropertyGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">

View File

@ -7,7 +7,7 @@ IcingaApplication::Ptr ConfigRpcComponent::GetIcingaApplication(void)
return static_pointer_cast<IcingaApplication>(GetApplication());
}
string ConfigRpcComponent::GetName(void)
string ConfigRpcComponent::GetName(void) const
{
return "configcomponent";
}

View File

@ -22,7 +22,7 @@ private:
JsonRpcRequest MakeObjectMessage(const ConfigObject::Ptr& object, string method, bool includeProperties);
public:
virtual string GetName(void);
virtual string GetName(void) const;
virtual void Start(void);
virtual void Stop(void);
};

View File

@ -43,12 +43,12 @@
<PropertyGroup Label="UserMacros" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<LinkIncremental>true</LinkIncremental>
<IncludePath>$(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\cJSON;$(SolutionDir)\icinga;$(IncludePath)</IncludePath>
<IncludePath>$(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\icinga;$(IncludePath)</IncludePath>
<LibraryPath>$(OutDir);$(LibraryPath)</LibraryPath>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<LinkIncremental>false</LinkIncremental>
<IncludePath>$(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\cJSON;$(SolutionDir)\icinga;$(IncludePath)</IncludePath>
<IncludePath>$(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\icinga;$(IncludePath)</IncludePath>
<LibraryPath>$(OutDir);$(LibraryPath)</LibraryPath>
</PropertyGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">

View File

@ -26,9 +26,8 @@ void EndpointManager::AddListener(unsigned short port)
void EndpointManager::AddConnection(string host, unsigned short port)
{
JsonRpcEndpoint::Ptr endpoint = make_shared<JsonRpcEndpoint>();
RegisterEndpoint(endpoint);
endpoint->Connect(host, port);
RegisterEndpoint(endpoint);
}
void EndpointManager::RegisterServer(JsonRpcServer::Ptr server)
@ -40,9 +39,8 @@ void EndpointManager::RegisterServer(JsonRpcServer::Ptr server)
int EndpointManager::NewClientHandler(const NewClientEventArgs& ncea)
{
JsonRpcEndpoint::Ptr endpoint = make_shared<JsonRpcEndpoint>();
RegisterEndpoint(endpoint);
endpoint->SetClient(static_pointer_cast<JsonRpcClient>(ncea.Client));
RegisterEndpoint(endpoint);
return 0;
}
@ -63,6 +61,11 @@ void EndpointManager::RegisterEndpoint(Endpoint::Ptr endpoint)
endpoint->OnNewMethodSource += bind_weak(&EndpointManager::NewMethodSourceHandler, shared_from_this());
endpoint->ForeachMethodSource(bind(&EndpointManager::NewMethodSourceHandler, this, _1));
NewEndpointEventArgs neea;
neea.Source = shared_from_this();
neea.Endpoint = endpoint;
OnNewEndpoint(neea);
}
void EndpointManager::UnregisterEndpoint(Endpoint::Ptr endpoint)
@ -143,9 +146,12 @@ int EndpointManager::NewMethodSourceHandler(const NewMethodEventArgs& ea)
return 0;
}
void EndpointManager::ForeachEndpoint(function<int (Endpoint::Ptr)> callback)
void EndpointManager::ForeachEndpoint(function<int (const NewEndpointEventArgs&)> callback)
{
NewEndpointEventArgs neea;
neea.Source = shared_from_this();
for (list<Endpoint::Ptr>::iterator i = m_Endpoints.begin(); i != m_Endpoints.end(); i++) {
callback(*i);
neea.Endpoint = *i;
callback(neea);
}
}

View File

@ -4,6 +4,11 @@
namespace icinga
{
struct I2_ICINGA_API NewEndpointEventArgs : public EventArgs
{
Endpoint::Ptr Endpoint;
};
class I2_ICINGA_API EndpointManager : public Object
{
list<JsonRpcServer::Ptr> m_Servers;
@ -34,7 +39,9 @@ public:
void SendAnycastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal = true);
void SendMulticastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal = true);
void ForeachEndpoint(function<int (Endpoint::Ptr)> callback);
void ForeachEndpoint(function<int (const NewEndpointEventArgs&)> callback);
Event<NewEndpointEventArgs> OnNewEndpoint;
};
}

View File

@ -16,5 +16,6 @@
#include "virtualendpoint.h"
#include "endpointmanager.h"
#include "icingaapplication.h"
#include "subscriptioncomponent.h"
#endif /* I2ICINGA_H */

View File

@ -15,6 +15,7 @@
<ClCompile Include="endpointmanager.cpp" />
<ClCompile Include="icingaapplication.cpp" />
<ClCompile Include="jsonrpcendpoint.cpp" />
<ClCompile Include="subscriptioncomponent.cpp" />
<ClCompile Include="virtualendpoint.cpp" />
</ItemGroup>
<ItemGroup>
@ -23,6 +24,7 @@
<ClInclude Include="i2-icinga.h" />
<ClInclude Include="icingaapplication.h" />
<ClInclude Include="jsonrpcendpoint.h" />
<ClInclude Include="subscriptioncomponent.h" />
<ClInclude Include="virtualendpoint.h" />
</ItemGroup>
<PropertyGroup Label="Globals">
@ -55,12 +57,12 @@
<PropertyGroup Label="UserMacros" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<LinkIncremental>true</LinkIncremental>
<IncludePath>$(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\cJSON;$(IncludePath)</IncludePath>
<IncludePath>$(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(IncludePath)</IncludePath>
<LibraryPath>$(OutDir);$(LibraryPath)</LibraryPath>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<LinkIncremental>false</LinkIncremental>
<IncludePath>$(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\cJSON;$(IncludePath)</IncludePath>
<IncludePath>$(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(IncludePath)</IncludePath>
<LibraryPath>$(OutDir);$(LibraryPath)</LibraryPath>
</PropertyGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">

View File

@ -54,6 +54,9 @@ int IcingaApplication::Main(const vector<string>& args)
connectionCollection->OnObjectRemoved += bind_weak(&IcingaApplication::DeletedRpcConnectionHandler, shared_from_this());
SubscriptionComponent::Ptr subscriptionsComponent = make_shared<SubscriptionComponent>();
RegisterComponent(subscriptionsComponent);
ConfigObject::Ptr fileComponentConfig = make_shared<ConfigObject>("component", "configfile");
fileComponentConfig->SetProperty("configFilename", args[1]);
fileComponentConfig->SetPropertyInteger("replicate", 0);
@ -84,7 +87,7 @@ int IcingaApplication::TestTimerHandler(const TimerEventArgs& tea)
request.SetVersion("2.0");
request.SetMethod("test");
for (int i = 0; i < 10000; i++)
for (int i = 0; i < 5; i++)
m_EndpointManager->SendMulticastRequest(m_TestEndpoint, request);
return 0;

View File

@ -17,51 +17,12 @@ void JsonRpcEndpoint::Connect(string host, unsigned short port)
SetClient(client);
}
int JsonRpcEndpoint::SyncSubscription(string type, const NewMethodEventArgs& nmea)
{
JsonRpcRequest request;
request.SetVersion("2.0");
request.SetMethod(type);
Message params;
params.GetDictionary()->SetValueString("method", nmea.Method);
request.SetParams(params);
m_Client->SendMessage(request);
return 0;
}
int JsonRpcEndpoint::SyncSubscriptions(Endpoint::Ptr endpoint)
{
if (!endpoint->IsLocal())
return 0;
endpoint->ForeachMethodSink(bind(&JsonRpcEndpoint::SyncSubscription, this, "message::Subscribe", _1));
endpoint->ForeachMethodSource(bind(&JsonRpcEndpoint::SyncSubscription, this, "message::Provide", _1));
return 0;
}
void JsonRpcEndpoint::SetClient(JsonRpcClient::Ptr client)
{
m_Client = client;
client->OnNewMessage += bind_weak(&JsonRpcEndpoint::NewMessageHandler, shared_from_this());
client->OnClosed += bind_weak(&JsonRpcEndpoint::ClientClosedHandler, shared_from_this());
client->OnError += bind_weak(&JsonRpcEndpoint::ClientErrorHandler, shared_from_this());
NewMethodEventArgs nmea;
nmea.Source = shared_from_this();
nmea.Method = "message::Subscribe";
SyncSubscription("message::Subscribe", nmea);
SyncSubscription("message::Provide", nmea);
nmea.Method = "message::Provide";
SyncSubscription("message::Subscribe", nmea);
SyncSubscription("message::Provide", nmea);
GetEndpointManager()->ForeachEndpoint(bind(&JsonRpcEndpoint::SyncSubscriptions, this, _1));
}
bool JsonRpcEndpoint::IsLocal(void) const

View File

@ -18,9 +18,6 @@ private:
int ClientErrorHandler(const SocketErrorEventArgs& ea);
int ClientReconnectHandler(const TimerEventArgs& ea);
int SyncSubscription(string type, const NewMethodEventArgs& nmea);
int SyncSubscriptions(Endpoint::Ptr endpoint);
public:
typedef shared_ptr<JsonRpcEndpoint> Ptr;
typedef weak_ptr<JsonRpcEndpoint> WeakPtr;

View File

@ -0,0 +1,103 @@
#include "i2-icinga.h"
using namespace icinga;
IcingaApplication::Ptr SubscriptionComponent::GetIcingaApplication(void) const
{
return static_pointer_cast<IcingaApplication>(GetApplication());
}
string SubscriptionComponent::GetName(void) const
{
return "subscriptioncomponent";
}
void SubscriptionComponent::Start(void)
{
m_SubscriptionEndpoint = make_shared<VirtualEndpoint>();
m_SubscriptionEndpoint->RegisterMethodHandler("message::Subscribe", bind_weak(&SubscriptionComponent::SubscribeMessageHandler, shared_from_this()));
m_SubscriptionEndpoint->RegisterMethodHandler("message::Provide", bind_weak(&SubscriptionComponent::ProvideMessageHandler, shared_from_this()));
m_SubscriptionEndpoint->RegisterMethodSource("message::Subscribe");
m_SubscriptionEndpoint->RegisterMethodSource("message::Provide");
EndpointManager::Ptr mgr = GetIcingaApplication()->GetEndpointManager();
mgr->OnNewEndpoint += bind_weak(&SubscriptionComponent::NewEndpointHandler, shared_from_this());
mgr->ForeachEndpoint(bind(&SubscriptionComponent::NewEndpointHandler, this, _1));
mgr->RegisterEndpoint(m_SubscriptionEndpoint);
}
void SubscriptionComponent::Stop(void)
{
EndpointManager::Ptr mgr = GetIcingaApplication()->GetEndpointManager();
mgr->UnregisterEndpoint(m_SubscriptionEndpoint);
}
int SubscriptionComponent::SyncSubscription(Endpoint::Ptr target, string type, const NewMethodEventArgs& nmea)
{
JsonRpcRequest request;
request.SetVersion("2.0");
request.SetMethod(type);
Message params;
params.GetDictionary()->SetValueString("method", nmea.Method);
request.SetParams(params);
target->ProcessRequest(m_SubscriptionEndpoint, request);
return 0;
}
int SubscriptionComponent::SyncSubscriptions(Endpoint::Ptr target, const NewEndpointEventArgs& neea)
{
Endpoint::Ptr source = neea.Endpoint;
if (!source->IsLocal())
return 0;
source->ForeachMethodSink(bind(&SubscriptionComponent::SyncSubscription, this, target, "message::Subscribe", _1));
source->ForeachMethodSource(bind(&SubscriptionComponent::SyncSubscription, this, target, "message::Provide", _1));
// TODO: bind to endpoint's events
//endpoint->OnNewMethodSink...
return 0;
}
int SubscriptionComponent::NewEndpointHandler(const NewEndpointEventArgs& neea)
{
if (neea.Endpoint->IsLocal())
return 0;
EndpointManager::Ptr mgr = GetIcingaApplication()->GetEndpointManager();
mgr->ForeachEndpoint(bind(&SubscriptionComponent::SyncSubscriptions, this, neea.Endpoint, _1));
return 0;
}
int SubscriptionComponent::SubscribeMessageHandler(const NewRequestEventArgs& nrea)
{
Message params;
if (!nrea.Request.GetParams(&params))
return 0;
string method;
if (!params.GetDictionary()->GetValueString("method", &method))
return 0;
nrea.Sender->RegisterMethodSink(method);
return 0;
}
int SubscriptionComponent::ProvideMessageHandler(const NewRequestEventArgs& nrea)
{
Message params;
if (!nrea.Request.GetParams(&params))
return 0;
string method;
if (!params.GetDictionary()->GetValueString("method", &method))
return 0;
nrea.Sender->RegisterMethodSource(method);
return 0;
}

View File

@ -0,0 +1,29 @@
#ifndef I2_SUBSCRIPTIONCOMPONENT_H
#define I2_SUBSCRIPTIONCOMPONENT_H
namespace icinga
{
class SubscriptionComponent : public Component
{
private:
VirtualEndpoint::Ptr m_SubscriptionEndpoint;
IcingaApplication::Ptr GetIcingaApplication(void) const;
int NewEndpointHandler(const NewEndpointEventArgs& neea);
int SubscribeMessageHandler(const NewRequestEventArgs& nrea);
int ProvideMessageHandler(const NewRequestEventArgs& nrea);
int SyncSubscription(Endpoint::Ptr target, string type, const NewMethodEventArgs& nmea);
int SyncSubscriptions(Endpoint::Ptr target, const NewEndpointEventArgs& neea);
public:
virtual string GetName(void) const;
virtual void Start(void);
virtual void Stop(void);
};
}
#endif /* I2_SUBSCRIPTIONCOMPONENT_H */

View File

@ -3,7 +3,6 @@
#include <map>
#include <i2-base.h>
#include <cJSON.h>
#ifdef I2_JSONRPC_BUILD
# define I2_JSONRPC_API I2_EXPORT

View File

@ -1,9 +1,10 @@
#include <cstdio>
#include "i2-jsonrpc.h"
#include <cJSON.h>
using namespace icinga;
Dictionary::Ptr Netstring::GetDictionaryFromJson(cJSON *json)
Dictionary::Ptr Netstring::GetDictionaryFromJson(json_t *json)
{
Dictionary::Ptr dictionary = make_shared<Dictionary>();
@ -26,7 +27,7 @@ Dictionary::Ptr Netstring::GetDictionaryFromJson(cJSON *json)
return dictionary;
}
cJSON *Netstring::GetJsonFromDictionary(const Dictionary::Ptr& dictionary)
json_t *Netstring::GetJsonFromDictionary(const Dictionary::Ptr& dictionary)
{
cJSON *json;
string valueString;

View File

@ -1,17 +1,21 @@
#ifndef NETSTRING_H
#define NETSTRING_H
struct cJSON;
namespace icinga
{
typedef ::cJSON json_t;
class I2_JSONRPC_API Netstring : public Object
{
private:
size_t m_Length;
void *m_Data;
static Dictionary::Ptr GetDictionaryFromJson(cJSON *json);
static cJSON *GetJsonFromDictionary(const Dictionary::Ptr& dictionary);
static Dictionary::Ptr GetDictionaryFromJson(json_t *json);
static json_t *GetJsonFromDictionary(const Dictionary::Ptr& dictionary);
public:
typedef shared_ptr<Netstring> Ptr;