mirror of https://github.com/Icinga/icinga2.git
Move most of the socket I/O to a separate thread
fixes #8300 fixes #8243
This commit is contained in:
parent
26c9bcdac1
commit
e0bbfb175c
|
@ -28,7 +28,7 @@ set(base_SOURCES
|
|||
exception.cpp fifo.cpp filelogger.cpp filelogger.thpp initialize.cpp json.cpp json-script.cpp logger.cpp logger.thpp math-script.cpp
|
||||
netstring.cpp networkstream.cpp number.cpp number-script.cpp object.cpp object-script.cpp primitivetype.cpp process.cpp
|
||||
ringbuffer.cpp scriptframe.cpp function.cpp function-script.cpp functionwrapper.cpp scriptglobal.cpp
|
||||
scriptutils.cpp serializer.cpp socket.cpp stacktrace.cpp
|
||||
scriptutils.cpp serializer.cpp socket.cpp socketevents.cpp stacktrace.cpp
|
||||
statsfunction.cpp stdiostream.cpp stream.cpp streamlogger.cpp streamlogger.thpp string.cpp string-script.cpp
|
||||
sysloglogger.cpp sysloglogger.thpp tcpsocket.cpp thinmutex.cpp threadpool.cpp timer.cpp
|
||||
tlsstream.cpp tlsutility.cpp type.cpp unixsocket.cpp utility.cpp value.cpp
|
||||
|
@ -43,7 +43,7 @@ endif()
|
|||
|
||||
add_library(base SHARED ${base_SOURCES})
|
||||
|
||||
target_link_libraries(base ${CMAKE_DL_LIBS} ${Boost_LIBRARIES} ${OPENSSL_LIBRARIES} ${YAJL_LIBRARIES} mmatch)
|
||||
target_link_libraries(base ${CMAKE_DL_LIBS} ${Boost_LIBRARIES} ${OPENSSL_LIBRARIES} ${YAJL_LIBRARIES} mmatch socketpair)
|
||||
|
||||
if(HAVE_LIBEXECINFO)
|
||||
target_link_libraries(base execinfo)
|
||||
|
@ -55,6 +55,9 @@ link_directories(${icinga2_BINARY_DIR}/third-party/execvpe)
|
|||
include_directories(${icinga2_SOURCE_DIR}/third-party/mmatch)
|
||||
link_directories(${icinga2_BINARY_DIR}/third-party/mmatch)
|
||||
|
||||
include_directories(${icinga2_SOURCE_DIR}/third-party/socketpair)
|
||||
link_directories(${icinga2_BINARY_DIR}/third-party/socketpair)
|
||||
|
||||
if(UNIX OR CYGWIN)
|
||||
target_link_libraries(base execvpe)
|
||||
endif()
|
||||
|
|
|
@ -78,6 +78,17 @@ void FIFO::Optimize(void)
|
|||
}
|
||||
}
|
||||
|
||||
size_t FIFO::Peek(void *buffer, size_t count)
|
||||
{
|
||||
if (count > m_DataSize)
|
||||
count = m_DataSize;
|
||||
|
||||
if (buffer != NULL)
|
||||
std::memcpy(buffer, m_Buffer + m_Offset, count);
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements IOQueue::Read.
|
||||
*/
|
||||
|
|
|
@ -41,6 +41,7 @@ public:
|
|||
FIFO(void);
|
||||
~FIFO(void);
|
||||
|
||||
size_t Peek(void *buffer, size_t count);
|
||||
virtual size_t Read(void *buffer, size_t count);
|
||||
virtual void Write(const void *buffer, size_t count);
|
||||
virtual void Close(void);
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include <iostream>
|
||||
#include <boost/exception/errinfo_api_function.hpp>
|
||||
#include <boost/exception/errinfo_errno.hpp>
|
||||
#include <socketpair.h>
|
||||
|
||||
#ifndef _WIN32
|
||||
# include <poll.h>
|
||||
|
@ -401,3 +402,12 @@ void Socket::MakeNonBlocking(void)
|
|||
Utility::SetNonBlocking(GetFD());
|
||||
#endif /* _WIN32 */
|
||||
}
|
||||
|
||||
void Socket::SocketPair(SOCKET s[2])
|
||||
{
|
||||
if (dumb_socketpair(s, 0) < 0)
|
||||
BOOST_THROW_EXCEPTION(socket_error()
|
||||
<< boost::errinfo_api_function("socketpair")
|
||||
<< boost::errinfo_errno(errno));
|
||||
}
|
||||
|
||||
|
|
|
@ -61,6 +61,8 @@ public:
|
|||
|
||||
void MakeNonBlocking(void);
|
||||
|
||||
static void SocketPair(SOCKET s[2]);
|
||||
|
||||
protected:
|
||||
void SetFD(SOCKET fd);
|
||||
|
||||
|
|
|
@ -0,0 +1,205 @@
|
|||
/******************************************************************************
|
||||
* Icinga 2 *
|
||||
* Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org) *
|
||||
* *
|
||||
* This program is free software; you can redistribute it and/or *
|
||||
* modify it under the terms of the GNU General Public License *
|
||||
* as published by the Free Software Foundation; either version 2 *
|
||||
* of the License, or (at your option) any later version. *
|
||||
* *
|
||||
* This program is distributed in the hope that it will be useful, *
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
|
||||
* GNU General Public License for more details. *
|
||||
* *
|
||||
* You should have received a copy of the GNU General Public License *
|
||||
* along with this program; if not, write to the Free Software Foundation *
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
|
||||
******************************************************************************/
|
||||
|
||||
#include "base/socketevents.hpp"
|
||||
#include "base/exception.hpp"
|
||||
#include "base/logger.hpp"
|
||||
#include <boost/thread/once.hpp>
|
||||
#include <boost/foreach.hpp>
|
||||
#include <map>
|
||||
|
||||
#ifndef _WIN32
|
||||
# include <poll.h>
|
||||
#endif /* _WIN32 */
|
||||
|
||||
using namespace icinga;
|
||||
|
||||
struct SocketEventDescriptor
|
||||
{
|
||||
int Events;
|
||||
SocketEvents *EventInterface;
|
||||
|
||||
SocketEventDescriptor(void)
|
||||
: Events(0)
|
||||
{ }
|
||||
};
|
||||
|
||||
static boost::once_flag l_SocketIOOnceFlag = BOOST_ONCE_INIT;
|
||||
static SOCKET l_SocketIOEventFDs[2];
|
||||
static boost::mutex l_SocketIOMutex;
|
||||
static std::map<SOCKET, SocketEventDescriptor> l_SocketIOSockets;
|
||||
|
||||
void SocketEvents::InitializeThread(void)
|
||||
{
|
||||
Socket::SocketPair(l_SocketIOEventFDs);
|
||||
|
||||
Utility::SetNonBlockingSocket(l_SocketIOEventFDs[0]);
|
||||
Utility::SetNonBlockingSocket(l_SocketIOEventFDs[1]);
|
||||
|
||||
#ifndef _WIN32
|
||||
Utility::SetCloExec(l_SocketIOEventFDs[0]);
|
||||
Utility::SetCloExec(l_SocketIOEventFDs[1]);
|
||||
#endif /* _WIN32 */
|
||||
|
||||
SocketEventDescriptor sed;
|
||||
sed.Events = POLLIN;
|
||||
|
||||
l_SocketIOSockets[l_SocketIOEventFDs[0]] = sed;
|
||||
|
||||
boost::thread thread(&SocketEvents::ThreadProc);
|
||||
thread.detach();
|
||||
}
|
||||
|
||||
void SocketEvents::ThreadProc(void)
|
||||
{
|
||||
Utility::SetThreadName("SocketIO");
|
||||
|
||||
for (;;) {
|
||||
pollfd *pfds;
|
||||
int pfdcount;
|
||||
|
||||
typedef std::map<SOCKET, SocketEventDescriptor>::value_type SocketDesc;
|
||||
|
||||
{
|
||||
boost::mutex::scoped_lock lock(l_SocketIOMutex);
|
||||
|
||||
pfdcount = l_SocketIOSockets.size();
|
||||
pfds = new pollfd[pfdcount];
|
||||
|
||||
int i = 0;
|
||||
|
||||
BOOST_FOREACH(const SocketDesc& desc, l_SocketIOSockets) {
|
||||
pfds[i].fd = desc.first;
|
||||
pfds[i].events = desc.second.Events;
|
||||
pfds[i].revents = 0;
|
||||
|
||||
i++;
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef _WIN32
|
||||
(void) WSAPoll(pfds, pfdcount, -1);
|
||||
#else /* _WIN32 */
|
||||
(void) poll(pfds, pfdcount, -1);
|
||||
#endif /* _WIN32 */
|
||||
|
||||
for (int i = 0; i < pfdcount; i++) {
|
||||
if ((pfds[i].revents & (POLLIN | POLLOUT | POLLHUP | POLLERR)) == 0)
|
||||
continue;
|
||||
|
||||
if (pfds[i].fd == l_SocketIOEventFDs[0]) {
|
||||
char buffer[512];
|
||||
if (recv(l_SocketIOEventFDs[0], buffer, sizeof(buffer), 0) < 0)
|
||||
Log(LogCritical, "SocketEvents", "Read from event FD failed.");
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
SocketEventDescriptor desc;
|
||||
Object::Ptr ltref;
|
||||
|
||||
{
|
||||
boost::mutex::scoped_lock lock(l_SocketIOMutex);
|
||||
|
||||
std::map<SOCKET, SocketEventDescriptor>::const_iterator it = l_SocketIOSockets.find(pfds[i].fd);
|
||||
|
||||
if (it == l_SocketIOSockets.end())
|
||||
continue;
|
||||
|
||||
desc = it->second;
|
||||
|
||||
/* We must hold a ref-counted reference to the event object to keep it alive. */
|
||||
ltref = dynamic_cast<Object *>(desc.EventInterface);
|
||||
}
|
||||
|
||||
desc.EventInterface->OnEvent(pfds[i].revents);
|
||||
}
|
||||
|
||||
delete [] pfds;
|
||||
}
|
||||
}
|
||||
|
||||
void SocketEvents::WakeUpThread(void)
|
||||
{
|
||||
(void) send(l_SocketIOEventFDs[1], "T", 1, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor for the SocketEvents class.
|
||||
*/
|
||||
SocketEvents::SocketEvents(const Socket::Ptr& socket)
|
||||
: m_FD(socket->GetFD())
|
||||
{
|
||||
boost::call_once(l_SocketIOOnceFlag, &SocketEvents::InitializeThread);
|
||||
|
||||
Register();
|
||||
}
|
||||
|
||||
SocketEvents::~SocketEvents(void)
|
||||
{
|
||||
Unregister();
|
||||
}
|
||||
|
||||
void SocketEvents::Register(void)
|
||||
{
|
||||
SocketEventDescriptor desc;
|
||||
desc.Events = 0;
|
||||
desc.EventInterface = this;
|
||||
|
||||
{
|
||||
boost::mutex::scoped_lock lock(l_SocketIOMutex);
|
||||
|
||||
l_SocketIOSockets[m_FD] = desc;
|
||||
}
|
||||
|
||||
/* There's no need to wake up the I/O thread here. */
|
||||
}
|
||||
|
||||
void SocketEvents::Unregister(void)
|
||||
{
|
||||
{
|
||||
boost::mutex::scoped_lock lock(l_SocketIOMutex);
|
||||
|
||||
l_SocketIOSockets.erase(m_FD);
|
||||
}
|
||||
|
||||
/* There's no need to wake up the I/O thread here. */
|
||||
}
|
||||
|
||||
void SocketEvents::ChangeEvents(int events)
|
||||
{
|
||||
{
|
||||
boost::mutex::scoped_lock lock(l_SocketIOMutex);
|
||||
|
||||
std::map<SOCKET, SocketEventDescriptor>::iterator it = l_SocketIOSockets.find(m_FD);
|
||||
|
||||
if (it == l_SocketIOSockets.end())
|
||||
return;
|
||||
|
||||
it->second.Events = events;
|
||||
}
|
||||
|
||||
WakeUpThread();
|
||||
}
|
||||
|
||||
void SocketEvents::OnEvent(int revents)
|
||||
{
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,62 @@
|
|||
/******************************************************************************
|
||||
* Icinga 2 *
|
||||
* Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org) *
|
||||
* *
|
||||
* This program is free software; you can redistribute it and/or *
|
||||
* modify it under the terms of the GNU General Public License *
|
||||
* as published by the Free Software Foundation; either version 2 *
|
||||
* of the License, or (at your option) any later version. *
|
||||
* *
|
||||
* This program is distributed in the hope that it will be useful, *
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
|
||||
* GNU General Public License for more details. *
|
||||
* *
|
||||
* You should have received a copy of the GNU General Public License *
|
||||
* along with this program; if not, write to the Free Software Foundation *
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
|
||||
******************************************************************************/
|
||||
|
||||
#ifndef SOCKETEVENTS_H
|
||||
#define SOCKETEVENTS_H
|
||||
|
||||
#include "base/i2-base.hpp"
|
||||
#include "base/socket.hpp"
|
||||
|
||||
namespace icinga
|
||||
{
|
||||
|
||||
/**
|
||||
* Socket event interface
|
||||
*
|
||||
* @ingroup base
|
||||
*/
|
||||
class I2_BASE_API SocketEvents
|
||||
{
|
||||
public:
|
||||
~SocketEvents(void);
|
||||
|
||||
virtual void OnEvent(int revents);
|
||||
|
||||
void Register(void);
|
||||
void Unregister(void);
|
||||
|
||||
void ChangeEvents(int events);
|
||||
|
||||
protected:
|
||||
SocketEvents(const Socket::Ptr& socket);
|
||||
|
||||
private:
|
||||
SOCKET m_FD;
|
||||
|
||||
static void InitializeThread(void);
|
||||
static void ThreadProc(void);
|
||||
|
||||
static void WakeUpThread(void);
|
||||
|
||||
int GetPollEvents(void) const;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif /* SOCKETEVENTS_H */
|
|
@ -24,6 +24,10 @@
|
|||
#include <boost/bind.hpp>
|
||||
#include <iostream>
|
||||
|
||||
#ifndef _WIN32
|
||||
# include <poll.h>
|
||||
#endif /* _WIN32 */
|
||||
|
||||
using namespace icinga;
|
||||
|
||||
int I2_EXPORT TlsStream::m_SSLIndex;
|
||||
|
@ -36,7 +40,9 @@ bool I2_EXPORT TlsStream::m_SSLIndexInitialized = false;
|
|||
* @param sslContext The SSL context for the client.
|
||||
*/
|
||||
TlsStream::TlsStream(const Socket::Ptr& socket, ConnectionRole role, const boost::shared_ptr<SSL_CTX>& sslContext)
|
||||
: m_Eof(false), m_VerifyOK(true), m_Socket(socket), m_Role(role)
|
||||
: SocketEvents(socket), m_Eof(false), m_HandshakeOK(false), m_VerifyOK(true), m_CloseOK(false), m_ErrorCode(0),
|
||||
m_ErrorOccurred(false), m_Socket(socket), m_Role(role), m_SendQ(new FIFO()), m_RecvQ(new FIFO()),
|
||||
m_CurrentAction(TlsActionNone), m_Retry(false)
|
||||
{
|
||||
std::ostringstream msgbuf;
|
||||
char errbuf[120];
|
||||
|
@ -92,7 +98,7 @@ bool TlsStream::IsVerifyOK(void) const
|
|||
*/
|
||||
boost::shared_ptr<X509> TlsStream::GetClientCertificate(void) const
|
||||
{
|
||||
boost::mutex::scoped_lock lock(m_SSLLock);
|
||||
boost::mutex::scoped_lock lock(m_Mutex);
|
||||
return boost::shared_ptr<X509>(SSL_get_certificate(m_SSL.get()), &Utility::NullDeleter);
|
||||
}
|
||||
|
||||
|
@ -103,53 +109,143 @@ boost::shared_ptr<X509> TlsStream::GetClientCertificate(void) const
|
|||
*/
|
||||
boost::shared_ptr<X509> TlsStream::GetPeerCertificate(void) const
|
||||
{
|
||||
boost::mutex::scoped_lock lock(m_SSLLock);
|
||||
boost::mutex::scoped_lock lock(m_Mutex);
|
||||
return boost::shared_ptr<X509>(SSL_get_peer_certificate(m_SSL.get()), X509_free);
|
||||
}
|
||||
|
||||
void TlsStream::OnEvent(int revents)
|
||||
{
|
||||
int rc, err;
|
||||
size_t count;
|
||||
|
||||
boost::mutex::scoped_lock lock(m_Mutex);
|
||||
|
||||
char buffer[512];
|
||||
|
||||
if (m_CurrentAction == TlsActionNone) {
|
||||
if (m_SendQ->GetAvailableBytes() > 0)
|
||||
m_CurrentAction = TlsActionWrite;
|
||||
else
|
||||
m_CurrentAction = TlsActionRead;
|
||||
}
|
||||
|
||||
switch (m_CurrentAction) {
|
||||
case TlsActionRead:
|
||||
do {
|
||||
rc = SSL_read(m_SSL.get(), buffer, sizeof(buffer));
|
||||
|
||||
if (rc > 0) {
|
||||
m_RecvQ->Write(buffer, rc);
|
||||
m_CV.notify_all();
|
||||
}
|
||||
} while (SSL_pending(m_SSL.get()));
|
||||
|
||||
break;
|
||||
case TlsActionWrite:
|
||||
count = m_SendQ->Peek(buffer, sizeof(buffer));
|
||||
|
||||
rc = SSL_write(m_SSL.get(), buffer, count);
|
||||
|
||||
if (rc > 0)
|
||||
m_SendQ->Read(NULL, rc);
|
||||
|
||||
break;
|
||||
case TlsActionHandshake:
|
||||
rc = SSL_do_handshake(m_SSL.get());
|
||||
|
||||
if (rc > 0) {
|
||||
m_HandshakeOK = true;
|
||||
m_CV.notify_all();
|
||||
}
|
||||
|
||||
break;
|
||||
case TlsActionClose:
|
||||
rc = SSL_shutdown(m_SSL.get());
|
||||
|
||||
if (rc > 0) {
|
||||
m_CloseOK = true;
|
||||
m_CV.notify_all();
|
||||
}
|
||||
|
||||
break;
|
||||
default:
|
||||
VERIFY(!"Invalid TlsAction");
|
||||
}
|
||||
|
||||
if (rc > 0) {
|
||||
if (m_SendQ->GetAvailableBytes() > 0) {
|
||||
m_CurrentAction = TlsActionWrite;
|
||||
ChangeEvents(POLLOUT);
|
||||
} else {
|
||||
m_CurrentAction = TlsActionNone;
|
||||
ChangeEvents(POLLIN);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
err = SSL_get_error(m_SSL.get(), rc);
|
||||
|
||||
std::ostringstream msgbuf;
|
||||
char errbuf[120];
|
||||
|
||||
switch (err) {
|
||||
case SSL_ERROR_WANT_READ:
|
||||
m_Retry = true;
|
||||
ChangeEvents(POLLIN);
|
||||
|
||||
break;
|
||||
case SSL_ERROR_WANT_WRITE:
|
||||
m_Retry = true;
|
||||
ChangeEvents(POLLOUT);
|
||||
|
||||
break;
|
||||
case SSL_ERROR_ZERO_RETURN:
|
||||
Unregister();
|
||||
|
||||
m_SSL.reset();
|
||||
m_Socket->Close();
|
||||
|
||||
m_Eof = true;
|
||||
|
||||
m_CV.notify_all();
|
||||
|
||||
break;
|
||||
default:
|
||||
Unregister();
|
||||
|
||||
m_SSL.reset();
|
||||
m_Socket->Close();
|
||||
|
||||
m_ErrorCode = ERR_peek_error();
|
||||
m_ErrorOccurred = true;
|
||||
|
||||
m_CV.notify_all();
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void TlsStream::HandleError(void) const
|
||||
{
|
||||
if (m_ErrorOccurred) {
|
||||
BOOST_THROW_EXCEPTION(openssl_error()
|
||||
<< boost::errinfo_api_function("TlsStream::OnEvent")
|
||||
<< errinfo_openssl_error(m_ErrorCode));
|
||||
}
|
||||
}
|
||||
|
||||
void TlsStream::Handshake(void)
|
||||
{
|
||||
std::ostringstream msgbuf;
|
||||
char errbuf[120];
|
||||
boost::mutex::scoped_lock lock(m_Mutex);
|
||||
|
||||
boost::mutex::scoped_lock alock(m_IOActionLock);
|
||||
m_CurrentAction = TlsActionHandshake;
|
||||
ChangeEvents(POLLOUT);
|
||||
|
||||
for (;;) {
|
||||
int rc, err;
|
||||
while (!m_HandshakeOK && !m_ErrorOccurred)
|
||||
m_CV.wait(lock);
|
||||
|
||||
{
|
||||
boost::mutex::scoped_lock lock(m_SSLLock);
|
||||
rc = SSL_do_handshake(m_SSL.get());
|
||||
|
||||
if (rc > 0)
|
||||
break;
|
||||
|
||||
err = SSL_get_error(m_SSL.get(), rc);
|
||||
}
|
||||
|
||||
switch (err) {
|
||||
case SSL_ERROR_WANT_READ:
|
||||
try {
|
||||
m_Socket->Poll(true, false);
|
||||
} catch (const std::exception&) {}
|
||||
continue;
|
||||
case SSL_ERROR_WANT_WRITE:
|
||||
try {
|
||||
m_Socket->Poll(false, true);
|
||||
} catch (const std::exception&) {}
|
||||
continue;
|
||||
case SSL_ERROR_ZERO_RETURN:
|
||||
CloseUnlocked();
|
||||
return;
|
||||
default:
|
||||
msgbuf << "SSL_do_handshake() failed with code " << ERR_peek_error() << ", \"" << ERR_error_string(ERR_peek_error(), errbuf) << "\"";
|
||||
Log(LogCritical, "TlsStream", msgbuf.str());
|
||||
|
||||
BOOST_THROW_EXCEPTION(openssl_error()
|
||||
<< boost::errinfo_api_function("SSL_do_handshake")
|
||||
<< errinfo_openssl_error(ERR_peek_error()));
|
||||
}
|
||||
}
|
||||
HandleError();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -157,116 +253,23 @@ void TlsStream::Handshake(void)
|
|||
*/
|
||||
size_t TlsStream::Read(void *buffer, size_t count)
|
||||
{
|
||||
size_t left = count;
|
||||
std::ostringstream msgbuf;
|
||||
char errbuf[120];
|
||||
boost::mutex::scoped_lock lock(m_Mutex);
|
||||
|
||||
bool want_read;
|
||||
while (m_RecvQ->GetAvailableBytes() < count && !m_ErrorOccurred && !m_Eof)
|
||||
m_CV.wait(lock);
|
||||
|
||||
{
|
||||
boost::mutex::scoped_lock lock(m_SSLLock);
|
||||
want_read = !SSL_pending(m_SSL.get()) || SSL_want_read(m_SSL.get());
|
||||
}
|
||||
HandleError();
|
||||
|
||||
if (want_read)
|
||||
m_Socket->Poll(true, false);
|
||||
|
||||
boost::mutex::scoped_lock alock(m_IOActionLock);
|
||||
|
||||
while (left > 0) {
|
||||
int rc, err;
|
||||
|
||||
{
|
||||
boost::mutex::scoped_lock lock(m_SSLLock);
|
||||
rc = SSL_read(m_SSL.get(), ((char *)buffer) + (count - left), left);
|
||||
|
||||
if (rc <= 0)
|
||||
err = SSL_get_error(m_SSL.get(), rc);
|
||||
}
|
||||
|
||||
if (rc <= 0) {
|
||||
switch (err) {
|
||||
case SSL_ERROR_WANT_READ:
|
||||
try {
|
||||
m_Socket->Poll(true, false);
|
||||
} catch (const std::exception&) {}
|
||||
continue;
|
||||
case SSL_ERROR_WANT_WRITE:
|
||||
try {
|
||||
m_Socket->Poll(false, true);
|
||||
} catch (const std::exception&) {}
|
||||
continue;
|
||||
case SSL_ERROR_ZERO_RETURN:
|
||||
CloseUnlocked();
|
||||
return count - left;
|
||||
default:
|
||||
if (ERR_peek_error() != 0) {
|
||||
msgbuf << "SSL_read() failed with code " << ERR_peek_error() << ", \"" << ERR_error_string(ERR_peek_error(), errbuf) << "\"";
|
||||
Log(LogCritical, "TlsStream", msgbuf.str());
|
||||
}
|
||||
|
||||
BOOST_THROW_EXCEPTION(openssl_error()
|
||||
<< boost::errinfo_api_function("SSL_read")
|
||||
<< errinfo_openssl_error(ERR_peek_error()));
|
||||
}
|
||||
}
|
||||
|
||||
left -= rc;
|
||||
}
|
||||
|
||||
return count;
|
||||
return m_RecvQ->Read(buffer, count);
|
||||
}
|
||||
|
||||
void TlsStream::Write(const void *buffer, size_t count)
|
||||
{
|
||||
size_t left = count;
|
||||
std::ostringstream msgbuf;
|
||||
char errbuf[120];
|
||||
boost::mutex::scoped_lock lock(m_Mutex);
|
||||
|
||||
m_Socket->Poll(false, true);
|
||||
m_SendQ->Write(buffer, count);
|
||||
|
||||
boost::mutex::scoped_lock alock(m_IOActionLock);
|
||||
|
||||
while (left > 0) {
|
||||
int rc, err;
|
||||
|
||||
{
|
||||
boost::mutex::scoped_lock lock(m_SSLLock);
|
||||
rc = SSL_write(m_SSL.get(), ((const char *)buffer) + (count - left), left);
|
||||
|
||||
if (rc <= 0)
|
||||
err = SSL_get_error(m_SSL.get(), rc);
|
||||
}
|
||||
|
||||
if (rc <= 0) {
|
||||
switch (err) {
|
||||
case SSL_ERROR_WANT_READ:
|
||||
try {
|
||||
m_Socket->Poll(true, false);
|
||||
} catch (const std::exception&) {}
|
||||
continue;
|
||||
case SSL_ERROR_WANT_WRITE:
|
||||
try {
|
||||
m_Socket->Poll(false, true);
|
||||
} catch (const std::exception&) {}
|
||||
continue;
|
||||
case SSL_ERROR_ZERO_RETURN:
|
||||
CloseUnlocked();
|
||||
return;
|
||||
default:
|
||||
if (ERR_peek_error() != 0) {
|
||||
msgbuf << "SSL_write() failed with code " << ERR_peek_error() << ", \"" << ERR_error_string(ERR_peek_error(), errbuf) << "\"";
|
||||
Log(LogCritical, "TlsStream", msgbuf.str());
|
||||
}
|
||||
|
||||
BOOST_THROW_EXCEPTION(openssl_error()
|
||||
<< boost::errinfo_api_function("SSL_write")
|
||||
<< errinfo_openssl_error(ERR_peek_error()));
|
||||
}
|
||||
}
|
||||
|
||||
left -= rc;
|
||||
}
|
||||
ChangeEvents(POLLOUT);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -274,49 +277,14 @@ void TlsStream::Write(const void *buffer, size_t count)
|
|||
*/
|
||||
void TlsStream::Close(void)
|
||||
{
|
||||
boost::mutex::scoped_lock alock(m_IOActionLock);
|
||||
boost::mutex::scoped_lock lock(m_Mutex);
|
||||
m_CurrentAction = TlsActionClose;
|
||||
ChangeEvents(POLLOUT);
|
||||
|
||||
CloseUnlocked();
|
||||
}
|
||||
while (!m_CloseOK && !m_ErrorOccurred)
|
||||
m_CV.wait(lock);
|
||||
|
||||
void TlsStream::CloseUnlocked(void)
|
||||
{
|
||||
m_Eof = true;
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
int rc, err;
|
||||
|
||||
{
|
||||
boost::mutex::scoped_lock lock(m_SSLLock);
|
||||
rc = SSL_shutdown(m_SSL.get());
|
||||
|
||||
if (rc == 0)
|
||||
continue;
|
||||
|
||||
if (rc > 0)
|
||||
break;
|
||||
|
||||
err = SSL_get_error(m_SSL.get(), rc);
|
||||
}
|
||||
|
||||
switch (err) {
|
||||
case SSL_ERROR_WANT_READ:
|
||||
try {
|
||||
m_Socket->Poll(true, false);
|
||||
} catch (const std::exception&) {}
|
||||
continue;
|
||||
case SSL_ERROR_WANT_WRITE:
|
||||
try {
|
||||
m_Socket->Poll(false, true);
|
||||
} catch (const std::exception&) {}
|
||||
continue;
|
||||
default:
|
||||
goto close_socket;
|
||||
}
|
||||
}
|
||||
|
||||
close_socket:
|
||||
m_Socket->Close();
|
||||
HandleError();
|
||||
}
|
||||
|
||||
bool TlsStream::IsEof(void) const
|
||||
|
|
|
@ -22,18 +22,29 @@
|
|||
|
||||
#include "base/i2-base.hpp"
|
||||
#include "base/socket.hpp"
|
||||
#include "base/socketevents.hpp"
|
||||
#include "base/stream.hpp"
|
||||
#include "base/tlsutility.hpp"
|
||||
#include "base/fifo.hpp"
|
||||
|
||||
namespace icinga
|
||||
{
|
||||
|
||||
enum TlsAction
|
||||
{
|
||||
TlsActionNone,
|
||||
TlsActionRead,
|
||||
TlsActionWrite,
|
||||
TlsActionHandshake,
|
||||
TlsActionClose
|
||||
};
|
||||
|
||||
/**
|
||||
* A TLS stream.
|
||||
*
|
||||
* @ingroup base
|
||||
*/
|
||||
class I2_BASE_API TlsStream : public Stream
|
||||
class I2_BASE_API TlsStream : public Stream, private SocketEvents
|
||||
{
|
||||
public:
|
||||
DECLARE_PTR_TYPEDEFS(TlsStream);
|
||||
|
@ -57,17 +68,29 @@ public:
|
|||
private:
|
||||
boost::shared_ptr<SSL> m_SSL;
|
||||
bool m_Eof;
|
||||
mutable boost::mutex m_SSLLock;
|
||||
mutable boost::mutex m_IOActionLock;
|
||||
mutable boost::mutex m_Mutex;
|
||||
mutable boost::condition_variable m_CV;
|
||||
bool m_HandshakeOK;
|
||||
bool m_VerifyOK;
|
||||
bool m_CloseOK;
|
||||
int m_ErrorCode;
|
||||
bool m_ErrorOccurred;
|
||||
|
||||
Socket::Ptr m_Socket;
|
||||
ConnectionRole m_Role;
|
||||
|
||||
FIFO::Ptr m_SendQ;
|
||||
FIFO::Ptr m_RecvQ;
|
||||
|
||||
TlsAction m_CurrentAction;
|
||||
bool m_Retry;
|
||||
|
||||
static int m_SSLIndex;
|
||||
static bool m_SSLIndexInitialized;
|
||||
|
||||
void CloseUnlocked(void);
|
||||
virtual void OnEvent(int revents);
|
||||
|
||||
void HandleError(void) const;
|
||||
|
||||
static int ValidateCertificate(int preverify_ok, X509_STORE_CTX *ctx);
|
||||
static void NullCertificateDeleter(X509 *certificate);
|
||||
|
|
|
@ -164,6 +164,11 @@ int PkiUtility::SaveCert(const String& host, const String& port, const String& k
|
|||
|
||||
boost::shared_ptr<X509> cert = stream->GetPeerCertificate();
|
||||
|
||||
if (!cert) {
|
||||
Log(LogCritical, "cli", "Peer did not present a valid certificate.");
|
||||
return 1;
|
||||
}
|
||||
|
||||
std::ofstream fpcert;
|
||||
fpcert.open(trustedfile.CStr());
|
||||
fpcert << CertificateToString(cert);
|
||||
|
|
|
@ -123,7 +123,11 @@ void ApiClient::DisconnectSync(void)
|
|||
listener->RemoveAnonymousClient(this);
|
||||
}
|
||||
|
||||
m_Stream->Close();
|
||||
try {
|
||||
m_Stream->Close();
|
||||
} catch (const std::exception&) {
|
||||
/* Ignore the exception. */
|
||||
}
|
||||
}
|
||||
|
||||
bool ApiClient::ProcessMessage(void)
|
||||
|
|
|
@ -24,3 +24,5 @@ endif()
|
|||
if(UNIX OR CYGWIN)
|
||||
add_subdirectory(execvpe)
|
||||
endif()
|
||||
|
||||
add_subdirectory(socketpair)
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
# Icinga 2
|
||||
# Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org)
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or
|
||||
# modify it under the terms of the GNU General Public License
|
||||
# as published by the Free Software Foundation; either version 2
|
||||
# of the License, or (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, write to the Free Software Foundation
|
||||
# Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
|
||||
add_library(socketpair SHARED socketpair.c socketpair.h)
|
||||
|
||||
set_target_properties (
|
||||
socketpair PROPERTIES
|
||||
DEFINE_SYMBOL I2_SOCKETPAIR_BUILD
|
||||
)
|
||||
|
||||
if(WIN32)
|
||||
target_link_libraries(socketpair ws2_32)
|
||||
endif()
|
||||
|
||||
install(
|
||||
TARGETS socketpair
|
||||
RUNTIME DESTINATION ${CMAKE_INSTALL_SBINDIR}
|
||||
LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}/icinga2
|
||||
)
|
||||
|
|
@ -0,0 +1,154 @@
|
|||
/* socketpair.c
|
||||
Copyright 2007, 2010 by Nathan C. Myers <ncm@cantrip.org>
|
||||
Redistribution and use in source and binary forms, with or without modification,
|
||||
are permitted provided that the following conditions are met:
|
||||
|
||||
Redistributions of source code must retain the above copyright notice, this
|
||||
list of conditions and the following disclaimer.
|
||||
|
||||
Redistributions in binary form must reproduce the above copyright notice,
|
||||
this list of conditions and the following disclaimer in the documentation
|
||||
and/or other materials provided with the distribution.
|
||||
|
||||
The name of the author must not be used to endorse or promote products
|
||||
derived from this software without specific prior written permission.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
||||
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
/* Changes:
|
||||
* 2014-02-12: merge David Woodhouse, Ger Hobbelt improvements
|
||||
* git.infradead.org/users/dwmw2/openconnect.git/commitdiff/bdeefa54
|
||||
* github.com/GerHobbelt/selectable-socketpair
|
||||
* always init the socks[] to -1/INVALID_SOCKET on error, both on Win32/64
|
||||
* and UNIX/other platforms
|
||||
* 2013-07-18: Change to BSD 3-clause license
|
||||
* 2010-03-31:
|
||||
* set addr to 127.0.0.1 because win32 getsockname does not always set it.
|
||||
* 2010-02-25:
|
||||
* set SO_REUSEADDR option to avoid leaking some windows resource.
|
||||
* Windows System Error 10049, "Event ID 4226 TCP/IP has reached
|
||||
* the security limit imposed on the number of concurrent TCP connect
|
||||
* attempts." Bleah.
|
||||
* 2007-04-25:
|
||||
* preserve value of WSAGetLastError() on all error returns.
|
||||
* 2007-04-22: (Thanks to Matthew Gregan <kinetik@flim.org>)
|
||||
* s/EINVAL/WSAEINVAL/ fix trivial compile failure
|
||||
* s/socket/WSASocket/ enable creation of sockets suitable as stdin/stdout
|
||||
* of a child process.
|
||||
* add argument make_overlapped
|
||||
*/
|
||||
|
||||
#include <string.h>
|
||||
|
||||
#ifdef WIN32
|
||||
# include <ws2tcpip.h> /* socklen_t, et al (MSVC20xx) */
|
||||
# include <windows.h>
|
||||
# include <io.h>
|
||||
#else
|
||||
# include <sys/types.h>
|
||||
# include <sys/socket.h>
|
||||
# include <errno.h>
|
||||
#endif
|
||||
|
||||
#include "socketpair.h"
|
||||
|
||||
#ifdef WIN32
|
||||
|
||||
/* dumb_socketpair:
|
||||
* If make_overlapped is nonzero, both sockets created will be usable for
|
||||
* "overlapped" operations via WSASend etc. If make_overlapped is zero,
|
||||
* socks[0] (only) will be usable with regular ReadFile etc., and thus
|
||||
* suitable for use as stdin or stdout of a child process. Note that the
|
||||
* sockets must be closed with closesocket() regardless.
|
||||
*/
|
||||
|
||||
int dumb_socketpair(SOCKET socks[2], int make_overlapped)
|
||||
{
|
||||
union {
|
||||
struct sockaddr_in inaddr;
|
||||
struct sockaddr addr;
|
||||
} a;
|
||||
SOCKET listener;
|
||||
int e;
|
||||
socklen_t addrlen = sizeof(a.inaddr);
|
||||
DWORD flags = (make_overlapped ? WSA_FLAG_OVERLAPPED : 0);
|
||||
int reuse = 1;
|
||||
|
||||
if (socks == 0) {
|
||||
WSASetLastError(WSAEINVAL);
|
||||
return SOCKET_ERROR;
|
||||
}
|
||||
socks[0] = socks[1] = -1;
|
||||
|
||||
listener = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
|
||||
if (listener == -1)
|
||||
return SOCKET_ERROR;
|
||||
|
||||
memset(&a, 0, sizeof(a));
|
||||
a.inaddr.sin_family = AF_INET;
|
||||
a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
|
||||
a.inaddr.sin_port = 0;
|
||||
|
||||
for (;;) {
|
||||
if (setsockopt(listener, SOL_SOCKET, SO_REUSEADDR,
|
||||
(char*) &reuse, (socklen_t) sizeof(reuse)) == -1)
|
||||
break;
|
||||
if (bind(listener, &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR)
|
||||
break;
|
||||
|
||||
memset(&a, 0, sizeof(a));
|
||||
if (getsockname(listener, &a.addr, &addrlen) == SOCKET_ERROR)
|
||||
break;
|
||||
// win32 getsockname may only set the port number, p=0.0005.
|
||||
// ( http://msdn.microsoft.com/library/ms738543.aspx ):
|
||||
a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
|
||||
a.inaddr.sin_family = AF_INET;
|
||||
|
||||
if (listen(listener, 1) == SOCKET_ERROR)
|
||||
break;
|
||||
|
||||
socks[0] = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, flags);
|
||||
if (socks[0] == -1)
|
||||
break;
|
||||
if (connect(socks[0], &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR)
|
||||
break;
|
||||
|
||||
socks[1] = accept(listener, NULL, NULL);
|
||||
if (socks[1] == -1)
|
||||
break;
|
||||
|
||||
closesocket(listener);
|
||||
return 0;
|
||||
}
|
||||
|
||||
e = WSAGetLastError();
|
||||
closesocket(listener);
|
||||
closesocket(socks[0]);
|
||||
closesocket(socks[1]);
|
||||
WSASetLastError(e);
|
||||
socks[0] = socks[1] = -1;
|
||||
return SOCKET_ERROR;
|
||||
}
|
||||
#else
|
||||
int dumb_socketpair(int socks[2], int dummy)
|
||||
{
|
||||
if (socks == 0) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
dummy = socketpair(AF_LOCAL, SOCK_STREAM, 0, socks);
|
||||
if (dummy)
|
||||
socks[0] = socks[1] = -1;
|
||||
return dummy;
|
||||
}
|
||||
#endif
|
|
@ -0,0 +1,37 @@
|
|||
/* socketpair.h
|
||||
* Copyright 2007 by Nathan C. Myers <ncm@cantrip.org>; some rights reserved.
|
||||
* This code is Free Software. It may be copied freely, in original or
|
||||
* modified form, subject only to the restrictions that (1) the author is
|
||||
* relieved from all responsibilities for any use for any purpose, and (2)
|
||||
* this copyright notice must be retained, unchanged, in its entirety. If
|
||||
* for any reason the author might be held responsible for any consequences
|
||||
* of copying or use, license is withheld.
|
||||
*/
|
||||
|
||||
#ifndef SOCKETPAIR_H
|
||||
#define SOCKETPAIR_H
|
||||
|
||||
#include "base/visibility.hpp"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif /* __cplusplus */
|
||||
|
||||
#ifdef I2_SOCKETPAIR_BUILD
|
||||
# define I2_SOCKETPAIR_API I2_EXPORT
|
||||
#else
|
||||
# define I2_SOCKETPAIR_API I2_IMPORT
|
||||
#endif /* I2_SOCKETPAIR_BUILD */
|
||||
|
||||
#ifdef _WIN32
|
||||
I2_SOCKETPAIR_API int dumb_socketpair(SOCKET socks[2], int make_overlapped);
|
||||
#else /* _WIN32 */
|
||||
I2_SOCKETPAIR_API int dumb_socketpair(int socks[2], int dummy);
|
||||
#endif
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif /* __cplusplus */
|
||||
|
||||
#endif /* SOCKETPAIR_H */
|
||||
|
Loading…
Reference in New Issue