Refactored the socket classes.

This commit is contained in:
Gunnar Beutner 2012-11-22 12:04:32 +01:00
parent 482742f00e
commit 321d66023f
45 changed files with 1298 additions and 1490 deletions

View File

@ -4,11 +4,11 @@ pkglib_LTLIBRARIES = \
compatido.la
compatido_la_SOURCES = \
i2-compatido.h \
idosocket.cpp \
idosocket.h \
compatidocomponent.cpp \
compatidocomponent.h
compatidocomponent.h \
i2-compatido.h \
idoconnection.cpp \
idoconnection.h
compatido_la_CPPFLAGS = \
$(BOOST_CPPFLAGS) \

View File

@ -151,12 +151,12 @@
</ItemDefinitionGroup>
<ItemGroup>
<ClCompile Include="compatidocomponent.cpp" />
<ClCompile Include="idosocket.cpp" />
<ClCompile Include="idoconnection.cpp" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="compatidocomponent.h" />
<ClInclude Include="i2-compatido.h" />
<ClInclude Include="idosocket.h" />
<ClInclude Include="idoconnection.h" />
</ItemGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">

View File

@ -14,7 +14,7 @@
<ClCompile Include="compatidocomponent.cpp">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="idosocket.cpp">
<ClCompile Include="idoconnection.cpp">
<Filter>Source Files</Filter>
</ClCompile>
</ItemGroup>
@ -25,7 +25,7 @@
<ClInclude Include="i2-compatido.h">
<Filter>Header Files</Filter>
</ClInclude>
<ClInclude Include="idosocket.h">
<ClInclude Include="idoconnection.h">
<Filter>Header Files</Filter>
</ClInclude>
</ItemGroup>

View File

@ -38,13 +38,13 @@ const int CompatIdoComponent::DefaultReconnectInterval = 15;
String CompatIdoComponent::GetSocketAddress(void) const
{
Value address = GetConfig()->Get("socket_address");
if (address.IsEmpty())
return DefaultSocketAddress;
else
return address;
}
/**
* Reads the socket port from the config
* @returns port The config option, or static default
@ -52,6 +52,7 @@ String CompatIdoComponent::GetSocketAddress(void) const
String CompatIdoComponent::GetSocketPort(void) const
{
Value port = GetConfig()->Get("socket_port");
if (port.IsEmpty())
return DefaultSocketPort;
else
@ -65,6 +66,7 @@ String CompatIdoComponent::GetSocketPort(void) const
String CompatIdoComponent::GetInstanceName(void) const
{
Value instance = GetConfig()->Get("instance_name");
if (instance.IsEmpty())
return DefaultInstanceName;
else
@ -75,34 +77,16 @@ String CompatIdoComponent::GetInstanceName(void) const
* Reads the reconnect interval from the config
* @returns reconnect_interval The config option, or static default
*/
int CompatIdoComponent::GetReconnectInterval(void) const
double CompatIdoComponent::GetReconnectInterval(void) const
{
Value interval = GetConfig()->Get("reconnect_interval");
if (interval.IsEmpty())
return DefaultReconnectInterval;
else
return interval;
}
/**
* Sets config dump in progress state
*/
void CompatIdoComponent::SetConfigDumpInProgress(bool state)
{
m_ConfigDumpInProgress = state;
}
/**
* Get state of config in progress
*
* @returns state bis config dump in progress.
*/
bool CompatIdoComponent::GetConfigDumpInProgress(void)
{
return m_ConfigDumpInProgress;
}
/**
* Starts the component.
*/
@ -111,7 +95,7 @@ void CompatIdoComponent::Start(void)
const int StatusTimerInterval = 60;
const int ConfigTimerInterval = 3600;
const int ProgramStatusTimerInterval = 15;
const int ReconnectTimerInterval = GetReconnectInterval();
const double ReconnectTimerInterval = GetReconnectInterval();
/* FIXME - make this a config option when unix sockets are realdy */
@ -125,12 +109,7 @@ void CompatIdoComponent::Start(void)
/*
* open ido socket once
*/
OpenIdoSocket(IdoSocketType);
/*
* tell ido2db that we just started
*/
SendStartProcess();
OpenIdoSocket();
/*
* ddump the config later (can't do that within start of the component)
@ -171,7 +150,6 @@ void CompatIdoComponent::Start(void)
m_ReconnectTimer->Start();
}
/**
* Stops the component.
*/
@ -180,24 +158,58 @@ void CompatIdoComponent::Stop(void)
CloseIdoSocket();
}
/**
* Opens the ido socket, and sends hello to ido2db
*/
void CompatIdoComponent::OpenIdoSocket(bool sockettype)
void CompatIdoComponent::OpenIdoSocket(void)
{
OpenSink(GetSocketAddress(), GetSocketPort());
SendHello(GetInstanceName(), sockettype);
TcpSocket::Ptr socket = boost::make_shared<TcpSocket>();
socket->Connect(GetSocketAddress(), GetSocketPort());
socket->Start();
m_IdoSocket->SetSocketType(sockettype);
/*
* if we're connected, do not reconnecte
*/
if(m_IdoSocket->IsConnected()) {
m_IdoSocket->SetReconnect(false);
} else {
m_IdoSocket->SetReconnect(true);
}
m_IdoConnection = boost::make_shared<IdoConnection>(socket);
m_IdoConnection->OnClosed.connect(boost::bind(&CompatIdoComponent::SocketDisconnectHandler, this));
/* FIXME */
#define COMPATIDO_PROTOCOL 2
#define COMPATIDO_NAME "ICINGA2 COMPATIDO"
#define COMPATIDO_RELEASE_VERSION "2.0"
/* connection is always TCP */
/* connecttype is always initial */
stringstream msgHello;
msgHello << "\n\n"
<< "HELLO" << "\n"
<< "PROTOCOL" << ": " << COMPATIDO_PROTOCOL<< "\n"
<< "AGENT" << ": " << COMPATIDO_NAME << "\n"
<< "AGENTVERSION" << ": " << VERSION << "\n"
<< "STARTTIME" << ": " << static_cast<int>(Utility::GetTime()) << "\n"
<< "DISPOSITION" << ": " << "REALTIME" << "\n"
<< "CONNECTION" << ": " << "TCPSOCKET" << "\n"
<< "INSTANCENAME" << ": " << GetInstanceName() << "\n"
<< "STARTDATADUMP"
<< "\n\n";
m_IdoConnection->SendMessage(msgHello.str());
/* TODO */
#define PROGRAM_MODIFICATION_DATE "10-17-2012"
#define PROGRAM_RELEASE_VERSION "2.0"
stringstream msgProcessData;
msgProcessData << "\n"
<< 200 << "\n" /* processdata */
<< 1 << "=" << 104 << "\n" /* type = pprocess prelaunch */
<< 2 << "=" << "" << "\n" /* flags */
<< 3 << "=" << "" << "\n" /* attributes */
<< 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n" /* timestamp */
<< 105 << "=" << "Icinga2" << "\n" /* progranname */
<< 107 << "=" << PROGRAM_RELEASE_VERSION << "\n" /* programversion */
<< 104 << "=" << PROGRAM_MODIFICATION_DATE << "\n" /* programdata */
<< 102 << "=" << Utility::GetPid() << "\n" /* process id */
<< 999 << "\n\n"; /* enddata */
m_IdoConnection->SendMessage(msgProcessData.str());
}
/*
@ -205,10 +217,20 @@ void CompatIdoComponent::OpenIdoSocket(bool sockettype)
*/
void CompatIdoComponent::CloseIdoSocket(void)
{
GoodByeSink();
CloseSink();
stringstream message;
message << "\n"
<< 1000 << "\n" /* enddatadump */
<< "ENDTIME" << ": " << static_cast<int>(Utility::GetTime()) << "\n" /* endtime */
<< "GOODBYE" /* goodbye */
<< "\n\n";
m_IdoConnection->SendMessage(message.str());
}
void CompatIdoComponent::SocketDisconnectHandler(void)
{
m_IdoConnection.reset();
}
/* TODO
* subscribe to all status updates and checkresults and dump them
@ -235,10 +257,7 @@ void CompatIdoComponent::ConfigTimerHandler(void)
{
Logger::Write(LogInformation, "compatido", "Writing compat ido config information");
/* protect the dump of status update messages */
SetConfigDumpInProgress(true);
DumpConfigObjects();
SetConfigDumpInProgress(false);
}
/**
@ -246,10 +265,6 @@ void CompatIdoComponent::ConfigTimerHandler(void)
*/
void CompatIdoComponent::ProgramStatusTimerHandler(void)
{
/* do not dump any data if config dump is still in progress */
if(GetConfigDumpInProgress())
return;
Logger::Write(LogInformation, "compatido", "Writing compat ido program status information");
DumpProgramStatusData();
@ -262,151 +277,9 @@ void CompatIdoComponent::ReconnectTimerHandler(void)
{
Logger::Write(LogDebug, "compatido", "Checking if ido socket requires reconnect");
if(m_IdoSocket->GetReconnect()) {
/* check if we aren't already connected */
if(m_IdoSocket->IsConnected()) {
Logger::Write(LogDebug, "compatido", "Already connected to ido socket ... no reconnect necessary");
return;
}
if (!m_IdoConnection)
/* socket was disconnected, recconnect */
OpenIdoSocket(m_IdoSocket->GetSocketType());
if(m_IdoSocket->IsConnected()) {
Logger::Write(LogInformation, "compatido", "Successfully reconnected to ido socket");
} else {
stringstream message;
message << "Unable to reconnect to ido socket. Trying again in " << GetReconnectInterval() << " sec";
Logger::Write(LogWarning, "compatido", message.str());
}
}
}
/**
* opens a tcp connection to the ido socket
*/
void CompatIdoComponent::OpenSink(String node, String service)
{
m_IdoSocket = boost::make_shared<IdoSocket>(RoleOutbound);
m_IdoSocket->Connect(node, service);
m_IdoSocket->Start();
}
/**
* sends hello msg to ido2b
*/
void CompatIdoComponent::SendHello(String instancename, bool sockettype)
{
/* FIXME */
#define COMPATIDO_PROTOCOL 2
#define COMPATIDO_NAME "ICINGA2 COMPATIDO"
#define COMPATIDO_RELEASE_VERSION "2.0"
String connection;
if(sockettype)
connection = "TCPSOCKET";
else
connection = "UNIXSOCKET";
/* connection is always TCP */
/* connecttype is always initial */
stringstream message;
message << "\n\n"
<< "HELLO" << "\n"
<< "PROTOCOL" << ": " << COMPATIDO_PROTOCOL<< "\n"
<< "AGENT" << ": " << COMPATIDO_NAME << "\n"
<< "AGENTVERSION" << ": " << VERSION << "\n"
<< "STARTTIME" << ": " << static_cast<int>(Utility::GetTime()) << "\n"
<< "DISPOSITION" << ": " << "REALTIME" << "\n"
<< "CONNECTION" << ": " << connection << "\n"
<< "INSTANCENAME" << ": " << instancename << "\n"
<< "STARTDATADUMP"
<< "\n\n";
m_IdoSocket->SendMessage(message.str());
}
/**
* sends goodbye msg to ido
*/
void CompatIdoComponent::GoodByeSink(void)
{
stringstream message;
message << "\n"
<< 1000 << "\n" /* enddatadump */
<< "ENDTIME" << ": " << static_cast<int>(Utility::GetTime()) << "\n" /* endtime */
<< "GOODBYE" /* goodbye */
<< "\n\n";
m_IdoSocket->SendMessage(message.str());
}
/**
* closes ido socket
*/
void CompatIdoComponent::CloseSink(void)
{
m_IdoSocket->Close();
}
/**
* tell ido2db that we are starting up (must be called before config dump)
*/
void CompatIdoComponent::SendStartProcess(void)
{
/* TODO */
#define PROGRAM_MODIFICATION_DATE "10-17-2012"
#define PROGRAM_RELEASE_VERSION "2.0"
stringstream message;
message << "\n"
<< 200 << "\n" /* processdata */
<< 1 << "=" << 104 << "\n" /* type = pprocess prelaunch */
<< 2 << "=" << "" << "\n" /* flags */
<< 3 << "=" << "" << "\n" /* attributes */
<< 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n" /* timestamp */
<< 105 << "=" << "Icinga2" << "\n" /* progranname */
<< 107 << "=" << PROGRAM_RELEASE_VERSION << "\n" /* programversion */
<< 104 << "=" << PROGRAM_MODIFICATION_DATE << "\n" /* programdata */
<< 102 << "=" << Utility::GetPid() << "\n" /* process id */
<< 999 << "\n\n"; /* enddata */
m_IdoSocket->SendMessage(message.str());
}
/**
* sends config dump start signal to ido
*/
void CompatIdoComponent::StartConfigDump(void)
{
/* configtype =1 (original), =2 (retained == default) */
stringstream message;
message << "\n\n"
<< 900 << ":" << "\n" /* startconfigdump */
<< 245 << "=" << "RETAINED" << "\n" /* configdumptype */
<< 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n" /* timestamp */
<< 999 /* enddata */
<< "\n\n";
m_IdoSocket->SendMessage(message.str());
}
/**
* sends config dump end signal to ido
*/
void CompatIdoComponent::EndConfigDump(void)
{
stringstream message;
message << "\n\n"
<< 901 << ":" << "\n" /* endconfigdump */
<< 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n" /* timestamp */
<< 999 /* enddata */
<< "\n\n";
m_IdoSocket->SendMessage(message.str());
OpenIdoSocket();
}
/**
@ -421,7 +294,7 @@ void CompatIdoComponent::EnableHostObject(const Host::Ptr& host)
<< 53 << "=" << host->GetName() << "\n" /* host */
<< 999 << "\n\n"; /* enddata */
m_IdoSocket->SendMessage(message.str());
m_IdoConnection->SendMessage(message.str());
}
/**
@ -437,7 +310,7 @@ void CompatIdoComponent::EnableServiceObject(const Service::Ptr& service)
<< 114 << "=" << service->GetAlias() << "\n" /* service */
<< 999 << "\n\n"; /* enddata */
m_IdoSocket->SendMessage(message.str());
m_IdoConnection->SendMessage(message.str());
}
/**
@ -452,7 +325,7 @@ void CompatIdoComponent::DisableHostObject(const Host::Ptr& host)
<< 53 << "=" << host->GetName() << "\n" /* host */
<< 999 << "\n\n"; /* enddata */
m_IdoSocket->SendMessage(message.str());
m_IdoConnection->SendMessage(message.str());
}
/**
@ -468,12 +341,9 @@ void CompatIdoComponent::DisableServiceObject(const Service::Ptr& service)
<< 114 << "=" << service->GetAlias() << "\n" /* service */
<< 999 << "\n\n"; /* enddata */
m_IdoSocket->SendMessage(message.str());
m_IdoConnection->SendMessage(message.str());
}
/**
* dump host config to ido
*
@ -553,7 +423,7 @@ void CompatIdoComponent::DumpHostObject(const Host::Ptr& host)
<< 262 << "=" << "i2_customvar" << ":" << 1 << ":" << "i2_custom_var_mod" << "\n" /* customvariable */
<< 999 << "\n\n"; /* enddata */
m_IdoSocket->SendMessage(message.str());
m_IdoConnection->SendMessage(message.str());
}
/**
@ -563,7 +433,6 @@ void CompatIdoComponent::DumpHostObject(const Host::Ptr& host)
*/
void CompatIdoComponent::DumpHostStatus(const Host::Ptr& host)
{
stringstream log;
log << "Dumping Host Status: " << host->GetName();
Logger::Write(LogDebug, "compatido", log.str());
@ -631,7 +500,7 @@ void CompatIdoComponent::DumpHostStatus(const Host::Ptr& host)
<< 262 << "=" << "i2_customvar" << ":" << "1" << ":" << "i2_customvarmod" << "\n" /* customvariable */
<< 999 << "\n\n"; /* enddata */
m_IdoSocket->SendMessage(message.str());
m_IdoConnection->SendMessage(message.str());
}
/**
@ -705,7 +574,7 @@ void CompatIdoComponent::DumpServiceObject(const Service::Ptr& service)
<< 262 << "=" << "i2_customvar" << ":" << 1 << ":" << "i2_custom_var_mod" << "\n" /* customvariable */
<< 999 << "\n\n"; /* enddata */
m_IdoSocket->SendMessage(message.str());
m_IdoConnection->SendMessage(message.str());
}
/**
@ -800,7 +669,7 @@ void CompatIdoComponent::DumpServiceStatus(const Service::Ptr& service)
<< 262 << "=" << "i2_customvar" << ":" << "1" << ":" << "i2_customvarmod" << "\n" /* customvariable */
<< 999 << "\n\n"; /* enddata */
m_IdoSocket->SendMessage(message.str());
m_IdoConnection->SendMessage(message.str());
}
@ -841,7 +710,7 @@ void CompatIdoComponent::DumpProgramStatusData(void)
<< 270 << "=" << static_cast<int>(Utility::GetTime()) << "\n" /* disablednotificationsexpiretime - supported in 1.8 XXX */
<< 999 << "\n\n"; /* enddata */
m_IdoSocket->SendMessage(message.str());
m_IdoConnection->SendMessage(message.str());
}
/**
@ -856,7 +725,16 @@ void CompatIdoComponent::DumpConfigObjects(void)
*/
/* tell ido2db that we start now */
StartConfigDump();
/* configtype =1 (original), =2 (retained == default) */
stringstream msgStartConfigDump;
msgStartConfigDump << "\n\n"
<< 900 << ":" << "\n" /* startconfigdump */
<< 245 << "=" << "RETAINED" << "\n" /* configdumptype */
<< 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n" /* timestamp */
<< 999 /* enddata */
<< "\n\n";
m_IdoConnection->SendMessage(msgStartConfigDump.str());
/* hosts and hostgroups */
map<String, vector<String> > hostgroups;
@ -896,11 +774,11 @@ void CompatIdoComponent::DumpConfigObjects(void)
<< 172 << "=" << name << "\n" /* hostgroupname */
<< 170 << "=" << hg->GetAlias() << "\n"; /* hostgroupalias */
CreateMessageList(message, hosts, 171); /* hostgroupmember */
SendMessageList(message, hosts, 171); /* hostgroupmember */
message << 999 << "\n\n"; /* enddata */
m_IdoSocket->SendMessage(message.str());
m_IdoConnection->SendMessage(message.str());
}
}
@ -950,16 +828,23 @@ void CompatIdoComponent::DumpConfigObjects(void)
sglist.push_back(service->GetAlias());
}
CreateMessageList(message, services, 219); /* servicegroupmember */
SendMessageList(message, services, 219); /* servicegroupmember */
message << 999 << "\n\n"; /* enddata */
m_IdoSocket->SendMessage(message.str());
m_IdoConnection->SendMessage(message.str());
}
}
/* tell ido2db that we ended dumping the config */
EndConfigDump();
stringstream msgEndConfigDump;
msgEndConfigDump << "\n\n"
<< 901 << ":" << "\n" /* endconfigdump */
<< 4 << "=" << std::setprecision(17) << Utility::GetTime() << "\n" /* timestamp */
<< 999 /* enddata */
<< "\n\n";
m_IdoConnection->SendMessage(msgEndConfigDump.str());
}
/**

View File

@ -38,33 +38,24 @@ private:
Timer::Ptr m_ProgramStatusTimer;
Timer::Ptr m_ReconnectTimer;
IdoSocket::Ptr m_IdoSocket;
bool m_ConfigDumpInProgress;
IdoConnection::Ptr m_IdoConnection;
String GetSocketAddress(void) const;
String GetSocketPort(void) const;
String GetInstanceName(void) const;
int GetReconnectInterval(void) const;
double GetReconnectInterval(void) const;
void SetConfigDumpInProgress(bool state);
bool GetConfigDumpInProgress(void);
void SocketDisconnectHandler(void);
void ConfigTimerHandler(void);
void StatusTimerHandler(void);
void ProgramStatusTimerHandler(void);
void ReconnectTimerHandler(void);
void OpenIdoSocket(bool sockettype);
void OpenIdoSocket(void);
void CloseIdoSocket(void);
void OpenSink(String node, String service);
void SendHello(String instancename, bool sockettype);
void GoodByeSink(void);
void CloseSink(void);
void SendStartProcess(void);
void StartConfigDump(void);
void EndConfigDump(void);
void EnableHostObject(const Host::Ptr& host);
void EnableServiceObject(const Service::Ptr& service);
@ -80,7 +71,7 @@ private:
void DumpProgramStatusData(void);
template<typename T>
void CreateMessageList(stringstream& msg, const T& list, int type)
void SendMessageList(stringstream& msg, const T& list, int type)
{
typename T::const_iterator it;
for (it = list.begin(); it != list.end(); it++) {

View File

@ -33,7 +33,7 @@
using std::stringstream;
#include "idosocket.h"
#include "idoconnection.h"
#include "compatidocomponent.h"

View File

@ -0,0 +1,59 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "i2-compatido.h"
using namespace icinga;
/**
* Constructor for the IdoSocket class.
*
* @param role The role of the underlying TCP client.
*/
IdoConnection::IdoConnection(const Stream::Ptr& stream)
: Connection(stream)
{ }
/**
* Sends a message to the ido socket
*
* @param message The message.
*/
void IdoConnection::SendMessage(const String& message)
{
/*
* write our message to the send queue
* as we inherit all the functionality
* of the tcpclient class
*/
GetStream()->Write(message.CStr(), message.GetLength());
}
/**
* Processes inbound data.
* Currently not used, as we do not receive data from ido sockets
*/
void IdoConnection::ProcessData(void)
{
// Just ignore whatever data the other side is sending
GetStream()->Read(NULL, GetStream()->GetAvailableBytes());
return;
}

