diff --git a/lib/base/bufferedstream.cpp b/lib/base/bufferedstream.cpp index 2f35a57f3..8d7fe4113 100644 --- a/lib/base/bufferedstream.cpp +++ b/lib/base/bufferedstream.cpp @@ -26,10 +26,10 @@ using namespace icinga; -BufferedStream::BufferedStream(const Stream::Ptr& innerStream) +BufferedStream::BufferedStream(const Stream::Ptr& innerStream, size_t maxBufferSize) : m_InnerStream(innerStream), m_Stopped(false), m_Eof(false), m_RecvQ(boost::make_shared()), m_SendQ(boost::make_shared()), - m_Blocking(true), m_Exception() + m_Blocking(true), m_MaxBufferSize(maxBufferSize), m_Exception() { m_ReadThread = boost::thread(boost::bind(&BufferedStream::ReadThreadProc, this)); m_WriteThread = boost::thread(boost::bind(&BufferedStream::WriteThreadProc, this)); @@ -111,6 +111,7 @@ void BufferedStream::WriteThreadProc(void) break; rc = m_SendQ->Read(buffer, sizeof(buffer)); + m_WriteCV.notify_all(); } m_InnerStream->Write(buffer, rc); @@ -158,12 +159,13 @@ size_t BufferedStream::Read(void *buffer, size_t count) * * @param buffer The data that is to be written. * @param count The number of bytes to write. - * @returns The number of bytes written */ void BufferedStream::Write(const void *buffer, size_t count) { boost::mutex::scoped_lock lock(m_Mutex); + InternalWaitWritable(count, lock); + if (m_Exception) boost::rethrow_exception(m_Exception); @@ -184,8 +186,18 @@ void BufferedStream::InternalWaitReadable(size_t count, boost::mutex::scoped_loc m_ReadCV.wait(lock); } -void BufferedStream::WaitWritable(size_t) -{ /* Nothing to do here. */ } +void BufferedStream::WaitWritable(size_t count) +{ + boost::mutex::scoped_lock lock(m_Mutex); + + InternalWaitWritable(count, lock); +} + +void BufferedStream::InternalWaitWritable(size_t count, boost::mutex::scoped_lock& lock) +{ + while (m_SendQ->GetAvailableBytes() + count > m_MaxBufferSize && !m_Exception && !m_Stopped) + m_WriteCV.wait(lock); +} void BufferedStream::MakeNonBlocking(void) { diff --git a/lib/base/bufferedstream.h b/lib/base/bufferedstream.h index a6028f3c9..6f1bb78ca 100644 --- a/lib/base/bufferedstream.h +++ b/lib/base/bufferedstream.h @@ -37,7 +37,7 @@ class I2_BASE_API BufferedStream : public Stream public: DECLARE_PTR_TYPEDEFS(BufferedStream); - BufferedStream(const Stream::Ptr& innerStream); + BufferedStream(const Stream::Ptr& innerStream, size_t maxBufferSize = 64 * 1024 * 1024); ~BufferedStream(void); virtual size_t Read(void *buffer, size_t count); @@ -62,6 +62,7 @@ private: FIFO::Ptr m_SendQ; bool m_Blocking; + size_t m_MaxBufferSize; boost::exception_ptr m_Exception; @@ -75,6 +76,7 @@ private: boost::thread m_ReadThread; boost::thread m_WriteThread; + void InternalWaitWritable(size_t count, boost::mutex::scoped_lock& lock); void InternalWaitReadable(size_t count, boost::mutex::scoped_lock& lock); };