From 34d26d424fdbb0f032e182951b8c5efc67d20b5d Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Sun, 24 Jun 2012 02:56:48 +0200 Subject: [PATCH] Made sockets multi-threaded. --- base/Makefile.am | 2 + base/application.cpp | 84 ++----------- base/base.vcxproj | 6 + base/event.cpp | 49 ++++++++ base/event.h | 45 +++++++ base/i2-base.h | 3 +- base/socket.cpp | 151 ++++++++++++++++++----- base/socket.h | 38 ++++-- base/tcpclient.cpp | 68 ++++------ base/tcpclient.h | 14 +-- base/tcpserver.cpp | 44 +++---- base/tcpserver.h | 13 +- base/tlsclient.cpp | 128 ++++++++++--------- base/tlsclient.h | 17 +-- components/checker/checker.vcxproj | 4 + components/configfile/configfile.vcxproj | 4 + components/configrpc/configrpc.vcxproj | 4 + components/delegation/delegation.vcxproj | 4 + components/demo/demo.vcxproj | 4 + components/discovery/discovery.vcxproj | 4 + dyn/dyn.vcxproj | 4 + dyntest/dyntest.vcxproj | 4 + icinga-app/icinga-app.vcxproj | 4 + icinga/endpointmanager.cpp | 1 + icinga/icinga.vcxproj | 4 + icinga/jsonrpcendpoint.cpp | 14 +-- icinga/jsonrpcendpoint.h | 2 +- jsonrpc/jsonrpc.vcxproj | 4 + jsonrpc/jsonrpcclient.cpp | 21 +++- jsonrpc/jsonrpcclient.h | 4 +- jsonrpc/jsonrpcserver.cpp | 2 +- third-party/cJSON/cJSON.vcxproj | 4 + third-party/mmatch/mmatch.vcxproj | 4 + 33 files changed, 466 insertions(+), 292 deletions(-) create mode 100644 base/event.cpp create mode 100644 base/event.h diff --git a/base/Makefile.am b/base/Makefile.am index 02fc2be3c..5ceedd730 100644 --- a/base/Makefile.am +++ b/base/Makefile.am @@ -13,6 +13,8 @@ libbase_la_SOURCES = \ configobject.h \ dictionary.cpp \ dictionary.h \ + event.cpp \ + event.h \ exception.cpp \ exception.h \ fifo.cpp \ diff --git a/base/application.cpp b/base/application.cpp index 8ecf9348a..80470aa94 100644 --- a/base/application.cpp +++ b/base/application.cpp @@ -96,9 +96,6 @@ Application::Ptr Application::GetInstance(void) void Application::RunEventLoop(void) { while (!m_ShuttingDown) { - fd_set readfds, writefds, exceptfds; - int nfds = -1; - Object::ClearHeldObjects(); long sleep = Timer::ProcessTimers(); @@ -106,80 +103,13 @@ void Application::RunEventLoop(void) if (m_ShuttingDown) break; - FD_ZERO(&readfds); - FD_ZERO(&writefds); - FD_ZERO(&exceptfds); + vector events; + + Event::Wait(&events, boost::get_system_time() + boost::posix_time::seconds(sleep)); - Socket::CollectionType::iterator prev, i; - for (i = Socket::Sockets.begin(); - i != Socket::Sockets.end(); ) { - Socket::Ptr socket = i->lock(); - - prev = i; - i++; - - if (!socket) { - Socket::Sockets.erase(prev); - continue; - } - - int fd = socket->GetFD(); - - if (socket->WantsToWrite()) - FD_SET(fd, &writefds); - - if (socket->WantsToRead()) - FD_SET(fd, &readfds); - - FD_SET(fd, &exceptfds); - - if (fd > nfds) - nfds = fd; - } - - timeval tv; - tv.tv_sec = sleep; - tv.tv_usec = 0; - - int ready; - - if (nfds == -1) { - Sleep(tv.tv_sec * 1000 + tv.tv_usec); - ready = 0; - } else - ready = select(nfds + 1, &readfds, &writefds, - &exceptfds, &tv); - - if (ready < 0) - break; - else if (ready == 0) - continue; - - for (i = Socket::Sockets.begin(); - i != Socket::Sockets.end(); ) { - Socket::Ptr socket = i->lock(); - - prev = i; - i++; - - if (!socket) { - Socket::Sockets.erase(prev); - continue; - } - - int fd; - - fd = socket->GetFD(); - if (fd != INVALID_SOCKET && FD_ISSET(fd, &writefds)) - socket->OnWritable(socket); - - fd = socket->GetFD(); - if (fd != INVALID_SOCKET && FD_ISSET(fd, &readfds)) - socket->OnReadable(socket); - - fd = socket->GetFD(); - if (fd != INVALID_SOCKET && FD_ISSET(fd, &exceptfds)) - socket->OnException(socket); + for (vector::iterator it = events.begin(); it != events.end(); it++) { + Event::Ptr ev = *it; + ev->OnEventDelivered(); } } } @@ -296,7 +226,7 @@ void Application::Log(LogSeverity severity, const string& facility, const string char timestamp[100]; // TODO: make this configurable - if (!IsDebugging() && severity < LogInformation) + if (/*!IsDebugging() && */severity < LogInformation) return; string severityStr; diff --git a/base/base.vcxproj b/base/base.vcxproj index 53ae7c6d0..d22ca8f05 100644 --- a/base/base.vcxproj +++ b/base/base.vcxproj @@ -15,6 +15,7 @@ + @@ -37,6 +38,7 @@ + @@ -97,6 +99,8 @@ Disabled _WINDLL;I2_BASE_BUILD;_DEBUG;%(PreprocessorDefinitions) Level3 + true + false Windows @@ -117,6 +121,8 @@ _WINDLL;I2_BASE_BUILD;%(PreprocessorDefinitions) Speed Level3 + true + false Windows diff --git a/base/event.cpp b/base/event.cpp new file mode 100644 index 000000000..284f727cd --- /dev/null +++ b/base/event.cpp @@ -0,0 +1,49 @@ +/****************************************************************************** + * Icinga 2 * + * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * + * * + * This program is free software; you can redistribute it and/or * + * modify it under the terms of the GNU General Public License * + * as published by the Free Software Foundation; either version 2 * + * of the License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the Free Software Foundation * + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * + ******************************************************************************/ + +#include "i2-base.h" + +using namespace icinga; + +deque Event::m_Events; +condition_variable Event::m_EventAvailable; +mutex Event::m_Mutex; + +bool Event::Wait(vector *events, const system_time& wait_until) +{ + mutex::scoped_lock lock(m_Mutex); + + while (m_Events.empty()) { + if (!m_EventAvailable.timed_wait(lock, wait_until)) + return false; + } + + vector result; + std::copy(m_Events.begin(), m_Events.end(), back_inserter(*events)); + m_Events.clear(); + + return true; +} + +void Event::Post(const Event::Ptr& ev) +{ + mutex::scoped_lock lock(m_Mutex); + m_Events.push_back(ev); + m_EventAvailable.notify_all(); +} diff --git a/base/event.h b/base/event.h new file mode 100644 index 000000000..bb586ef93 --- /dev/null +++ b/base/event.h @@ -0,0 +1,45 @@ +/****************************************************************************** + * Icinga 2 * + * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * + * * + * This program is free software; you can redistribute it and/or * + * modify it under the terms of the GNU General Public License * + * as published by the Free Software Foundation; either version 2 * + * of the License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the Free Software Foundation * + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * + ******************************************************************************/ + +#ifndef EVENT_H +#define EVENT_H + +namespace icinga +{ + +class Event : public Object +{ +public: + typedef shared_ptr Ptr; + typedef weak_ptr WeakPtr; + + static bool Wait(vector *events, const system_time& wait_until); + static void Post(const Event::Ptr& ev); + + boost::signal OnEventDelivered; + +private: + static deque m_Events; + static condition_variable m_EventAvailable; + static mutex m_Mutex; +}; + +} + +#endif /* EVENT_H */ diff --git a/base/i2-base.h b/base/i2-base.h index bdfb75729..f795e507e 100644 --- a/base/i2-base.h +++ b/base/i2-base.h @@ -128,6 +128,7 @@ using boost::thread; using boost::thread_group; using boost::mutex; using boost::condition_variable; +using boost::system_time; #if defined(__APPLE__) && defined(__MACH__) # pragma GCC diagnostic ignored "-Wdeprecated-declarations" @@ -150,7 +151,7 @@ using boost::condition_variable; #include "utility.h" #include "object.h" #include "exception.h" -#include "memory.h" +#include "event.h" #include "variant.h" #include "dictionary.h" #include "timer.h" diff --git a/base/socket.cpp b/base/socket.cpp index fc90011ce..b27459cb2 100644 --- a/base/socket.cpp +++ b/base/socket.cpp @@ -21,12 +21,6 @@ using namespace icinga; -/** - * A collection of weak pointers to Socket objects which have been - * registered with the socket sub-system. - */ -Socket::CollectionType Socket::Sockets; - /** * Constructor for the Socket class. */ @@ -40,27 +34,23 @@ Socket::Socket(void) */ Socket::~Socket(void) { - CloseInternal(true); + { + mutex::scoped_lock lock(m_Mutex); + + CloseInternal(true); + } } -/** - * Registers the socket and starts handling events for it. - */ void Socket::Start(void) { - assert(m_FD != INVALID_SOCKET); + assert(!m_ReadThread.joinable() && !m_WriteThread.joinable()); + assert(GetFD() != INVALID_SOCKET); - OnException.connect(boost::bind(&Socket::ExceptionEventHandler, this)); + m_ReadThread = thread(boost::bind(&Socket::ReadThreadProc, static_cast(GetSelf()))); + m_ReadThread.detach(); - Sockets.push_back(GetSelf()); -} - -/** - * Unregisters the sockets and stops handling events for it. - */ -void Socket::Stop(void) -{ - Sockets.remove_if(WeakPtrEqual(this)); + m_WriteThread = thread(boost::bind(&Socket::WriteThreadProc, static_cast(GetSelf()))); + m_WriteThread.detach(); } /** @@ -70,8 +60,6 @@ void Socket::Stop(void) */ void Socket::SetFD(SOCKET fd) { - unsigned long lTrue = 1; - /* mark the socket as non-blocking */ if (fd != INVALID_SOCKET) { #ifdef F_GETFL @@ -83,6 +71,7 @@ void Socket::SetFD(SOCKET fd) if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) throw PosixException("fcntl failed", errno); #else /* F_GETFL */ + unsigned long lTrue = 1; ioctlsocket(fd, FIONBIO, &lTrue); #endif /* F_GETFL */ } @@ -105,6 +94,8 @@ SOCKET Socket::GetFD(void) const */ void Socket::Close(void) { + mutex::scoped_lock lock(m_Mutex); + CloseInternal(false); } @@ -124,9 +115,9 @@ void Socket::CloseInternal(bool from_dtor) /* nobody can possibly have a valid event subscription when the destructor has been called */ if (!from_dtor) { - Stop(); - - OnClosed(GetSelf()); + Event::Ptr ev = boost::make_shared(); + ev->OnEventDelivered.connect(boost::bind(boost::ref(OnClosed), GetSelf())); + Event::Post(ev); } } @@ -171,9 +162,11 @@ int Socket::GetLastSocketError(void) void Socket::HandleSocketError(const std::exception& ex) { if (!OnError.empty()) { - OnError(GetSelf(), ex); + Event::Ptr ev = boost::make_shared(); + ev->OnEventDelivered.connect(boost::bind(boost::ref(OnError), GetSelf(), ex)); + Event::Post(ev); - Close(); + CloseInternal(false); } else { throw ex; } @@ -181,10 +174,8 @@ void Socket::HandleSocketError(const std::exception& ex) /** * Processes errors that have occured for the socket. - * - * @param - Event arguments for the socket error. */ -void Socket::ExceptionEventHandler(void) +void Socket::HandleException(void) { HandleSocketError(SocketException( "select() returned fd in except fdset", GetError())); @@ -200,6 +191,9 @@ bool Socket::WantsToRead(void) const return false; } +void Socket::HandleReadable(void) +{ } + /** * Checks whether data should be written for this socket object. * @@ -210,6 +204,9 @@ bool Socket::WantsToWrite(void) const return false; } +void Socket::HandleWritable(void) +{ } + /** * Formats a sockaddr in a human-readable way. * @@ -236,6 +233,8 @@ string Socket::GetAddressFromSockaddr(sockaddr *address, socklen_t len) */ string Socket::GetClientAddress(void) { + mutex::scoped_lock lock(m_Mutex); + sockaddr_storage sin; socklen_t len = sizeof(sin); @@ -256,6 +255,8 @@ string Socket::GetClientAddress(void) */ string Socket::GetPeerAddress(void) { + mutex::scoped_lock lock(m_Mutex); + sockaddr_storage sin; socklen_t len = sizeof(sin); @@ -286,3 +287,91 @@ SocketException::SocketException(const string& message, int errorCode) string msg = message + ": " + details; SetMessage(msg.c_str()); } + +void Socket::ReadThreadProc(void) +{ + mutex::scoped_lock lock(m_Mutex); + + for (;;) { + fd_set readfds, exceptfds; + + FD_ZERO(&readfds); + FD_ZERO(&exceptfds); + + int fd = GetFD(); + + if (fd == INVALID_SOCKET) + return; + + if (WantsToRead()) + FD_SET(fd, &readfds); + + FD_SET(fd, &exceptfds); + + lock.unlock(); + + timeval tv; + tv.tv_sec = 5; + tv.tv_usec = 0; + int rc = select(fd + 1, &readfds, NULL, &exceptfds, &tv); + + lock.lock(); + + if (rc < 0) { + HandleSocketError(SocketException("select() failed", GetError())); + return; + } + + if (FD_ISSET(fd, &readfds)) + HandleReadable(); + + if (FD_ISSET(fd, &exceptfds)) + HandleException(); + + if (WantsToWrite()) + ; /* notify Write thread */ + } +} + +void Socket::WriteThreadProc(void) +{ + mutex::scoped_lock lock(m_Mutex); + + for (;;) { + fd_set writefds; + + FD_ZERO(&writefds); + + int fd = GetFD(); + + while (!WantsToWrite()) { + if (GetFD() == INVALID_SOCKET) + return; + + lock.unlock(); + Sleep(500); + lock.lock(); + } + + FD_SET(fd, &writefds); + + lock.unlock(); + + int rc = select(fd + 1, NULL, &writefds, NULL, NULL); + + lock.lock(); + + if (rc < 0) { + HandleSocketError(SocketException("select() failed", GetError())); + return; + } + + if (FD_ISSET(fd, &writefds)) + HandleWritable(); + } +} + +mutex& Socket::GetMutex(void) const +{ + return m_Mutex; +} diff --git a/base/socket.h b/base/socket.h index aa920d44d..4a391e1dd 100644 --- a/base/socket.h +++ b/base/socket.h @@ -33,45 +33,59 @@ public: typedef shared_ptr Ptr; typedef weak_ptr WeakPtr; - typedef list CollectionType; + //typedef list CollectionType; - static Socket::CollectionType Sockets; + //static Socket::CollectionType Sockets; ~Socket(void); - void SetFD(SOCKET fd); - SOCKET GetFD(void) const; - - boost::signal OnReadable; - boost::signal OnWritable; - boost::signal OnException; + //boost::signal OnReadable; + //boost::signal OnWritable; + //boost::signal OnException; boost::signal OnError; boost::signal OnClosed; - virtual bool WantsToRead(void) const; - virtual bool WantsToWrite(void) const; - virtual void Start(void); - virtual void Stop(void); + //virtual void Stop(void); void Close(void); string GetClientAddress(void); string GetPeerAddress(void); + mutex& GetMutex(void) const; + protected: Socket(void); + void SetFD(SOCKET fd); + SOCKET GetFD(void) const; + int GetError(void) const; static int GetLastSocketError(void); void HandleSocketError(const std::exception& ex); + virtual bool WantsToRead(void) const; + virtual bool WantsToWrite(void) const; + + virtual void HandleReadable(void); + virtual void HandleWritable(void); + virtual void HandleException(void); + virtual void CloseInternal(bool from_dtor); + mutable mutex m_Mutex; + private: SOCKET m_FD; /**< The socket descriptor. */ + thread m_ReadThread; + thread m_WriteThread; + + void ReadThreadProc(void); + void WriteThreadProc(void); + void ExceptionEventHandler(void); static string GetAddressFromSockaddr(sockaddr *address, socklen_t len); diff --git a/base/tcpclient.cpp b/base/tcpclient.cpp index 70fa81406..2b0d18f52 100644 --- a/base/tcpclient.cpp +++ b/base/tcpclient.cpp @@ -44,17 +44,6 @@ TcpClientRole TcpClient::GetRole(void) const return m_Role; } -/** - * Registers the socket and starts processing events for it. - */ -void TcpClient::Start(void) -{ - TcpSocket::Start(); - - OnReadable.connect(boost::bind(&TcpClient::ReadableEventHandler, this)); - OnWritable.connect(boost::bind(&TcpClient::WritableEventHandler, this)); -} - /** * Creates a socket and connects to the specified node and service. * @@ -124,7 +113,7 @@ FIFO::Ptr TcpClient::GetSendQueue(void) return m_SendQueue; } -size_t TcpClient::FlushSendQueue(void) +void TcpClient::HandleWritable(void) { int rc; @@ -132,7 +121,7 @@ size_t TcpClient::FlushSendQueue(void) if (rc <= 0) { HandleSocketError(SocketException("send() failed", GetError())); - return 0; + return; } m_SendQueue->Read(NULL, rc); @@ -148,46 +137,31 @@ FIFO::Ptr TcpClient::GetRecvQueue(void) return m_RecvQueue; } -size_t TcpClient::FillRecvQueue(void) +void TcpClient::HandleReadable(void) { - int rc; + for (;;) { + size_t bufferSize = FIFO::BlockSize / 2; + char *buffer = (char *)m_RecvQueue->GetWriteBuffer(&bufferSize); + int rc = recv(GetFD(), buffer, bufferSize, 0); - size_t bufferSize = FIFO::BlockSize / 2; - char *buffer = (char *)m_RecvQueue->GetWriteBuffer(&bufferSize); - rc = recv(GetFD(), buffer, bufferSize, 0); + #ifdef _WIN32 + if (rc < 0 && WSAGetLastError() == WSAEWOULDBLOCK) + #else /* _WIN32 */ + if (rc < 0 && errno == EAGAIN) + #endif /* _WIN32 */ + return; -#ifdef _WIN32 - if (rc < 0 && WSAGetLastError() == WSAEWOULDBLOCK) -#else /* _WIN32 */ - if (rc < 0 && errno == EAGAIN) -#endif /* _WIN32 */ - return 0; + if (rc <= 0) { + HandleSocketError(SocketException("recv() failed", GetError())); + return; + } - if (rc <= 0) { - HandleSocketError(SocketException("recv() failed", GetError())); - return 0; + m_RecvQueue->Write(NULL, rc); } - m_RecvQueue->Write(NULL, rc); - - return rc; -} - -/** - * Processes data that is available for this socket. - */ -void TcpClient::ReadableEventHandler(void) -{ - if (FillRecvQueue() > 0) - OnDataAvailable(GetSelf()); -} - -/** - * Processes data that can be written for this socket. - */ -void TcpClient::WritableEventHandler(void) -{ - FlushSendQueue(); + Event::Ptr ev = boost::make_shared(); + ev->OnEventDelivered.connect(boost::bind(boost::ref(OnDataAvailable), GetSelf())); + Event::Post(ev); } /** diff --git a/base/tcpclient.h b/base/tcpclient.h index 048e8dad6..479d61734 100644 --- a/base/tcpclient.h +++ b/base/tcpclient.h @@ -51,29 +51,25 @@ public: TcpClientRole GetRole(void) const; - virtual void Start(void); - void Connect(const string& node, const string& service); FIFO::Ptr GetSendQueue(void); FIFO::Ptr GetRecvQueue(void); + boost::signal OnDataAvailable; + +protected: virtual bool WantsToRead(void) const; virtual bool WantsToWrite(void) const; - boost::signal OnDataAvailable; + virtual void HandleReadable(void); + virtual void HandleWritable(void); private: TcpClientRole m_Role; FIFO::Ptr m_SendQueue; FIFO::Ptr m_RecvQueue; - - virtual size_t FillRecvQueue(void); - virtual size_t FlushSendQueue(void); - - void ReadableEventHandler(void); - void WritableEventHandler(void); }; /** diff --git a/base/tcpserver.cpp b/base/tcpserver.cpp index fb559667b..c4b863e54 100644 --- a/base/tcpserver.cpp +++ b/base/tcpserver.cpp @@ -34,7 +34,7 @@ TcpServer::TcpServer(void) * * @param clientFactory The client factory function. */ -void TcpServer::SetClientFactory(function clientFactory) +void TcpServer::SetClientFactory(function clientFactory) { m_ClientFactory = clientFactory; } @@ -44,21 +44,11 @@ void TcpServer::SetClientFactory(function clientFactory) * * @returns The client factory function. */ -function TcpServer::GetFactoryFunction(void) const +function TcpServer::GetFactoryFunction(void) const { return m_ClientFactory; } -/** - * Registers the TCP server and starts processing events for it. - */ -void TcpServer::Start(void) -{ - TcpSocket::Start(); - - OnReadable.connect(boost::bind(&TcpServer::ReadableEventHandler, this)); -} - /** * Starts listening for incoming client connections. */ @@ -71,11 +61,21 @@ void TcpServer::Listen(void) } } +/** + * Checks whether the TCP server wants to read (i.e. accept new clients). + * + * @returns true + */ +bool TcpServer::WantsToRead(void) const +{ + return true; +} + /** * Accepts a new client and creates a new client object for it * using the client factory function. */ -void TcpServer::ReadableEventHandler(void) +void TcpServer::HandleReadable(void) { int fd; sockaddr_storage addr; @@ -89,19 +89,9 @@ void TcpServer::ReadableEventHandler(void) return; } - TcpClient::Ptr client = m_ClientFactory(); - client->SetFD(fd); - client->Start(); + TcpClient::Ptr client = m_ClientFactory(fd); - OnNewClient(GetSelf(), client); -} - -/** - * Checks whether the TCP server wants to read (i.e. accept new clients). - * - * @returns true - */ -bool TcpServer::WantsToRead(void) const -{ - return true; + Event::Ptr ev = boost::make_shared(); + ev->OnEventDelivered.connect(boost::bind(boost::ref(OnNewClient), GetSelf(), client)); + Event::Post(ev); } diff --git a/base/tcpserver.h b/base/tcpserver.h index 247f66a78..fa64a34e4 100644 --- a/base/tcpserver.h +++ b/base/tcpserver.h @@ -37,21 +37,20 @@ public: TcpServer(void); - void SetClientFactory(function function); - function GetFactoryFunction(void) const; - - virtual void Start(); + void SetClientFactory(function function); + function GetFactoryFunction(void) const; void Listen(void); boost::signal OnNewClient; +protected: virtual bool WantsToRead(void) const; -private: - void ReadableEventHandler(void); + virtual void HandleReadable(void); - function m_ClientFactory; +private: + function m_ClientFactory; }; } diff --git a/base/tlsclient.cpp b/base/tlsclient.cpp index 0c1ab6440..3ac84a96e 100644 --- a/base/tlsclient.cpp +++ b/base/tlsclient.cpp @@ -37,43 +37,8 @@ TlsClient::TlsClient(TcpClientRole role, shared_ptr sslContext) : TcpCl m_BlockWrite = false; } -/** - * Takes a certificate as an argument. Does nothing. - * - * @param certificate An X509 certificate. - */ -void TlsClient::NullCertificateDeleter(X509 *certificate) -{ - /* Nothing to do here. */ -} - -/** - * Retrieves the X509 certficate for this client. - * - * @returns The X509 certificate. - */ -shared_ptr TlsClient::GetClientCertificate(void) const -{ - return shared_ptr(SSL_get_certificate(m_SSL.get()), &TlsClient::NullCertificateDeleter); -} - -/** - * Retrieves the X509 certficate for the peer. - * - * @returns The X509 certificate. - */ -shared_ptr TlsClient::GetPeerCertificate(void) const -{ - return shared_ptr(SSL_get_peer_certificate(m_SSL.get()), X509_free); -} - -/** - * Registers the TLS socket and starts processing events for it. - */ void TlsClient::Start(void) { - TcpClient::Start(); - m_SSL = shared_ptr(SSL_new(m_SSLContext.get()), SSL_free); if (!m_SSL) @@ -101,12 +66,48 @@ void TlsClient::Start(void) SSL_set_connect_state(m_SSL.get()); SSL_do_handshake(m_SSL.get()); + + Socket::Start(); +} + +/** + * Takes a certificate as an argument. Does nothing. + * + * @param certificate An X509 certificate. + */ +void TlsClient::NullCertificateDeleter(X509 *certificate) +{ + /* Nothing to do here. */ +} + +/** + * Retrieves the X509 certficate for this client. + * + * @returns The X509 certificate. + */ +shared_ptr TlsClient::GetClientCertificate(void) const +{ + mutex::scoped_lock lock(GetMutex()); + + return shared_ptr(SSL_get_certificate(m_SSL.get()), &TlsClient::NullCertificateDeleter); +} + +/** + * Retrieves the X509 certficate for the peer. + * + * @returns The X509 certificate. + */ +shared_ptr TlsClient::GetPeerCertificate(void) const +{ + mutex::scoped_lock lock(GetMutex()); + + return shared_ptr(SSL_get_peer_certificate(m_SSL.get()), X509_free); } /** * Processes data that is available for this socket. */ -size_t TlsClient::FillRecvQueue(void) +void TlsClient::HandleReadable(void) { int result; @@ -116,10 +117,9 @@ size_t TlsClient::FillRecvQueue(void) result = 0; for (;;) { - int rc; size_t bufferSize = FIFO::BlockSize / 2; char *buffer = (char *)GetRecvQueue()->GetWriteBuffer(&bufferSize); - rc = SSL_read(m_SSL.get(), buffer, bufferSize); + int rc = SSL_read(m_SSL.get(), buffer, bufferSize); if (rc <= 0) { switch (SSL_get_error(m_SSL.get(), rc)) { @@ -127,36 +127,35 @@ size_t TlsClient::FillRecvQueue(void) m_BlockRead = true; /* fall through */ case SSL_ERROR_WANT_READ: - return result; + goto post_event; case SSL_ERROR_ZERO_RETURN: - Close(); - return result; + CloseInternal(false); + goto post_event; default: HandleSocketError(OpenSSLException( "SSL_read failed", ERR_get_error())); - return result; + goto post_event; } } GetRecvQueue()->Write(NULL, rc); - - result += rc; } - return result; +post_event: + Event::Ptr ev = boost::make_shared(); + ev->OnEventDelivered.connect(boost::bind(boost::ref(OnDataAvailable), GetSelf())); + Event::Post(ev); } /** * Processes data that can be written for this socket. */ -size_t TlsClient::FlushSendQueue(void) +void TlsClient::HandleWritable(void) { - int rc; - m_BlockRead = false; m_BlockWrite = false; - rc = SSL_write(m_SSL.get(), (const char *)GetSendQueue()->GetReadBuffer(), GetSendQueue()->GetSize()); + int rc = SSL_write(m_SSL.get(), (const char *)GetSendQueue()->GetReadBuffer(), GetSendQueue()->GetSize()); if (rc <= 0) { switch (SSL_get_error(m_SSL.get(), rc)) { @@ -164,20 +163,18 @@ size_t TlsClient::FlushSendQueue(void) m_BlockWrite = true; /* fall through */ case SSL_ERROR_WANT_WRITE: - return 0; + return; case SSL_ERROR_ZERO_RETURN: - Close(); - return 0; + CloseInternal(false); + return; default: HandleSocketError(OpenSSLException( "SSL_write failed", ERR_get_error())); - return 0; + return; } } GetSendQueue()->Read(NULL, rc); - - return rc; } /** @@ -249,12 +246,29 @@ int TlsClient::SSLVerifyCertificate(int ok, X509_STORE_CTX *x509Context) SSL *ssl = (SSL *)X509_STORE_CTX_get_ex_data(x509Context, SSL_get_ex_data_X509_STORE_CTX_idx()); TlsClient *client = (TlsClient *)SSL_get_ex_data(ssl, m_SSLIndex); + assert(client->GetMutex().active_count); + if (client == NULL) return 0; - bool valid = (ok != 0); + return client->ValidateCertificateInternal(ok, x509Context); +} + +int TlsClient::ValidateCertificateInternal(int ok, X509_STORE_CTX *x509Context) +{ shared_ptr x509Certificate = shared_ptr(x509Context->cert, &TlsClient::NullCertificateDeleter); - client->OnVerifyCertificate(client->GetSelf(), &valid, x509Context, x509Certificate); + bool valid = ValidateCertificate((ok != 0), x509Context, x509Certificate); + + if (valid) { + Event::Ptr ev = boost::make_shared(); + ev->OnEventDelivered.connect(boost::bind(boost::ref(OnCertificateValidated), GetSelf())); + Event::Post(ev); + } return valid ? 1 : 0; } + +bool TlsClient::ValidateCertificate(bool ok, X509_STORE_CTX *x509Context, const shared_ptr& x509Certificate) +{ + return ok; +} diff --git a/base/tlsclient.h b/base/tlsclient.h index ff44cc813..87de66ef8 100644 --- a/base/tlsclient.h +++ b/base/tlsclient.h @@ -33,18 +33,23 @@ class I2_BASE_API TlsClient : public TcpClient public: TlsClient(TcpClientRole role, shared_ptr sslContext); + virtual void Start(void); + shared_ptr GetClientCertificate(void) const; shared_ptr GetPeerCertificate(void) const; - virtual void Start(void); + boost::signal OnCertificateValidated; + +protected: + void HandleSSLError(void); virtual bool WantsToRead(void) const; virtual bool WantsToWrite(void) const; - boost::signal&)> OnVerifyCertificate; + virtual void HandleReadable(void); + virtual void HandleWritable(void); -protected: - void HandleSSLError(void); + virtual bool ValidateCertificate(bool ok, X509_STORE_CTX *x509Context, const shared_ptr& x509Certificate); private: shared_ptr m_SSLContext; @@ -56,14 +61,12 @@ private: static int m_SSLIndex; static bool m_SSLIndexInitialized; - virtual size_t FillRecvQueue(void); - virtual size_t FlushSendQueue(void); - virtual void CloseInternal(bool from_dtor); static void NullCertificateDeleter(X509 *certificate); static int SSLVerifyCertificate(int ok, X509_STORE_CTX *x509Context); + int ValidateCertificateInternal(int ok, X509_STORE_CTX *x509Context); }; TcpClient::Ptr TlsClientFactory(TcpClientRole role, shared_ptr sslContext); diff --git a/components/checker/checker.vcxproj b/components/checker/checker.vcxproj index 6e894d9de..67d311df7 100644 --- a/components/checker/checker.vcxproj +++ b/components/checker/checker.vcxproj @@ -54,6 +54,8 @@ Level3 Disabled WIN32;_DEBUG;_WINDOWS;_USRDLL;CHECKER_EXPORTS;%(PreprocessorDefinitions) + true + false Windows @@ -70,6 +72,8 @@ true true WIN32;NDEBUG;_WINDOWS;_USRDLL;CHECKER_EXPORTS;%(PreprocessorDefinitions) + true + false Windows diff --git a/components/configfile/configfile.vcxproj b/components/configfile/configfile.vcxproj index 01e6e01d4..a4061b5a1 100644 --- a/components/configfile/configfile.vcxproj +++ b/components/configfile/configfile.vcxproj @@ -59,6 +59,8 @@ Disabled WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions) Level3 + true + false Windows @@ -80,6 +82,8 @@ WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions) Speed Level3 + true + false Windows diff --git a/components/configrpc/configrpc.vcxproj b/components/configrpc/configrpc.vcxproj index 47805d16a..eae5056e5 100644 --- a/components/configrpc/configrpc.vcxproj +++ b/components/configrpc/configrpc.vcxproj @@ -61,6 +61,8 @@ Disabled WIN32;_DEBUG;_WINDOWS;_USRDLL;CONFIGCOMPONENT_EXPORTS;%(PreprocessorDefinitions) Level3 + true + false Windows @@ -78,6 +80,8 @@ WIN32;NDEBUG;_WINDOWS;_USRDLL;CONFIGCOMPONENT_EXPORTS;%(PreprocessorDefinitions) Speed Level3 + true + false Windows diff --git a/components/delegation/delegation.vcxproj b/components/delegation/delegation.vcxproj index cfe75bea3..8cb021043 100644 --- a/components/delegation/delegation.vcxproj +++ b/components/delegation/delegation.vcxproj @@ -54,6 +54,8 @@ Level3 Disabled WIN32;_DEBUG;_WINDOWS;_USRDLL;DELEGATION_EXPORTS;%(PreprocessorDefinitions) + true + false Windows @@ -70,6 +72,8 @@ true true WIN32;NDEBUG;_WINDOWS;_USRDLL;DELEGATION_EXPORTS;%(PreprocessorDefinitions) + true + false Windows diff --git a/components/demo/demo.vcxproj b/components/demo/demo.vcxproj index db011539a..653970c34 100644 --- a/components/demo/demo.vcxproj +++ b/components/demo/demo.vcxproj @@ -54,6 +54,8 @@ Disabled WIN32;_DEBUG;_WINDOWS;_USRDLL;DEMO_EXPORTS;%(PreprocessorDefinitions) Level3 + true + false Windows @@ -70,6 +72,8 @@ true WIN32;NDEBUG;_WINDOWS;_USRDLL;DEMO_EXPORTS;%(PreprocessorDefinitions) Level3 + true + false Windows diff --git a/components/discovery/discovery.vcxproj b/components/discovery/discovery.vcxproj index 4e6a6ed42..ee8f0c6b8 100644 --- a/components/discovery/discovery.vcxproj +++ b/components/discovery/discovery.vcxproj @@ -54,6 +54,8 @@ Disabled WIN32;_DEBUG;_WINDOWS;_USRDLL;DISCOVERY_EXPORTS;%(PreprocessorDefinitions) Level3 + true + false Windows @@ -70,6 +72,8 @@ true WIN32;NDEBUG;_WINDOWS;_USRDLL;DISCOVERY_EXPORTS;%(PreprocessorDefinitions) Level3 + true + false Windows diff --git a/dyn/dyn.vcxproj b/dyn/dyn.vcxproj index 4d4f244d3..f9ae0a29b 100644 --- a/dyn/dyn.vcxproj +++ b/dyn/dyn.vcxproj @@ -87,6 +87,8 @@ Level3 Disabled _WINDLL;I2_DYN_BUILD;_DEBUG;%(PreprocessorDefinitions) + true + false Windows @@ -103,6 +105,8 @@ true true _WINDLL;I2_DYN_BUILD;%(PreprocessorDefinitions) + true + false Windows diff --git a/dyntest/dyntest.vcxproj b/dyntest/dyntest.vcxproj index 4a7d9f799..495ff32a8 100644 --- a/dyntest/dyntest.vcxproj +++ b/dyntest/dyntest.vcxproj @@ -53,6 +53,8 @@ Level3 Disabled + true + false Console @@ -69,6 +71,8 @@ true true WIN32;NDEBUG;_CONSOLE;%(PreprocessorDefinitions) + true + false Console diff --git a/icinga-app/icinga-app.vcxproj b/icinga-app/icinga-app.vcxproj index 16a24dedf..86bc63b18 100644 --- a/icinga-app/icinga-app.vcxproj +++ b/icinga-app/icinga-app.vcxproj @@ -58,6 +58,8 @@ Disabled WIN32;I2_ICINGALAUNCHER_BUILD;_DEBUG;_CONSOLE;%(PreprocessorDefinitions) Level3 + true + false Console @@ -75,6 +77,8 @@ WIN32;I2_ICINGALAUNCHER_BUILD;NDEBUG;_CONSOLE;%(PreprocessorDefinitions) Speed Level3 + true + false Console diff --git a/icinga/endpointmanager.cpp b/icinga/endpointmanager.cpp index b4d5c9123..84cb4463c 100644 --- a/icinga/endpointmanager.cpp +++ b/icinga/endpointmanager.cpp @@ -136,6 +136,7 @@ void EndpointManager::NewClientHandler(const TcpClient::Ptr& client) JsonRpcEndpoint::Ptr endpoint = boost::make_shared(); endpoint->SetClient(static_pointer_cast(client)); + client->Start(); RegisterEndpoint(endpoint); } diff --git a/icinga/icinga.vcxproj b/icinga/icinga.vcxproj index 63ead4e0b..78bb39497 100644 --- a/icinga/icinga.vcxproj +++ b/icinga/icinga.vcxproj @@ -84,6 +84,8 @@ Disabled WIN32;I2_ICINGA_BUILD;_DEBUG;_CONSOLE;%(PreprocessorDefinitions) Level3 + true + false Console @@ -101,6 +103,8 @@ WIN32;I2_ICINGA_BUILD;NDEBUG;_CONSOLE;%(PreprocessorDefinitions) Speed Level3 + true + false Console diff --git a/icinga/jsonrpcendpoint.cpp b/icinga/jsonrpcendpoint.cpp index 6aa1dbd6b..881516451 100644 --- a/icinga/jsonrpcendpoint.cpp +++ b/icinga/jsonrpcendpoint.cpp @@ -53,7 +53,7 @@ void JsonRpcEndpoint::SetClient(JsonRpcClient::Ptr client) client->OnNewMessage.connect(boost::bind(&JsonRpcEndpoint::NewMessageHandler, this, _2)); client->OnClosed.connect(boost::bind(&JsonRpcEndpoint::ClientClosedHandler, this)); client->OnError.connect(boost::bind(&JsonRpcEndpoint::ClientErrorHandler, this, _2)); - client->OnVerifyCertificate.connect(boost::bind(&JsonRpcEndpoint::VerifyCertificateHandler, this, _2, _4)); + client->OnCertificateValidated.connect(boost::bind(&JsonRpcEndpoint::CertificateValidatedHandler, this)); } bool JsonRpcEndpoint::IsLocal(void) const @@ -135,15 +135,13 @@ void JsonRpcEndpoint::ClientErrorHandler(const std::exception& ex) Application::Log(LogWarning, "jsonrpc", message.str()); } -void JsonRpcEndpoint::VerifyCertificateHandler(bool *valid, const shared_ptr& certificate) +void JsonRpcEndpoint::CertificateValidatedHandler(void) { - if (certificate && *valid) { - string identity = Utility::GetCertificateCN(certificate); + string identity = Utility::GetCertificateCN(m_Client->GetPeerCertificate()); - if (GetIdentity().empty() && !identity.empty()) { - m_Identity = identity; - GetEndpointManager()->RegisterEndpoint(GetSelf()); - } + if (GetIdentity().empty() && !identity.empty()) { + m_Identity = identity; + GetEndpointManager()->RegisterEndpoint(GetSelf()); } } diff --git a/icinga/jsonrpcendpoint.h b/icinga/jsonrpcendpoint.h index 852dadf86..338bd2b45 100644 --- a/icinga/jsonrpcendpoint.h +++ b/icinga/jsonrpcendpoint.h @@ -64,7 +64,7 @@ private: void NewMessageHandler(const MessagePart& message); void ClientClosedHandler(void); void ClientErrorHandler(const std::exception& ex); - void VerifyCertificateHandler(bool *valid, const shared_ptr& certificate); + void CertificateValidatedHandler(void); }; } diff --git a/jsonrpc/jsonrpc.vcxproj b/jsonrpc/jsonrpc.vcxproj index de1f80346..7ea643878 100644 --- a/jsonrpc/jsonrpc.vcxproj +++ b/jsonrpc/jsonrpc.vcxproj @@ -69,6 +69,8 @@ Disabled WIN32;I2_JSONRPC_BUILD;_DEBUG;_LIB;%(PreprocessorDefinitions) Level3 + true + false Windows @@ -90,6 +92,8 @@ WIN32;I2_JSONRPC_BUILD;NDEBUG;_LIB;%(PreprocessorDefinitions) Speed Level3 + true + false Windows diff --git a/jsonrpc/jsonrpcclient.cpp b/jsonrpc/jsonrpcclient.cpp index c95b049b1..e457b9617 100644 --- a/jsonrpc/jsonrpcclient.cpp +++ b/jsonrpc/jsonrpcclient.cpp @@ -40,6 +40,8 @@ JsonRpcClient::JsonRpcClient(TcpClientRole role, shared_ptr sslContext) */ void JsonRpcClient::SendMessage(const MessagePart& message) { + mutex::scoped_lock lock(GetMutex()); + Netstring::WriteStringToFIFO(GetSendQueue(), message.ToJsonString()); } @@ -53,13 +55,17 @@ void JsonRpcClient::DataAvailableHandler(void) string jsonString; MessagePart message; - if (!Netstring::ReadStringFromFIFO(GetRecvQueue(), &jsonString)) - return; + { + mutex::scoped_lock lock(GetMutex()); + + if (!Netstring::ReadStringFromFIFO(GetRecvQueue(), &jsonString)) + return; + } message = MessagePart(jsonString); OnNewMessage(GetSelf(), message); - } catch (const Exception& ex) { - Application::Log(LogCritical, "jsonrpc", "Exception while processing message from JSON-RPC client: " + string(ex.GetMessage())); + } catch (const std::exception& ex) { + Application::Log(LogCritical, "jsonrpc", "Exception while processing message from JSON-RPC client: " + string(ex.what())); Close(); return; @@ -70,11 +76,14 @@ void JsonRpcClient::DataAvailableHandler(void) /** * Factory function for JSON-RPC clients. * + * @param fd The file descriptor. * @param role The role of the underlying TCP client. * @param sslContext SSL context for the TLS connection. * @returns A new JSON-RPC client. */ -JsonRpcClient::Ptr icinga::JsonRpcClientFactory(TcpClientRole role, shared_ptr sslContext) +JsonRpcClient::Ptr icinga::JsonRpcClientFactory(SOCKET fd, TcpClientRole role, shared_ptr sslContext) { - return boost::make_shared(role, sslContext); + JsonRpcClient::Ptr client = boost::make_shared(role, sslContext); + client->SetFD(fd); + return client; } diff --git a/jsonrpc/jsonrpcclient.h b/jsonrpc/jsonrpcclient.h index 00a16ada1..6df837d2f 100644 --- a/jsonrpc/jsonrpcclient.h +++ b/jsonrpc/jsonrpcclient.h @@ -42,9 +42,11 @@ public: private: void DataAvailableHandler(void); + + friend JsonRpcClient::Ptr JsonRpcClientFactory(SOCKET fd, TcpClientRole role, shared_ptr sslContext); }; -JsonRpcClient::Ptr JsonRpcClientFactory(TcpClientRole role, shared_ptr sslContext); +JsonRpcClient::Ptr JsonRpcClientFactory(SOCKET fd, TcpClientRole role, shared_ptr sslContext); } diff --git a/jsonrpc/jsonrpcserver.cpp b/jsonrpc/jsonrpcserver.cpp index a39d2c594..c5f7e504e 100644 --- a/jsonrpc/jsonrpcserver.cpp +++ b/jsonrpc/jsonrpcserver.cpp @@ -28,5 +28,5 @@ using namespace icinga; */ JsonRpcServer::JsonRpcServer(shared_ptr sslContext) { - SetClientFactory(boost::bind(&JsonRpcClientFactory, RoleInbound, sslContext)); + SetClientFactory(boost::bind(&JsonRpcClientFactory, _1, RoleInbound, sslContext)); } diff --git a/third-party/cJSON/cJSON.vcxproj b/third-party/cJSON/cJSON.vcxproj index 3568a5482..b045931a1 100644 --- a/third-party/cJSON/cJSON.vcxproj +++ b/third-party/cJSON/cJSON.vcxproj @@ -51,6 +51,8 @@ Disabled WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions) Level3 + true + false Windows @@ -67,6 +69,8 @@ WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions) Speed Level3 + true + false Windows diff --git a/third-party/mmatch/mmatch.vcxproj b/third-party/mmatch/mmatch.vcxproj index 632e28d77..b6766c1e9 100644 --- a/third-party/mmatch/mmatch.vcxproj +++ b/third-party/mmatch/mmatch.vcxproj @@ -51,6 +51,8 @@ Disabled WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions) Level3 + true + false Windows @@ -66,6 +68,8 @@ true WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions) Level3 + true + false Windows