Fix file descriptors not getting closed properly

Refs #4865
This commit is contained in:
Gunnar Beutner 2014-04-12 08:16:57 +02:00
parent 2961364e97
commit 06fdaeb2b2
6 changed files with 4 additions and 340 deletions

View File

@ -32,7 +32,6 @@
#include "base/application.h" #include "base/application.h"
#include "base/stream.h" #include "base/stream.h"
#include "base/networkstream.h" #include "base/networkstream.h"
#include "base/bufferedstream.h"
#include "base/exception.h" #include "base/exception.h"
#include "base/statsfunction.h" #include "base/statsfunction.h"
#include <boost/algorithm/string.hpp> #include <boost/algorithm/string.hpp>
@ -90,8 +89,7 @@ void GraphiteWriter::ReconnectTimerHandler(void)
Log(LogDebug, "perfdata", "GraphiteWriter: Reconnect to tcp socket on host '" + GetHost() + "' port '" + GetPort() + "'."); Log(LogDebug, "perfdata", "GraphiteWriter: Reconnect to tcp socket on host '" + GetHost() + "' port '" + GetPort() + "'.");
socket->Connect(GetHost(), GetPort()); socket->Connect(GetHost(), GetPort());
NetworkStream::Ptr net_stream = make_shared<NetworkStream>(socket); m_Stream = make_shared<NetworkStream>(socket);
m_Stream = make_shared<BufferedStream>(net_stream);
} }
void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)

View File