View File

@ -17,16 +17,31 @@
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "i2-remoting.h"
#ifndef IDOCONNECTION_H
#define IDOCONNECTION_H
using namespace icinga;
namespace icinga
{
/**
* Constructor for the JsonRpcServer class.
* An IDO socket client.
*
* @param sslContext SSL context that should be used for client connections.
* @ingroup compatido
*/
JsonRpcServer::JsonRpcServer(shared_ptr<SSL_CTX> sslContext)
class IdoConnection : public Connection
{
SetClientFactory(boost::bind(&JsonRpcClientFactory, _1, RoleInbound, sslContext));
public:
typedef shared_ptr<IdoConnection> Ptr;
typedef weak_ptr<IdoConnection> WeakPtr;
IdoConnection(const Stream::Ptr& stream);
void SendMessage(const String& message);
protected:
virtual void ProcessData(void);
};
}
#endif /* IDOCONNECTION_H */

View File

@ -1,147 +0,0 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "i2-compatido.h"
using namespace icinga;
/**
* Constructor for the IdoSocket class.
*
* @param role The role of the underlying TCP client.
*/
IdoSocket::IdoSocket(TcpClientRole role)
: TcpClient(role)
{
/*
* we currently do not receive any data from the ido socket,
* this is just data output - so we do not need to bind
* a local instance of our datahandler in case of a new
* signal telling about new data
*/
OnDataAvailable.connect(boost::bind(&IdoSocket::DataAvailableHandler, this));
/*
* what to do on disconnect
*/
OnClosed.connect(boost::bind(&IdoSocket::ClientClosedHandler, this));
}
/**
* * Set the ido socket type
* *
* * @param type true=tcp, false=unix
* */
void IdoSocket::SetSocketType(bool type)
{
m_SocketType = type;
}
/*
* * Get the ido socket type
* *
* * @returns type true=tcp, false=unix
* */
bool IdoSocket::GetSocketType(void)
{
return m_SocketType;
}
/**
* Sends a message to the ido socket
*
* @param message The message.
*/
void IdoSocket::SendMessage(const String& message)
{
/*
* write our message to the send queue
* as we inherit all the functionality
* of the tcpclient class
*/
Write(message.CStr(), message.GetLength());
}
/**
* Handles closed client connect
*/
void IdoSocket::ClientClosedHandler(void)
{
try {
CheckException();
} catch (const exception& ex) {
stringstream message;
message << "Error occured for ido socket: " << ex.what();
Logger::Write(LogWarning, "compatido", message.str());
}
Logger::Write(LogWarning, "compatido", "Lost connection to ido socket");
SetReconnect(true);
OnDisconnected(GetSelf());
}
/**
* Set reconnect vstate
*
* @aparam enable Enables the reconnect.
*/
void IdoSocket::SetReconnect(bool reconnect)
{
m_Reconnect = reconnect;
}
/**
* Get reconnect state
*
* @returns reconnect The reconnect variable
*/
bool IdoSocket::GetReconnect(void)
{
return m_Reconnect;
}
/**
* Processes inbound data.
* Currently not used, as we do not receive data from ido sockets
*/
void IdoSocket::DataAvailableHandler(void)
{
return;
}
/**
* Factory function for ido socket clients.
*
* @param fd The file descriptor.
* @param role The role of the underlying TCP client.
* @returns A new ido socket client.
*/
IdoSocket::Ptr icinga::IdoSocketFactory(SOCKET fd, TcpClientRole role)
{
IdoSocket::Ptr client = boost::make_shared<IdoSocket>(role);
client->SetFD(fd);
return client;
}

