Implemented outbound JSON-RPC client connections.

Made socket error handling more robust.
This commit is contained in:
Gunnar Beutner 2012-04-04 12:22:46 +02:00
parent f20e71ace8
commit 261329e483
14 changed files with 277 additions and 35 deletions

View File

@ -57,6 +57,22 @@ void ConfigObject::SetProperty(const string& name, const string& value)
} }
} }
void ConfigObject::SetPropertyInteger(const string& name, int value)
{
char valueString[20];
sprintf(valueString, "%d", value);
SetProperty(name, string(valueString));
}
void ConfigObject::SetPropertyDouble(const string& name, double value)
{
char valueString[20];
sprintf(valueString, "%f", value);
SetProperty(name, string(valueString));
}
bool ConfigObject::GetProperty(const string& name, string *value) const bool ConfigObject::GetProperty(const string& name, string *value) const
{ {
map<string, string>::const_iterator vi = Properties.find(name); map<string, string>::const_iterator vi = Properties.find(name);

View File

@ -16,6 +16,8 @@ Socket::~Socket(void)
void Socket::Start(void) void Socket::Start(void)
{ {
OnException += bind_weak(&Socket::ExceptionEventHandler, shared_from_this());
Sockets.push_front(static_pointer_cast<Socket>(shared_from_this())); Sockets.push_front(static_pointer_cast<Socket>(shared_from_this()));
} }
@ -62,6 +64,49 @@ void Socket::Close(bool from_dtor)
Stop(); Stop();
} }
string Socket::FormatErrorCode(int code)
{
char *message;
string result = "Unknown socket error.";
#ifdef _WIN32
if (FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM, NULL, code, 0, (char *)&message, 0, NULL) != 0) {
result = string(message);
LocalFree(message);
}
#else /* _WIN32 */
if (code != 0)
message = strerror(code);
result = string(message);
#endif /* _WIN32 */
return result;
}
int Socket::ExceptionEventHandler(EventArgs::Ptr ea)
{
int opt;
socklen_t optlen = sizeof(opt);
int rc = getsockopt(GetFD(), SOL_SOCKET, SO_ERROR, (char *)&opt, &optlen);
if (rc < 0) {
Close();
return 0;
}
if (opt != 0) {
SocketErrorEventArgs::Ptr ea = make_shared<SocketErrorEventArgs>();
ea->Code = opt;
ea->Message = FormatErrorCode(opt);
Close();
}
return 0;
}
void Socket::CloseAllSockets(void) void Socket::CloseAllSockets(void)
{ {
for (list<Socket::WeakPtr>::iterator i = Sockets.begin(); i != Sockets.end(); ) { for (list<Socket::WeakPtr>::iterator i = Sockets.begin(); i != Sockets.end(); ) {

View File

@ -3,11 +3,24 @@
namespace icinga { namespace icinga {
struct SocketErrorEventArgs : public EventArgs
{
typedef shared_ptr<SocketErrorEventArgs> Ptr;
typedef weak_ptr<SocketErrorEventArgs> WeakPtr;
int Code;
string Message;
};
class Socket : public Object class Socket : public Object
{ {
private: private:
SOCKET m_FD; SOCKET m_FD;
int ExceptionEventHandler(EventArgs::Ptr ea);
string FormatErrorCode(int errorCode);
protected: protected:
Socket(void); Socket(void);
@ -30,6 +43,7 @@ public:
event<EventArgs::Ptr> OnWritable; event<EventArgs::Ptr> OnWritable;
event<EventArgs::Ptr> OnException; event<EventArgs::Ptr> OnException;
event<SocketErrorEventArgs::Ptr> OnError;
event<EventArgs::Ptr> OnClosed; event<EventArgs::Ptr> OnClosed;
virtual bool WantsToRead(void) const; virtual bool WantsToRead(void) const;

View File

@ -6,6 +6,8 @@ TCPClient::TCPClient(void)
{ {
m_SendQueue = make_shared<FIFO>(); m_SendQueue = make_shared<FIFO>();
m_RecvQueue = make_shared<FIFO>(); m_RecvQueue = make_shared<FIFO>();
m_PeerPort = 0;
} }
void TCPClient::Start(void) void TCPClient::Start(void)
@ -16,6 +18,35 @@ void TCPClient::Start(void)
OnWritable += bind_weak(&TCPClient::WritableEventHandler, shared_from_this()); OnWritable += bind_weak(&TCPClient::WritableEventHandler, shared_from_this());
} }
void TCPClient::Connect(const string& hostname, unsigned short port)
{
hostent *hent;
sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(port);
hent = gethostbyname(hostname.c_str());
if (hent != NULL)
sin.sin_addr.s_addr = ((in_addr *)hent->h_addr_list[0])->s_addr;
else
sin.sin_addr.s_addr = inet_addr(hostname.c_str());
int rc = connect(GetFD(), (sockaddr *)&sin, sizeof(sin));
#ifdef _WIN32
if (rc < 0 && WSAGetLastError() != WSAEWOULDBLOCK)
#else /* _WIN32 */
if (rc < 0 && errno != EINPROGRESS)
#endif /* _WIN32 */
Close();
m_PeerHost = hostname;
m_PeerPort = port;
}
FIFO::Ptr TCPClient::GetSendQueue(void) FIFO::Ptr TCPClient::GetSendQueue(void)
{ {
return m_SendQueue; return m_SendQueue;
@ -26,6 +57,17 @@ FIFO::Ptr TCPClient::GetRecvQueue(void)
return m_RecvQueue; return m_RecvQueue;
} }
string TCPClient::GetPeerHost(void)
{
return m_PeerHost;
}
int TCPClient::GetPeerPort(void)
{
return m_PeerPort;
}
int TCPClient::ReadableEventHandler(EventArgs::Ptr ea) int TCPClient::ReadableEventHandler(EventArgs::Ptr ea)
{ {
int rc; int rc;

View File

@ -7,6 +7,9 @@ namespace icinga
class TCPClient : public TCPSocket class TCPClient : public TCPSocket
{ {
private: private:
string m_PeerHost;
int m_PeerPort;
FIFO::Ptr m_SendQueue; FIFO::Ptr m_SendQueue;
FIFO::Ptr m_RecvQueue; FIFO::Ptr m_RecvQueue;
@ -21,11 +24,14 @@ public:
virtual void Start(void); virtual void Start(void);
void Connect(const char *hostname, unsigned short port); void Connect(const string& hostname, unsigned short port);
FIFO::Ptr GetSendQueue(void); FIFO::Ptr GetSendQueue(void);
FIFO::Ptr GetRecvQueue(void); FIFO::Ptr GetRecvQueue(void);
string GetPeerHost(void);
int GetPeerPort(void);
virtual bool WantsToRead(void) const; virtual bool WantsToRead(void) const;
virtual bool WantsToWrite(void) const; virtual bool WantsToWrite(void) const;

View File

@ -26,7 +26,12 @@ void TCPServer::Start(void)
void TCPServer::Listen(void) void TCPServer::Listen(void)
{ {
listen(GetFD(), SOMAXCONN); int rc = listen(GetFD(), SOMAXCONN);
if (rc < 0) {
Close();
return;
}
Start(); Start();
} }

View File

@ -27,5 +27,9 @@ void TCPSocket::Bind(const char *hostname, unsigned short port)
sin.sin_family = AF_INET; sin.sin_family = AF_INET;
sin.sin_addr.s_addr = hostname ? inet_addr(hostname) : htonl(INADDR_ANY); sin.sin_addr.s_addr = hostname ? inet_addr(hostname) : htonl(INADDR_ANY);
sin.sin_port = htons(port); sin.sin_port = htons(port);
::bind(GetFD(), (sockaddr *)&sin, sizeof(sin));
int rc = ::bind(GetFD(), (sockaddr *)&sin, sizeof(sin));
if (rc < 0)
Close();
} }

View File

@ -90,6 +90,17 @@ unsigned int Timer::GetInterval(void) const
return m_Interval; return m_Interval;
} }
void Timer::SetUserArgs(const EventArgs::Ptr& userArgs)
{
m_UserArgs = userArgs;
}
EventArgs::Ptr Timer::GetUserArgs(void) const
{
return m_UserArgs;
}
void Timer::Start(void) void Timer::Start(void)
{ {
Stop(); Stop();

View File

@ -7,6 +7,7 @@
#include <sys/stat.h> #include <sys/stat.h>
#include <fcntl.h> #include <fcntl.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <netdb.h>
#include <sys/ioctl.h> #include <sys/ioctl.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <arpa/inet.h> #include <arpa/inet.h>

View File

@ -80,8 +80,14 @@ int ConfigRpcComponent::FetchObjectsHandler(NewMessageEventArgs::Ptr ea)
int ConfigRpcComponent::LocalObjectCreatedHandler(ConfigObjectEventArgs::Ptr ea) int ConfigRpcComponent::LocalObjectCreatedHandler(ConfigObjectEventArgs::Ptr ea)
{ {
ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea->Source); ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea->Source);
int replicate = 0;
object->GetPropertyInteger("replicate", &replicate);
if (replicate) {
ConnectionManager::Ptr connectionManager = GetIcingaApplication()->GetConnectionManager(); ConnectionManager::Ptr connectionManager = GetIcingaApplication()->GetConnectionManager();
connectionManager->SendMessage(MakeObjectMessage(object, "config::ObjectCreated", true)); connectionManager->SendMessage(MakeObjectMessage(object, "config::ObjectCreated", true));
}
return 0; return 0;
} }
@ -89,8 +95,14 @@ int ConfigRpcComponent::LocalObjectCreatedHandler(ConfigObjectEventArgs::Ptr ea)
int ConfigRpcComponent::LocalObjectRemovedHandler(ConfigObjectEventArgs::Ptr ea) int ConfigRpcComponent::LocalObjectRemovedHandler(ConfigObjectEventArgs::Ptr ea)
{ {
ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea->Source); ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea->Source);
int replicate = 0;
object->GetPropertyInteger("replicate", &replicate);
if (replicate) {
ConnectionManager::Ptr connectionManager = GetIcingaApplication()->GetConnectionManager(); ConnectionManager::Ptr connectionManager = GetIcingaApplication()->GetConnectionManager();
connectionManager->SendMessage(MakeObjectMessage(object, "config::ObjectRemoved", false)); connectionManager->SendMessage(MakeObjectMessage(object, "config::ObjectRemoved", false));
}
return 0; return 0;
} }
@ -98,6 +110,11 @@ int ConfigRpcComponent::LocalObjectRemovedHandler(ConfigObjectEventArgs::Ptr ea)
int ConfigRpcComponent::LocalPropertyChangedHandler(ConfigObjectEventArgs::Ptr ea) int ConfigRpcComponent::LocalPropertyChangedHandler(ConfigObjectEventArgs::Ptr ea)
{ {
ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea->Source); ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea->Source);
int replicate = 0;
object->GetPropertyInteger("replicate", &replicate);
if (replicate) {
JsonRpcMessage::Ptr msg = MakeObjectMessage(object, "config::ObjectRemoved", false); JsonRpcMessage::Ptr msg = MakeObjectMessage(object, "config::ObjectRemoved", false);
cJSON *params = msg->GetParams(); cJSON *params = msg->GetParams();
cJSON_AddStringToObject(params, "property", ea->Property.c_str()); cJSON_AddStringToObject(params, "property", ea->Property.c_str());
@ -107,6 +124,7 @@ int ConfigRpcComponent::LocalPropertyChangedHandler(ConfigObjectEventArgs::Ptr e
ConnectionManager::Ptr connectionManager = GetIcingaApplication()->GetConnectionManager(); ConnectionManager::Ptr connectionManager = GetIcingaApplication()->GetConnectionManager();
connectionManager->SendMessage(msg); connectionManager->SendMessage(msg);
}
return 0; return 0;
} }

View File

@ -30,28 +30,33 @@ int IcingaApplication::Main(const vector<string>& args)
string componentDirectory = GetExeDirectory() + "/../lib/icinga"; string componentDirectory = GetExeDirectory() + "/../lib/icinga";
AddComponentSearchDir(componentDirectory); AddComponentSearchDir(componentDirectory);
function<int (ConfigObjectEventArgs::Ptr)> NewComponentHandler;
NewComponentHandler = bind_weak(&IcingaApplication::NewComponentHandler, shared_from_this());
ConfigCollection::Ptr componentCollection = GetConfigHive()->GetCollection("component"); ConfigCollection::Ptr componentCollection = GetConfigHive()->GetCollection("component");
function<int (ConfigObjectEventArgs::Ptr)> NewComponentHandler = bind_weak(&IcingaApplication::NewComponentHandler, shared_from_this());
componentCollection->OnObjectCreated += NewComponentHandler; componentCollection->OnObjectCreated += NewComponentHandler;
componentCollection->ForEachObject(NewComponentHandler); componentCollection->ForEachObject(NewComponentHandler);
function<int (ConfigObjectEventArgs::Ptr)> DeletedComponentHandler; componentCollection->OnObjectRemoved += bind_weak(&IcingaApplication::DeletedComponentHandler, shared_from_this());
DeletedComponentHandler = bind_weak(&IcingaApplication::DeletedComponentHandler, shared_from_this());
componentCollection->OnObjectRemoved += DeletedComponentHandler;
function<int (ConfigObjectEventArgs::Ptr)> NewRpcListenerHandler;
NewRpcListenerHandler = bind_weak(&IcingaApplication::NewRpcListenerHandler, shared_from_this());
ConfigCollection::Ptr listenerCollection = GetConfigHive()->GetCollection("rpclistener"); ConfigCollection::Ptr listenerCollection = GetConfigHive()->GetCollection("rpclistener");
function<int (ConfigObjectEventArgs::Ptr)> NewRpcListenerHandler = bind_weak(&IcingaApplication::NewRpcListenerHandler, shared_from_this());
listenerCollection->OnObjectCreated += NewRpcListenerHandler; listenerCollection->OnObjectCreated += NewRpcListenerHandler;
listenerCollection->ForEachObject(NewRpcListenerHandler); listenerCollection->ForEachObject(NewRpcListenerHandler);
function<int (ConfigObjectEventArgs::Ptr)> DeletedRpcListenerHandler; listenerCollection->OnObjectRemoved += bind_weak(&IcingaApplication::DeletedRpcListenerHandler, shared_from_this());
DeletedRpcListenerHandler = bind_weak(&IcingaApplication::DeletedRpcListenerHandler, shared_from_this());
listenerCollection->OnObjectRemoved += DeletedRpcListenerHandler; ConfigCollection::Ptr connectionCollection = GetConfigHive()->GetCollection("rpcconnection");
function<int (ConfigObjectEventArgs::Ptr)> NewRpcConnectionHandler = bind_weak(&IcingaApplication::NewRpcConnectionHandler, shared_from_this());
connectionCollection->OnObjectCreated += NewRpcConnectionHandler;
connectionCollection->ForEachObject(NewRpcConnectionHandler);
connectionCollection->OnObjectRemoved += bind_weak(&IcingaApplication::DeletedRpcConnectionHandler, shared_from_this());
ConfigObject::Ptr fileComponentConfig = make_shared<ConfigObject>("component", "configfilecomponent"); ConfigObject::Ptr fileComponentConfig = make_shared<ConfigObject>("component", "configfilecomponent");
fileComponentConfig->SetProperty("configFilename", args[1]); fileComponentConfig->SetProperty("configFilename", args[1]);
fileComponentConfig->SetPropertyInteger("replicate", 0);
GetConfigHive()->AddObject(fileComponentConfig); GetConfigHive()->AddObject(fileComponentConfig);
ConfigCollection::Ptr collection = GetConfigHive()->GetCollection("rpclistener"); ConfigCollection::Ptr collection = GetConfigHive()->GetCollection("rpclistener");
@ -109,10 +114,7 @@ int IcingaApplication::NewRpcListenerHandler(ConfigObjectEventArgs::Ptr ea)
Log("Creating JSON-RPC listener on port %d", port); Log("Creating JSON-RPC listener on port %d", port);
JsonRpcServer::Ptr server = make_shared<JsonRpcServer>(); GetConnectionManager()->AddListener(port);
server->Bind(port);
server->Start();
GetConnectionManager()->RegisterServer(server);
return 0; return 0;
} }
@ -124,4 +126,31 @@ int IcingaApplication::DeletedRpcListenerHandler(ConfigObjectEventArgs::Ptr ea)
return 0; return 0;
} }
int IcingaApplication::NewRpcConnectionHandler(ConfigObjectEventArgs::Ptr ea)
{
ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea->Source);
string hostname;
int port;
if (!object->GetProperty("hostname", &hostname))
throw Exception("Parameter 'hostname' is required for 'rpcconnection' objects.");
if (!object->GetPropertyInteger("port", &port))
throw Exception("Parameter 'port' is required for 'rpcconnection' objects.");
Log("Creating JSON-RPC connection to %s:%d", hostname.c_str(), port);
GetConnectionManager()->AddConnection(hostname, port);
return 0;
}
int IcingaApplication::DeletedRpcConnectionHandler(ConfigObjectEventArgs::Ptr ea)
{
throw Exception("Unsupported operation.");
return 0;
}
SET_START_CLASS(icinga::IcingaApplication); SET_START_CLASS(icinga::IcingaApplication);

