Fixed subscription code.

This commit is contained in:
Gunnar Beutner 2012-04-25 20:35:37 +02:00
parent 00709b21c8
commit 0d8b352150
8 changed files with 25 additions and 35 deletions

View File

@ -28,6 +28,9 @@ void ConfigRpcComponent::Start(void)
m_ConfigRpcEndpoint->RegisterMethodSource("config::PropertyChanged");
}
m_ConfigRpcEndpoint->RegisterMethodHandler("message::Welcome",
bind_weak(&ConfigRpcComponent::WelcomeMessageHandler, shared_from_this()));
m_ConfigRpcEndpoint->RegisterMethodHandler("config::ObjectCreated",
bind_weak(&ConfigRpcComponent::RemoteObjectUpdatedHandler, shared_from_this()));
m_ConfigRpcEndpoint->RegisterMethodHandler("config::ObjectRemoved",
@ -36,9 +39,6 @@ void ConfigRpcComponent::Start(void)
bind_weak(&ConfigRpcComponent::RemoteObjectUpdatedHandler, shared_from_this()));
endpointManager->RegisterEndpoint(m_ConfigRpcEndpoint);
endpointManager->OnNewEndpoint += bind_weak(&ConfigRpcComponent::NewEndpointHandler, shared_from_this());
endpointManager->ForeachEndpoint(bind(&ConfigRpcComponent::NewEndpointHandler, this, _1));
}
void ConfigRpcComponent::Stop(void)
@ -46,20 +46,12 @@ void ConfigRpcComponent::Stop(void)
// TODO: implement
}
int ConfigRpcComponent::NewEndpointHandler(const NewEndpointEventArgs& ea)
{
ea.Endpoint->OnSessionEstablished += bind_weak(&ConfigRpcComponent::SessionEstablishedHandler, shared_from_this());
return 0;
}
int ConfigRpcComponent::SessionEstablishedHandler(const EventArgs& ea)
int ConfigRpcComponent::WelcomeMessageHandler(const NewRequestEventArgs& ea)
{
JsonRpcRequest request;
request.SetMethod("config::FetchObjects");
Endpoint::Ptr endpoint = static_pointer_cast<Endpoint>(ea.Source);
GetEndpointManager()->SendUnicastRequest(m_ConfigRpcEndpoint, endpoint, request);
GetEndpointManager()->SendUnicastRequest(m_ConfigRpcEndpoint, ea.Sender, request);
return 0;
}

View File

@ -9,8 +9,7 @@ class ConfigRpcComponent : public IcingaComponent
private:
VirtualEndpoint::Ptr m_ConfigRpcEndpoint;
int NewEndpointHandler(const NewEndpointEventArgs& ea);
int SessionEstablishedHandler(const EventArgs& ea);
int WelcomeMessageHandler(const NewRequestEventArgs& ea);
int LocalObjectCreatedHandler(const EventArgs& ea);
int LocalObjectRemovedHandler(const EventArgs& ea);

View File

