Cleaned up JSON-RPC client code.

This commit is contained in:
Gunnar Beutner 2012-05-07 13:48:17 +02:00
parent 3f1fb0e36d
commit 28bdbe1ffe
15 changed files with 170 additions and 159 deletions

View File

@ -23,14 +23,10 @@ void TCPClient::Start(void)
OnWritable += bind_weak(&TCPClient::WritableEventHandler, shared_from_this());
}
void TCPClient::Connect(const string& hostname, unsigned short port)
void TCPClient::Connect(const string& node, const string& service)
{
m_Role = RoleOutbound;
stringstream s;
s << port;
string strPort = s.str();
addrinfo hints;
addrinfo *result;
@ -39,7 +35,7 @@ void TCPClient::Connect(const string& hostname, unsigned short port)
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
int rc = getaddrinfo(hostname.c_str(), strPort.c_str(), &hints, &result);
int rc = getaddrinfo(node.c_str(), service.c_str(), &hints, &result);
if (rc < 0) {
HandleSocketError();

View File

@ -31,7 +31,7 @@ public:
virtual void Start(void);
void Connect(const string& hostname, unsigned short port);
void Connect(const string& node, const string& service);
FIFO::Ptr GetSendQueue(void);
FIFO::Ptr GetRecvQueue(void);

View File

@ -29,7 +29,7 @@ void TCPServer::Listen(void)
int rc = listen(GetFD(), SOMAXCONN);
if (rc < 0) {
Close();
HandleSocketError();
return;
}
}

View File

@ -17,17 +17,13 @@ void TCPSocket::MakeSocket(int family)
SetFD(fd);
}
void TCPSocket::Bind(unsigned short port, int family)
void TCPSocket::Bind(string service, int family)
{
Bind(NULL, port, family);
Bind(string(), service, family);
}
void TCPSocket::Bind(const char *hostname, unsigned short port, int family)
void TCPSocket::Bind(string node, string service, int family)
{
stringstream s;
s << port;
string strPort = s.str();
addrinfo hints;
addrinfo *result;
@ -37,7 +33,7 @@ void TCPSocket::Bind(const char *hostname, unsigned short port, int family)
hints.ai_protocol = IPPROTO_TCP;
hints.ai_flags = AI_PASSIVE;
if (getaddrinfo(hostname, strPort.c_str(), &hints, &result) < 0) {
if (getaddrinfo(node.c_str(), service.c_str(), &hints, &result) < 0) {
HandleSocketError();
return;

View File

@ -13,8 +13,8 @@ public:
typedef shared_ptr<TCPSocket> Ptr;
typedef weak_ptr<TCPSocket> WeakPtr;
void Bind(unsigned short port, int family);
void Bind(const char *hostname, unsigned short port, int family);
void Bind(string service, int family);
void Bind(string node, string service, int family);
};
}

View File

@ -29,6 +29,11 @@ void DiscoveryComponent::Start(void)
GetEndpointManager()->OnNewEndpoint += bind_weak(&DiscoveryComponent::NewEndpointHandler, shared_from_this());
GetEndpointManager()->RegisterEndpoint(m_DiscoveryEndpoint);
m_DiscoveryConnectTimer = make_shared<Timer>();
m_DiscoveryConnectTimer->SetInterval(30);
m_DiscoveryConnectTimer->OnTimerExpired += bind_weak(&DiscoveryComponent::ReconnectTimerHandler, shared_from_this());
m_DiscoveryConnectTimer->Start();
}
void DiscoveryComponent::Stop(void)
@ -61,8 +66,15 @@ int DiscoveryComponent::NewEndpointHandler(const NewEndpointEventArgs& neea)
{
neea.Endpoint->OnIdentityChanged += bind_weak(&DiscoveryComponent::NewIdentityHandler, shared_from_this());
/* accept discovery::RegisterComponent messages from any endpoint */
neea.Endpoint->RegisterMethodSource("discovery::RegisterComponent");
if (IsBroker()) {
/* accept discovery::RegisterComponent messages from any endpoint */
neea.Endpoint->RegisterMethodSource("discovery::RegisterComponent");
}
/* TODO: implement message broker authorisation */
neea.Endpoint->RegisterMethodSource("discovery::NewComponent");
/* TODO: register handler to unregister this endpoint when it's closed */
return 0;
}
@ -120,17 +132,19 @@ int DiscoveryComponent::NewIdentityHandler(const EventArgs& ea)
Endpoint::Ptr endpoint = static_pointer_cast<Endpoint>(ea.Source);
string identity = endpoint->GetIdentity();
if (identity == GetEndpointManager()->GetIdentity()) {
Application::Log("Detected loop-back connection - Disconnecting endpoint.");
if (!GetIcingaApplication()->IsDebugging()) {
if (identity == GetEndpointManager()->GetIdentity()) {
Application::Log("Detected loop-back connection - Disconnecting endpoint.");
endpoint->Stop();
GetEndpointManager()->UnregisterEndpoint(endpoint);
endpoint->Stop();
GetEndpointManager()->UnregisterEndpoint(endpoint);
return 0;
return 0;
}
GetEndpointManager()->ForeachEndpoint(bind(&DiscoveryComponent::CheckExistingEndpoint, this, endpoint, _1));
}
GetEndpointManager()->ForeachEndpoint(bind(&DiscoveryComponent::CheckExistingEndpoint, this, endpoint, _1));
// we assume the other component _always_ wants
// discovery::RegisterComponent messages from us
endpoint->RegisterMethodSink("discovery::RegisterComponent");
@ -194,8 +208,8 @@ void DiscoveryComponent::SendDiscoveryMessage(string method, string identity, En
return;
if (!info->Node.empty() && !info->Service.empty()) {
params.SetPropertyString("node", info->Node);
params.SetPropertyString("service", info->Service);
params.SetNode(info->Node);
params.SetService(info->Service);
}
set<string>::iterator i;
@ -214,74 +228,73 @@ void DiscoveryComponent::SendDiscoveryMessage(string method, string identity, En
void DiscoveryComponent::ProcessDiscoveryMessage(string identity, DiscoveryMessage message)
{
ComponentDiscoveryInfo::Ptr info = make_shared<ComponentDiscoveryInfo>();
message.GetNode(&info->Node);
message.GetService(&info->Service);
Message provides;
if (message.GetProvides(&provides)) {
DictionaryIterator i;
for (i = provides.GetDictionary()->Begin(); i != provides.GetDictionary()->End(); i++) {
info->PublishedMethods.insert(i->second);
}
}
Message subscribes;
if (message.GetSubscribes(&subscribes)) {
DictionaryIterator i;
for (i = subscribes.GetDictionary()->Begin(); i != subscribes.GetDictionary()->End(); i++) {
info->SubscribedMethods.insert(i->second);
}
}
map<string, ComponentDiscoveryInfo::Ptr>::iterator i;
i = m_Components.find(identity);
if (i != m_Components.end())
m_Components.erase(i);
m_Components[identity] = info;
SendDiscoveryMessage("discovery::NewComponent", identity, Endpoint::Ptr());
}
int DiscoveryComponent::NewComponentMessageHandler(const NewRequestEventArgs& nrea)
{
/*Message message;
DiscoveryMessage message;
nrea.Request.GetParams(&message);
ProcessDiscoveryMessage(message.GetPropertyString(, DiscoveryMessage(message));*/
string identity;
if (!message.GetIdentity(&identity))
return 0;
ProcessDiscoveryMessage(identity, message);
return 0;
}
int DiscoveryComponent::RegisterComponentMessageHandler(const NewRequestEventArgs& nrea)
{
Message message;
DiscoveryMessage message;
nrea.Request.GetParams(&message);
ProcessDiscoveryMessage(nrea.Sender->GetIdentity(), DiscoveryMessage(message));
ProcessDiscoveryMessage(nrea.Sender->GetIdentity(), message);
return 0;
}
void DiscoveryComponent::AddSubscribedMethod(string identity, string method)
int DiscoveryComponent::ReconnectTimerHandler(const TimerEventArgs& tea)
{
ComponentDiscoveryInfo::Ptr info;
EndpointManager::Ptr endpointManager = GetEndpointManager();
if (!GetComponentDiscoveryInfo(identity, &info))
return;
map<string, ComponentDiscoveryInfo::Ptr>::iterator i;
for (i = m_Components.begin(); i != m_Components.end(); i++) {
if (endpointManager->HasConnectedEndpoint(i->first))
continue;
info->SubscribedMethods.insert(method);
}
ComponentDiscoveryInfo::Ptr info = i->second;
endpointManager->AddConnection(info->Node, info->Service);
}
bool DiscoveryComponent::IsSubscribedMethod(string identity, string method) const
{
if (GetEndpointManager()->GetIdentity() == identity)
return true;
ComponentDiscoveryInfo::Ptr info;
if (!GetComponentDiscoveryInfo(identity, &info))
return false;
set<string>::const_iterator i;
i = info->SubscribedMethods.find(method);
return (i != info->SubscribedMethods.end());
}
void DiscoveryComponent::AddPublishedMethod(string identity, string method)
{
ComponentDiscoveryInfo::Ptr info;
if (!GetComponentDiscoveryInfo(identity, &info))
return;
info->PublishedMethods.insert(method);
}
bool DiscoveryComponent::IsPublishedMethod(string identity, string method) const
{
if (GetEndpointManager()->GetIdentity() == identity)
return true;
ComponentDiscoveryInfo::Ptr info;
if (!GetComponentDiscoveryInfo(identity, &info))
return false;
set<string>::const_iterator i;
i = info->PublishedMethods.find(method);
return (i != info->PublishedMethods.end());
return 0;
}
EXPORT_COMPONENT(DiscoveryComponent);

View File

@ -22,8 +22,8 @@ class DiscoveryComponent : public IcingaComponent
private:
VirtualEndpoint::Ptr m_DiscoveryEndpoint;
map<string, ComponentDiscoveryInfo::Ptr> m_Components;
bool m_Broker;
Timer::Ptr m_DiscoveryConnectTimer;
int NewEndpointHandler(const NewEndpointEventArgs& neea);
int NewIdentityHandler(const EventArgs& ea);
@ -41,18 +41,14 @@ private:
int DiscoverySinkHandler(const NewMethodEventArgs& nmea, ComponentDiscoveryInfo::Ptr info) const;
int DiscoverySourceHandler(const NewMethodEventArgs& nmea, ComponentDiscoveryInfo::Ptr info) const;
int ReconnectTimerHandler(const TimerEventArgs& tea);
bool IsBroker(void) const;
public:
virtual string GetName(void) const;
virtual void Start(void);
virtual void Stop(void);
void AddSubscribedMethod(string identity, string method);
bool IsSubscribedMethod(string identity, string method) const;
void AddPublishedMethod(string identity, string method);
bool IsPublishedMethod(string identity, string method) const;
};
}

View File

@ -21,6 +21,26 @@ public:
SetPropertyString("identity", value);
}
inline bool GetNode(string *value) const
{
return GetPropertyString("node", value);
}
inline void SetNode(const string& value)
{
SetPropertyString("node", value);
}
inline bool GetService(string *value) const
{
return GetPropertyString("service", value);
}
inline void SetService(const string& value)
{
SetPropertyString("service", value);
}
inline bool GetSubscribes(Message *value) const
{
return GetPropertyMessage("subscribes", value);

View File

@ -14,9 +14,6 @@
"discovery": { "replicate": "0", "broker": "1" }
},
"rpclistener": {
"kekslistener": { "replicate": "0", "port": "7777" }
},
"host": {
"localhost": { "ipaddr": "127.0.0.1" }
"kekslistener": { "replicate": "0", "service": "7777" }
}
}

