From fe4fd67405c2520b163adad7801b70263d71fd8f Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Fri, 19 Apr 2013 14:47:41 +0200 Subject: [PATCH] Bugfixes for the BufferedStream class. --- lib/base/bufferedstream.cpp | 64 +++++++++++++++++++++++++------------ lib/base/bufferedstream.h | 14 +++++--- test/base-match.cpp | 8 +++++ third-party/mmatch/mmatch.c | 5 ++- 4 files changed, 66 insertions(+), 25 deletions(-) diff --git a/lib/base/bufferedstream.cpp b/lib/base/bufferedstream.cpp index 4f9bc7bcd..b9346279e 100644 --- a/lib/base/bufferedstream.cpp +++ b/lib/base/bufferedstream.cpp @@ -27,30 +27,52 @@ using namespace icinga; BufferedStream::BufferedStream(const Stream::Ptr& innerStream) - : m_InnerStream(innerStream), m_RecvQ(boost::make_shared()), m_SendQ(boost::make_shared()), - m_Exception(), m_Blocking(true) + : m_InnerStream(innerStream), m_Stopped(false), + m_RecvQ(boost::make_shared()), m_SendQ(boost::make_shared()), + m_Blocking(true), m_Exception() { - boost::thread readThread(boost::bind(&BufferedStream::ReadThreadProc, this)); - readThread.detach(); - - boost::thread writeThread(boost::bind(&BufferedStream::WriteThreadProc, this)); - writeThread.detach(); + m_ReadThread = boost::thread(boost::bind(&BufferedStream::ReadThreadProc, this)); + m_WriteThread = boost::thread(boost::bind(&BufferedStream::WriteThreadProc, this)); +} + +BufferedStream::~BufferedStream(void) +{ + { + boost::mutex::scoped_lock lock(m_Mutex); + + m_Stopped = true; + } + + m_InnerStream->Close(); + + { + boost::mutex::scoped_lock lock(m_Mutex); + + m_ReadCV.notify_all(); + m_WriteCV.notify_all(); + } + + m_ReadThread.join(); + m_WriteThread.join(); } void BufferedStream::ReadThreadProc(void) { char buffer[512]; - + try { for (;;) { size_t rc = m_InnerStream->Read(buffer, sizeof(buffer)); - + if (rc == 0) break; - + boost::mutex::scoped_lock lock(m_Mutex); m_RecvQ->Write(buffer, rc); m_ReadCV.notify_all(); + + if (m_Stopped) + break; } } catch (const std::exception& ex) { { @@ -68,19 +90,22 @@ void BufferedStream::WriteThreadProc(void) { char buffer[512]; - try { + try { for (;;) { size_t rc; - + { boost::mutex::scoped_lock lock(m_Mutex); - - while (m_SendQ->GetAvailableBytes() == 0) + + while (m_SendQ->GetAvailableBytes() == 0 && !m_Stopped) m_WriteCV.wait(lock); - + + if (m_Stopped) + break; + rc = m_SendQ->Read(buffer, sizeof(buffer)); - } - + } + m_InnerStream->Write(buffer, rc); } } catch (const std::exception& ex) { @@ -136,7 +161,7 @@ void BufferedStream::Write(const void *buffer, size_t count) boost::rethrow_exception(m_Exception); m_SendQ->Write(buffer, count); - m_WriteCV.notify_all(); + m_WriteCV.notify_all(); } void BufferedStream::WaitReadable(size_t count) @@ -152,7 +177,7 @@ void BufferedStream::InternalWaitReadable(size_t count, boost::mutex::scoped_loc m_ReadCV.wait(lock); } -void BufferedStream::WaitWritable(size_t count) +void BufferedStream::WaitWritable(size_t) { /* Nothing to do here. */ } void BufferedStream::MakeNonBlocking(void) @@ -161,4 +186,3 @@ void BufferedStream::MakeNonBlocking(void) m_Blocking = false; } - diff --git a/lib/base/bufferedstream.h b/lib/base/bufferedstream.h index 08efb94c3..4176a7754 100644 --- a/lib/base/bufferedstream.h +++ b/lib/base/bufferedstream.h @@ -39,6 +39,7 @@ public: typedef weak_ptr WeakPtr; BufferedStream(const Stream::Ptr& innerStream); + ~BufferedStream(void); virtual size_t Read(void *buffer, size_t count); virtual void Write(const void *buffer, size_t count); @@ -52,21 +53,26 @@ public: private: Stream::Ptr m_InnerStream; - + + bool m_Stopped; + FIFO::Ptr m_RecvQ; FIFO::Ptr m_SendQ; bool m_Blocking; - + boost::exception_ptr m_Exception; - + boost::mutex m_Mutex; boost::condition_variable m_ReadCV; boost::condition_variable m_WriteCV; - + void ReadThreadProc(void); void WriteThreadProc(void); + boost::thread m_ReadThread; + boost::thread m_WriteThread; + void InternalWaitReadable(size_t count, boost::mutex::scoped_lock& lock); }; diff --git a/test/base-match.cpp b/test/base-match.cpp index 84ae92e97..b3cf5a11e 100644 --- a/test/base-match.cpp +++ b/test/base-match.cpp @@ -27,10 +27,18 @@ BOOST_AUTO_TEST_SUITE(base_match) BOOST_AUTO_TEST_CASE(tolong) { BOOST_CHECK(Utility::Match("*", "hello")); + BOOST_CHECK(!Utility::Match("\\**", "hello")); + BOOST_CHECK(Utility::Match("\\**", "*ello")); + BOOST_CHECK(Utility::Match("?e*l?", "hello")); + BOOST_CHECK(Utility::Match("?e*l?", "helo")); BOOST_CHECK(!Utility::Match("world", "hello")); BOOST_CHECK(!Utility::Match("hee*", "hello")); BOOST_CHECK(Utility::Match("he??o", "hello")); BOOST_CHECK(Utility::Match("he?", "hel")); + BOOST_CHECK(Utility::Match("he*", "hello")); + BOOST_CHECK(Utility::Match("he*o", "heo")); + BOOST_CHECK(Utility::Match("he**o", "heo")); + BOOST_CHECK(Utility::Match("he**o", "hello")); } BOOST_AUTO_TEST_SUITE_END() diff --git a/third-party/mmatch/mmatch.c b/third-party/mmatch/mmatch.c index e6fb1093f..b68f1bf5a 100644 --- a/third-party/mmatch/mmatch.c +++ b/third-party/mmatch/mmatch.c @@ -45,6 +45,7 @@ * And last but not least, '\?' and '\*' in `new_mask' now become one character. */ +#if 0 int mmatch(const char *old_mask, const char *new_mask) { const char *m = old_mask; @@ -145,6 +146,7 @@ int mmatch(const char *old_mask, const char *new_mask) } } } +#endif /* * Compare if a given string (name) matches the given @@ -261,6 +263,7 @@ break_while: * Note that this new optimized alghoritm can *only* work in place. */ +#if 0 char *collapse(char *pattern) { int star = 0; @@ -303,4 +306,4 @@ char *collapse(char *pattern) }; return pattern; } - +#endif