Made sockets multi-threaded.

This commit is contained in:
Gunnar Beutner 2012-06-24 02:56:48 +02:00
parent 145d85d148
commit 34d26d424f
33 changed files with 466 additions and 292 deletions

View File

@ -13,6 +13,8 @@ libbase_la_SOURCES = \
configobject.h \
dictionary.cpp \
dictionary.h \
event.cpp \
event.h \
exception.cpp \
exception.h \
fifo.cpp \

View File

@ -96,9 +96,6 @@ Application::Ptr Application::GetInstance(void)
void Application::RunEventLoop(void)
{
while (!m_ShuttingDown) {
fd_set readfds, writefds, exceptfds;
int nfds = -1;
Object::ClearHeldObjects();
long sleep = Timer::ProcessTimers();
@ -106,80 +103,13 @@ void Application::RunEventLoop(void)
if (m_ShuttingDown)
break;
FD_ZERO(&readfds);
FD_ZERO(&writefds);
FD_ZERO(&exceptfds);
vector<Event::Ptr> events;
Socket::CollectionType::iterator prev, i;
for (i = Socket::Sockets.begin();
i != Socket::Sockets.end(); ) {
Socket::Ptr socket = i->lock();
Event::Wait(&events, boost::get_system_time() + boost::posix_time::seconds(sleep));
prev = i;
i++;
if (!socket) {
Socket::Sockets.erase(prev);
continue;
}
int fd = socket->GetFD();
if (socket->WantsToWrite())
FD_SET(fd, &writefds);
if (socket->WantsToRead())
FD_SET(fd, &readfds);
FD_SET(fd, &exceptfds);
if (fd > nfds)
nfds = fd;
}
timeval tv;
tv.tv_sec = sleep;
tv.tv_usec = 0;
int ready;
if (nfds == -1) {
Sleep(tv.tv_sec * 1000 + tv.tv_usec);
ready = 0;
} else
ready = select(nfds + 1, &readfds, &writefds,
&exceptfds, &tv);
if (ready < 0)
break;
else if (ready == 0)
continue;
for (i = Socket::Sockets.begin();
i != Socket::Sockets.end(); ) {
Socket::Ptr socket = i->lock();
prev = i;
i++;
if (!socket) {
Socket::Sockets.erase(prev);
continue;
}
int fd;
fd = socket->GetFD();
if (fd != INVALID_SOCKET && FD_ISSET(fd, &writefds))
socket->OnWritable(socket);
fd = socket->GetFD();
if (fd != INVALID_SOCKET && FD_ISSET(fd, &readfds))
socket->OnReadable(socket);
fd = socket->GetFD();
if (fd != INVALID_SOCKET && FD_ISSET(fd, &exceptfds))
socket->OnException(socket);
for (vector<Event::Ptr>::iterator it = events.begin(); it != events.end(); it++) {
Event::Ptr ev = *it;
ev->OnEventDelivered();
}
}
}
@ -296,7 +226,7 @@ void Application::Log(LogSeverity severity, const string& facility, const string
char timestamp[100];
// TODO: make this configurable
if (!IsDebugging() && severity < LogInformation)
if (/*!IsDebugging() && */severity < LogInformation)
return;
string severityStr;

View File

@ -15,6 +15,7 @@
<ClCompile Include="component.cpp" />
<ClCompile Include="configobject.cpp" />
<ClCompile Include="dictionary.cpp" />
<ClCompile Include="event.cpp" />
<ClCompile Include="exception.cpp" />
<ClCompile Include="fifo.cpp" />
<ClCompile Include="object.cpp" />
@ -37,6 +38,7 @@
<ClInclude Include="component.h" />
<ClInclude Include="configobject.h" />
<ClInclude Include="dictionary.h" />
<ClInclude Include="event.h" />
<ClInclude Include="objectmap.h" />
<ClInclude Include="objectset.h" />
<ClInclude Include="exception.h" />
@ -97,6 +99,8 @@
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>_WINDLL;I2_BASE_BUILD;_DEBUG;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<WarningLevel>Level3</WarningLevel>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
@ -117,6 +121,8 @@
<PreprocessorDefinitions>_WINDLL;I2_BASE_BUILD;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<FavorSizeOrSpeed>Speed</FavorSizeOrSpeed>
<WarningLevel>Level3</WarningLevel>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>

49
base/event.cpp Normal file
View File

@ -0,0 +1,49 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "i2-base.h"
using namespace icinga;
deque<Event::Ptr> Event::m_Events;
condition_variable Event::m_EventAvailable;
mutex Event::m_Mutex;
bool Event::Wait(vector<Event::Ptr> *events, const system_time& wait_until)
{
mutex::scoped_lock lock(m_Mutex);
while (m_Events.empty()) {
if (!m_EventAvailable.timed_wait(lock, wait_until))
return false;
}
vector<Event::Ptr> result;
std::copy(m_Events.begin(), m_Events.end(), back_inserter(*events));
m_Events.clear();
return true;
}
void Event::Post(const Event::Ptr& ev)
{
mutex::scoped_lock lock(m_Mutex);
m_Events.push_back(ev);
m_EventAvailable.notify_all();
}

45
base/event.h Normal file
View File

@ -0,0 +1,45 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef EVENT_H
#define EVENT_H
namespace icinga
{
class Event : public Object
{
public:
typedef shared_ptr<Event> Ptr;
typedef weak_ptr<Event> WeakPtr;
static bool Wait(vector<Event::Ptr> *events, const system_time& wait_until);
static void Post(const Event::Ptr& ev);
boost::signal<void ()> OnEventDelivered;
private:
static deque<Event::Ptr> m_Events;
static condition_variable m_EventAvailable;
static mutex m_Mutex;
};
}
#endif /* EVENT_H */

View File

@ -128,6 +128,7 @@ using boost::thread;
using boost::thread_group;
using boost::mutex;
using boost::condition_variable;
using boost::system_time;
#if defined(__APPLE__) && defined(__MACH__)
# pragma GCC diagnostic ignored "-Wdeprecated-declarations"
@ -150,7 +151,7 @@ using boost::condition_variable;
#include "utility.h"
#include "object.h"
#include "exception.h"
#include "memory.h"
#include "event.h"
#include "variant.h"
#include "dictionary.h"
#include "timer.h"

View File

@ -21,12 +21,6 @@
using namespace icinga;
/**
* A collection of weak pointers to Socket objects which have been
* registered with the socket sub-system.
*/
Socket::CollectionType Socket::Sockets;
/**
* Constructor for the Socket class.
*/
@ -40,27 +34,23 @@ Socket::Socket(void)
*/
Socket::~Socket(void)
{
CloseInternal(true);
{
mutex::scoped_lock lock(m_Mutex);
CloseInternal(true);
}
}
/**
* Registers the socket and starts handling events for it.
*/
void Socket::Start(void)
{
assert(m_FD != INVALID_SOCKET);
assert(!m_ReadThread.joinable() && !m_WriteThread.joinable());
assert(GetFD() != INVALID_SOCKET);
OnException.connect(boost::bind(&Socket::ExceptionEventHandler, this));
m_ReadThread = thread(boost::bind(&Socket::ReadThreadProc, static_cast<Socket::Ptr>(GetSelf())));
m_ReadThread.detach();
Sockets.push_back(GetSelf());
}
/**
* Unregisters the sockets and stops handling events for it.
*/
void Socket::Stop(void)
{
Sockets.remove_if(WeakPtrEqual<Socket>(this));
m_WriteThread = thread(boost::bind(&Socket::WriteThreadProc, static_cast<Socket::Ptr>(GetSelf())));
m_WriteThread.detach();
}
/**
@ -70,8 +60,6 @@ void Socket::Stop(void)
*/
void Socket::SetFD(SOCKET fd)
{
unsigned long lTrue = 1;
/* mark the socket as non-blocking */
if (fd != INVALID_SOCKET) {
#ifdef F_GETFL
@ -83,6 +71,7 @@ void Socket::SetFD(SOCKET fd)
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0)
throw PosixException("fcntl failed", errno);
#else /* F_GETFL */
unsigned long lTrue = 1;
ioctlsocket(fd, FIONBIO, &lTrue);
#endif /* F_GETFL */
}
@ -105,6 +94,8 @@ SOCKET Socket::GetFD(void) const
*/
void Socket::Close(void)
{
mutex::scoped_lock lock(m_Mutex);
CloseInternal(false);
}
@ -124,9 +115,9 @@ void Socket::CloseInternal(bool from_dtor)
/* nobody can possibly have a valid event subscription when the
destructor has been called */
if (!from_dtor) {
Stop();
OnClosed(GetSelf());
Event::Ptr ev = boost::make_shared<Event>();
ev->OnEventDelivered.connect(boost::bind(boost::ref(OnClosed), GetSelf()));
Event::Post(ev);
}
}
@ -171,9 +162,11 @@ int Socket::GetLastSocketError(void)
void Socket::HandleSocketError(const std::exception& ex)
{
if (!OnError.empty()) {
OnError(GetSelf(), ex);
Event::Ptr ev = boost::make_shared<Event>();
ev->OnEventDelivered.connect(boost::bind(boost::ref(OnError), GetSelf(), ex));
Event::Post(ev);
Close();
CloseInternal(false);
} else {
throw ex;
}
@ -181,10 +174,8 @@ void Socket::HandleSocketError(const std::exception& ex)
/**
* Processes errors that have occured for the socket.
*
* @param - Event arguments for the socket error.
*/
void Socket::ExceptionEventHandler(void)
void Socket::HandleException(void)
{
HandleSocketError(SocketException(
"select() returned fd in except fdset", GetError()));
@ -200,6 +191,9 @@ bool Socket::WantsToRead(void) const
return false;
}
void Socket::HandleReadable(void)
{ }
/**
* Checks whether data should be written for this socket object.
*
@ -210,6 +204,9 @@ bool Socket::WantsToWrite(void) const
return false;
}
void Socket::HandleWritable(void)
{ }
/**
* Formats a sockaddr in a human-readable way.
*
@ -236,6 +233,8 @@ string Socket::GetAddressFromSockaddr(sockaddr *address, socklen_t len)
*/
string Socket::GetClientAddress(void)
{
mutex::scoped_lock lock(m_Mutex);
sockaddr_storage sin;
socklen_t len = sizeof(sin);
@ -256,6 +255,8 @@ string Socket::GetClientAddress(void)
*/
string Socket::GetPeerAddress(void)
{
mutex::scoped_lock lock(m_Mutex);
sockaddr_storage sin;
socklen_t len = sizeof(sin);
@ -286,3 +287,91 @@ SocketException::SocketException(const string& message, int errorCode)
string msg = message + ": " + details;
SetMessage(msg.c_str());
}
void Socket::ReadThreadProc(void)
{
mutex::scoped_lock lock(m_Mutex);
for (;;) {
fd_set readfds, exceptfds;
FD_ZERO(&readfds);
FD_ZERO(&exceptfds);
int fd = GetFD();
if (fd == INVALID_SOCKET)
return;
if (WantsToRead())
FD_SET(fd, &readfds);
FD_SET(fd, &exceptfds);
lock.unlock();
timeval tv;
tv.tv_sec = 5;
tv.tv_usec = 0;
int rc = select(fd + 1, &readfds, NULL, &exceptfds, &tv);
lock.lock();
if (rc < 0) {
HandleSocketError(SocketException("select() failed", GetError()));
return;
}
if (FD_ISSET(fd, &readfds))
HandleReadable();
if (FD_ISSET(fd, &exceptfds))
HandleException();
if (WantsToWrite())
; /* notify Write thread */
}
}
void Socket::WriteThreadProc(void)
{
mutex::scoped_lock lock(m_Mutex);
for (;;) {
fd_set writefds;
FD_ZERO(&writefds);
int fd = GetFD();
while (!WantsToWrite()) {
if (GetFD() == INVALID_SOCKET)
return;
lock.unlock();
Sleep(500);
lock.lock();
}
FD_SET(fd, &writefds);
lock.unlock();
int rc = select(fd + 1, NULL, &writefds, NULL, NULL);
lock.lock();
if (rc < 0) {
HandleSocketError(SocketException("select() failed", GetError()));
return;
}
if (FD_ISSET(fd, &writefds))
HandleWritable();
}
}
mutex& Socket::GetMutex(void) const
{
return m_Mutex;
}

View File

@ -33,45 +33,59 @@ public:
typedef shared_ptr<Socket> Ptr;
typedef weak_ptr<Socket> WeakPtr;
typedef list<Socket::WeakPtr> CollectionType;
//typedef list<Socket::WeakPtr> CollectionType;
static Socket::CollectionType Sockets;
//static Socket::CollectionType Sockets;
~Socket(void);
void SetFD(SOCKET fd);
SOCKET GetFD(void) const;
boost::signal<void (const Socket::Ptr&)> OnReadable;
boost::signal<void (const Socket::Ptr&)> OnWritable;
boost::signal<void (const Socket::Ptr&)> OnException;
//boost::signal<void (const Socket::Ptr&)> OnReadable;
//boost::signal<void (const Socket::Ptr&)> OnWritable;
//boost::signal<void (const Socket::Ptr&)> OnException;
boost::signal<void (const Socket::Ptr&, const std::exception&)> OnError;
boost::signal<void (const Socket::Ptr&)> OnClosed;
virtual bool WantsToRead(void) const;
virtual bool WantsToWrite(void) const;
virtual void Start(void);
virtual void Stop(void);
//virtual void Stop(void);
void Close(void);
string GetClientAddress(void);
string GetPeerAddress(void);
mutex& GetMutex(void) const;
protected:
Socket(void);
void SetFD(SOCKET fd);
SOCKET GetFD(void) const;
int GetError(void) const;
static int GetLastSocketError(void);
void HandleSocketError(const std::exception& ex);
virtual bool WantsToRead(void) const;
virtual bool WantsToWrite(void) const;
virtual void HandleReadable(void);
virtual void HandleWritable(void);
virtual void HandleException(void);
virtual void CloseInternal(bool from_dtor);
mutable mutex m_Mutex;
private:
SOCKET m_FD; /**< The socket descriptor. */
thread m_ReadThread;
thread m_WriteThread;
void ReadThreadProc(void);
void WriteThreadProc(void);
void ExceptionEventHandler(void);
static string GetAddressFromSockaddr(sockaddr *address, socklen_t len);

View File

@ -44,17 +44,6 @@ TcpClientRole TcpClient::GetRole(void) const
return m_Role;
}
/**
* Registers the socket and starts processing events for it.
*/
void TcpClient::Start(void)
{
TcpSocket::Start();
OnReadable.connect(boost::bind(&TcpClient::ReadableEventHandler, this));
OnWritable.connect(boost::bind(&TcpClient::WritableEventHandler, this));
}
/**
* Creates a socket and connects to the specified node and service.
*
@ -124,7 +113,7 @@ FIFO::Ptr TcpClient::GetSendQueue(void)
return m_SendQueue;
}
size_t TcpClient::FlushSendQueue(void)
void TcpClient::HandleWritable(void)
{
int rc;
@ -132,7 +121,7 @@ size_t TcpClient::FlushSendQueue(void)
if (rc <= 0) {
HandleSocketError(SocketException("send() failed", GetError()));
return 0;
return;
}
m_SendQueue->Read(NULL, rc);
@ -148,46 +137,31 @@ FIFO::Ptr TcpClient::GetRecvQueue(void)
return m_RecvQueue;
}
size_t TcpClient::FillRecvQueue(void)
void TcpClient::HandleReadable(void)
{
int rc;
for (;;) {
size_t bufferSize = FIFO::BlockSize / 2;
char *buffer = (char *)m_RecvQueue->GetWriteBuffer(&bufferSize);
int rc = recv(GetFD(), buffer, bufferSize, 0);
size_t bufferSize = FIFO::BlockSize / 2;
char *buffer = (char *)m_RecvQueue->GetWriteBuffer(&bufferSize);
rc = recv(GetFD(), buffer, bufferSize, 0);
#ifdef _WIN32
if (rc < 0 && WSAGetLastError() == WSAEWOULDBLOCK)
#else /* _WIN32 */
if (rc < 0 && errno == EAGAIN)
#endif /* _WIN32 */
return;
#ifdef _WIN32
if (rc < 0 && WSAGetLastError() == WSAEWOULDBLOCK)
#else /* _WIN32 */
if (rc < 0 && errno == EAGAIN)
#endif /* _WIN32 */
return 0;
if (rc <= 0) {
HandleSocketError(SocketException("recv() failed", GetError()));
return;
}
if (rc <= 0) {
HandleSocketError(SocketException("recv() failed", GetError()));
return 0;
m_RecvQueue->Write(NULL, rc);
}
m_RecvQueue->Write(NULL, rc);
return rc;
}
/**
* Processes data that is available for this socket.
*/
void TcpClient::ReadableEventHandler(void)
{
if (FillRecvQueue() > 0)
OnDataAvailable(GetSelf());
}
/**
* Processes data that can be written for this socket.
*/
void TcpClient::WritableEventHandler(void)
{
FlushSendQueue();
Event::Ptr ev = boost::make_shared<Event>();
ev->OnEventDelivered.connect(boost::bind(boost::ref(OnDataAvailable), GetSelf()));
Event::Post(ev);
}
/**

View File

@ -51,29 +51,25 @@ public:
TcpClientRole GetRole(void) const;
virtual void Start(void);
void Connect(const string& node, const string& service);
FIFO::Ptr GetSendQueue(void);
FIFO::Ptr GetRecvQueue(void);
boost::signal<void (const TcpClient::Ptr&)> OnDataAvailable;
protected:
virtual bool WantsToRead(void) const;
virtual bool WantsToWrite(void) const;
boost::signal<void (const TcpClient::Ptr&)> OnDataAvailable;
virtual void HandleReadable(void);
virtual void HandleWritable(void);
private:
TcpClientRole m_Role;
FIFO::Ptr m_SendQueue;
FIFO::Ptr m_RecvQueue;
virtual size_t FillRecvQueue(void);
virtual size_t FlushSendQueue(void);
void ReadableEventHandler(void);
void WritableEventHandler(void);
};
/**

View File

@ -34,7 +34,7 @@ TcpServer::TcpServer(void)
*
* @param clientFactory The client factory function.
*/
void TcpServer::SetClientFactory(function<TcpClient::Ptr()> clientFactory)
void TcpServer::SetClientFactory(function<TcpClient::Ptr(SOCKET)> clientFactory)
{
m_ClientFactory = clientFactory;
}
@ -44,21 +44,11 @@ void TcpServer::SetClientFactory(function<TcpClient::Ptr()> clientFactory)
*
* @returns The client factory function.
*/
function<TcpClient::Ptr()> TcpServer::GetFactoryFunction(void) const
function<TcpClient::Ptr(SOCKET)> TcpServer::GetFactoryFunction(void) const
{
return m_ClientFactory;
}
/**
* Registers the TCP server and starts processing events for it.
*/
void TcpServer::Start(void)
{
TcpSocket::Start();
OnReadable.connect(boost::bind(&TcpServer::ReadableEventHandler, this));
}
/**
* Starts listening for incoming client connections.
*/
@ -71,11 +61,21 @@ void TcpServer::Listen(void)
}
}
/**
* Checks whether the TCP server wants to read (i.e. accept new clients).
*
* @returns true
*/
bool TcpServer::WantsToRead(void) const
{
return true;
}
/**
* Accepts a new client and creates a new client object for it
* using the client factory function.
*/
void TcpServer::ReadableEventHandler(void)
void TcpServer::HandleReadable(void)
{
int fd;
sockaddr_storage addr;
@ -89,19 +89,9 @@ void TcpServer::ReadableEventHandler(void)
return;
}
TcpClient::Ptr client = m_ClientFactory();
client->SetFD(fd);
client->Start();
TcpClient::Ptr client = m_ClientFactory(fd);
OnNewClient(GetSelf(), client);
}
/**
* Checks whether the TCP server wants to read (i.e. accept new clients).
*
* @returns true
*/
bool TcpServer::WantsToRead(void) const
{
return true;
Event::Ptr ev = boost::make_shared<Event>();
ev->OnEventDelivered.connect(boost::bind(boost::ref(OnNewClient), GetSelf(), client));
Event::Post(ev);
}

