From 0d8b352150f0b4f5a1238e9a1430706c7bb1c0fd Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Wed, 25 Apr 2012 20:35:37 +0200 Subject: [PATCH] Fixed subscription code. --- components/configrpc/configrpccomponent.cpp | 18 +++++------------- components/configrpc/configrpccomponent.h | 3 +-- icinga-app/icinga.conf | 5 ++++- icinga/discoverycomponent.cpp | 18 +++++------------- icinga/discoverycomponent.h | 3 +-- icinga/endpoint.cpp | 2 -- icinga/endpoint.h | 1 - icinga/subscriptioncomponent.cpp | 10 +++++++++- 8 files changed, 25 insertions(+), 35 deletions(-) diff --git a/components/configrpc/configrpccomponent.cpp b/components/configrpc/configrpccomponent.cpp index b4a82f86c..7cfdf069b 100644 --- a/components/configrpc/configrpccomponent.cpp +++ b/components/configrpc/configrpccomponent.cpp @@ -28,6 +28,9 @@ void ConfigRpcComponent::Start(void) m_ConfigRpcEndpoint->RegisterMethodSource("config::PropertyChanged"); } + m_ConfigRpcEndpoint->RegisterMethodHandler("message::Welcome", + bind_weak(&ConfigRpcComponent::WelcomeMessageHandler, shared_from_this())); + m_ConfigRpcEndpoint->RegisterMethodHandler("config::ObjectCreated", bind_weak(&ConfigRpcComponent::RemoteObjectUpdatedHandler, shared_from_this())); m_ConfigRpcEndpoint->RegisterMethodHandler("config::ObjectRemoved", @@ -36,9 +39,6 @@ void ConfigRpcComponent::Start(void) bind_weak(&ConfigRpcComponent::RemoteObjectUpdatedHandler, shared_from_this())); endpointManager->RegisterEndpoint(m_ConfigRpcEndpoint); - - endpointManager->OnNewEndpoint += bind_weak(&ConfigRpcComponent::NewEndpointHandler, shared_from_this()); - endpointManager->ForeachEndpoint(bind(&ConfigRpcComponent::NewEndpointHandler, this, _1)); } void ConfigRpcComponent::Stop(void) @@ -46,20 +46,12 @@ void ConfigRpcComponent::Stop(void) // TODO: implement } -int ConfigRpcComponent::NewEndpointHandler(const NewEndpointEventArgs& ea) -{ - ea.Endpoint->OnSessionEstablished += bind_weak(&ConfigRpcComponent::SessionEstablishedHandler, shared_from_this()); - - return 0; -} - -int ConfigRpcComponent::SessionEstablishedHandler(const EventArgs& ea) +int ConfigRpcComponent::WelcomeMessageHandler(const NewRequestEventArgs& ea) { JsonRpcRequest request; request.SetMethod("config::FetchObjects"); - Endpoint::Ptr endpoint = static_pointer_cast(ea.Source); - GetEndpointManager()->SendUnicastRequest(m_ConfigRpcEndpoint, endpoint, request); + GetEndpointManager()->SendUnicastRequest(m_ConfigRpcEndpoint, ea.Sender, request); return 0; } diff --git a/components/configrpc/configrpccomponent.h b/components/configrpc/configrpccomponent.h index 27ce4bb3b..823c61d58 100644 --- a/components/configrpc/configrpccomponent.h +++ b/components/configrpc/configrpccomponent.h @@ -9,8 +9,7 @@ class ConfigRpcComponent : public IcingaComponent private: VirtualEndpoint::Ptr m_ConfigRpcEndpoint; - int NewEndpointHandler(const NewEndpointEventArgs& ea); - int SessionEstablishedHandler(const EventArgs& ea); + int WelcomeMessageHandler(const NewRequestEventArgs& ea); int LocalObjectCreatedHandler(const EventArgs& ea); int LocalObjectRemovedHandler(const EventArgs& ea); diff --git a/icinga-app/icinga.conf b/icinga-app/icinga.conf index 38d761f35..c7ec54767 100644 --- a/icinga-app/icinga.conf +++ b/icinga-app/icinga.conf @@ -4,7 +4,10 @@ "demo": { "replicate": "0" } }, "rpcconnection": { - "kekslistener": { "replicate": "0", "hostname": "10.0.10.14", "port": "7777" } + "kekslistener": { "replicate": "0", "hostname": "127.0.0.1", "port": "7777" } + }, + "rpclistener": { + "kekslistener": { "replicate": "0", "port": "7777" } }, "host": { "localhost": { "ipaddr": "127.0.0.1" } diff --git a/icinga/discoverycomponent.cpp b/icinga/discoverycomponent.cpp index d2fd5bee7..2b09a0705 100644 --- a/icinga/discoverycomponent.cpp +++ b/icinga/discoverycomponent.cpp @@ -10,6 +10,9 @@ string DiscoveryComponent::GetName(void) const void DiscoveryComponent::Start(void) { m_DiscoveryEndpoint = make_shared(); + m_DiscoveryEndpoint->RegisterMethodHandler("message::Welcome", + bind_weak(&DiscoveryComponent::GetPeersMessageHandler, shared_from_this())); + m_DiscoveryEndpoint->RegisterMethodSource("discovery::PeerAvailable"); m_DiscoveryEndpoint->RegisterMethodHandler("discovery::GetPeers", bind_weak(&DiscoveryComponent::GetPeersMessageHandler, shared_from_this())); @@ -25,22 +28,11 @@ void DiscoveryComponent::Stop(void) mgr->UnregisterEndpoint(m_DiscoveryEndpoint); } -int DiscoveryComponent::NewEndpointHandler(const NewEndpointEventArgs& neea) -{ - neea.Endpoint->OnSessionEstablished += bind_weak(&DiscoveryComponent::SessionEstablishedHandler, shared_from_this()); - - /* TODO: register handler for new sink/source */ - - return 0; -} - -int DiscoveryComponent::SessionEstablishedHandler(const EventArgs& neea) +int DiscoveryComponent::WelcomeMessageHandler(const NewRequestEventArgs& neea) { JsonRpcRequest request; request.SetMethod("discovery::GetPeers"); - - Endpoint::Ptr endpoint = static_pointer_cast(neea.Source); - GetEndpointManager()->SendUnicastRequest(m_DiscoveryEndpoint, endpoint, request); + GetEndpointManager()->SendUnicastRequest(m_DiscoveryEndpoint, neea.Sender, request); /* TODO: send information about this client to all other clients */ diff --git a/icinga/discoverycomponent.h b/icinga/discoverycomponent.h index 3e8325e95..7d5e65008 100644 --- a/icinga/discoverycomponent.h +++ b/icinga/discoverycomponent.h @@ -11,8 +11,7 @@ private: IcingaApplication::Ptr GetIcingaApplication(void) const; - int NewEndpointHandler(const NewEndpointEventArgs& neea); - int SessionEstablishedHandler(const EventArgs& neea); + int WelcomeMessageHandler(const NewRequestEventArgs& neea); int GetPeersMessageHandler(const NewRequestEventArgs& nrea); public: diff --git a/icinga/endpoint.cpp b/icinga/endpoint.cpp index 7d5e82f4f..981650329 100644 --- a/icinga/endpoint.cpp +++ b/icinga/endpoint.cpp @@ -14,8 +14,6 @@ void Endpoint::SetIdentity(string identity) EventArgs ea; ea.Source = shared_from_this(); OnIdentityChanged(ea); - - OnSessionEstablished(ea); } bool Endpoint::HasIdentity(void) const diff --git a/icinga/endpoint.h b/icinga/endpoint.h index 0f22867de..1980fdd48 100644 --- a/icinga/endpoint.h +++ b/icinga/endpoint.h @@ -66,7 +66,6 @@ public: int CountMethodSources(void) const; Event OnIdentityChanged; - Event OnSessionEstablished; }; } diff --git a/icinga/subscriptioncomponent.cpp b/icinga/subscriptioncomponent.cpp index 190b45ea0..c4b03362e 100644 --- a/icinga/subscriptioncomponent.cpp +++ b/icinga/subscriptioncomponent.cpp @@ -12,6 +12,7 @@ void SubscriptionComponent::Start(void) m_SubscriptionEndpoint = make_shared(); m_SubscriptionEndpoint->RegisterMethodHandler("message::Subscribe", bind_weak(&SubscriptionComponent::SubscribeMessageHandler, shared_from_this())); m_SubscriptionEndpoint->RegisterMethodHandler("message::Provide", bind_weak(&SubscriptionComponent::ProvideMessageHandler, shared_from_this())); + m_SubscriptionEndpoint->RegisterMethodSource("message::Welcome"); m_SubscriptionEndpoint->RegisterMethodSource("message::Subscribe"); m_SubscriptionEndpoint->RegisterMethodSource("message::Provide"); @@ -38,7 +39,6 @@ int SubscriptionComponent::SyncSubscription(Endpoint::Ptr target, string type, c SubscriptionMessage subscriptionMessage; subscriptionMessage.SetMethod(nmea.Method); request.SetParams(subscriptionMessage); - GetEndpointManager()->SendUnicastRequest(m_SubscriptionEndpoint, target, request); return 0; @@ -68,11 +68,19 @@ int SubscriptionComponent::NewEndpointHandler(const NewEndpointEventArgs& neea) neea.Endpoint->AddAllowedMethodSinkPrefix("message::"); neea.Endpoint->AddAllowedMethodSourcePrefix("message::"); + /* we just assume the peer wants those messages */ + neea.Endpoint->RegisterMethodSink("message::Welcome"); neea.Endpoint->RegisterMethodSink("message::Subscribe"); neea.Endpoint->RegisterMethodSink("message::Provide"); GetEndpointManager()->ForeachEndpoint(bind(&SubscriptionComponent::SyncSubscriptions, this, neea.Endpoint, _1)); + /* signal the peer that we're done syncing subscriptions and are now + * ready to accept messages. */ + JsonRpcRequest request; + request.SetMethod("message::Welcome"); + GetEndpointManager()->SendUnicastRequest(m_SubscriptionEndpoint, neea.Endpoint, request); + return 0; }