From 8dcb4efa5ee21551840d111149a221b19d49b933 Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Wed, 24 Jun 2015 09:44:59 +0200 Subject: [PATCH] Fix stability issues with the TlsStream/Stream classes fixes #9481 --- lib/base/netstring.cpp | 4 ++-- lib/base/netstring.hpp | 2 +- lib/base/stdiostream.cpp | 5 +++++ lib/base/stdiostream.hpp | 1 + lib/base/stream.cpp | 12 +++++++----- lib/base/stream.hpp | 9 ++++----- lib/remote/apiclient.cpp | 6 +++++- lib/remote/apiclient.hpp | 1 + 8 files changed, 26 insertions(+), 14 deletions(-) diff --git a/lib/base/netstring.cpp b/lib/base/netstring.cpp index 1d97836af..e692f09d5 100644 --- a/lib/base/netstring.cpp +++ b/lib/base/netstring.cpp @@ -32,13 +32,13 @@ using namespace icinga; * @exception invalid_argument The input stream is invalid. * @see https://github.com/PeterScott/netstring-c/blob/master/netstring.c */ -StreamReadStatus NetString::ReadStringFromStream(const Stream::Ptr& stream, String *str, StreamReadContext& context) +StreamReadStatus NetString::ReadStringFromStream(const Stream::Ptr& stream, String *str, StreamReadContext& context, bool may_wait) { if (context.Eof) return StatusEof; if (context.MustRead) { - if (!context.FillFromStream(stream)) { + if (!context.FillFromStream(stream, may_wait)) { context.Eof = true; return StatusEof; } diff --git a/lib/base/netstring.hpp b/lib/base/netstring.hpp index 85fbe7e49..089a211e2 100644 --- a/lib/base/netstring.hpp +++ b/lib/base/netstring.hpp @@ -38,7 +38,7 @@ class String; class I2_BASE_API NetString { public: - static StreamReadStatus ReadStringFromStream(const Stream::Ptr& stream, String *message, StreamReadContext& context); + static StreamReadStatus ReadStringFromStream(const Stream::Ptr& stream, String *message, StreamReadContext& context, bool may_wait = false); static void WriteStringToStream(const Stream::Ptr& stream, const String& message); private: diff --git a/lib/base/stdiostream.cpp b/lib/base/stdiostream.cpp index 140117b98..356e64361 100644 --- a/lib/base/stdiostream.cpp +++ b/lib/base/stdiostream.cpp @@ -61,6 +61,11 @@ void StdioStream::Close(void) } } +bool StdioStream::IsDataAvailable(void) const +{ + return !IsEof(); +} + bool StdioStream::IsEof(void) const { return !m_InnerStream->good(); diff --git a/lib/base/stdiostream.hpp b/lib/base/stdiostream.hpp index 371d6994c..44e342419 100644 --- a/lib/base/stdiostream.hpp +++ b/lib/base/stdiostream.hpp @@ -39,6 +39,7 @@ public: virtual void Close(void); + virtual bool IsDataAvailable(void) const; virtual bool IsEof(void) const; private: diff --git a/lib/base/stream.cpp b/lib/base/stream.cpp index 84ba19887..5b465cba3 100644 --- a/lib/base/stream.cpp +++ b/lib/base/stream.cpp @@ -61,13 +61,13 @@ void Stream::WaitForData(void) m_CV.wait(lock); } -StreamReadStatus Stream::ReadLine(String *line, StreamReadContext& context) +StreamReadStatus Stream::ReadLine(String *line, StreamReadContext& context, bool may_wait) { if (context.Eof) return StatusEof; if (context.MustRead) { - if (!context.FillFromStream(this)) { + if (!context.FillFromStream(this, may_wait)) { context.Eof = true; *line = String(context.Buffer, &(context.Buffer[context.Size])); @@ -86,6 +86,8 @@ StreamReadStatus Stream::ReadLine(String *line, StreamReadContext& context) if (count == 1) first_newline = i; + else if (count > 1) + break; } } @@ -103,9 +105,9 @@ StreamReadStatus Stream::ReadLine(String *line, StreamReadContext& context) return StatusNeedData; } -bool StreamReadContext::FillFromStream(const Stream::Ptr& stream) +bool StreamReadContext::FillFromStream(const Stream::Ptr& stream, bool may_wait) { - if (Wait && stream->SupportsWaiting()) + if (may_wait && stream->SupportsWaiting()) stream->WaitForData(); size_t count = 0; @@ -120,7 +122,7 @@ bool StreamReadContext::FillFromStream(const Stream::Ptr& stream) Size += rc; count += rc; - } while (stream->IsDataAvailable()); + } while (count < 64 * 1024 && stream->IsDataAvailable()); if (count == 0 && stream->IsEof()) return false; diff --git a/lib/base/stream.hpp b/lib/base/stream.hpp index fcd14cc95..9754d3b36 100644 --- a/lib/base/stream.hpp +++ b/lib/base/stream.hpp @@ -38,8 +38,8 @@ enum ConnectionRole struct StreamReadContext { - StreamReadContext(bool wait = true) - : Buffer(NULL), Size(0), MustRead(true), Eof(false), Wait(wait) + StreamReadContext(void) + : Buffer(NULL), Size(0), MustRead(true), Eof(false) { } ~StreamReadContext(void) @@ -47,14 +47,13 @@ struct StreamReadContext free(Buffer); } - bool FillFromStream(const intrusive_ptr& stream); + bool FillFromStream(const intrusive_ptr& stream, bool may_wait); void DropData(size_t count); char *Buffer; size_t Size; bool MustRead; bool Eof; - bool Wait; }; enum StreamReadStatus @@ -117,7 +116,7 @@ public: void RegisterDataHandler(const boost::function& handler); - StreamReadStatus ReadLine(String *line, StreamReadContext& context); + StreamReadStatus ReadLine(String *line, StreamReadContext& context, bool may_wait = false); protected: void SignalDataAvailable(void); diff --git a/lib/remote/apiclient.cpp b/lib/remote/apiclient.cpp index b5662433e..d4fc9f92a 100644 --- a/lib/remote/apiclient.cpp +++ b/lib/remote/apiclient.cpp @@ -40,7 +40,7 @@ static Timer::Ptr l_ApiClientTimeoutTimer; ApiClient::ApiClient(const String& identity, bool authenticated, const TlsStream::Ptr& stream, ConnectionRole role) : m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), m_Role(role), m_Seen(Utility::GetTime()), - m_NextHeartbeat(0), m_HeartbeatTimeout(0), m_Context(false) + m_NextHeartbeat(0), m_HeartbeatTimeout(0) { boost::call_once(l_ApiClientOnceFlag, &ApiClient::StaticInitialize); @@ -59,6 +59,8 @@ void ApiClient::StaticInitialize(void) void ApiClient::Start(void) { m_Stream->RegisterDataHandler(boost::bind(&ApiClient::DataAvailableHandler, this)); + if (m_Stream->IsDataAvailable()) + DataAvailableHandler(); } String ApiClient::GetIdentity(void) const @@ -195,6 +197,8 @@ bool ApiClient::ProcessMessage(void) void ApiClient::DataAvailableHandler(void) { + boost::mutex::scoped_lock lock(m_DataHandlerMutex); + try { while (ProcessMessage()) ; /* empty loop body */ diff --git a/lib/remote/apiclient.hpp b/lib/remote/apiclient.hpp index 80199f1c4..3adf6161a 100644 --- a/lib/remote/apiclient.hpp +++ b/lib/remote/apiclient.hpp @@ -74,6 +74,7 @@ private: double m_NextHeartbeat; double m_HeartbeatTimeout; Timer::Ptr m_TimeoutTimer; + boost::mutex m_DataHandlerMutex; StreamReadContext m_Context;