mirror of
https://github.com/Icinga/icinga2.git
synced 2025-07-25 06:34:42 +02:00
base: Limit buffer size for BufferedStream objects.
This commit is contained in:
parent
c519deb257
commit
132695a460
@ -26,10 +26,10 @@
|
|||||||
|
|
||||||
using namespace icinga;
|
using namespace icinga;
|
||||||
|
|
||||||
BufferedStream::BufferedStream(const Stream::Ptr& innerStream)
|
BufferedStream::BufferedStream(const Stream::Ptr& innerStream, size_t maxBufferSize)
|
||||||
: m_InnerStream(innerStream), m_Stopped(false), m_Eof(false),
|
: m_InnerStream(innerStream), m_Stopped(false), m_Eof(false),
|
||||||
m_RecvQ(boost::make_shared<FIFO>()), m_SendQ(boost::make_shared<FIFO>()),
|
m_RecvQ(boost::make_shared<FIFO>()), m_SendQ(boost::make_shared<FIFO>()),
|
||||||
m_Blocking(true), m_Exception()
|
m_Blocking(true), m_MaxBufferSize(maxBufferSize), m_Exception()
|
||||||
{
|
{
|
||||||
m_ReadThread = boost::thread(boost::bind(&BufferedStream::ReadThreadProc, this));
|
m_ReadThread = boost::thread(boost::bind(&BufferedStream::ReadThreadProc, this));
|
||||||
m_WriteThread = boost::thread(boost::bind(&BufferedStream::WriteThreadProc, this));
|
m_WriteThread = boost::thread(boost::bind(&BufferedStream::WriteThreadProc, this));
|
||||||
@ -111,6 +111,7 @@ void BufferedStream::WriteThreadProc(void)
|
|||||||
break;
|
break;
|
||||||
|
|
||||||
rc = m_SendQ->Read(buffer, sizeof(buffer));
|
rc = m_SendQ->Read(buffer, sizeof(buffer));
|
||||||
|
m_WriteCV.notify_all();
|
||||||
}
|
}
|
||||||
|
|
||||||
m_InnerStream->Write(buffer, rc);
|
m_InnerStream->Write(buffer, rc);
|
||||||
@ -158,12 +159,13 @@ size_t BufferedStream::Read(void *buffer, size_t count)
|
|||||||
*
|
*
|
||||||
* @param buffer The data that is to be written.
|
* @param buffer The data that is to be written.
|
||||||
* @param count The number of bytes to write.
|
* @param count The number of bytes to write.
|
||||||
* @returns The number of bytes written
|
|
||||||
*/
|
*/
|
||||||
void BufferedStream::Write(const void *buffer, size_t count)
|
void BufferedStream::Write(const void *buffer, size_t count)
|
||||||
{
|
{
|
||||||
boost::mutex::scoped_lock lock(m_Mutex);
|
boost::mutex::scoped_lock lock(m_Mutex);
|
||||||
|
|
||||||
|
InternalWaitWritable(count, lock);
|
||||||
|
|
||||||
if (m_Exception)
|
if (m_Exception)
|
||||||
boost::rethrow_exception(m_Exception);
|
boost::rethrow_exception(m_Exception);
|
||||||
|
|
||||||
@ -184,8 +186,18 @@ void BufferedStream::InternalWaitReadable(size_t count, boost::mutex::scoped_loc
|
|||||||
m_ReadCV.wait(lock);
|
m_ReadCV.wait(lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
void BufferedStream::WaitWritable(size_t)
|
void BufferedStream::WaitWritable(size_t count)
|
||||||
{ /* Nothing to do here. */ }
|
{
|
||||||
|
boost::mutex::scoped_lock lock(m_Mutex);
|
||||||
|
|
||||||
|
InternalWaitWritable(count, lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
void BufferedStream::InternalWaitWritable(size_t count, boost::mutex::scoped_lock& lock)
|
||||||
|
{
|
||||||
|
while (m_SendQ->GetAvailableBytes() + count > m_MaxBufferSize && !m_Exception && !m_Stopped)
|
||||||
|
m_WriteCV.wait(lock);
|
||||||
|
}
|
||||||
|
|
||||||
void BufferedStream::MakeNonBlocking(void)
|
void BufferedStream::MakeNonBlocking(void)
|
||||||
{
|
{
|
||||||
|
@ -37,7 +37,7 @@ class I2_BASE_API BufferedStream : public Stream
|
|||||||
public:
|
public:
|
||||||
DECLARE_PTR_TYPEDEFS(BufferedStream);
|
DECLARE_PTR_TYPEDEFS(BufferedStream);
|
||||||
|
|
||||||
BufferedStream(const Stream::Ptr& innerStream);
|
BufferedStream(const Stream::Ptr& innerStream, size_t maxBufferSize = 64 * 1024 * 1024);
|
||||||
~BufferedStream(void);
|
~BufferedStream(void);
|
||||||
|
|
||||||
virtual size_t Read(void *buffer, size_t count);
|
virtual size_t Read(void *buffer, size_t count);
|
||||||
@ -62,6 +62,7 @@ private:
|
|||||||
FIFO::Ptr m_SendQ;
|
FIFO::Ptr m_SendQ;
|
||||||
|
|
||||||
bool m_Blocking;
|
bool m_Blocking;
|
||||||
|
size_t m_MaxBufferSize;
|
||||||
|
|
||||||
boost::exception_ptr m_Exception;
|
boost::exception_ptr m_Exception;
|
||||||
|
|
||||||
@ -75,6 +76,7 @@ private:
|
|||||||
boost::thread m_ReadThread;
|
boost::thread m_ReadThread;
|
||||||
boost::thread m_WriteThread;
|
boost::thread m_WriteThread;
|
||||||
|
|
||||||
|
void InternalWaitWritable(size_t count, boost::mutex::scoped_lock& lock);
|
||||||
void InternalWaitReadable(size_t count, boost::mutex::scoped_lock& lock);
|
void InternalWaitReadable(size_t count, boost::mutex::scoped_lock& lock);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user