@ -23,7 +23,7 @@ mkclass_target(streamlogger.ti streamlogger.th)
mkclass_target(sysloglogger.ti sysloglogger.th) mkclass_target(sysloglogger.ti sysloglogger.th)
add_library(base SHARED add_library(base SHARED
application.cpp application.th array.cpp bufferedstream.cpp context.cpp application.cpp application.th array.cpp context.cpp
convert.cpp dictionary.cpp dynamicobject.cpp dynamicobject.th dynamictype.cpp convert.cpp dictionary.cpp dynamicobject.cpp dynamicobject.th dynamictype.cpp
exception.cpp fifo.cpp filelogger.cpp filelogger.th logger.cpp logger.th exception.cpp fifo.cpp filelogger.cpp filelogger.th logger.cpp logger.th
netstring.cpp networkstream.cpp object.cpp objectlock.cpp process.cpp netstring.cpp networkstream.cpp object.cpp objectlock.cpp process.cpp

View File

@ -1,223 +0,0 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012-2014 Icinga Development Team (http://www.icinga.org) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "base/bufferedstream.h"
#include "base/objectlock.h"
#include "base/utility.h"
#include "base/logger_fwd.h"
#include <sstream>
using namespace icinga;
BufferedStream::BufferedStream(const Stream::Ptr& innerStream, size_t maxBufferSize)
: m_InnerStream(innerStream), m_Stopped(false), m_Eof(false),
m_RecvQ(make_shared<FIFO>()), m_SendQ(make_shared<FIFO>()),
m_Blocking(true), m_MaxBufferSize(maxBufferSize), m_Exception()
{
m_ReadThread = boost::thread(boost::bind(&BufferedStream::ReadThreadProc, this));
m_WriteThread = boost::thread(boost::bind(&BufferedStream::WriteThreadProc, this));
}
BufferedStream::~BufferedStream(void)
{
{
boost::mutex::scoped_lock lock(m_Mutex);
m_Stopped = true;
}
m_InnerStream->Close();
{
boost::mutex::scoped_lock lock(m_Mutex);
m_ReadCV.notify_all();
m_WriteCV.notify_all();
}
m_ReadThread.join();
m_WriteThread.join();
}
void BufferedStream::ReadThreadProc(void)
{
char buffer[512];
Utility::SetThreadName("BufS Read");
try {
for (;;) {
size_t rc = m_InnerStream->Read(buffer, sizeof(buffer));
if (rc == 0) {
boost::mutex::scoped_lock lock(m_Mutex);
m_Eof = true;
m_Stopped = true;
m_ReadCV.notify_all();
m_WriteCV.notify_all();
break;
}
boost::mutex::scoped_lock lock(m_Mutex);
m_RecvQ->Write(buffer, rc);
m_ReadCV.notify_all();
if (m_Stopped)
break;
}
} catch (const std::exception& ex) {
{
boost::mutex::scoped_lock lock(m_Mutex);
if (!m_Exception)
m_Exception = boost::current_exception();
m_ReadCV.notify_all();
}
}
}
void BufferedStream::WriteThreadProc(void)
{
char buffer[512];
Utility::SetThreadName("BufS Write");
try {
for (;;) {
size_t rc;
{
boost::mutex::scoped_lock lock(m_Mutex);
while (m_SendQ->GetAvailableBytes() == 0 && !m_Stopped)
m_WriteCV.wait(lock);
if (m_Stopped)
break;
rc = m_SendQ->Read(buffer, sizeof(buffer));
m_WriteCV.notify_all();
}
m_InnerStream->Write(buffer, rc);
}
} catch (const std::exception& ex) {
{
boost::mutex::scoped_lock lock(m_Mutex);
if (!m_Exception)
m_Exception = boost::current_exception();
m_WriteCV.notify_all();
}
}
}
void BufferedStream::Close(void)
{
m_InnerStream->Close();
}
/**
* Reads data from the stream.
*
* @param buffer The buffer where data should be stored. May be NULL if you're
* not actually interested in the data.
* @param count The number of bytes to read from the queue.
* @returns The number of bytes actually read.
*/
size_t BufferedStream::Read(void *buffer, size_t count)
{
boost::mutex::scoped_lock lock(m_Mutex);
if (m_Blocking)
InternalWaitReadable(count, lock);
if (m_Exception)
boost::rethrow_exception(m_Exception);
if (m_Eof)
BOOST_THROW_EXCEPTION(std::invalid_argument("Tried to read from closed socket."));
return m_RecvQ->Read(buffer, count);
}
/**
* Writes data to the stream.
*
* @param buffer The data that is to be written.
* @param count The number of bytes to write.
*/
void BufferedStream::Write(const void *buffer, size_t count)
{
boost::mutex::scoped_lock lock(m_Mutex);
InternalWaitWritable(count, lock);
if (m_Exception)
boost::rethrow_exception(m_Exception);
if (m_Eof)
BOOST_THROW_EXCEPTION(std::invalid_argument("Tried to write to closed socket."));
m_SendQ->Write(buffer, count);
m_WriteCV.notify_all();
}
void BufferedStream::WaitReadable(size_t count)
{
boost::mutex::scoped_lock lock(m_Mutex);
InternalWaitReadable(count, lock);
}
void BufferedStream::InternalWaitReadable(size_t count, boost::mutex::scoped_lock& lock)
{
while (m_RecvQ->GetAvailableBytes() < count && !m_Exception && !m_Stopped)
m_ReadCV.wait(lock);
}
void BufferedStream::WaitWritable(size_t count)
{
boost::mutex::scoped_lock lock(m_Mutex);
InternalWaitWritable(count, lock);
}
void BufferedStream::InternalWaitWritable(size_t count, boost::mutex::scoped_lock& lock)
{
while (m_SendQ->GetAvailableBytes() + count > m_MaxBufferSize && !m_Exception && !m_Stopped)
m_WriteCV.wait(lock);
}
void BufferedStream::MakeNonBlocking(void)
{
boost::mutex::scoped_lock lock(m_Mutex);
m_Blocking = false;
}
bool BufferedStream::IsEof(void) const
{
boost::mutex::scoped_lock lock(m_Mutex);
return m_InnerStream->IsEof() && m_RecvQ->GetAvailableBytes() == 0;
}

View File

@ -1,85 +0,0 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012-2014 Icinga Development Team (http://www.icinga.org) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef BUFFEREDSTREAM_H
#define BUFFEREDSTREAM_H
#include "base/i2-base.h"
#include "base/stream.h"
#include "base/fifo.h"
namespace icinga
{
/**
* A buffered stream.
*
* @ingroup base
*/
class I2_BASE_API BufferedStream : public Stream
{
public:
DECLARE_PTR_TYPEDEFS(BufferedStream);
BufferedStream(const Stream::Ptr& innerStream, size_t maxBufferSize = 64 * 1024 * 1024);
~BufferedStream(void);
virtual size_t Read(void *buffer, size_t count);
virtual void Write(const void *buffer, size_t count);
virtual void Close(void);
virtual bool IsEof(void) const;
void WaitReadable(size_t count);
void WaitWritable(size_t count);
void MakeNonBlocking(void);
private:
Stream::Ptr m_InnerStream;
bool m_Stopped;
bool m_Eof;
FIFO::Ptr m_RecvQ;
FIFO::Ptr m_SendQ;
bool m_Blocking;
size_t m_MaxBufferSize;
boost::exception_ptr m_Exception;
mutable boost::mutex m_Mutex;
boost::condition_variable m_ReadCV;
boost::condition_variable m_WriteCV;
void ReadThreadProc(void);
void WriteThreadProc(void);
boost::thread m_ReadThread;
boost::thread m_WriteThread;
void InternalWaitWritable(size_t count, boost::mutex::scoped_lock& lock);
void InternalWaitReadable(size_t count, boost::mutex::scoped_lock& lock);
};
}
#endif /* BUFFEREDSTREAM_H */

View File

@ -38,15 +38,8 @@ bool I2_EXPORT TlsStream::m_SSLIndexInitialized = false;
* @param sslContext The SSL context for the client. * @param sslContext The SSL context for the client.
*/ */
TlsStream::TlsStream(const Stream::Ptr& innerStream, TlsRole role, shared_ptr<SSL_CTX> sslContext) TlsStream::TlsStream(const Stream::Ptr& innerStream, TlsRole role, shared_ptr<SSL_CTX> sslContext)
: m_SSLContext(sslContext), m_Role(role) : m_InnerStream(innerStream), m_SSLContext(sslContext), m_Role(role)
{ {
m_InnerStream = dynamic_pointer_cast<BufferedStream>(innerStream);
if (!m_InnerStream)
m_InnerStream = make_shared<BufferedStream>(innerStream);
m_InnerStream->MakeNonBlocking();
m_SSL = shared_ptr<SSL>(SSL_new(m_SSLContext.get()), SSL_free); m_SSL = shared_ptr<SSL>(SSL_new(m_SSLContext.get()), SSL_free);
m_SSLContext.reset(); m_SSLContext.reset();
@ -106,14 +99,8 @@ void TlsStream::Handshake(void)
while ((rc = SSL_do_handshake(m_SSL.get())) <= 0) { while ((rc = SSL_do_handshake(m_SSL.get())) <= 0) {
switch (SSL_get_error(m_SSL.get(), rc)) { switch (SSL_get_error(m_SSL.get(), rc)) {
case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_READ:
olock.Unlock();
m_InnerStream->WaitReadable(1);
olock.Lock();
continue; continue;
case SSL_ERROR_WANT_WRITE: case SSL_ERROR_WANT_WRITE:
olock.Unlock();
m_InnerStream->WaitWritable(1);
olock.Lock();
continue; continue;
case SSL_ERROR_ZERO_RETURN: case SSL_ERROR_ZERO_RETURN:
Close(); Close();
@ -144,14 +131,8 @@ size_t TlsStream::Read(void *buffer, size_t count)
if (rc <= 0) { if (rc <= 0) {
switch (SSL_get_error(m_SSL.get(), rc)) { switch (SSL_get_error(m_SSL.get(), rc)) {
case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_READ:
olock.Unlock();
m_InnerStream->WaitReadable(1);
olock.Lock();
continue; continue;
case SSL_ERROR_WANT_WRITE: case SSL_ERROR_WANT_WRITE:
olock.Unlock();
m_InnerStream->WaitWritable(1);
olock.Lock();
continue; continue;
case SSL_ERROR_ZERO_RETURN: case SSL_ERROR_ZERO_RETURN:
Close(); Close();
@ -184,14 +165,8 @@ void TlsStream::Write(const void *buffer, size_t count)
if (rc <= 0) { if (rc <= 0) {
switch (SSL_get_error(m_SSL.get(), rc)) { switch (SSL_get_error(m_SSL.get(), rc)) {
case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_READ:
olock.Unlock();
m_InnerStream->WaitReadable(1);
olock.Lock();
continue; continue;
case SSL_ERROR_WANT_WRITE: case SSL_ERROR_WANT_WRITE:
olock.Unlock();
m_InnerStream->WaitWritable(1);
olock.Lock();
continue; continue;
case SSL_ERROR_ZERO_RETURN: case SSL_ERROR_ZERO_RETURN:
Close(); Close();

View File

@ -21,7 +21,6 @@
#define TLSSTREAM_H #define TLSSTREAM_H
#include "base/i2-base.h" #include "base/i2-base.h"
#include "base/bufferedstream.h"
#include "base/stream.h" #include "base/stream.h"
#include "base/fifo.h" #include "base/fifo.h"
#include "base/tlsutility.h" #include "base/tlsutility.h"
@ -64,7 +63,7 @@ private:
shared_ptr<SSL> m_SSL; shared_ptr<SSL> m_SSL;
BIO *m_BIO; BIO *m_BIO;
BufferedStream::Ptr m_InnerStream; Stream::Ptr m_InnerStream;
TlsRole m_Role; TlsRole m_Role;
static int m_SSLIndex; static int m_SSLIndex;