From 2cd43ed8c6804b7633bc5f6227d77976bf9e5fc0 Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Thu, 19 Apr 2012 11:29:36 +0200 Subject: [PATCH] Refactored subscription handling into a separate component. --- base/application.cpp | 4 +- base/component.cpp | 4 +- base/component.h | 6 +- base/i2-base.h | 1 + base/variant.cpp | 1 + components/configfile/configfilecomponent.cpp | 2 +- components/configfile/configfilecomponent.h | 2 +- components/configrpc/configrpc.vcxproj | 4 +- components/configrpc/configrpccomponent.cpp | 2 +- components/configrpc/configrpccomponent.h | 2 +- icinga-app/icinga-app.vcxproj | 4 +- icinga/endpointmanager.cpp | 18 ++- icinga/endpointmanager.h | 9 +- icinga/i2-icinga.h | 1 + icinga/icinga.vcxproj | 6 +- icinga/icingaapplication.cpp | 5 +- icinga/jsonrpcendpoint.cpp | 39 ------- icinga/jsonrpcendpoint.h | 3 - icinga/subscriptioncomponent.cpp | 103 ++++++++++++++++++ icinga/subscriptioncomponent.h | 29 +++++ jsonrpc/i2-jsonrpc.h | 1 - jsonrpc/netstring.cpp | 5 +- jsonrpc/netstring.h | 8 +- 23 files changed, 187 insertions(+), 72 deletions(-) create mode 100644 icinga/subscriptioncomponent.cpp create mode 100644 icinga/subscriptioncomponent.h diff --git a/base/application.cpp b/base/application.cpp index f7ba08925..8a1eeb3d9 100644 --- a/base/application.cpp +++ b/base/application.cpp @@ -81,13 +81,13 @@ void Application::RunEventLoop(void) time_t now = time(NULL); time_t next = Timer::GetNextCall(); - long sleep = (next < now) ? 0 : (next - now); + time_t sleep = (next < now) ? 0 : (next - now); if (m_ShuttingDown) break; timeval tv; - tv.tv_sec = (sleep < 0) ? 0 : sleep; + tv.tv_sec = (sleep < 0) ? 0 : (long)sleep; tv.tv_usec = 0; int ready; diff --git a/base/component.cpp b/base/component.cpp index 90a2013c6..4c37bda00 100644 --- a/base/component.cpp +++ b/base/component.cpp @@ -7,7 +7,7 @@ void Component::SetApplication(const Application::WeakPtr& application) m_Application = application; } -Application::Ptr Component::GetApplication(void) +Application::Ptr Component::GetApplication(void) const { return m_Application.lock(); } @@ -17,7 +17,7 @@ void Component::SetConfig(const ConfigObject::Ptr& componentConfig) m_Config = componentConfig; } -ConfigObject::Ptr Component::GetConfig(void) +ConfigObject::Ptr Component::GetConfig(void) const { return m_Config; } diff --git a/base/component.h b/base/component.h index 68da36b5c..97f1dd870 100644 --- a/base/component.h +++ b/base/component.h @@ -15,12 +15,12 @@ public: typedef weak_ptr WeakPtr; void SetApplication(const Application::WeakPtr& application); - Application::Ptr GetApplication(void); + Application::Ptr GetApplication(void) const; void SetConfig(const ConfigObject::Ptr& componentConfig); - ConfigObject::Ptr GetConfig(void); + ConfigObject::Ptr GetConfig(void) const; - virtual string GetName(void) = 0; + virtual string GetName(void) const = 0; virtual void Start(void) = 0; virtual void Stop(void) = 0; }; diff --git a/base/i2-base.h b/base/i2-base.h index a07cbc54e..3b0e2a632 100644 --- a/base/i2-base.h +++ b/base/i2-base.h @@ -4,6 +4,7 @@ #ifdef _MSC_VER # define HAVE_CXX11 # pragma warning(disable:4251) +# define _CRT_SECURE_NO_DEPRECATE #else /* _MSC_VER */ # include "config.h" #endif /* _MSC_VER */ diff --git a/base/variant.cpp b/base/variant.cpp index 953fcf907..d1ab9bd18 100644 --- a/base/variant.cpp +++ b/base/variant.cpp @@ -23,6 +23,7 @@ void Variant::Convert(VariantType newType) const if (newType == m_Type) return; + // TODO: convert variant data throw NotImplementedException(); } diff --git a/components/configfile/configfilecomponent.cpp b/components/configfile/configfilecomponent.cpp index d0bf7d051..3c1b7b4d4 100644 --- a/components/configfile/configfilecomponent.cpp +++ b/components/configfile/configfilecomponent.cpp @@ -5,7 +5,7 @@ using namespace icinga; -string ConfigFileComponent::GetName(void) +string ConfigFileComponent::GetName(void) const { return "configfilecomponent"; } diff --git a/components/configfile/configfilecomponent.h b/components/configfile/configfilecomponent.h index 8d195f6ab..2ebd7f643 100644 --- a/components/configfile/configfilecomponent.h +++ b/components/configfile/configfilecomponent.h @@ -12,7 +12,7 @@ public: typedef shared_ptr Ptr; typedef weak_ptr WeakPtr; - virtual string GetName(void); + virtual string GetName(void) const; virtual void Start(void); virtual void Stop(void); }; diff --git a/components/configrpc/configrpc.vcxproj b/components/configrpc/configrpc.vcxproj index ac93b490f..de2fc3264 100644 --- a/components/configrpc/configrpc.vcxproj +++ b/components/configrpc/configrpc.vcxproj @@ -46,12 +46,12 @@ true - $(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\icinga;$(SolutionDir)\cJSON;$(IncludePath) + $(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\icinga;$(IncludePath) $(OutDir);$(LibraryPath) false - $(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\icinga;$(SolutionDir)\cJSON;$(IncludePath) + $(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\icinga;$(IncludePath) $(OutDir);$(LibraryPath) diff --git a/components/configrpc/configrpccomponent.cpp b/components/configrpc/configrpccomponent.cpp index 9edcf0733..0e0194d3a 100644 --- a/components/configrpc/configrpccomponent.cpp +++ b/components/configrpc/configrpccomponent.cpp @@ -7,7 +7,7 @@ IcingaApplication::Ptr ConfigRpcComponent::GetIcingaApplication(void) return static_pointer_cast(GetApplication()); } -string ConfigRpcComponent::GetName(void) +string ConfigRpcComponent::GetName(void) const { return "configcomponent"; } diff --git a/components/configrpc/configrpccomponent.h b/components/configrpc/configrpccomponent.h index ede2cdd18..7b65cf9e8 100644 --- a/components/configrpc/configrpccomponent.h +++ b/components/configrpc/configrpccomponent.h @@ -22,7 +22,7 @@ private: JsonRpcRequest MakeObjectMessage(const ConfigObject::Ptr& object, string method, bool includeProperties); public: - virtual string GetName(void); + virtual string GetName(void) const; virtual void Start(void); virtual void Stop(void); }; diff --git a/icinga-app/icinga-app.vcxproj b/icinga-app/icinga-app.vcxproj index 077bd72ce..ea6bb9328 100644 --- a/icinga-app/icinga-app.vcxproj +++ b/icinga-app/icinga-app.vcxproj @@ -43,12 +43,12 @@ true - $(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\cJSON;$(SolutionDir)\icinga;$(IncludePath) + $(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\icinga;$(IncludePath) $(OutDir);$(LibraryPath) false - $(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\cJSON;$(SolutionDir)\icinga;$(IncludePath) + $(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\icinga;$(IncludePath) $(OutDir);$(LibraryPath) diff --git a/icinga/endpointmanager.cpp b/icinga/endpointmanager.cpp index bf6b877ce..8d543a020 100644 --- a/icinga/endpointmanager.cpp +++ b/icinga/endpointmanager.cpp @@ -26,9 +26,8 @@ void EndpointManager::AddListener(unsigned short port) void EndpointManager::AddConnection(string host, unsigned short port) { JsonRpcEndpoint::Ptr endpoint = make_shared(); - RegisterEndpoint(endpoint); - endpoint->Connect(host, port); + RegisterEndpoint(endpoint); } void EndpointManager::RegisterServer(JsonRpcServer::Ptr server) @@ -40,9 +39,8 @@ void EndpointManager::RegisterServer(JsonRpcServer::Ptr server) int EndpointManager::NewClientHandler(const NewClientEventArgs& ncea) { JsonRpcEndpoint::Ptr endpoint = make_shared(); - RegisterEndpoint(endpoint); - endpoint->SetClient(static_pointer_cast(ncea.Client)); + RegisterEndpoint(endpoint); return 0; } @@ -63,6 +61,11 @@ void EndpointManager::RegisterEndpoint(Endpoint::Ptr endpoint) endpoint->OnNewMethodSource += bind_weak(&EndpointManager::NewMethodSourceHandler, shared_from_this()); endpoint->ForeachMethodSource(bind(&EndpointManager::NewMethodSourceHandler, this, _1)); + + NewEndpointEventArgs neea; + neea.Source = shared_from_this(); + neea.Endpoint = endpoint; + OnNewEndpoint(neea); } void EndpointManager::UnregisterEndpoint(Endpoint::Ptr endpoint) @@ -143,9 +146,12 @@ int EndpointManager::NewMethodSourceHandler(const NewMethodEventArgs& ea) return 0; } -void EndpointManager::ForeachEndpoint(function callback) +void EndpointManager::ForeachEndpoint(function callback) { + NewEndpointEventArgs neea; + neea.Source = shared_from_this(); for (list::iterator i = m_Endpoints.begin(); i != m_Endpoints.end(); i++) { - callback(*i); + neea.Endpoint = *i; + callback(neea); } } diff --git a/icinga/endpointmanager.h b/icinga/endpointmanager.h index 642004fc9..ed981297f 100644 --- a/icinga/endpointmanager.h +++ b/icinga/endpointmanager.h @@ -4,6 +4,11 @@ namespace icinga { +struct I2_ICINGA_API NewEndpointEventArgs : public EventArgs +{ + Endpoint::Ptr Endpoint; +}; + class I2_ICINGA_API EndpointManager : public Object { list m_Servers; @@ -34,7 +39,9 @@ public: void SendAnycastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal = true); void SendMulticastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal = true); - void ForeachEndpoint(function callback); + void ForeachEndpoint(function callback); + + Event OnNewEndpoint; }; } diff --git a/icinga/i2-icinga.h b/icinga/i2-icinga.h index 8b2a0f68e..a17e5557d 100644 --- a/icinga/i2-icinga.h +++ b/icinga/i2-icinga.h @@ -16,5 +16,6 @@ #include "virtualendpoint.h" #include "endpointmanager.h" #include "icingaapplication.h" +#include "subscriptioncomponent.h" #endif /* I2ICINGA_H */ diff --git a/icinga/icinga.vcxproj b/icinga/icinga.vcxproj index 9457eed4e..90df9d096 100644 --- a/icinga/icinga.vcxproj +++ b/icinga/icinga.vcxproj @@ -15,6 +15,7 @@ + @@ -23,6 +24,7 @@ + @@ -55,12 +57,12 @@ true - $(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\cJSON;$(IncludePath) + $(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(IncludePath) $(OutDir);$(LibraryPath) false - $(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(SolutionDir)\cJSON;$(IncludePath) + $(SolutionDir)\base;$(SolutionDir)\jsonrpc;$(IncludePath) $(OutDir);$(LibraryPath) diff --git a/icinga/icingaapplication.cpp b/icinga/icingaapplication.cpp index 235d44292..e2fdd9fc2 100644 --- a/icinga/icingaapplication.cpp +++ b/icinga/icingaapplication.cpp @@ -54,6 +54,9 @@ int IcingaApplication::Main(const vector& args) connectionCollection->OnObjectRemoved += bind_weak(&IcingaApplication::DeletedRpcConnectionHandler, shared_from_this()); + SubscriptionComponent::Ptr subscriptionsComponent = make_shared(); + RegisterComponent(subscriptionsComponent); + ConfigObject::Ptr fileComponentConfig = make_shared("component", "configfile"); fileComponentConfig->SetProperty("configFilename", args[1]); fileComponentConfig->SetPropertyInteger("replicate", 0); @@ -84,7 +87,7 @@ int IcingaApplication::TestTimerHandler(const TimerEventArgs& tea) request.SetVersion("2.0"); request.SetMethod("test"); - for (int i = 0; i < 10000; i++) + for (int i = 0; i < 5; i++) m_EndpointManager->SendMulticastRequest(m_TestEndpoint, request); return 0; diff --git a/icinga/jsonrpcendpoint.cpp b/icinga/jsonrpcendpoint.cpp index 84a55a1ff..604686c3d 100644 --- a/icinga/jsonrpcendpoint.cpp +++ b/icinga/jsonrpcendpoint.cpp @@ -17,51 +17,12 @@ void JsonRpcEndpoint::Connect(string host, unsigned short port) SetClient(client); } -int JsonRpcEndpoint::SyncSubscription(string type, const NewMethodEventArgs& nmea) -{ - JsonRpcRequest request; - request.SetVersion("2.0"); - request.SetMethod(type); - - Message params; - params.GetDictionary()->SetValueString("method", nmea.Method); - request.SetParams(params); - - m_Client->SendMessage(request); - - return 0; -} - -int JsonRpcEndpoint::SyncSubscriptions(Endpoint::Ptr endpoint) -{ - if (!endpoint->IsLocal()) - return 0; - - endpoint->ForeachMethodSink(bind(&JsonRpcEndpoint::SyncSubscription, this, "message::Subscribe", _1)); - endpoint->ForeachMethodSource(bind(&JsonRpcEndpoint::SyncSubscription, this, "message::Provide", _1)); - - return 0; -} - void JsonRpcEndpoint::SetClient(JsonRpcClient::Ptr client) { m_Client = client; client->OnNewMessage += bind_weak(&JsonRpcEndpoint::NewMessageHandler, shared_from_this()); client->OnClosed += bind_weak(&JsonRpcEndpoint::ClientClosedHandler, shared_from_this()); client->OnError += bind_weak(&JsonRpcEndpoint::ClientErrorHandler, shared_from_this()); - - NewMethodEventArgs nmea; - nmea.Source = shared_from_this(); - - nmea.Method = "message::Subscribe"; - SyncSubscription("message::Subscribe", nmea); - SyncSubscription("message::Provide", nmea); - - nmea.Method = "message::Provide"; - SyncSubscription("message::Subscribe", nmea); - SyncSubscription("message::Provide", nmea); - - GetEndpointManager()->ForeachEndpoint(bind(&JsonRpcEndpoint::SyncSubscriptions, this, _1)); } bool JsonRpcEndpoint::IsLocal(void) const diff --git a/icinga/jsonrpcendpoint.h b/icinga/jsonrpcendpoint.h index e9d1b330d..306ca9329 100644 --- a/icinga/jsonrpcendpoint.h +++ b/icinga/jsonrpcendpoint.h @@ -18,9 +18,6 @@ private: int ClientErrorHandler(const SocketErrorEventArgs& ea); int ClientReconnectHandler(const TimerEventArgs& ea); - int SyncSubscription(string type, const NewMethodEventArgs& nmea); - int SyncSubscriptions(Endpoint::Ptr endpoint); - public: typedef shared_ptr Ptr; typedef weak_ptr WeakPtr; diff --git a/icinga/subscriptioncomponent.cpp b/icinga/subscriptioncomponent.cpp new file mode 100644 index 000000000..b30688689 --- /dev/null +++ b/icinga/subscriptioncomponent.cpp @@ -0,0 +1,103 @@ +#include "i2-icinga.h" + +using namespace icinga; + +IcingaApplication::Ptr SubscriptionComponent::GetIcingaApplication(void) const +{ + return static_pointer_cast(GetApplication()); +} + +string SubscriptionComponent::GetName(void) const +{ + return "subscriptioncomponent"; +} + +void SubscriptionComponent::Start(void) +{ + m_SubscriptionEndpoint = make_shared(); + m_SubscriptionEndpoint->RegisterMethodHandler("message::Subscribe", bind_weak(&SubscriptionComponent::SubscribeMessageHandler, shared_from_this())); + m_SubscriptionEndpoint->RegisterMethodHandler("message::Provide", bind_weak(&SubscriptionComponent::ProvideMessageHandler, shared_from_this())); + m_SubscriptionEndpoint->RegisterMethodSource("message::Subscribe"); + m_SubscriptionEndpoint->RegisterMethodSource("message::Provide"); + + EndpointManager::Ptr mgr = GetIcingaApplication()->GetEndpointManager(); + mgr->OnNewEndpoint += bind_weak(&SubscriptionComponent::NewEndpointHandler, shared_from_this()); + mgr->ForeachEndpoint(bind(&SubscriptionComponent::NewEndpointHandler, this, _1)); + mgr->RegisterEndpoint(m_SubscriptionEndpoint); +} + +void SubscriptionComponent::Stop(void) +{ + EndpointManager::Ptr mgr = GetIcingaApplication()->GetEndpointManager(); + mgr->UnregisterEndpoint(m_SubscriptionEndpoint); +} + +int SubscriptionComponent::SyncSubscription(Endpoint::Ptr target, string type, const NewMethodEventArgs& nmea) +{ + JsonRpcRequest request; + request.SetVersion("2.0"); + request.SetMethod(type); + + Message params; + params.GetDictionary()->SetValueString("method", nmea.Method); + request.SetParams(params); + + target->ProcessRequest(m_SubscriptionEndpoint, request); + + return 0; +} + +int SubscriptionComponent::SyncSubscriptions(Endpoint::Ptr target, const NewEndpointEventArgs& neea) +{ + Endpoint::Ptr source = neea.Endpoint; + + if (!source->IsLocal()) + return 0; + + source->ForeachMethodSink(bind(&SubscriptionComponent::SyncSubscription, this, target, "message::Subscribe", _1)); + source->ForeachMethodSource(bind(&SubscriptionComponent::SyncSubscription, this, target, "message::Provide", _1)); + + // TODO: bind to endpoint's events + //endpoint->OnNewMethodSink... + + return 0; +} + +int SubscriptionComponent::NewEndpointHandler(const NewEndpointEventArgs& neea) +{ + if (neea.Endpoint->IsLocal()) + return 0; + + EndpointManager::Ptr mgr = GetIcingaApplication()->GetEndpointManager(); + mgr->ForeachEndpoint(bind(&SubscriptionComponent::SyncSubscriptions, this, neea.Endpoint, _1)); + + return 0; +} + +int SubscriptionComponent::SubscribeMessageHandler(const NewRequestEventArgs& nrea) +{ + Message params; + if (!nrea.Request.GetParams(¶ms)) + return 0; + + string method; + if (!params.GetDictionary()->GetValueString("method", &method)) + return 0; + + nrea.Sender->RegisterMethodSink(method); + return 0; +} + +int SubscriptionComponent::ProvideMessageHandler(const NewRequestEventArgs& nrea) +{ + Message params; + if (!nrea.Request.GetParams(¶ms)) + return 0; + + string method; + if (!params.GetDictionary()->GetValueString("method", &method)) + return 0; + + nrea.Sender->RegisterMethodSource(method); + return 0; +} diff --git a/icinga/subscriptioncomponent.h b/icinga/subscriptioncomponent.h new file mode 100644 index 000000000..b58bb8a5b --- /dev/null +++ b/icinga/subscriptioncomponent.h @@ -0,0 +1,29 @@ +#ifndef I2_SUBSCRIPTIONCOMPONENT_H +#define I2_SUBSCRIPTIONCOMPONENT_H + +namespace icinga +{ + +class SubscriptionComponent : public Component +{ +private: + VirtualEndpoint::Ptr m_SubscriptionEndpoint; + + IcingaApplication::Ptr GetIcingaApplication(void) const; + + int NewEndpointHandler(const NewEndpointEventArgs& neea); + int SubscribeMessageHandler(const NewRequestEventArgs& nrea); + int ProvideMessageHandler(const NewRequestEventArgs& nrea); + + int SyncSubscription(Endpoint::Ptr target, string type, const NewMethodEventArgs& nmea); + int SyncSubscriptions(Endpoint::Ptr target, const NewEndpointEventArgs& neea); + +public: + virtual string GetName(void) const; + virtual void Start(void); + virtual void Stop(void); +}; + +} + +#endif /* I2_SUBSCRIPTIONCOMPONENT_H */ diff --git a/jsonrpc/i2-jsonrpc.h b/jsonrpc/i2-jsonrpc.h index 7aed38444..8f5b6eef0 100644 --- a/jsonrpc/i2-jsonrpc.h +++ b/jsonrpc/i2-jsonrpc.h @@ -3,7 +3,6 @@ #include #include -#include #ifdef I2_JSONRPC_BUILD # define I2_JSONRPC_API I2_EXPORT diff --git a/jsonrpc/netstring.cpp b/jsonrpc/netstring.cpp index 27052739d..48ca07419 100644 --- a/jsonrpc/netstring.cpp +++ b/jsonrpc/netstring.cpp @@ -1,9 +1,10 @@ #include #include "i2-jsonrpc.h" +#include using namespace icinga; -Dictionary::Ptr Netstring::GetDictionaryFromJson(cJSON *json) +Dictionary::Ptr Netstring::GetDictionaryFromJson(json_t *json) { Dictionary::Ptr dictionary = make_shared(); @@ -26,7 +27,7 @@ Dictionary::Ptr Netstring::GetDictionaryFromJson(cJSON *json) return dictionary; } -cJSON *Netstring::GetJsonFromDictionary(const Dictionary::Ptr& dictionary) +json_t *Netstring::GetJsonFromDictionary(const Dictionary::Ptr& dictionary) { cJSON *json; string valueString; diff --git a/jsonrpc/netstring.h b/jsonrpc/netstring.h index 7e3d2dbba..bb49a8de5 100644 --- a/jsonrpc/netstring.h +++ b/jsonrpc/netstring.h @@ -1,17 +1,21 @@ #ifndef NETSTRING_H #define NETSTRING_H +struct cJSON; + namespace icinga { +typedef ::cJSON json_t; + class I2_JSONRPC_API Netstring : public Object { private: size_t m_Length; void *m_Data; - static Dictionary::Ptr GetDictionaryFromJson(cJSON *json); - static cJSON *GetJsonFromDictionary(const Dictionary::Ptr& dictionary); + static Dictionary::Ptr GetDictionaryFromJson(json_t *json); + static json_t *GetJsonFromDictionary(const Dictionary::Ptr& dictionary); public: typedef shared_ptr Ptr;