22
icinga-app/icinga2.conf Normal file
View File

@ -0,0 +1,22 @@
{
"icinga": {
"icinga": {
"privkey": "icinga-c2.key",
"pubkey": "icinga-c2.crt",
"cakey": "ca.crt",
"node": "10.0.10.3",
"service": "8888"
}
},
"component": {
"configrpc": { "replicate": "0", "configSource": "1" },
"demo": { "replicate": "0" },
"discovery": { "replicate": "0", "broker": "0" }
},
"rpclistener": {
"kekslistener": { "replicate": "0", "service": "8888" }
},
"rpcconnection": {
"keksclient": { "replicate": "0", "node": "127.0.0.1", "service": "7777" }
}
}

View File

@ -22,29 +22,29 @@ shared_ptr<SSL_CTX> EndpointManager::GetSSLContext(void) const
return m_SSLContext;
}
void EndpointManager::AddListener(unsigned short port)
void EndpointManager::AddListener(string service)
{
stringstream s;
s << "Adding new listener: port " << port;
s << "Adding new listener: port " << service;
Application::Log(s.str());
JsonRpcServer::Ptr server = make_shared<JsonRpcServer>(m_SSLContext);
RegisterServer(server);
server->Bind(port, AF_INET6);
server->Bind(service, AF_INET6);
server->Listen();
server->Start();
}
void EndpointManager::AddConnection(string host, unsigned short port)
void EndpointManager::AddConnection(string node, string service)
{
stringstream s;
s << "Adding new endpoint: [" << host << "]:" << port;
s << "Adding new endpoint: [" << node << "]:" << service;
Application::Log(s.str());
JsonRpcEndpoint::Ptr endpoint = make_shared<JsonRpcEndpoint>();
endpoint->Connect(host, port, m_SSLContext);
RegisterEndpoint(endpoint);
endpoint->Connect(node, service, m_SSLContext);
}
void EndpointManager::RegisterServer(JsonRpcServer::Ptr server)
@ -103,8 +103,10 @@ void EndpointManager::SendUnicastRequest(Endpoint::Ptr sender, Endpoint::Ptr rec
if (!request.GetMethod(&method))
throw InvalidArgumentException("Missing 'method' parameter.");
if (recipient->IsMethodSink(method))
if (recipient->IsMethodSink(method)) {
Application::Log(sender->GetAddress() + " -> " + recipient->GetAddress() + ": " + method);
recipient->ProcessRequest(sender, request);
}
}
void EndpointManager::SendAnycastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal)
@ -144,3 +146,14 @@ void EndpointManager::ForeachEndpoint(function<int (const NewEndpointEventArgs&)
callback(neea);
}
}
bool EndpointManager::HasConnectedEndpoint(string identity) const
{
list<Endpoint::Ptr>::const_iterator i;
for (i = m_Endpoints.begin(); i != m_Endpoints.end(); i++) {
if ((*i)->GetIdentity() == identity)
return true;
}
return false;
}

