Bugfixes for the BufferedStream class.

This commit is contained in:
Gunnar Beutner 2013-04-19 14:47:41 +02:00
parent 09d97c551e
commit fe4fd67405
4 changed files with 66 additions and 25 deletions

View File

@ -27,30 +27,52 @@
using namespace icinga; using namespace icinga;
BufferedStream::BufferedStream(const Stream::Ptr& innerStream) BufferedStream::BufferedStream(const Stream::Ptr& innerStream)
: m_InnerStream(innerStream), m_RecvQ(boost::make_shared<FIFO>()), m_SendQ(boost::make_shared<FIFO>()), : m_InnerStream(innerStream), m_Stopped(false),
m_Exception(), m_Blocking(true) m_RecvQ(boost::make_shared<FIFO>()), m_SendQ(boost::make_shared<FIFO>()),
m_Blocking(true), m_Exception()
{ {
boost::thread readThread(boost::bind(&BufferedStream::ReadThreadProc, this)); m_ReadThread = boost::thread(boost::bind(&BufferedStream::ReadThreadProc, this));
readThread.detach(); m_WriteThread = boost::thread(boost::bind(&BufferedStream::WriteThreadProc, this));
}
boost::thread writeThread(boost::bind(&BufferedStream::WriteThreadProc, this));
writeThread.detach(); BufferedStream::~BufferedStream(void)
{
{
boost::mutex::scoped_lock lock(m_Mutex);
m_Stopped = true;
}
m_InnerStream->Close();
{
boost::mutex::scoped_lock lock(m_Mutex);
m_ReadCV.notify_all();
m_WriteCV.notify_all();
}
m_ReadThread.join();
m_WriteThread.join();
} }
void BufferedStream::ReadThreadProc(void) void BufferedStream::ReadThreadProc(void)
{ {
char buffer[512]; char buffer[512];
try { try {
for (;;) { for (;;) {
size_t rc = m_InnerStream->Read(buffer, sizeof(buffer)); size_t rc = m_InnerStream->Read(buffer, sizeof(buffer));
if (rc == 0) if (rc == 0)
break; break;
boost::mutex::scoped_lock lock(m_Mutex); boost::mutex::scoped_lock lock(m_Mutex);
m_RecvQ->Write(buffer, rc); m_RecvQ->Write(buffer, rc);
m_ReadCV.notify_all(); m_ReadCV.notify_all();
if (m_Stopped)
break;
} }
} catch (const std::exception& ex) { } catch (const std::exception& ex) {
{ {
@ -68,19 +90,22 @@ void BufferedStream::WriteThreadProc(void)
{ {
char buffer[512]; char buffer[512];
try { try {
for (;;) { for (;;) {
size_t rc; size_t rc;
{ {
boost::mutex::scoped_lock lock(m_Mutex); boost::mutex::scoped_lock lock(m_Mutex);
while (m_SendQ->GetAvailableBytes() == 0) while (m_SendQ->GetAvailableBytes() == 0 && !m_Stopped)
m_WriteCV.wait(lock); m_WriteCV.wait(lock);
if (m_Stopped)
break;
rc = m_SendQ->Read(buffer, sizeof(buffer)); rc = m_SendQ->Read(buffer, sizeof(buffer));
} }
m_InnerStream->Write(buffer, rc); m_InnerStream->Write(buffer, rc);
} }
} catch (const std::exception& ex) { } catch (const std::exception& ex) {
@ -136,7 +161,7 @@ void BufferedStream::Write(const void *buffer, size_t count)
boost::rethrow_exception(m_Exception); boost::rethrow_exception(m_Exception);
m_SendQ->Write(buffer, count); m_SendQ->Write(buffer, count);
m_WriteCV.notify_all(); m_WriteCV.notify_all();
} }
void BufferedStream::WaitReadable(size_t count) void BufferedStream::WaitReadable(size_t count)
@ -152,7 +177,7 @@ void BufferedStream::InternalWaitReadable(size_t count, boost::mutex::scoped_loc
m_ReadCV.wait(lock); m_ReadCV.wait(lock);
} }
void BufferedStream::WaitWritable(size_t count) void BufferedStream::WaitWritable(size_t)
{ /* Nothing to do here. */ } { /* Nothing to do here. */ }
void BufferedStream::MakeNonBlocking(void) void BufferedStream::MakeNonBlocking(void)
@ -161,4 +186,3 @@ void BufferedStream::MakeNonBlocking(void)
m_Blocking = false; m_Blocking = false;
} }

View File

@ -39,6 +39,7 @@ public:
typedef weak_ptr<BufferedStream> WeakPtr; typedef weak_ptr<BufferedStream> WeakPtr;
BufferedStream(const Stream::Ptr& innerStream); BufferedStream(const Stream::Ptr& innerStream);
~BufferedStream(void);
virtual size_t Read(void *buffer, size_t count); virtual size_t Read(void *buffer, size_t count);
virtual void Write(const void *buffer, size_t count); virtual void Write(const void *buffer, size_t count);
@ -52,21 +53,26 @@ public:
private: private:
Stream::Ptr m_InnerStream; Stream::Ptr m_InnerStream;
bool m_Stopped;
FIFO::Ptr m_RecvQ; FIFO::Ptr m_RecvQ;
FIFO::Ptr m_SendQ; FIFO::Ptr m_SendQ;
bool m_Blocking; bool m_Blocking;
boost::exception_ptr m_Exception; boost::exception_ptr m_Exception;
boost::mutex m_Mutex; boost::mutex m_Mutex;
boost::condition_variable m_ReadCV; boost::condition_variable m_ReadCV;
boost::condition_variable m_WriteCV; boost::condition_variable m_WriteCV;
void ReadThreadProc(void); void ReadThreadProc(void);
void WriteThreadProc(void); void WriteThreadProc(void);
boost::thread m_ReadThread;
boost::thread m_WriteThread;
void InternalWaitReadable(size_t count, boost::mutex::scoped_lock& lock); void InternalWaitReadable(size_t count, boost::mutex::scoped_lock& lock);
}; };

View File

@ -27,10 +27,18 @@ BOOST_AUTO_TEST_SUITE(base_match)
BOOST_AUTO_TEST_CASE(tolong) BOOST_AUTO_TEST_CASE(tolong)
{ {
BOOST_CHECK(Utility::Match("*", "hello")); BOOST_CHECK(Utility::Match("*", "hello"));
BOOST_CHECK(!Utility::Match("\\**", "hello"));
BOOST_CHECK(Utility::Match("\\**", "*ello"));
BOOST_CHECK(Utility::Match("?e*l?", "hello"));
BOOST_CHECK(Utility::Match("?e*l?", "helo"));
BOOST_CHECK(!Utility::Match("world", "hello")); BOOST_CHECK(!Utility::Match("world", "hello"));
BOOST_CHECK(!Utility::Match("hee*", "hello")); BOOST_CHECK(!Utility::Match("hee*", "hello"));
BOOST_CHECK(Utility::Match("he??o", "hello")); BOOST_CHECK(Utility::Match("he??o", "hello"));
BOOST_CHECK(Utility::Match("he?", "hel")); BOOST_CHECK(Utility::Match("he?", "hel"));
BOOST_CHECK(Utility::Match("he*", "hello"));
BOOST_CHECK(Utility::Match("he*o", "heo"));
BOOST_CHECK(Utility::Match("he**o", "heo"));
BOOST_CHECK(Utility::Match("he**o", "hello"));
} }
BOOST_AUTO_TEST_SUITE_END() BOOST_AUTO_TEST_SUITE_END()

View File

@ -45,6 +45,7 @@
* And last but not least, '\?' and '\*' in `new_mask' now become one character. * And last but not least, '\?' and '\*' in `new_mask' now become one character.
*/ */
#if 0
int mmatch(const char *old_mask, const char *new_mask) int mmatch(const char *old_mask, const char *new_mask)
{ {
const char *m = old_mask; const char *m = old_mask;
@ -145,6 +146,7 @@ int mmatch(const char *old_mask, const char *new_mask)
} }
} }
} }
#endif
/* /*
* Compare if a given string (name) matches the given * Compare if a given string (name) matches the given
@ -261,6 +263,7 @@ break_while:
* Note that this new optimized alghoritm can *only* work in place. * Note that this new optimized alghoritm can *only* work in place.
*/ */
#if 0
char *collapse(char *pattern) char *collapse(char *pattern)
{ {
int star = 0; int star = 0;
@ -303,4 +306,4 @@ char *collapse(char *pattern)
}; };
return pattern; return pattern;
} }
#endif