View File

@ -1,68 +0,0 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef IDOSOCKET_H
#define IDOSOCKET_H
#include "i2-compatido.h"
namespace icinga
{
/**
* An IDO socket client.
*
* @ingroup compatido
*/
class IdoSocket : public TcpClient
{
public:
typedef shared_ptr<IdoSocket> Ptr;
typedef weak_ptr<IdoSocket> WeakPtr;
IdoSocket(TcpClientRole role);
void SetSocketType(bool);
bool GetSocketType(void);
void SendMessage(const String& message);
void SetReconnect(bool reconnect);
bool GetReconnect(void);
boost::signal<void (const IdoSocket::Ptr&, const stringstream&)> OnNewMessage;
boost::signal<void (const IdoSocket::Ptr&)> OnConnected;
boost::signal<void (const IdoSocket::Ptr&)> OnDisconnected;
private:
void DataAvailableHandler(void);
void ClientClosedHandler(void);
bool m_Reconnect;
bool m_SocketType;
friend IdoSocket::Ptr IdoSocketFactory(SOCKET fd, TcpClientRole role);
};
IdoSocket::Ptr IdoSocketFactory(SOCKET fd, TcpClientRole role);
}
#endif /* JSONRPCCLIENT_H */

View File

@ -68,8 +68,7 @@ void DemoComponent::DemoTimerHandler(void)
void DemoComponent::HelloWorldRequestHandler(const Endpoint::Ptr& sender,
const RequestMessage& request)
{
Logger::Write(LogInformation, "demo", "Got 'hello world' from"
" address=" + sender->GetAddress() + ", identity=" +
Logger::Write(LogInformation, "demo", "Got 'hello world' from identity=" +
sender->GetName());
}

View File

@ -10,6 +10,8 @@ libbase_la_SOURCES = \
asynctask.h \
component.cpp \
component.h \
connection.cpp \
connection.h \
dictionary.cpp \
dictionary.h \
dynamicobject.cpp \
@ -21,7 +23,6 @@ libbase_la_SOURCES = \
fifo.cpp \
fifo.h \
i2-base.h \
ioqueue.h \
logger.cpp \
logger.h \
netstring.cpp \
@ -40,20 +41,20 @@ libbase_la_SOURCES = \
scripttask.h \
socket.cpp \
socket.h \
stream.cpp \
stream.h \
stream_bio.cpp \
stream_bio.h \
streamlogger.cpp \
streamlogger.h \
sysloglogger.cpp \
sysloglogger.h \
tcpclient.cpp \
tcpclient.h \
tcpserver.cpp \
tcpserver.h \
tcpsocket.cpp \
tcpsocket.h \
timer.cpp \
timer.h \
tlsclient.cpp \
tlsclient.h \
tlsstream.cpp \
tlsstream.h \
unix.h \
utility.cpp \
utility.h \

View File

@ -21,6 +21,7 @@
<ItemGroup>
<ClCompile Include="application.cpp" />
<ClCompile Include="component.cpp" />
<ClCompile Include="connection.cpp" />
<ClCompile Include="dynamicobject.cpp" />
<ClCompile Include="dictionary.cpp" />
<ClCompile Include="event.cpp" />
@ -41,13 +42,13 @@
<ClCompile Include="scriptfunction.cpp" />
<ClCompile Include="scripttask.cpp" />
<ClCompile Include="socket.cpp" />
<ClCompile Include="stream.cpp" />
<ClCompile Include="streamlogger.cpp" />
<ClCompile Include="stream_bio.cpp" />
<ClCompile Include="sysloglogger.cpp" />
<ClCompile Include="tcpclient.cpp" />
<ClCompile Include="tcpserver.cpp" />
<ClCompile Include="tcpsocket.cpp" />
<ClCompile Include="timer.cpp" />
<ClCompile Include="tlsclient.cpp" />
<ClCompile Include="tlsstream.cpp" />
<ClCompile Include="utility.cpp" />
<ClCompile Include="value.cpp" />
</ItemGroup>
@ -55,11 +56,12 @@
<ClInclude Include="application.h" />
<ClInclude Include="asynctask.h" />
<ClInclude Include="component.h" />
<ClInclude Include="connection.h" />
<ClInclude Include="dynamicobject.h" />
<ClInclude Include="dictionary.h" />
<ClInclude Include="event.h" />
<ClInclude Include="fifo.h" />
<ClInclude Include="ioqueue.h" />
<ClInclude Include="stream.h" />
<ClInclude Include="netstring.h" />
<ClInclude Include="qstring.h" />
<ClInclude Include="scriptfunction.h" />
@ -72,12 +74,11 @@
<ClInclude Include="ringbuffer.h" />
<ClInclude Include="socket.h" />
<ClInclude Include="streamlogger.h" />
<ClInclude Include="stream_bio.h" />
<ClInclude Include="sysloglogger.h" />
<ClInclude Include="tcpclient.h" />
<ClInclude Include="tcpserver.h" />
<ClInclude Include="tcpsocket.h" />
<ClInclude Include="timer.h" />
<ClInclude Include="tlsclient.h" />
<ClInclude Include="tlsstream.h" />
<ClInclude Include="unix.h" />
<ClInclude Include="utility.h" />
<ClInclude Include="value.h" />

View File

@ -25,30 +25,15 @@
<ClCompile Include="socket.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="tcpclient.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="tcpserver.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="tcpsocket.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="timer.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="tlsclient.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="unix.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="utility.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="win32.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="logger.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
@ -85,6 +70,18 @@
<ClCompile Include="qstring.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="tlsstream.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="stream.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="stream_bio.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="connection.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="application.h">
@ -117,21 +114,12 @@
<ClInclude Include="socket.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="tcpclient.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="tcpserver.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="tcpsocket.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="timer.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="tlsclient.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="unix.h">
<Filter>Headerdateien</Filter>
</ClInclude>
@ -159,9 +147,6 @@
<ClInclude Include="scriptfunction.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="ioqueue.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="fifo.h">
<Filter>Headerdateien</Filter>
</ClInclude>
@ -177,6 +162,18 @@
<ClInclude Include="qstring.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="stream.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="tlsstream.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="stream_bio.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="connection.h">
<Filter>Headerdateien</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<Filter Include="Quelldateien">

44
lib/base/connection.cpp Normal file
View File

@ -0,0 +1,44 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "i2-base.h"
using namespace icinga;
Connection::Connection(const Stream::Ptr& stream)
: m_Stream(stream)
{
m_Stream->OnDataAvailable.connect(boost::bind(&Connection::ProcessData, this));
m_Stream->OnClosed.connect(boost::bind(&Connection::ClosedHandler, this));
}
Stream::Ptr Connection::GetStream(void) const
{
return m_Stream;
}
void Connection::ClosedHandler(void)
{
OnClosed(GetSelf());
}
void Connection::Close(void)
{
m_Stream->Close();
}

View File

@ -17,44 +17,35 @@
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef TCPSERVER_H
#define TCPSERVER_H
#ifndef CONNECTION_H
#define CONNECTION_H
namespace icinga
{
/**
* A TCP server that listens on a TCP port and accepts incoming
* client connections.
*
* @ingroup base
*/
class I2_BASE_API TcpServer : public TcpSocket
class I2_BASE_API Connection : public Object
{
public:
typedef shared_ptr<TcpServer> Ptr;
typedef weak_ptr<TcpServer> WeakPtr;
typedef shared_ptr<Connection> Ptr;
typedef weak_ptr<Connection> WeakPtr;
typedef function<TcpClient::Ptr(SOCKET)> ClientFactory;
Connection(const Stream::Ptr& stream);
TcpServer(void);
Stream::Ptr GetStream(void) const;
void SetClientFactory(const ClientFactory& clientFactory);
ClientFactory GetFactoryFunction(void) const;
void Close(void);
void Listen(void);
boost::signal<void (const TcpServer::Ptr&, const TcpClient::Ptr&)> OnNewClient;
boost::signal<void (const Connection::Ptr&)> OnClosed;
protected:
virtual bool WantsToRead(void) const;
virtual void HandleReadable(void);
virtual void ProcessData(void) = 0;
private:
ClientFactory m_ClientFactory;
Stream::Ptr m_Stream;
void ClosedHandler(void);
};
}
#endif /* TCPSERVER_H */
#endif /* CONNECTION_H */

View File

@ -368,6 +368,7 @@ void DynamicObject::DumpObjects(const String& filename)
throw_exception(runtime_error("Could not open '" + filename + "' file"));
FIFO::Ptr fifo = boost::make_shared<FIFO>();
fifo->Start();
DynamicObject::TypeMap::iterator tt;
for (tt = GetAllObjects().begin(); tt != GetAllObjects().end(); tt++) {
@ -401,7 +402,7 @@ void DynamicObject::DumpObjects(const String& filename)
String json = value.Serialize();
/* This is quite ugly, unfortunatelly NetString requires an IOQueue object */
NetString::WriteStringToIOQueue(fifo.get(), json);
NetString::WriteStringToStream(fifo, json);
size_t count;
while ((count = fifo->GetAvailableBytes()) > 0) {
@ -416,6 +417,8 @@ void DynamicObject::DumpObjects(const String& filename)
}
}
fifo->Close();
fp.close();
#ifdef _WIN32
@ -436,6 +439,8 @@ void DynamicObject::RestoreObjects(const String& filename)
/* TODO: Fix this horrible mess by implementing a class that provides
* IOQueue functionality for files. */
FIFO::Ptr fifo = boost::make_shared<FIFO>();
fifo->Start();
while (fp) {
char buffer[1024];
@ -444,7 +449,7 @@ void DynamicObject::RestoreObjects(const String& filename)
}
String message;
while (NetString::ReadStringFromIOQueue(fifo.get(), &message)) {
while (NetString::ReadStringFromStream(fifo, &message)) {
Dictionary::Ptr persistentObject = Value::Deserialize(message);
String type = persistentObject->Get("type");
@ -462,6 +467,8 @@ void DynamicObject::RestoreObjects(const String& filename)
object->ApplyUpdate(update, Attribute_All);
}
}
fifo->Close();
}
void DynamicObject::DeactivateObjects(void)

View File