View File

@ -15,6 +15,9 @@ private:
int NewRpcListenerHandler(ConfigObjectEventArgs::Ptr ea); int NewRpcListenerHandler(ConfigObjectEventArgs::Ptr ea);
int DeletedRpcListenerHandler(ConfigObjectEventArgs::Ptr ea); int DeletedRpcListenerHandler(ConfigObjectEventArgs::Ptr ea);
int NewRpcConnectionHandler(ConfigObjectEventArgs::Ptr ea);
int DeletedRpcConnectionHandler(ConfigObjectEventArgs::Ptr ea);
public: public:
typedef shared_ptr<IcingaApplication> Ptr; typedef shared_ptr<IcingaApplication> Ptr;
typedef weak_ptr<IcingaApplication> WeakPtr; typedef weak_ptr<IcingaApplication> WeakPtr;

View File

@ -2,6 +2,28 @@
using namespace icinga; using namespace icinga;
void ConnectionManager::AddListener(unsigned short port)
{
JsonRpcServer::Ptr server = make_shared<JsonRpcServer>();
RegisterServer(server);
server->MakeSocket();
server->Bind(port);
server->Listen();
server->Start();
}
void ConnectionManager::AddConnection(string host, short port)
{
JsonRpcClient::Ptr client = make_shared<JsonRpcClient>();
RegisterClient(client);
client->MakeSocket();
client->Connect(host, port);
client->Start();
}
void ConnectionManager::RegisterServer(JsonRpcServer::Ptr server) void ConnectionManager::RegisterServer(JsonRpcServer::Ptr server)
{ {
m_Servers.push_front(server); m_Servers.push_front(server);
@ -18,6 +40,7 @@ void ConnectionManager::RegisterClient(JsonRpcClient::Ptr client)
{ {
m_Clients.push_front(client); m_Clients.push_front(client);
client->OnNewMessage += bind_weak(&ConnectionManager::NewMessageHandler, shared_from_this()); client->OnNewMessage += bind_weak(&ConnectionManager::NewMessageHandler, shared_from_this());
client->OnClosed += bind_weak(&ConnectionManager::CloseClientHandler, shared_from_this());
} }
void ConnectionManager::UnregisterClient(JsonRpcClient::Ptr client) void ConnectionManager::UnregisterClient(JsonRpcClient::Ptr client)
@ -36,7 +59,28 @@ int ConnectionManager::NewClientHandler(NewClientEventArgs::Ptr ncea)
int ConnectionManager::CloseClientHandler(EventArgs::Ptr ea) int ConnectionManager::CloseClientHandler(EventArgs::Ptr ea)
{ {
UnregisterClient(static_pointer_cast<JsonRpcClient>(ea->Source)); JsonRpcClient::Ptr client = static_pointer_cast<JsonRpcClient>(ea->Source);
UnregisterClient(client);
Timer::Ptr timer = make_shared<Timer>();
timer->SetInterval(30);
timer->SetUserArgs(ea);
timer->OnTimerExpired += bind_weak(&ConnectionManager::ReconnectClientHandler, shared_from_this());
timer->Start();
m_ReconnectTimers.push_front(timer);
return 0;
}
int ConnectionManager::ReconnectClientHandler(TimerEventArgs::Ptr ea)
{
JsonRpcClient::Ptr client = static_pointer_cast<JsonRpcClient>(ea->UserArgs->Source);
Timer::Ptr timer = static_pointer_cast<Timer>(ea->Source);
AddConnection(client->GetPeerHost(), client->GetPeerPort());
timer->Stop();
m_ReconnectTimers.remove(timer);
return 0; return 0;
} }

View File

@ -9,20 +9,24 @@ class ConnectionManager : public Object
list<JsonRpcServer::Ptr> m_Servers; list<JsonRpcServer::Ptr> m_Servers;
list<JsonRpcClient::Ptr> m_Clients; list<JsonRpcClient::Ptr> m_Clients;
map< string, event<NewMessageEventArgs::Ptr> > m_Methods; map< string, event<NewMessageEventArgs::Ptr> > m_Methods;
list<Timer::Ptr> m_ReconnectTimers;
int NewClientHandler(NewClientEventArgs::Ptr ncea); int NewClientHandler(NewClientEventArgs::Ptr ncea);
int CloseClientHandler(EventArgs::Ptr ea); int CloseClientHandler(EventArgs::Ptr ea);
int ReconnectClientHandler(TimerEventArgs::Ptr ea);
int NewMessageHandler(NewMessageEventArgs::Ptr nmea); int NewMessageHandler(NewMessageEventArgs::Ptr nmea);
void RegisterClient(JsonRpcClient::Ptr server);
void UnregisterClient(JsonRpcClient::Ptr server);
void RegisterServer(JsonRpcServer::Ptr server);
void UnregisterServer(JsonRpcServer::Ptr server);
public: public:
typedef shared_ptr<ConnectionManager> Ptr; typedef shared_ptr<ConnectionManager> Ptr;
typedef weak_ptr<ConnectionManager> WeakPtr; typedef weak_ptr<ConnectionManager> WeakPtr;
void RegisterServer(JsonRpcServer::Ptr server); void AddListener(unsigned short port);
void UnregisterServer(JsonRpcServer::Ptr server); void AddConnection(string host, short port);
void RegisterClient(JsonRpcClient::Ptr client);
void UnregisterClient(JsonRpcClient::Ptr client);
void RegisterMethod(string method, function<int (NewMessageEventArgs::Ptr)> callback); void RegisterMethod(string method, function<int (NewMessageEventArgs::Ptr)> callback);
void UnregisterMethod(string method, function<int (NewMessageEventArgs::Ptr)> callback); void UnregisterMethod(string method, function<int (NewMessageEventArgs::Ptr)> callback);