@ -4,7 +4,10 @@
"demo": { "replicate": "0" }
},
"rpcconnection": {
"kekslistener": { "replicate": "0", "hostname": "10.0.10.14", "port": "7777" }
"kekslistener": { "replicate": "0", "hostname": "127.0.0.1", "port": "7777" }
},
"rpclistener": {
"kekslistener": { "replicate": "0", "port": "7777" }
},
"host": {
"localhost": { "ipaddr": "127.0.0.1" }

View File

@ -10,6 +10,9 @@ string DiscoveryComponent::GetName(void) const
void DiscoveryComponent::Start(void)
{
m_DiscoveryEndpoint = make_shared<VirtualEndpoint>();
m_DiscoveryEndpoint->RegisterMethodHandler("message::Welcome",
bind_weak(&DiscoveryComponent::GetPeersMessageHandler, shared_from_this()));
m_DiscoveryEndpoint->RegisterMethodSource("discovery::PeerAvailable");
m_DiscoveryEndpoint->RegisterMethodHandler("discovery::GetPeers",
bind_weak(&DiscoveryComponent::GetPeersMessageHandler, shared_from_this()));
@ -25,22 +28,11 @@ void DiscoveryComponent::Stop(void)
mgr->UnregisterEndpoint(m_DiscoveryEndpoint);
}
int DiscoveryComponent::NewEndpointHandler(const NewEndpointEventArgs& neea)
{
neea.Endpoint->OnSessionEstablished += bind_weak(&DiscoveryComponent::SessionEstablishedHandler, shared_from_this());
/* TODO: register handler for new sink/source */
return 0;
}
int DiscoveryComponent::SessionEstablishedHandler(const EventArgs& neea)
int DiscoveryComponent::WelcomeMessageHandler(const NewRequestEventArgs& neea)
{
JsonRpcRequest request;
request.SetMethod("discovery::GetPeers");
Endpoint::Ptr endpoint = static_pointer_cast<Endpoint>(neea.Source);
GetEndpointManager()->SendUnicastRequest(m_DiscoveryEndpoint, endpoint, request);
GetEndpointManager()->SendUnicastRequest(m_DiscoveryEndpoint, neea.Sender, request);
/* TODO: send information about this client to all other clients */

View File

@ -11,8 +11,7 @@ private:
IcingaApplication::Ptr GetIcingaApplication(void) const;
int NewEndpointHandler(const NewEndpointEventArgs& neea);
int SessionEstablishedHandler(const EventArgs& neea);
int WelcomeMessageHandler(const NewRequestEventArgs& neea);
int GetPeersMessageHandler(const NewRequestEventArgs& nrea);
public:

View File

@ -14,8 +14,6 @@ void Endpoint::SetIdentity(string identity)
EventArgs ea;
ea.Source = shared_from_this();
OnIdentityChanged(ea);
OnSessionEstablished(ea);
}
bool Endpoint::HasIdentity(void) const

View File

@ -66,7 +66,6 @@ public:
int CountMethodSources(void) const;
Event<EventArgs> OnIdentityChanged;
Event<EventArgs> OnSessionEstablished;
};
}

View File

@ -12,6 +12,7 @@ void SubscriptionComponent::Start(void)
m_SubscriptionEndpoint = make_shared<VirtualEndpoint>();
m_SubscriptionEndpoint->RegisterMethodHandler("message::Subscribe", bind_weak(&SubscriptionComponent::SubscribeMessageHandler, shared_from_this()));
m_SubscriptionEndpoint->RegisterMethodHandler("message::Provide", bind_weak(&SubscriptionComponent::ProvideMessageHandler, shared_from_this()));
m_SubscriptionEndpoint->RegisterMethodSource("message::Welcome");
m_SubscriptionEndpoint->RegisterMethodSource("message::Subscribe");
m_SubscriptionEndpoint->RegisterMethodSource("message::Provide");
@ -38,7 +39,6 @@ int SubscriptionComponent::SyncSubscription(Endpoint::Ptr target, string type, c
SubscriptionMessage subscriptionMessage;
subscriptionMessage.SetMethod(nmea.Method);
request.SetParams(subscriptionMessage);
GetEndpointManager()->SendUnicastRequest(m_SubscriptionEndpoint, target, request);
return 0;
@ -68,11 +68,19 @@ int SubscriptionComponent::NewEndpointHandler(const NewEndpointEventArgs& neea)
neea.Endpoint->AddAllowedMethodSinkPrefix("message::");
neea.Endpoint->AddAllowedMethodSourcePrefix("message::");
/* we just assume the peer wants those messages */
neea.Endpoint->RegisterMethodSink("message::Welcome");
neea.Endpoint->RegisterMethodSink("message::Subscribe");
neea.Endpoint->RegisterMethodSink("message::Provide");
GetEndpointManager()->ForeachEndpoint(bind(&SubscriptionComponent::SyncSubscriptions, this, neea.Endpoint, _1));
/* signal the peer that we're done syncing subscriptions and are now
* ready to accept messages. */
JsonRpcRequest request;
request.SetMethod("message::Welcome");
GetEndpointManager()->SendUnicastRequest(m_SubscriptionEndpoint, neea.Endpoint, request);
return 0;
}