View File

@ -32,8 +32,8 @@ public:
void SetSSLContext(shared_ptr<SSL_CTX> sslContext);
shared_ptr<SSL_CTX> GetSSLContext(void) const;
void AddListener(unsigned short port);
void AddConnection(string host, unsigned short port);
void AddListener(string service);
void AddConnection(string node, string service);
void RegisterEndpoint(Endpoint::Ptr endpoint);
void UnregisterEndpoint(Endpoint::Ptr endpoint);
@ -44,6 +44,8 @@ public:
void ForeachEndpoint(function<int (const NewEndpointEventArgs&)> callback);
bool HasConnectedEndpoint(string identity) const;
Event<NewEndpointEventArgs> OnNewEndpoint;
};

View File

@ -172,16 +172,11 @@ int IcingaApplication::NewRpcListenerHandler(const EventArgs& ea)
if (object->GetReplicated())
return 0;
long portValue;
if (!object->GetPropertyInteger("port", &portValue))
throw InvalidArgumentException("Parameter 'port' is required for 'rpclistener' objects.");
string service;
if (!object->GetPropertyString("service", &service))
throw InvalidArgumentException("Parameter 'service' is required for 'rpclistener' objects.");
if (portValue < 0 || portValue > USHRT_MAX)
throw InvalidArgumentException("Parameter 'port' contains an invalid value.");
unsigned short port = (unsigned short)portValue;
GetEndpointManager()->AddListener(port);
GetEndpointManager()->AddListener(service);
return 0;
}
@ -196,26 +191,20 @@ int IcingaApplication::DeletedRpcListenerHandler(const EventArgs& ea)
int IcingaApplication::NewRpcConnectionHandler(const EventArgs& ea)
{
ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea.Source);
string hostname;
long portValue;
unsigned short port;
/* don't allow replicated config objects */
if (object->GetReplicated())
return 0;
if (!object->GetPropertyString("hostname", &hostname))
throw InvalidArgumentException("Parameter 'hostname' is required for 'rpcconnection' objects.");
string node;
if (!object->GetPropertyString("node", &node))
throw InvalidArgumentException("Parameter 'node' is required for 'rpcconnection' objects.");
if (!object->GetPropertyInteger("port", &portValue))
throw InvalidArgumentException("Parameter 'port' is required for 'rpcconnection' objects.");
string service;
if (!object->GetPropertyString("service", &service))
throw InvalidArgumentException("Parameter 'service' is required for 'rpcconnection' objects.");
if (portValue < 0 || portValue > USHRT_MAX)
throw InvalidArgumentException("Parameter 'port' contains an invalid value.");
port = (unsigned short)portValue;
GetEndpointManager()->AddConnection(hostname, port);
GetEndpointManager()->AddConnection(node, service);
return 0;
}