View File

@ -37,21 +37,20 @@ public:
TcpServer(void);
void SetClientFactory(function<TcpClient::Ptr()> function);
function<TcpClient::Ptr()> GetFactoryFunction(void) const;
virtual void Start();
void SetClientFactory(function<TcpClient::Ptr(SOCKET)> function);
function<TcpClient::Ptr(SOCKET)> GetFactoryFunction(void) const;
void Listen(void);
boost::signal<void (const TcpServer::Ptr&, const TcpClient::Ptr&)> OnNewClient;
protected:
virtual bool WantsToRead(void) const;
private:
void ReadableEventHandler(void);
virtual void HandleReadable(void);
function<TcpClient::Ptr()> m_ClientFactory;
private:
function<TcpClient::Ptr(SOCKET)> m_ClientFactory;
};
}

View File

@ -37,43 +37,8 @@ TlsClient::TlsClient(TcpClientRole role, shared_ptr<SSL_CTX> sslContext) : TcpCl
m_BlockWrite = false;
}
/**
* Takes a certificate as an argument. Does nothing.
*
* @param certificate An X509 certificate.
*/
void TlsClient::NullCertificateDeleter(X509 *certificate)
{
/* Nothing to do here. */
}
/**
* Retrieves the X509 certficate for this client.
*
* @returns The X509 certificate.
*/
shared_ptr<X509> TlsClient::GetClientCertificate(void) const
{
return shared_ptr<X509>(SSL_get_certificate(m_SSL.get()), &TlsClient::NullCertificateDeleter);
}
/**
* Retrieves the X509 certficate for the peer.
*
* @returns The X509 certificate.
*/
shared_ptr<X509> TlsClient::GetPeerCertificate(void) const
{
return shared_ptr<X509>(SSL_get_peer_certificate(m_SSL.get()), X509_free);
}
/**
* Registers the TLS socket and starts processing events for it.
*/
void TlsClient::Start(void)
{
TcpClient::Start();
m_SSL = shared_ptr<SSL>(SSL_new(m_SSLContext.get()), SSL_free);
if (!m_SSL)
@ -101,12 +66,48 @@ void TlsClient::Start(void)
SSL_set_connect_state(m_SSL.get());
SSL_do_handshake(m_SSL.get());
Socket::Start();
}
/**
* Takes a certificate as an argument. Does nothing.
*
* @param certificate An X509 certificate.
*/
void TlsClient::NullCertificateDeleter(X509 *certificate)
{
/* Nothing to do here. */
}
/**
* Retrieves the X509 certficate for this client.
*
* @returns The X509 certificate.
*/
shared_ptr<X509> TlsClient::GetClientCertificate(void) const
{
mutex::scoped_lock lock(GetMutex());
return shared_ptr<X509>(SSL_get_certificate(m_SSL.get()), &TlsClient::NullCertificateDeleter);
}
/**
* Retrieves the X509 certficate for the peer.
*
* @returns The X509 certificate.
*/
shared_ptr<X509> TlsClient::GetPeerCertificate(void) const
{
mutex::scoped_lock lock(GetMutex());
return shared_ptr<X509>(SSL_get_peer_certificate(m_SSL.get()), X509_free);
}
/**
* Processes data that is available for this socket.
*/
size_t TlsClient::FillRecvQueue(void)
void TlsClient::HandleReadable(void)
{
int result;
@ -116,10 +117,9 @@ size_t TlsClient::FillRecvQueue(void)
result = 0;
for (;;) {
int rc;
size_t bufferSize = FIFO::BlockSize / 2;
char *buffer = (char *)GetRecvQueue()->GetWriteBuffer(&bufferSize);
rc = SSL_read(m_SSL.get(), buffer, bufferSize);
int rc = SSL_read(m_SSL.get(), buffer, bufferSize);
if (rc <= 0) {
switch (SSL_get_error(m_SSL.get(), rc)) {
@ -127,36 +127,35 @@ size_t TlsClient::FillRecvQueue(void)
m_BlockRead = true;
/* fall through */
case SSL_ERROR_WANT_READ:
return result;
goto post_event;
case SSL_ERROR_ZERO_RETURN:
Close();
return result;
CloseInternal(false);
goto post_event;
default:
HandleSocketError(OpenSSLException(
"SSL_read failed", ERR_get_error()));
return result;
goto post_event;
}
}
GetRecvQueue()->Write(NULL, rc);
result += rc;
}
return result;
post_event:
Event::Ptr ev = boost::make_shared<Event>();
ev->OnEventDelivered.connect(boost::bind(boost::ref(OnDataAvailable), GetSelf()));
Event::Post(ev);
}
/**
* Processes data that can be written for this socket.
*/
size_t TlsClient::FlushSendQueue(void)
void TlsClient::HandleWritable(void)
{
int rc;
m_BlockRead = false;
m_BlockWrite = false;
rc = SSL_write(m_SSL.get(), (const char *)GetSendQueue()->GetReadBuffer(), GetSendQueue()->GetSize());
int rc = SSL_write(m_SSL.get(), (const char *)GetSendQueue()->GetReadBuffer(), GetSendQueue()->GetSize());
if (rc <= 0) {
switch (SSL_get_error(m_SSL.get(), rc)) {
@ -164,20 +163,18 @@ size_t TlsClient::FlushSendQueue(void)
m_BlockWrite = true;
/* fall through */
case SSL_ERROR_WANT_WRITE:
return 0;
return;
case SSL_ERROR_ZERO_RETURN:
Close();
return 0;
CloseInternal(false);
return;
default:
HandleSocketError(OpenSSLException(
"SSL_write failed", ERR_get_error()));
return 0;
return;
}
}
GetSendQueue()->Read(NULL, rc);
return rc;
}
/**
@ -249,12 +246,29 @@ int TlsClient::SSLVerifyCertificate(int ok, X509_STORE_CTX *x509Context)
SSL *ssl = (SSL *)X509_STORE_CTX_get_ex_data(x509Context, SSL_get_ex_data_X509_STORE_CTX_idx());
TlsClient *client = (TlsClient *)SSL_get_ex_data(ssl, m_SSLIndex);
assert(client->GetMutex().active_count);
if (client == NULL)
return 0;
bool valid = (ok != 0);
return client->ValidateCertificateInternal(ok, x509Context);
}
int TlsClient::ValidateCertificateInternal(int ok, X509_STORE_CTX *x509Context)
{
shared_ptr<X509> x509Certificate = shared_ptr<X509>(x509Context->cert, &TlsClient::NullCertificateDeleter);
client->OnVerifyCertificate(client->GetSelf(), &valid, x509Context, x509Certificate);
bool valid = ValidateCertificate((ok != 0), x509Context, x509Certificate);
if (valid) {
Event::Ptr ev = boost::make_shared<Event>();
ev->OnEventDelivered.connect(boost::bind(boost::ref(OnCertificateValidated), GetSelf()));
Event::Post(ev);
}
return valid ? 1 : 0;
}
bool TlsClient::ValidateCertificate(bool ok, X509_STORE_CTX *x509Context, const shared_ptr<X509>& x509Certificate)
{
return ok;
}

View File

@ -33,18 +33,23 @@ class I2_BASE_API TlsClient : public TcpClient
public:
TlsClient(TcpClientRole role, shared_ptr<SSL_CTX> sslContext);
virtual void Start(void);
shared_ptr<X509> GetClientCertificate(void) const;
shared_ptr<X509> GetPeerCertificate(void) const;
virtual void Start(void);
boost::signal<void (const TlsClient::Ptr&)> OnCertificateValidated;
protected:
void HandleSSLError(void);
virtual bool WantsToRead(void) const;
virtual bool WantsToWrite(void) const;
boost::signal<void (const TlsClient::Ptr&, bool *, X509_STORE_CTX *, const shared_ptr<X509>&)> OnVerifyCertificate;
virtual void HandleReadable(void);
virtual void HandleWritable(void);
protected:
void HandleSSLError(void);
virtual bool ValidateCertificate(bool ok, X509_STORE_CTX *x509Context, const shared_ptr<X509>& x509Certificate);
private:
shared_ptr<SSL_CTX> m_SSLContext;
@ -56,14 +61,12 @@ private:
static int m_SSLIndex;
static bool m_SSLIndexInitialized;
virtual size_t FillRecvQueue(void);
virtual size_t FlushSendQueue(void);
virtual void CloseInternal(bool from_dtor);
static void NullCertificateDeleter(X509 *certificate);
static int SSLVerifyCertificate(int ok, X509_STORE_CTX *x509Context);
int ValidateCertificateInternal(int ok, X509_STORE_CTX *x509Context);
};
TcpClient::Ptr TlsClientFactory(TcpClientRole role, shared_ptr<SSL_CTX> sslContext);

View File

@ -54,6 +54,8 @@
<WarningLevel>Level3</WarningLevel>
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;_DEBUG;_WINDOWS;_USRDLL;CHECKER_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
@ -70,6 +72,8 @@
<FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions>
<PreprocessorDefinitions>WIN32;NDEBUG;_WINDOWS;_USRDLL;CHECKER_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>

View File

@ -59,6 +59,8 @@
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<WarningLevel>Level3</WarningLevel>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
@ -80,6 +82,8 @@
<PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<FavorSizeOrSpeed>Speed</FavorSizeOrSpeed>
<WarningLevel>Level3</WarningLevel>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>

View File

@ -61,6 +61,8 @@
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;_DEBUG;_WINDOWS;_USRDLL;CONFIGCOMPONENT_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<WarningLevel>Level3</WarningLevel>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
@ -78,6 +80,8 @@
<PreprocessorDefinitions>WIN32;NDEBUG;_WINDOWS;_USRDLL;CONFIGCOMPONENT_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<FavorSizeOrSpeed>Speed</FavorSizeOrSpeed>
<WarningLevel>Level3</WarningLevel>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>

View File

@ -54,6 +54,8 @@
<WarningLevel>Level3</WarningLevel>
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;_DEBUG;_WINDOWS;_USRDLL;DELEGATION_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
@ -70,6 +72,8 @@
<FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions>
<PreprocessorDefinitions>WIN32;NDEBUG;_WINDOWS;_USRDLL;DELEGATION_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>

View File

@ -54,6 +54,8 @@
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;_DEBUG;_WINDOWS;_USRDLL;DEMO_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<WarningLevel>Level3</WarningLevel>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
@ -70,6 +72,8 @@
<IntrinsicFunctions>true</IntrinsicFunctions>
<PreprocessorDefinitions>WIN32;NDEBUG;_WINDOWS;_USRDLL;DEMO_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<WarningLevel>Level3</WarningLevel>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>

View File

@ -54,6 +54,8 @@
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;_DEBUG;_WINDOWS;_USRDLL;DISCOVERY_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<WarningLevel>Level3</WarningLevel>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
@ -70,6 +72,8 @@
<IntrinsicFunctions>true</IntrinsicFunctions>
<PreprocessorDefinitions>WIN32;NDEBUG;_WINDOWS;_USRDLL;DISCOVERY_EXPORTS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<WarningLevel>Level3</WarningLevel>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>

View File

@ -87,6 +87,8 @@
<WarningLevel>Level3</WarningLevel>
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>_WINDLL;I2_DYN_BUILD;_DEBUG;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
@ -103,6 +105,8 @@
<FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions>
<PreprocessorDefinitions>_WINDLL;I2_DYN_BUILD;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>

View File

@ -53,6 +53,8 @@
</PrecompiledHeader>
<WarningLevel>Level3</WarningLevel>
<Optimization>Disabled</Optimization>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
@ -69,6 +71,8 @@
<FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions>
<PreprocessorDefinitions>WIN32;NDEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>

View File

@ -58,6 +58,8 @@
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;I2_ICINGALAUNCHER_BUILD;_DEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<WarningLevel>Level3</WarningLevel>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
@ -75,6 +77,8 @@
<PreprocessorDefinitions>WIN32;I2_ICINGALAUNCHER_BUILD;NDEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<FavorSizeOrSpeed>Speed</FavorSizeOrSpeed>
<WarningLevel>Level3</WarningLevel>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>

View File

@ -136,6 +136,7 @@ void EndpointManager::NewClientHandler(const TcpClient::Ptr& client)
JsonRpcEndpoint::Ptr endpoint = boost::make_shared<JsonRpcEndpoint>();
endpoint->SetClient(static_pointer_cast<JsonRpcClient>(client));
client->Start();
RegisterEndpoint(endpoint);
}

View File

@ -84,6 +84,8 @@
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;I2_ICINGA_BUILD;_DEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<WarningLevel>Level3</WarningLevel>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
@ -101,6 +103,8 @@
<PreprocessorDefinitions>WIN32;I2_ICINGA_BUILD;NDEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<FavorSizeOrSpeed>Speed</FavorSizeOrSpeed>
<WarningLevel>Level3</WarningLevel>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>

View File

@ -53,7 +53,7 @@ void JsonRpcEndpoint::SetClient(JsonRpcClient::Ptr client)
client->OnNewMessage.connect(boost::bind(&JsonRpcEndpoint::NewMessageHandler, this, _2));
client->OnClosed.connect(boost::bind(&JsonRpcEndpoint::ClientClosedHandler, this));
client->OnError.connect(boost::bind(&JsonRpcEndpoint::ClientErrorHandler, this, _2));
client->OnVerifyCertificate.connect(boost::bind(&JsonRpcEndpoint::VerifyCertificateHandler, this, _2, _4));
client->OnCertificateValidated.connect(boost::bind(&JsonRpcEndpoint::CertificateValidatedHandler, this));
}
bool JsonRpcEndpoint::IsLocal(void) const
@ -135,15 +135,13 @@ void JsonRpcEndpoint::ClientErrorHandler(const std::exception& ex)
Application::Log(LogWarning, "jsonrpc", message.str());
}
void JsonRpcEndpoint::VerifyCertificateHandler(bool *valid, const shared_ptr<X509>& certificate)
void JsonRpcEndpoint::CertificateValidatedHandler(void)
{
if (certificate && *valid) {
string identity = Utility::GetCertificateCN(certificate);
string identity = Utility::GetCertificateCN(m_Client->GetPeerCertificate());
if (GetIdentity().empty() && !identity.empty()) {
m_Identity = identity;
GetEndpointManager()->RegisterEndpoint(GetSelf());
}
if (GetIdentity().empty() && !identity.empty()) {
m_Identity = identity;
GetEndpointManager()->RegisterEndpoint(GetSelf());
}
}

View File

@ -64,7 +64,7 @@ private:
void NewMessageHandler(const MessagePart& message);
void ClientClosedHandler(void);
void ClientErrorHandler(const std::exception& ex);
void VerifyCertificateHandler(bool *valid, const shared_ptr<X509>& certificate);
void CertificateValidatedHandler(void);
};
}

View File

@ -69,6 +69,8 @@
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;I2_JSONRPC_BUILD;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<WarningLevel>Level3</WarningLevel>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
@ -90,6 +92,8 @@
<PreprocessorDefinitions>WIN32;I2_JSONRPC_BUILD;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<FavorSizeOrSpeed>Speed</FavorSizeOrSpeed>
<WarningLevel>Level3</WarningLevel>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>

View File

@ -40,6 +40,8 @@ JsonRpcClient::JsonRpcClient(TcpClientRole role, shared_ptr<SSL_CTX> sslContext)
*/
void JsonRpcClient::SendMessage(const MessagePart& message)
{
mutex::scoped_lock lock(GetMutex());
Netstring::WriteStringToFIFO(GetSendQueue(), message.ToJsonString());
}
@ -53,13 +55,17 @@ void JsonRpcClient::DataAvailableHandler(void)
string jsonString;
MessagePart message;
if (!Netstring::ReadStringFromFIFO(GetRecvQueue(), &jsonString))
return;
{
mutex::scoped_lock lock(GetMutex());
if (!Netstring::ReadStringFromFIFO(GetRecvQueue(), &jsonString))
return;
}
message = MessagePart(jsonString);
OnNewMessage(GetSelf(), message);
} catch (const Exception& ex) {
Application::Log(LogCritical, "jsonrpc", "Exception while processing message from JSON-RPC client: " + string(ex.GetMessage()));
} catch (const std::exception& ex) {
Application::Log(LogCritical, "jsonrpc", "Exception while processing message from JSON-RPC client: " + string(ex.what()));
Close();
return;
@ -70,11 +76,14 @@ void JsonRpcClient::DataAvailableHandler(void)
/**
* Factory function for JSON-RPC clients.
*
* @param fd The file descriptor.
* @param role The role of the underlying TCP client.
* @param sslContext SSL context for the TLS connection.
* @returns A new JSON-RPC client.
*/
JsonRpcClient::Ptr icinga::JsonRpcClientFactory(TcpClientRole role, shared_ptr<SSL_CTX> sslContext)
JsonRpcClient::Ptr icinga::JsonRpcClientFactory(SOCKET fd, TcpClientRole role, shared_ptr<SSL_CTX> sslContext)
{
return boost::make_shared<JsonRpcClient>(role, sslContext);
JsonRpcClient::Ptr client = boost::make_shared<JsonRpcClient>(role, sslContext);
client->SetFD(fd);
return client;
}

View File

@ -42,9 +42,11 @@ public:
private:
void DataAvailableHandler(void);
friend JsonRpcClient::Ptr JsonRpcClientFactory(SOCKET fd, TcpClientRole role, shared_ptr<SSL_CTX> sslContext);
};
JsonRpcClient::Ptr JsonRpcClientFactory(TcpClientRole role, shared_ptr<SSL_CTX> sslContext);
JsonRpcClient::Ptr JsonRpcClientFactory(SOCKET fd, TcpClientRole role, shared_ptr<SSL_CTX> sslContext);
}

View File

@ -28,5 +28,5 @@ using namespace icinga;
*/
JsonRpcServer::JsonRpcServer(shared_ptr<SSL_CTX> sslContext)
{
SetClientFactory(boost::bind(&JsonRpcClientFactory, RoleInbound, sslContext));
SetClientFactory(boost::bind(&JsonRpcClientFactory, _1, RoleInbound, sslContext));
}

View File

@ -51,6 +51,8 @@
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<WarningLevel>Level3</WarningLevel>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
@ -67,6 +69,8 @@
<PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<FavorSizeOrSpeed>Speed</FavorSizeOrSpeed>
<WarningLevel>Level3</WarningLevel>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>

View File

@ -51,6 +51,8 @@
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<WarningLevel>Level3</WarningLevel>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
@ -66,6 +68,8 @@
<IntrinsicFunctions>true</IntrinsicFunctions>
<PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<WarningLevel>Level3</WarningLevel>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<MinimalRebuild>false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>