@ -38,6 +38,13 @@ FIFO::~FIFO(void)
free(m_Buffer);
}
void FIFO::Start(void)
{
SetConnected(true);
Stream::Start();
}
/**
* Resizes the FIFO's buffer so that it is at least newSize bytes long.
*
@ -109,25 +116,32 @@ size_t FIFO::GetAvailableBytes(void) const
/**
* Implements IOQueue::Peek.
*/
void FIFO::Peek(void *buffer, size_t count)
size_t FIFO::Peek(void *buffer, size_t count)
{
assert(m_DataSize >= count);
assert(IsConnected());
if (count > m_DataSize)
count = m_DataSize;
if (buffer != NULL)
memcpy(buffer, m_Buffer + m_Offset, count);
return count;
}
/**
* Implements IOQueue::Read.
*/
void FIFO::Read(void *buffer, size_t count)
size_t FIFO::Read(void *buffer, size_t count)
{
Peek(buffer, count);
count = Peek(buffer, count);
m_DataSize -= count;
m_Offset += count;
Optimize();
return count;
}
/**
@ -135,6 +149,8 @@ void FIFO::Read(void *buffer, size_t count)
*/
void FIFO::Write(const void *buffer, size_t count)
{
assert(IsConnected());
ResizeBuffer(m_Offset + m_DataSize + count);
memcpy(m_Buffer + m_Offset + m_DataSize, buffer, count);
m_DataSize += count;

View File

@ -28,7 +28,7 @@ namespace icinga
*
* @ingroup base
*/
class I2_BASE_API FIFO : public Object, public IOQueue
class I2_BASE_API FIFO : public Stream
{
public:
static const size_t BlockSize = 16 * 1024;
@ -39,13 +39,15 @@ public:
FIFO(void);
~FIFO(void);
void Start(void);
/*const void *GetReadBuffer(void) const;
void *GetWriteBuffer(size_t *count);*/
virtual size_t GetAvailableBytes(void) const;
virtual void Peek(void *buffer, size_t count);
virtual void Read(void *buffer, size_t count);
virtual void Write(const void *buffer, size_t count);
size_t GetAvailableBytes(void) const;
size_t Peek(void *buffer, size_t count);
size_t Read(void *buffer, size_t count);
void Write(const void *buffer, size_t count);
private:
char *m_Buffer;

View File

@ -175,14 +175,14 @@ namespace tuples = boost::tuples;
#include "dictionary.h"
#include "ringbuffer.h"
#include "timer.h"
#include "ioqueue.h"
#include "stream.h"
#include "stream_bio.h"
#include "connection.h"
#include "netstring.h"
#include "fifo.h"
#include "socket.h"
#include "tcpsocket.h"
#include "tcpclient.h"
#include "tcpserver.h"
#include "tlsclient.h"
#include "tlsstream.h"
#include "asynctask.h"
#include "process.h"
#include "scriptfunction.h"

View File

@ -22,17 +22,17 @@
using namespace icinga;
/**
* Reads data from an IOQueue in netString format.
* Reads data from a stream in netString format.
*
* @param queue The IOQueue to read from.
* @param stream The stream to read from.
* @param[out] str The String that has been read from the IOQueue.
* @returns true if a complete String was read from the IOQueue, false otherwise.
* @exception invalid_argument The input stream is invalid.
* @see https://github.com/PeterScott/netString-c/blob/master/netString.c
*/
bool NetString::ReadStringFromIOQueue(IOQueue *queue, String *str)
bool NetString::ReadStringFromStream(const Stream::Ptr& stream, String *str)
{
size_t buffer_length = queue->GetAvailableBytes();
size_t buffer_length = stream->GetAvailableBytes();
/* minimum netString length is 3 */
if (buffer_length < 3)
@ -47,7 +47,7 @@ bool NetString::ReadStringFromIOQueue(IOQueue *queue, String *str)
if (buffer == NULL && buffer_length > 0)
throw_exception(bad_alloc());
queue->Peek(buffer, buffer_length);
stream->Peek(buffer, buffer_length);
/* no leading zeros allowed */
if (buffer[0] == '0' && isdigit(buffer[1])) {
@ -68,7 +68,7 @@ bool NetString::ReadStringFromIOQueue(IOQueue *queue, String *str)
len = len * 10 + (buffer[i] - '0');
}
buffer_length = queue->GetAvailableBytes();
buffer_length = stream->GetAvailableBytes();
/* make sure the buffer is large enough */
if (i + len + 1 >= buffer_length)
@ -86,7 +86,7 @@ bool NetString::ReadStringFromIOQueue(IOQueue *queue, String *str)
buffer = new_buffer;
queue->Peek(buffer, buffer_length);
stream->Peek(buffer, buffer_length);
/* check for the colon delimiter */
if (buffer[i] != ':') {
@ -104,25 +104,25 @@ bool NetString::ReadStringFromIOQueue(IOQueue *queue, String *str)
free(buffer);
/* remove the data from the IOQueue */
queue->Read(NULL, buffer_length);
/* remove the data from the stream */
stream->Read(NULL, buffer_length);
return true;
}
/**
* Writes data into an IOQueue using the netString format.
* Writes data into a stream using the netString format.
*
* @param queue The IOQueue.
* @param stream The stream.
* @param str The String that is to be written.
*/
void NetString::WriteStringToIOQueue(IOQueue *queue, const String& str)
void NetString::WriteStringToStream(const Stream::Ptr& stream, const String& str)
{
stringstream prefixbuf;
prefixbuf << str.GetLength() << ":";
String prefix = prefixbuf.str();
queue->Write(prefix.CStr(), prefix.GetLength());
queue->Write(str.CStr(), str.GetLength());
queue->Write(",", 1);
stream->Write(prefix.CStr(), prefix.GetLength());
stream->Write(str.CStr(), str.GetLength());
stream->Write(",", 1);
}

View File

@ -33,8 +33,8 @@ namespace icinga
class I2_BASE_API NetString
{
public:
static bool ReadStringFromIOQueue(IOQueue *queue, String *message);
static void WriteStringToIOQueue(IOQueue *queue, const String& message);
static bool ReadStringFromStream(const Stream::Ptr& stream, String *message);
static void WriteStringToStream(const Stream::Ptr& stream, const String& message);
private:
NetString(void);

View File

@ -25,16 +25,19 @@ using namespace icinga;
* Constructor for the Socket class.
*/
Socket::Socket(void)
: m_FD(INVALID_SOCKET), m_Connected(false)
{ }
: m_FD(INVALID_SOCKET), m_Connected(false), m_Listening(false),
m_SendQueue(boost::make_shared<FIFO>()), m_RecvQueue(boost::make_shared<FIFO>())
{
m_SendQueue->Start();
m_RecvQueue->Start();
}
/**
* Destructor for the Socket class.
*/
Socket::~Socket(void)
{
boost::mutex::scoped_lock lock(m_SocketMutex);
CloseInternal(true);
Close();
}
/**
@ -50,6 +53,8 @@ void Socket::Start(void)
m_WriteThread = thread(boost::bind(&Socket::WriteThreadProc, static_cast<Socket::Ptr>(GetSelf())));
m_WriteThread.detach();
Stream::Start();
}
/**
@ -88,35 +93,24 @@ SOCKET Socket::GetFD(void) const
return m_FD;
}
void Socket::CloseUnlocked(void)
{
if (m_FD == INVALID_SOCKET)
return;
closesocket(m_FD);
m_FD = INVALID_SOCKET;
Stream::Close();
}
/**
* Closes the socket.
*/
void Socket::Close(void)
{
boost::mutex::scoped_lock lock(m_SocketMutex);
CloseInternal(false);
}
/**
* Closes the socket.
*
* @param from_dtor Whether this method was called from the destructor.
*/
void Socket::CloseInternal(bool from_dtor)
{
if (m_FD == INVALID_SOCKET)
return;
SetConnected(false);
closesocket(m_FD);
m_FD = INVALID_SOCKET;
/* nobody can possibly have a valid event subscription when the
destructor has been called */
if (!from_dtor)
Event::Post(boost::bind(boost::ref(OnClosed), GetSelf()));
CloseUnlocked();
}
/**
@ -159,32 +153,6 @@ void Socket::HandleException(void)
throw_exception(SocketException("select() returned fd in except fdset", GetError()));
}
/**
* Checks whether data should be read for this socket object.
*
* @returns true if the socket should be registered for reading, false otherwise.
*/
bool Socket::WantsToRead(void) const
{
return false;
}
void Socket::HandleReadable(void)
{ }
/**
* Checks whether data should be written for this socket object.
*
* @returns true if the socket should be registered for writing, false otherwise.
*/
bool Socket::WantsToWrite(void) const
{
return false;
}
void Socket::HandleWritable(void)
{ }
/**
* Formats a sockaddr in a human-readable way.
*
@ -305,9 +273,9 @@ void Socket::ReadThreadProc(void)
if (FD_ISSET(fd, &exceptfds))
HandleException();
} catch (...) {
m_Exception = boost::current_exception();
SetException(boost::current_exception());
CloseInternal(false);
CloseUnlocked();
break;
}
@ -360,9 +328,9 @@ void Socket::WriteThreadProc(void)
if (FD_ISSET(fd, &writefds))
HandleWritable();
} catch (...) {
m_Exception = boost::current_exception();
SetException(boost::current_exception());
CloseInternal(false);
CloseUnlocked();
break;
}
@ -390,11 +358,281 @@ bool Socket::IsConnected(void) const
}
/**
* Checks whether an exception is available for this socket. Should be called
* by user-supplied handlers for the OnClosed signal.
* Returns how much data is available for reading.
*
* @returns The number of bytes available.
*/
void Socket::CheckException(void)
size_t Socket::GetAvailableBytes(void) const
{
if (m_Exception)
rethrow_exception(m_Exception);
if (m_Listening)
throw new logic_error("Socket does not support GetAvailableBytes().");
{
boost::mutex::scoped_lock lock(m_QueueMutex);
return m_RecvQueue->GetAvailableBytes();
}
}
/**
* Reads data from the socket.
*
* @param buffer The buffer where the data should be stored.
* @param size The size of the buffer.
* @returns The number of bytes read.
*/
size_t Socket::Read(void *buffer, size_t size)
{
if (m_Listening)
throw new logic_error("Socket does not support Read().");
{
boost::mutex::scoped_lock lock(m_QueueMutex);
if (m_RecvQueue->GetAvailableBytes() == 0)
CheckException();
return m_RecvQueue->Read(buffer, size);
}
}
/**
* Peeks at data for the socket.
*
* @param buffer The buffer where the data should be stored.
* @param size The size of the buffer.
* @returns The number of bytes read.
*/
size_t Socket::Peek(void *buffer, size_t size)
{
if (m_Listening)
throw new logic_error("Socket does not support Peek().");
{
boost::mutex::scoped_lock lock(m_QueueMutex);
if (m_RecvQueue->GetAvailableBytes() == 0)
CheckException();
return m_RecvQueue->Peek(buffer, size);
}
}
/**
* Writes data to the socket.
*
* @param buffer The buffer that should be sent.
* @param size The size of the buffer.
*/
void Socket::Write(const void *buffer, size_t size)
{
if (m_Listening)
throw new logic_error("Socket does not support Write().");
{
boost::mutex::scoped_lock lock(m_QueueMutex);
m_SendQueue->Write(buffer, size);
}
}
/**
* Starts listening for incoming client connections.
*/
void Socket::Listen(void)
{
if (listen(GetFD(), SOMAXCONN) < 0)
throw_exception(SocketException("listen() failed", GetError()));
m_Listening = true;
}
void Socket::HandleWritable(void)
{
if (m_Listening)
HandleWritableServer();
else
HandleWritableClient();
}
void Socket::HandleReadable(void)
{
if (m_Listening)
HandleReadableServer();
else
HandleReadableClient();
}
/**
* Processes data that is available for this socket.
*/
void Socket::HandleWritableClient(void)
{
int rc;
char data[1024];
size_t count;
if (!IsConnected())
SetConnected(true);
for (;;) {
{
boost::mutex::scoped_lock lock(m_QueueMutex);
count = m_SendQueue->GetAvailableBytes();
if (count == 0)
break;
if (count > sizeof(data))
count = sizeof(data);
m_SendQueue->Peek(data, count);
}
rc = send(GetFD(), data, count, 0);
if (rc <= 0)
throw_exception(SocketException("send() failed", GetError()));
{
boost::mutex::scoped_lock lock(m_QueueMutex);
m_SendQueue->Read(NULL, rc);
}
}
}
/**
* Processes data that can be written for this socket.
*/
void Socket::HandleReadableClient(void)
{
if (!IsConnected())
SetConnected(true);
bool new_data = false;
for (;;) {
char data[1024];
int rc = recv(GetFD(), data, sizeof(data), 0);
#ifdef _WIN32
if (rc < 0 && WSAGetLastError() == WSAEWOULDBLOCK)
#else /* _WIN32 */
if (rc < 0 && errno == EAGAIN)
#endif /* _WIN32 */
break;
if (rc <= 0)
throw_exception(SocketException("recv() failed", GetError()));
{
boost::mutex::scoped_lock lock(m_QueueMutex);
m_RecvQueue->Write(data, rc);
}
new_data = true;
}
if (new_data)
Event::Post(boost::bind(boost::ref(OnDataAvailable), GetSelf()));
}
void Socket::HandleWritableServer(void)
{
throw logic_error("This should never happen.");
}
/**
* Accepts a new client and creates a new client object for it
* using the client factory function.
*/
void Socket::HandleReadableServer(void)
{
int fd;
sockaddr_storage addr;
socklen_t addrlen = sizeof(addr);
fd = accept(GetFD(), (sockaddr *)&addr, &addrlen);
if (fd < 0)
throw_exception(SocketException("accept() failed", GetError()));
TcpSocket::Ptr client = boost::make_shared<TcpSocket>();
client->SetFD(fd);
Event::Post(boost::bind(boost::ref(OnNewClient), GetSelf(), client));
}
/**
* Checks whether data should be written for this socket object.
*
* @returns true if the socket should be registered for writing, false otherwise.
*/
bool Socket::WantsToWrite(void) const
{
if (m_Listening)
return WantsToWriteServer();
else
return WantsToWriteClient();
}
/**
* Checks whether data should be read for this socket object.
*
* @returns true if the socket should be registered for reading, false otherwise.
*/
bool Socket::WantsToRead(void) const
{
if (m_Listening)
return WantsToReadServer();
else
return WantsToReadClient();
}
/**
* Checks whether data should be read for this socket.
*
* @returns true
*/
bool Socket::WantsToReadClient(void) const
{
return true;
}
/**
* Checks whether data should be written for this socket.
*
* @returns true if data should be written, false otherwise.
*/
bool Socket::WantsToWriteClient(void) const
{
{
boost::mutex::scoped_lock lock(m_QueueMutex);
if (m_SendQueue->GetAvailableBytes() > 0)
return true;
}
return (!IsConnected());
}
/**
* Checks whether the TCP server wants to write.
*
* @returns false
*/
bool Socket::WantsToWriteServer(void) const
{
return false;
}
/**
* Checks whether the TCP server wants to read (i.e. accept new clients).
*
* @returns true
*/
bool Socket::WantsToReadServer(void) const
{
return true;
}

View File

@ -23,11 +23,11 @@
namespace icinga {
/**
* Base class for sockets.
* Base class for connection-oriented sockets.
*
* @ingroup base
*/
class I2_BASE_API Socket : public Object
class I2_BASE_API Socket : public Stream
{
public:
typedef shared_ptr<Socket> Ptr;
@ -35,18 +35,23 @@ public:
~Socket(void);
boost::signal<void (const Socket::Ptr&)> OnClosed;
virtual void Start(void);
void Close(void);
virtual void Close(void);
String GetClientAddress(void);
String GetPeerAddress(void);
bool IsConnected(void) const;
void CheckException(void);
size_t GetAvailableBytes(void) const;
size_t Read(void *buffer, size_t size);
size_t Peek(void *buffer, size_t size);
void Write(const void *buffer, size_t size);
void Listen(void);
boost::signal<void (const Socket::Ptr&, const Socket::Ptr&)> OnNewClient;
protected:
Socket(void);
@ -59,34 +64,49 @@ protected:
int GetError(void) const;
static int GetLastSocketError(void);
virtual bool WantsToRead(void) const;
virtual bool WantsToWrite(void) const;
virtual void HandleReadable(void);
virtual void HandleWritable(void);
virtual void HandleException(void);
virtual void CloseInternal(bool from_dtor);
mutable boost::mutex m_SocketMutex;
private:
SOCKET m_FD; /**< The socket descriptor. */
bool m_Connected;
bool m_Listening;
thread m_ReadThread;
thread m_WriteThread;
condition_variable m_WriteCV;
boost::exception_ptr m_Exception;
void ReadThreadProc(void);
void WriteThreadProc(void);
void ExceptionEventHandler(void);
static String GetAddressFromSockaddr(sockaddr *address, socklen_t len);
mutable boost::mutex m_QueueMutex;
FIFO::Ptr m_SendQueue;
FIFO::Ptr m_RecvQueue;
void HandleWritableClient(void);
void HandleReadableClient(void);
void HandleWritableServer(void);
void HandleReadableServer(void);
void HandleReadable(void);
void HandleWritable(void);
void HandleException(void);
bool WantsToWriteClient(void) const;
bool WantsToReadClient(void) const;
bool WantsToWriteServer(void) const;
bool WantsToReadServer(void) const;
bool WantsToWrite(void) const;
bool WantsToRead(void) const;
void CloseUnlocked(void);
};
/**

79
lib/base/stream.cpp Normal file
View File

@ -0,0 +1,79 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "i2-base.h"
using namespace icinga;
Stream::Stream(void)
: m_Connected(false)
{ }
Stream::~Stream(void)
{
assert(!m_Running);
}
bool Stream::IsConnected(void) const
{
return m_Connected;
}
void Stream::SetConnected(bool connected)
{
m_Connected = connected;
if (m_Connected)
Event::Post(boost::bind(boost::ref(OnConnected), GetSelf()));
else
Event::Post(boost::bind(boost::ref(OnClosed), GetSelf()));
}
/**
* Checks whether an exception is available for this socket and re-throws
* the exception if there is one.
*/
void Stream::CheckException(void)
{
if (m_Exception)
rethrow_exception(m_Exception);
}
void Stream::SetException(boost::exception_ptr exception)
{
m_Exception = exception;
}
boost::exception_ptr Stream::GetException(void)
{
return m_Exception;
}
void Stream::Start(void)
{
m_Running = true;
}
void Stream::Close(void)
{
assert(m_Running);
m_Running = false;
SetConnected(false);
}

View File

@ -17,20 +17,28 @@
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef IOQUEUE_H
#define IOQUEUE_H
#ifndef STREAM_H
#define STREAM_H
namespace icinga
{
/**
* An I/O queue.
* A stream.
*
* @ingroup base
*/
class IOQueue
class I2_BASE_API Stream : public Object
{
public:
typedef shared_ptr<Stream> Ptr;
typedef weak_ptr<Stream> WeakPtr;
Stream(void);
~Stream(void);
virtual void Start(void);
/**
* Retrieves the number of bytes available for reading.
*
@ -39,37 +47,61 @@ public:
virtual size_t GetAvailableBytes(void) const = 0;
/**
* Reads data from the queue without advancing the read pointer. Trying
* to read more data than is available in the queue is a programming error.
* Use GetBytesAvailable() to check how much data is available.
* Reads data from the stream without advancing the read pointer.
*
* @param buffer The buffer where data should be stored. May be NULL if
* you're not actually interested in the data.
* @param count The number of bytes to read from the queue.
* @returns The number of bytes actually read.
*/
virtual void Peek(void *buffer, size_t count) = 0;
virtual size_t Peek(void *buffer, size_t count) = 0;
/**
* Reads data from the queue. Trying to read more data than is
* available in the queue is a programming error. Use GetBytesAvailable()
* to check how much data is available.
* Reads data from the stream.
*
* @param buffer The buffer where data should be stored. May be NULL if you're
* not actually interested in the data.
* @param count The number of bytes to read from the queue.
* @returns The number of bytes actually read.
*/
virtual void Read(void *buffer, size_t count) = 0;
virtual size_t Read(void *buffer, size_t count) = 0;
/**
* Writes data to the queue.
* Writes data to the stream.
*
* @param buffer The data that is to be written.
* @param count The number of bytes to write.
* @returns The number of bytes written
*/
virtual void Write(const void *buffer, size_t count) = 0;
/**
* Closes the stream and releases resources.
*/
virtual void Close(void);
bool IsConnected(void) const;
boost::exception_ptr GetException(void);
void CheckException(void);
boost::signal<void (const Stream::Ptr&)> OnConnected;
boost::signal<void (const Stream::Ptr&)> OnDataAvailable;
boost::signal<void (const Stream::Ptr&)> OnClosed;
protected:
void SetConnected(bool connected);
void SetException(boost::exception_ptr exception);
private:
bool m_Running;
bool m_Connected;
boost::exception_ptr m_Exception;
};
BIO *BIO_Stream_new(const Stream::Ptr& stream);
}
#endif /* IOQUEUE_H */
#endif /* STREAM_H */

121
lib/base/stream_bio.cpp Normal file
View File

@ -0,0 +1,121 @@
#include "i2-base.h"
using namespace icinga;
int I2Stream_new(BIO *bi);
int I2Stream_free(BIO *bi);
int I2Stream_read(BIO *bi, char *out, int outl);
int I2Stream_write(BIO *bi, const char *in, int inl);
long I2Stream_ctrl(BIO *bi, int cmd, long num, void *ptr);
int I2Stream_gets(BIO *bi, char *buf, int size);
int I2Stream_puts(BIO *bi, const char *str);
#define BIO_TYPE_I2STREAM (99|0x0400|0x0100)
static BIO_METHOD I2Stream_method =
{
BIO_TYPE_I2STREAM,
"Icinga Stream",
I2Stream_write,
I2Stream_read,
NULL,
NULL,
I2Stream_ctrl,
I2Stream_new,
I2Stream_free,
NULL,
};
typedef struct I2Stream_bio_s
{
Stream::Ptr Stream;
boost::exception_ptr Exception;
} I2Stream_bio_t;
BIO_METHOD *BIO_s_I2Stream(void)
{
return &I2Stream_method;
}
BIO *icinga::BIO_new_I2Stream(const Stream::Ptr& stream)
{
BIO *bi = BIO_new(BIO_s_I2Stream());
if (bi == NULL)
return NULL;
I2Stream_bio_t *bp = (I2Stream_bio_t *)bi->ptr;
bp->Stream = stream;
return bi;
}
void icinga::I2Stream_check_exception(BIO *bi) {
I2Stream_bio_t *bp = (I2Stream_bio_t *)bi->ptr;
if (bp->Exception) {
boost::exception_ptr ptr = bp->Exception;
bp->Exception = boost::exception_ptr();
rethrow_exception(ptr);
}
}
static int I2Stream_new(BIO *bi)
{
bi->shutdown = 0;
bi->init = 1;
bi->num = -1;
bi->ptr = new I2Stream_bio_t;
return 1;
}
static int I2Stream_free(BIO *bi)
{
I2Stream_bio_t *bp = (I2Stream_bio_t *)bi->ptr;
delete bp;
return 1;
}
static int I2Stream_read(BIO *bi, char *out, int outl)
{
I2Stream_bio_t *bp = (I2Stream_bio_t *)bi->ptr;
size_t data_read;
BIO_clear_retry_flags(bi);
try {
data_read = bp->Stream->Read(out, outl);
} catch (...) {
bp->Exception = boost::current_exception();
return -1;
}
if (data_read == 0) {
BIO_set_retry_read(bi);
return -1;
}
return data_read;
}
static int I2Stream_write(BIO *bi, const char *in, int inl)
{
I2Stream_bio_t *bp = (I2Stream_bio_t *)bi->ptr;
bp->Stream->Write(in, inl);
return inl;
}
static long I2Stream_ctrl(BIO *bi, int cmd, long num, void *ptr)
{
switch (cmd) {
case BIO_CTRL_FLUSH:
return 1;
default:
return 0;
}
}

View File

@ -17,26 +17,15 @@
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef JSONRPCSERVER_H
#define JSONRPCSERVER_H
#ifndef STREAMBIO_H
#define STREAMBIO_H
namespace icinga
{
/**
* A JSON-RPC server.
*
* @ingroup remoting
*/
class I2_REMOTING_API JsonRpcServer : public TcpServer
{
public:
typedef shared_ptr<JsonRpcServer> Ptr;
typedef weak_ptr<JsonRpcServer> WeakPtr;
JsonRpcServer(shared_ptr<SSL_CTX> sslContext);
};
BIO *BIO_new_I2Stream(const Stream::Ptr& stream);
void I2Stream_check_exception(BIO *bi);
}
#endif /* JSONRPCSERVER_H */
#endif /* STREAMBIO_H */

View File

@ -1,256 +0,0 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "i2-base.h"
using namespace icinga;
/**
* Constructor for the TcpClient class.
*
* @param role The role of the TCP client socket.
*/
TcpClient::TcpClient(TcpClientRole role)
: m_SendQueue(boost::make_shared<FIFO>()),
m_RecvQueue(boost::make_shared<FIFO>()),
m_Role(role)
{ }
/**
* Retrieves the role of the socket.
*
* @returns The role.
*/
TcpClientRole TcpClient::GetRole(void) const
{
return m_Role;
}
/**
* Creates a socket and connects to the specified node and service.
*
* @param node The node.
* @param service The service.
*/
void TcpClient::Connect(const String& node, const String& service)
{
m_Role = RoleOutbound;
addrinfo hints;
addrinfo *result;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
int rc = getaddrinfo(node.CStr(), service.CStr(), &hints, &result);
if (rc < 0)
throw_exception(SocketException("getaddrinfo() failed", GetLastSocketError()));
int fd = INVALID_SOCKET;
for (addrinfo *info = result; info != NULL; info = info->ai_next) {
fd = socket(info->ai_family, info->ai_socktype, info->ai_protocol);
if (fd == INVALID_SOCKET)
continue;
SetFD(fd);
rc = connect(fd, info->ai_addr, info->ai_addrlen);
#ifdef _WIN32
if (rc < 0 && WSAGetLastError() != WSAEWOULDBLOCK) {
#else /* _WIN32 */
if (rc < 0 && errno != EINPROGRESS) {
#endif /* _WIN32 */
closesocket(fd);
SetFD(INVALID_SOCKET);
continue;
}
if (rc >= 0) {
SetConnected(true);
OnConnected(GetSelf());
}
break;
}
freeaddrinfo(result);
if (fd == INVALID_SOCKET)
throw_exception(runtime_error("Could not create a suitable socket."));
}
/**
* Processes data that is available for this socket.
*/
void TcpClient::HandleWritable(void)
{
int rc;
char data[1024];
size_t count;
if (!IsConnected()) {
SetConnected(true);
Event::Post(boost::bind(boost::cref(OnConnected), GetSelf()));
}
for (;;) {
{
boost::mutex::scoped_lock lock(m_QueueMutex);
count = m_SendQueue->GetAvailableBytes();
if (count == 0)
break;
if (count > sizeof(data))
count = sizeof(data);
m_SendQueue->Peek(data, count);
}
rc = send(GetFD(), data, count, 0);
if (rc <= 0)
throw_exception(SocketException("send() failed", GetError()));
{
boost::mutex::scoped_lock lock(m_QueueMutex);
m_SendQueue->Read(NULL, rc);
}
}
}
/**
* Implements IOQueue::GetAvailableBytes.
*/
size_t TcpClient::GetAvailableBytes(void) const
{
boost::mutex::scoped_lock lock(m_QueueMutex);
return m_RecvQueue->GetAvailableBytes();
}
/**
* Implements IOQueue::Peek.
*/
void TcpClient::Peek(void *buffer, size_t count)
{
boost::mutex::scoped_lock lock(m_QueueMutex);
m_RecvQueue->Peek(buffer, count);
}
/**
* Implements IOQueue::Read.
*/
void TcpClient::Read(void *buffer, size_t count)
{
boost::mutex::scoped_lock lock(m_QueueMutex);
m_RecvQueue->Read(buffer, count);
}
/**
* Implements IOQueue::Write.
*/
void TcpClient::Write(const void *buffer, size_t count)
{
boost::mutex::scoped_lock lock(m_QueueMutex);
m_SendQueue->Write(buffer, count);
}
/**
* Processes data that can be written for this socket.
*/
void TcpClient::HandleReadable(void)
{
if (!IsConnected()) {
SetConnected(true);
Event::Post(boost::bind(boost::cref(OnConnected), GetSelf()));
}
for (;;) {
char data[1024];
int rc = recv(GetFD(), data, sizeof(data), 0);
#ifdef _WIN32
if (rc < 0 && WSAGetLastError() == WSAEWOULDBLOCK)
#else /* _WIN32 */
if (rc < 0 && errno == EAGAIN)
#endif /* _WIN32 */
return;
if (rc <= 0)
throw_exception(SocketException("recv() failed", GetError()));
{
boost::mutex::scoped_lock lock(m_QueueMutex);
m_RecvQueue->Write(data, rc);
}
}
Event::Post(boost::bind(boost::ref(OnDataAvailable), GetSelf()));
}
/**
* Checks whether data should be read for this socket.
*
* @returns true
*/
bool TcpClient::WantsToRead(void) const
{
return true;
}
/**
* Checks whether data should be written for this socket.
*
* @returns true if data should be written, false otherwise.
*/
bool TcpClient::WantsToWrite(void) const
{
{
boost::mutex::scoped_lock lock(m_QueueMutex);
if (m_SendQueue->GetAvailableBytes() > 0)
return true;
}
return (!IsConnected());
}
/**
* Default factory function for TCP clients.
*
* @param role The role of the new client.
* @returns The new client.
*/
TcpClient::Ptr icinga::TcpClientFactory(TcpClientRole role)
{
return boost::make_shared<TcpClient>(role);
}

View File

@ -1,90 +0,0 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef TCPCLIENT_H
#define TCPCLIENT_H
namespace icinga
{
/**
* The role of a TCP client object.
*
* @ingroup base
*/
enum TcpClientRole
{
RoleInbound, /**< Inbound socket, i.e. one that was returned
from accept(). */
RoleOutbound /**< Outbound socket, i.e. one that is connect()'d to a
remote socket. */
};
/**
* A TCP client connection.
*
* @ingroup base
*/
class I2_BASE_API TcpClient : public TcpSocket, public IOQueue
{
public:
typedef shared_ptr<TcpClient> Ptr;
typedef weak_ptr<TcpClient> WeakPtr;
TcpClient(TcpClientRole role);
TcpClientRole GetRole(void) const;
void Connect(const String& node, const String& service);
boost::signal<void (const TcpClient::Ptr&)> OnConnected;
boost::signal<void (const TcpClient::Ptr&)> OnDataAvailable;
virtual size_t GetAvailableBytes(void) const;
virtual void Peek(void *buffer, size_t count);
virtual void Read(void *buffer, size_t count);
virtual void Write(const void *buffer, size_t count);
protected:
virtual bool WantsToRead(void) const;
virtual bool WantsToWrite(void) const;
virtual void HandleReadable(void);
virtual void HandleWritable(void);
mutable boost::mutex m_QueueMutex;
FIFO::Ptr m_SendQueue;
FIFO::Ptr m_RecvQueue;
private:
TcpClientRole m_Role;
};
/**
* Returns a new unconnected TcpClient object that has the specified
* connection role.
*
* @param role The role of the new object.
* @returns A new TcpClient object.
*/
TcpClient::Ptr TcpClientFactory(TcpClientRole role);
}
#endif /* TCPCLIENT_H */

View File

@ -1,89 +0,0 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "i2-base.h"
using namespace icinga;
/**
* Constructor for the TcpServer class.
*/
TcpServer::TcpServer(void)
: m_ClientFactory(boost::bind(&TcpClientFactory, RoleInbound))
{ }
/**
* Sets the client factory.
*
* @param clientFactory The client factory function.
*/
void TcpServer::SetClientFactory(const TcpServer::ClientFactory& clientFactory)
{
m_ClientFactory = clientFactory;
}
/**
* Retrieves the client factory.
*
* @returns The client factory function.
*/
TcpServer::ClientFactory TcpServer::GetFactoryFunction(void) const
{
return m_ClientFactory;
}
/**
* Starts listening for incoming client connections.
*/
void TcpServer::Listen(void)
{
if (listen(GetFD(), SOMAXCONN) < 0)
throw_exception(SocketException("listen() failed", GetError()));
}
/**
* Checks whether the TCP server wants to read (i.e. accept new clients).
*
* @returns true
*/
bool TcpServer::WantsToRead(void) const
{
return true;
}
/**
* Accepts a new client and creates a new client object for it
* using the client factory function.
*/
void TcpServer::HandleReadable(void)
{
int fd;
sockaddr_storage addr;
socklen_t addrlen = sizeof(addr);
fd = accept(GetFD(), (sockaddr *)&addr, &addrlen);
if (fd < 0)
throw_exception(SocketException("accept() failed", GetError()));
TcpClient::Ptr client = m_ClientFactory(fd);
Event::Post(boost::bind(boost::ref(OnNewClient), GetSelf(), client));
}

View File

@ -21,23 +21,6 @@
using namespace icinga;
/**
* Creates a socket.
*
* @param family The socket family for the new socket.
*/
void TcpSocket::MakeSocket(int family)
{
assert(GetFD() == INVALID_SOCKET);
int fd = socket(family, SOCK_STREAM, 0);
if (fd == INVALID_SOCKET)
throw_exception(SocketException("socket() failed", GetLastSocketError()));
SetFD(fd);
}
/**
* Creates a socket and binds it to the specified service.
*
@ -110,3 +93,61 @@ void TcpSocket::Bind(String node, String service, int family)
if (fd == INVALID_SOCKET)
throw_exception(runtime_error("Could not create a suitable socket."));
}
/**
* Creates a socket and connects to the specified node and service.
*
* @param node The node.
* @param service The service.
*/
void TcpSocket::Connect(const String& node, const String& service)
{
addrinfo hints;
addrinfo *result;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
int rc = getaddrinfo(node.CStr(), service.CStr(), &hints, &result);
if (rc < 0)
throw_exception(SocketException("getaddrinfo() failed", GetLastSocketError()));
int fd = INVALID_SOCKET;
for (addrinfo *info = result; info != NULL; info = info->ai_next) {
fd = socket(info->ai_family, info->ai_socktype, info->ai_protocol);
if (fd == INVALID_SOCKET)
continue;
SetFD(fd);
rc = connect(fd, info->ai_addr, info->ai_addrlen);
#ifdef _WIN32
if (rc < 0 && WSAGetLastError() != WSAEWOULDBLOCK) {
#else /* _WIN32 */
if (rc < 0 && errno != EINPROGRESS) {
#endif /* _WIN32 */
closesocket(fd);
SetFD(INVALID_SOCKET);
continue;
}
if (rc >= 0) {
SetConnected(true);
OnConnected(GetSelf());
}
break;
}
freeaddrinfo(result);
if (fd == INVALID_SOCKET)
throw_exception(runtime_error("Could not create a suitable socket."));
}

View File

@ -37,8 +37,7 @@ public:
void Bind(String service, int family);
void Bind(String node, String service, int family);
private:
void MakeSocket(int family);
void Connect(const String& node, const String& service);
};
}

View File

@ -21,21 +21,26 @@
using namespace icinga;
int I2_EXPORT TlsClient::m_SSLIndex;
bool I2_EXPORT TlsClient::m_SSLIndexInitialized = false;
int I2_EXPORT TlsStream::m_SSLIndex;
bool I2_EXPORT TlsStream::m_SSLIndexInitialized = false;
/**
* Constructor for the TlsClient class.
* Constructor for the TlsStream class.
*
* @param role The role of the client.
* @param sslContext The SSL context for the client.
*/
TlsClient::TlsClient(TcpClientRole role, shared_ptr<SSL_CTX> sslContext)
: TcpClient(role), m_SSLContext(sslContext),
m_BlockRead(false), m_BlockWrite(false)
{ }
TlsStream::TlsStream(const Stream::Ptr& innerStream, TlsRole role, shared_ptr<SSL_CTX> sslContext)
: m_InnerStream(innerStream), m_Role(role), m_SSLContext(sslContext),
m_SendQueue(boost::make_shared<FIFO>()), m_RecvQueue(boost::make_shared<FIFO>())
{
m_InnerStream->OnDataAvailable.connect(boost::bind(&TlsStream::DataAvailableHandler, this));
m_InnerStream->OnClosed.connect(boost::bind(&TlsStream::ClosedHandler, this));
m_SendQueue->Start();
m_RecvQueue->Start();
}
void TlsClient::Start(void)
void TlsStream::Start(void)
{
m_SSL = shared_ptr<SSL>(SSL_new(m_SSLContext.get()), SSL_free);
@ -48,7 +53,7 @@ void TlsClient::Start(void)
throw_exception(logic_error("No X509 client certificate was specified."));
if (!m_SSLIndexInitialized) {
m_SSLIndex = SSL_get_ex_new_index(0, const_cast<char *>("TlsClient"), NULL, NULL, NULL);
m_SSLIndex = SSL_get_ex_new_index(0, const_cast<char *>("TlsStream"), NULL, NULL, NULL);
m_SSLIndexInitialized = true;
}
@ -56,21 +61,24 @@ void TlsClient::Start(void)
SSL_set_verify(m_SSL.get(), SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, NULL);
SSL_set_fd(m_SSL.get(), GetFD());
m_BIO = BIO_new_I2Stream(m_InnerStream);
SSL_set_bio(m_SSL.get(), m_BIO, m_BIO);
if (GetRole() == RoleInbound)
if (m_Role == TlsRoleServer)
SSL_set_accept_state(m_SSL.get());
else
SSL_set_connect_state(m_SSL.get());
int rc = SSL_do_handshake(m_SSL.get());
/*int rc = SSL_do_handshake(m_SSL.get());
if (rc == 1) {
SetConnected(true);
OnConnected(GetSelf());
}
}*/
Socket::Start();
Stream::Start();
HandleIO();
}
/**
@ -78,10 +86,8 @@ void TlsClient::Start(void)
*
* @returns The X509 certificate.
*/
shared_ptr<X509> TlsClient::GetClientCertificate(void) const
shared_ptr<X509> TlsStream::GetClientCertificate(void) const
{
boost::mutex::scoped_lock lock(m_SocketMutex);
return shared_ptr<X509>(SSL_get_certificate(m_SSL.get()), &Utility::NullDeleter);
}
@ -90,82 +96,87 @@ shared_ptr<X509> TlsClient::GetClientCertificate(void) const
*
* @returns The X509 certificate.
*/
shared_ptr<X509> TlsClient::GetPeerCertificate(void) const
shared_ptr<X509> TlsStream::GetPeerCertificate(void) const
{
boost::mutex::scoped_lock lock(m_SocketMutex);
return shared_ptr<X509>(SSL_get_peer_certificate(m_SSL.get()), X509_free);
}
/**
* Processes data that is available for this socket.
*/
void TlsClient::HandleReadable(void)
void TlsStream::DataAvailableHandler(void)
{
m_BlockRead = false;
m_BlockWrite = false;
try {
HandleIO();
} catch (...) {
SetException(boost::current_exception());
for (;;) {
char data[1024];
Close();
}
}
void TlsStream::ClosedHandler(void)
{
SetException(m_InnerStream->GetException());
Close();
}
/**
* Processes data for the stream.
*/
void TlsStream::HandleIO(void)
{
char data[16 * 1024];
int rc;
if (IsConnected()) {
rc = SSL_read(m_SSL.get(), data, sizeof(data));
} else {
if (!IsConnected()) {
rc = SSL_do_handshake(m_SSL.get());
if (rc == 1) {
SetConnected(true);
Event::Post(boost::bind(boost::cref(OnConnected), GetSelf()));
} else {
switch (SSL_get_error(m_SSL.get(), rc)) {
case SSL_ERROR_WANT_WRITE:
/* fall through */
case SSL_ERROR_WANT_READ:
return;
case SSL_ERROR_ZERO_RETURN:
Close();
return;
default:
I2Stream_check_exception(m_BIO);
throw_exception(OpenSSLException("SSL_do_handshake failed", ERR_get_error()));
}
}
}
if (rc <= 0) {
bool new_data = false, read_ok = true;
while (read_ok) {
rc = SSL_read(m_SSL.get(), data, sizeof(data));
if (rc > 0) {
m_RecvQueue->Write(data, rc);
new_data = true;
} else {
switch (SSL_get_error(m_SSL.get(), rc)) {
case SSL_ERROR_WANT_WRITE:
m_BlockRead = true;
/* fall through */
case SSL_ERROR_WANT_READ:
goto post_event;
read_ok = false;
break;
case SSL_ERROR_ZERO_RETURN:
CloseInternal(false);
goto post_event;
Close();
return;
default:
I2Stream_check_exception(m_BIO);
throw_exception(OpenSSLException("SSL_read failed", ERR_get_error()));
}
}
if (IsConnected()) {
boost::mutex::scoped_lock lock(m_QueueMutex);
m_RecvQueue->Write(data, rc);
}
}
post_event:
Event::Post(boost::bind(boost::ref(OnDataAvailable), GetSelf()));
}
if (new_data)
OnDataAvailable(GetSelf());
/**
* Processes data that can be written for this socket.
*/
void TlsClient::HandleWritable(void)
{
m_BlockRead = false;
m_BlockWrite = false;
char data[1024];
size_t count;
for (;;) {
int rc;
if (IsConnected()) {
{
boost::mutex::scoped_lock lock(m_QueueMutex);
count = m_SendQueue->GetAvailableBytes();
while (m_SendQueue->GetAvailableBytes() > 0) {
size_t count = m_SendQueue->GetAvailableBytes();
if (count == 0)
break;
@ -174,96 +185,57 @@ void TlsClient::HandleWritable(void)
count = sizeof(data);
m_SendQueue->Peek(data, count);
}
rc = SSL_write(m_SSL.get(), (const char *)data, count);
if (rc > 0) {
m_SendQueue->Read(NULL, rc);
} else {
rc = SSL_do_handshake(m_SSL.get());
if (rc == 1) {
SetConnected(true);
Event::Post(boost::bind(boost::cref(OnConnected), GetSelf()));
return;
}
}
if (rc <= 0) {
switch (SSL_get_error(m_SSL.get(), rc)) {
case SSL_ERROR_WANT_READ:
m_BlockWrite = true;
/* fall through */
case SSL_ERROR_WANT_WRITE:
return;
case SSL_ERROR_ZERO_RETURN:
CloseInternal(false);
Close();
return;
default:
I2Stream_check_exception(m_BIO);
throw_exception(OpenSSLException("SSL_write failed", ERR_get_error()));
}
}
if (IsConnected()) {
boost::mutex::scoped_lock lock(m_QueueMutex);
m_SendQueue->Read(NULL, rc);
}
}
}
/**
* Checks whether data should be read for this socket.
*
* @returns true if data should be read, false otherwise.
* Closes the stream.
*/
bool TlsClient::WantsToRead(void) const
{
if (SSL_want_read(m_SSL.get()))
return true;
if (m_BlockRead)
return false;
return TcpClient::WantsToRead();
}
/**
* Checks whether data should be written for this socket.
*
* @returns true if data should be written, false otherwise.
*/
bool TlsClient::WantsToWrite(void) const
{
if (SSL_want_write(m_SSL.get()))
return true;
if (m_BlockWrite)
return false;
return TcpClient::WantsToWrite();
}
/**
* Closes the socket.
*
* @param from_dtor Whether this method was invoked from the destructor.
*/
void TlsClient::CloseInternal(bool from_dtor)
void TlsStream::Close(void)
{
if (m_SSL)
SSL_shutdown(m_SSL.get());
TcpClient::CloseInternal(from_dtor);
Stream::Close();
}
/**
* Factory function for the TlsClient class.
*
* @param role The role of the TLS socket.
* @param sslContext The SSL context for the socket.
* @returns A new TLS socket.
*/
TcpClient::Ptr icinga::TlsClientFactory(TcpClientRole role, shared_ptr<SSL_CTX> sslContext)
size_t TlsStream::GetAvailableBytes(void) const
{
return boost::make_shared<TlsClient>(role, sslContext);
return m_RecvQueue->GetAvailableBytes();
}
size_t TlsStream::Peek(void *buffer, size_t count)
{
return m_RecvQueue->Peek(buffer, count);
}
size_t TlsStream::Read(void *buffer, size_t count)
{
return m_RecvQueue->Read(buffer, count);
}
void TlsStream::Write(const void *buffer, size_t count)
{
m_SendQueue->Write(buffer, count);
HandleIO();
}

View File

@ -17,53 +17,66 @@
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef TLSCLIENT_H
#define TLSCLIENT_H
#ifndef TLSSTREAM_H
#define TLSSTREAM_H
namespace icinga
{
typedef enum
{
TlsRoleClient,
TlsRoleServer
} TlsRole;
/**
* A TLS client connection.
* A TLS stream.
*
* @ingroup base
*/
class I2_BASE_API TlsClient : public TcpClient
class I2_BASE_API TlsStream : public Stream
{
public:
TlsClient(TcpClientRole role, shared_ptr<SSL_CTX> sslContext);
typedef shared_ptr<TlsStream> Ptr;
typedef weak_ptr<TlsStream> WeakPtr;
virtual void Start(void);
TlsStream(const Stream::Ptr& innerStream, TlsRole role, shared_ptr<SSL_CTX> sslContext);
shared_ptr<X509> GetClientCertificate(void) const;
shared_ptr<X509> GetPeerCertificate(void) const;
void Start(void);
virtual void Close(void);
virtual size_t GetAvailableBytes(void) const;
virtual size_t Peek(void *buffer, size_t count);
virtual size_t Read(void *buffer, size_t count);
virtual void Write(const void *buffer, size_t count);
protected:
void HandleSSLError(void);
void DataAvailableHandler(void);
void ClosedHandler(void);
virtual bool WantsToRead(void) const;
virtual bool WantsToWrite(void) const;
virtual void HandleReadable(void);
virtual void HandleWritable(void);
void HandleIO(void);
private:
shared_ptr<SSL_CTX> m_SSLContext;
shared_ptr<SSL> m_SSL;
BIO *m_BIO;
bool m_BlockRead;
bool m_BlockWrite;
FIFO::Ptr m_SendQueue;
FIFO::Ptr m_RecvQueue;
Stream::Ptr m_InnerStream;
TlsRole m_Role;
static int m_SSLIndex;
static bool m_SSLIndexInitialized;
virtual void CloseInternal(bool from_dtor);
static void NullCertificateDeleter(X509 *certificate);
};
TcpClient::Ptr TlsClientFactory(TcpClientRole role, shared_ptr<SSL_CTX> sslContext);
};
}
#endif /* TLSCLIENT_H */
#endif /* TLSSTREAM_H */

View File

@ -10,10 +10,8 @@ libremoting_la_SOURCES = \
endpointmanager.cpp \
endpointmanager.h \
i2-remoting.h \
jsonrpcclient.cpp \
jsonrpcclient.h \
jsonrpcserver.cpp \
jsonrpcserver.h \
jsonrpcconnection.cpp \
jsonrpcconnection.h \
messagepart.cpp \
messagepart.h \
requestmessage.cpp \

View File

@ -111,37 +111,18 @@ bool Endpoint::IsConnected(void) const
if (IsLocalEndpoint()) {
return true;
} else {
JsonRpcClient::Ptr client = GetClient();
JsonRpcConnection::Ptr client = GetClient();
return (client && client->IsConnected());
return (client && client->GetStream()->IsConnected());
}
}
/**
* Retrieves the address for the endpoint.
*
* @returns The endpoint's address.
*/
String Endpoint::GetAddress(void) const
{
if (IsLocalEndpoint()) {
return "local:" + GetName();
} else {
JsonRpcClient::Ptr client = GetClient();
if (!client || !client->IsConnected())
return "<disconnected endpoint>";
return client->GetPeerAddress();
}
}
JsonRpcClient::Ptr Endpoint::GetClient(void) const
JsonRpcConnection::Ptr Endpoint::GetClient(void) const
{
return Get("client");
}
void Endpoint::SetClient(const JsonRpcClient::Ptr& client)
void Endpoint::SetClient(const JsonRpcConnection::Ptr& client)
{
Set("client", client);
client->OnNewMessage.connect(boost::bind(&Endpoint::NewMessageHandler, this, _2));
@ -337,14 +318,14 @@ void Endpoint::NewMessageHandler(const MessagePart& message)
void Endpoint::ClientClosedHandler(void)
{
try {
/*try {
GetClient()->CheckException();
} catch (const exception& ex) {
stringstream message;
message << "Error occured for JSON-RPC socket: Message=" << ex.what();
Logger::Write(LogWarning, "jsonrpc", message.str());
}
}*/
Logger::Write(LogWarning, "jsonrpc", "Lost connection to endpoint: identity=" + GetName());

View File

@ -43,10 +43,8 @@ public:
static bool Exists(const String& name);
static Endpoint::Ptr GetByName(const String& name);
String GetAddress(void) const;
JsonRpcClient::Ptr GetClient(void) const;
void SetClient(const JsonRpcClient::Ptr& client);
JsonRpcConnection::Ptr GetClient(void) const;
void SetClient(const JsonRpcConnection::Ptr& client);
void RegisterSubscription(const String& topic);
void UnregisterSubscription(const String& topic);

View File

@ -110,11 +110,11 @@ void EndpointManager::AddListener(const String& service)
s << "Adding new listener: port " << service;
Logger::Write(LogInformation, "icinga", s.str());
JsonRpcServer::Ptr server = boost::make_shared<JsonRpcServer>(sslContext);
TcpSocket::Ptr server = boost::make_shared<TcpSocket>();
m_Servers.insert(server);
server->OnNewClient.connect(boost::bind(&EndpointManager::NewClientHandler,
this, _2));
this, _2, TlsRoleServer));
server->Bind(service, AF_INET6);
server->Listen();
@ -133,9 +133,9 @@ void EndpointManager::AddConnection(const String& node, const String& service) {
if (!sslContext)
throw_exception(logic_error("SSL context is required for AddConnection()"));
JsonRpcClient::Ptr client = boost::make_shared<JsonRpcClient>(RoleOutbound, sslContext);
TcpSocket::Ptr client = boost::make_shared<TcpSocket>();
client->Connect(node, service);
NewClientHandler(client);
NewClientHandler(client, TlsRoleClient);
}
/**
@ -143,27 +143,30 @@ void EndpointManager::AddConnection(const String& node, const String& service) {
*
* @param client The new client.
*/
void EndpointManager::NewClientHandler(const TcpClient::Ptr& client)
void EndpointManager::NewClientHandler(const Socket::Ptr& client, TlsRole role)
{
JsonRpcClient::Ptr jclient = static_pointer_cast<JsonRpcClient>(client);
String peerAddress = client->GetPeerAddress();
TlsStream::Ptr tlsStream = boost::make_shared<TlsStream>(client, role, GetSSLContext());
tlsStream->Start();
m_PendingClients.insert(jclient);
jclient->OnConnected.connect(boost::bind(&EndpointManager::ClientConnectedHandler, this, _1));
jclient->Start();
m_PendingClients.insert(tlsStream);
tlsStream->OnConnected.connect(boost::bind(&EndpointManager::ClientConnectedHandler, this, _1, peerAddress));
client->Start();
}
void EndpointManager::ClientConnectedHandler(const TcpClient::Ptr& client)
void EndpointManager::ClientConnectedHandler(const Stream::Ptr& client, const String& peerAddress)
{
JsonRpcClient::Ptr jclient = static_pointer_cast<JsonRpcClient>(client);
TlsStream::Ptr tlsStream = static_pointer_cast<TlsStream>(client);
JsonRpcConnection::Ptr jclient = boost::make_shared<JsonRpcConnection>(tlsStream);
Logger::Write(LogInformation, "icinga", "New client connection for " + jclient->GetPeerAddress());
m_PendingClients.erase(jclient);
shared_ptr<X509> cert = jclient->GetPeerCertificate();
m_PendingClients.erase(tlsStream);
shared_ptr<X509> cert = tlsStream->GetPeerCertificate();
String identity = Utility::GetCertificateCN(cert);
Logger::Write(LogInformation, "icinga", "New client connection at " + peerAddress + " for identity '" + identity + "'");
Endpoint::Ptr endpoint;
if (Endpoint::Exists(identity))

View File

@ -70,8 +70,8 @@ private:
Timer::Ptr m_ReconnectTimer;
set<JsonRpcServer::Ptr> m_Servers;
set<JsonRpcClient::Ptr> m_PendingClients;
set<TcpSocket::Ptr> m_Servers;
set<TlsStream::Ptr> m_PendingClients;
/**
* Information about a pending API request.
@ -101,8 +101,8 @@ private:
void ReconnectTimerHandler(void);
void NewClientHandler(const TcpClient::Ptr& client);
void ClientConnectedHandler(const TcpClient::Ptr& client);
void NewClientHandler(const Socket::Ptr& client, TlsRole rol);
void ClientConnectedHandler(const Stream::Ptr& client, const String& peerAddress);
};
}

View File

@ -39,8 +39,7 @@
#include "messagepart.h"
#include "requestmessage.h"
#include "responsemessage.h"
#include "jsonrpcclient.h"
#include "jsonrpcserver.h"
#include "jsonrpcconnection.h"
#include "endpoint.h"
#include "endpointmanager.h"

View File

@ -22,39 +22,35 @@
using namespace icinga;
/**
* Constructor for the JsonRpcClient class.
* Constructor for the JsonRpcConnection class.
*
* @param role The role of the underlying TCP client.
* @param sslContext SSL context for the TLS connection.
* @param stream The stream.
*/
JsonRpcClient::JsonRpcClient(TcpClientRole role, shared_ptr<SSL_CTX> sslContext)
: TlsClient(role, sslContext)
{
OnDataAvailable.connect(boost::bind(&JsonRpcClient::DataAvailableHandler,
this));
}
JsonRpcConnection::JsonRpcConnection(const Stream::Ptr& stream)
: Connection(stream)
{ }
/**
* Sends a message to the connected peer.
*
* @param message The message.
*/
void JsonRpcClient::SendMessage(const MessagePart& message)
void JsonRpcConnection::SendMessage(const MessagePart& message)
{
Value value = message.GetDictionary();
String json = value.Serialize();
//std::cerr << ">> " << json << std::endl;
NetString::WriteStringToIOQueue(this, json);
NetString::WriteStringToStream(GetStream(), json);
}
/**
* Processes inbound data.
*/
void JsonRpcClient::DataAvailableHandler(void)
void JsonRpcConnection::ProcessData(void)
{
String jsonString;
while (NetString::ReadStringFromIOQueue(this, &jsonString)) {
while (NetString::ReadStringFromStream(GetStream(), &jsonString)) {
//std::cerr << "<< " << jsonString << std::endl;
try {
@ -73,20 +69,3 @@ void JsonRpcClient::DataAvailableHandler(void)
}
}
}
/**
* Factory function for JSON-RPC clients.
*
* @param fd The file descriptor.
* @param role The role of the underlying TCP client.
* @param sslContext SSL context for the TLS connection.
* @returns A new JSON-RPC client.
*/
JsonRpcClient::Ptr icinga::JsonRpcClientFactory(SOCKET fd, TcpClientRole role,
shared_ptr<SSL_CTX> sslContext)
{
JsonRpcClient::Ptr client = boost::make_shared<JsonRpcClient>(role,
sslContext);
client->SetFD(fd);
return client;
}

View File

@ -17,37 +17,33 @@
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef JSONRPCCLIENT_H
#define JSONRPCCLIENT_H
#ifndef JSONRPCCONNECTION_H
#define JSONRPCCONNECTION_H
namespace icinga
{
/**
* A JSON-RPC client.
* A JSON-RPC connection.
*
* @ingroup remoting
*/
class I2_REMOTING_API JsonRpcClient : public TlsClient
class I2_REMOTING_API JsonRpcConnection : public Connection
{
public:
typedef shared_ptr<JsonRpcClient> Ptr;
typedef weak_ptr<JsonRpcClient> WeakPtr;
typedef shared_ptr<JsonRpcConnection> Ptr;
typedef weak_ptr<JsonRpcConnection> WeakPtr;
JsonRpcClient(TcpClientRole role, shared_ptr<SSL_CTX> sslContext);
JsonRpcConnection(const Stream::Ptr& stream);
void SendMessage(const MessagePart& message);
boost::signal<void (const JsonRpcClient::Ptr&, const MessagePart&)> OnNewMessage;
boost::signal<void (const JsonRpcConnection::Ptr&, const MessagePart&)> OnNewMessage;
private:
void DataAvailableHandler(void);
friend JsonRpcClient::Ptr JsonRpcClientFactory(SOCKET fd, TcpClientRole role, shared_ptr<SSL_CTX> sslContext);
protected:
virtual void ProcessData(void);
};
JsonRpcClient::Ptr JsonRpcClientFactory(SOCKET fd, TcpClientRole role, shared_ptr<SSL_CTX> sslContext);
}
#endif /* JSONRPCCLIENT_H */
#endif /* JSONRPCCONNECTION_H */

View File

@ -22,10 +22,9 @@
<ClInclude Include="endpoint.h" />
<ClInclude Include="endpointmanager.h" />
<ClInclude Include="i2-remoting.h" />
<ClInclude Include="jsonrpcclient.h" />
<ClInclude Include="jsonrpcconnection.h" />
<ClInclude Include="requestmessage.h" />
<ClInclude Include="responsemessage.h" />
<ClInclude Include="jsonrpcserver.h" />
<ClInclude Include="messagepart.h" />
</ItemGroup>
<ItemGroup>
@ -37,10 +36,9 @@
<PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">Create</PrecompiledHeader>
<PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">Create</PrecompiledHeader>
</ClCompile>
<ClCompile Include="jsonrpcclient.cpp" />
<ClCompile Include="jsonrpcconnection.cpp" />
<ClCompile Include="requestmessage.cpp" />
<ClCompile Include="responsemessage.cpp" />
<ClCompile Include="jsonrpcserver.cpp" />
<ClCompile Include="messagepart.cpp" />
</ItemGroup>
<PropertyGroup Label="Globals">

View File

@ -1,15 +1,9 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup>
<ClCompile Include="jsonrpcclient.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="responsemessage.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="jsonrpcserver.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="messagepart.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
@ -25,14 +19,11 @@
<ClCompile Include="endpointmanager.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="jsonrpcconnection.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="jsonrpcclient.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="jsonrpcserver.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="messagepart.h">
<Filter>Headerdateien</Filter>
</ClInclude>
@ -51,6 +42,9 @@
<ClInclude Include="endpointmanager.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="jsonrpcconnection.h">
<Filter>Headerdateien</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<Filter Include="Headerdateien">