View File

@ -15,16 +15,12 @@ JsonRpcClient::Ptr JsonRpcEndpoint::GetClient(void)
return m_Client;
}
void JsonRpcEndpoint::Connect(string host, unsigned short port, shared_ptr<SSL_CTX> sslContext)
void JsonRpcEndpoint::Connect(string node, string service, shared_ptr<SSL_CTX> sslContext)
{
m_PeerHostname = host;
m_PeerPort = port;
JsonRpcClient::Ptr client = make_shared<JsonRpcClient>(RoleOutbound, sslContext);
client->Connect(host, port);
client->Start();
SetClient(client);
client->Connect(node, service);
client->Start();
}
void JsonRpcEndpoint::SetClient(JsonRpcClient::Ptr client)
@ -99,23 +95,12 @@ int JsonRpcEndpoint::ClientClosedHandler(const EventArgs& ea)
m_PendingCalls.clear();
if (m_PeerHostname != string()) {
Timer::Ptr timer = make_shared<Timer>();
timer->SetInterval(30);
timer->SetUserArgs(ea);
timer->OnTimerExpired += bind_weak(&JsonRpcEndpoint::ClientReconnectHandler, shared_from_this());
timer->Start();
m_ReconnectTimer = timer;
Application::Log("Spawned reconnect timer (30 seconds)");
}
// TODO: _only_ clear non-persistent method sources/sinks
// unregister ourselves if no persistent sources/sinks are left (use a timer for that, once we have a TTL property for the methods)
ClearMethodSinks();
ClearMethodSources();
if (CountMethodSinks() == 0 && !m_ReconnectTimer)
if (CountMethodSinks() == 0)
GetEndpointManager()->UnregisterEndpoint(static_pointer_cast<Endpoint>(shared_from_this()));
m_Client.reset();
@ -132,19 +117,6 @@ int JsonRpcEndpoint::ClientErrorHandler(const SocketErrorEventArgs& ea)
return 0;
}
int JsonRpcEndpoint::ClientReconnectHandler(const TimerEventArgs& ea)
{
JsonRpcClient::Ptr client = static_pointer_cast<JsonRpcClient>(ea.UserArgs.Source);
Timer::Ptr timer = static_pointer_cast<Timer>(ea.Source);
GetEndpointManager()->AddConnection(m_PeerHostname, m_PeerPort);
timer->Stop();
m_ReconnectTimer.reset();
return 0;
}
int JsonRpcEndpoint::VerifyCertificateHandler(const VerifyCertificateEventArgs& ea)
{
if (ea.Certificate && ea.ValidCertificate) {

View File

@ -11,22 +11,17 @@ private:
string m_Address;
JsonRpcClient::Ptr m_Client;
map<string, Endpoint::Ptr> m_PendingCalls;
Timer::Ptr m_ReconnectTimer;
string m_PeerHostname;
unsigned short m_PeerPort;
int NewMessageHandler(const NewMessageEventArgs& nmea);
int ClientClosedHandler(const EventArgs& ea);
int ClientErrorHandler(const SocketErrorEventArgs& ea);
int ClientReconnectHandler(const TimerEventArgs& ea);
int VerifyCertificateHandler(const VerifyCertificateEventArgs& ea);
public:
typedef shared_ptr<JsonRpcEndpoint> Ptr;
typedef weak_ptr<JsonRpcEndpoint> WeakPtr;
void Connect(string host, unsigned short port,
void Connect(string node, string service,
shared_ptr<SSL_CTX> sslContext);
JsonRpcClient::Ptr GetClient(void);