Fix stability issues with the TlsStream/Stream classes

fixes #9481
This commit is contained in:
Gunnar Beutner 2015-06-24 09:44:59 +02:00 committed by Michael Friedrich
parent c913143034
commit 0a3614723f
8 changed files with 26 additions and 14 deletions

View File

@ -32,13 +32,13 @@ using namespace icinga;
* @exception invalid_argument The input stream is invalid. * @exception invalid_argument The input stream is invalid.
* @see https://github.com/PeterScott/netstring-c/blob/master/netstring.c * @see https://github.com/PeterScott/netstring-c/blob/master/netstring.c
*/ */
StreamReadStatus NetString::ReadStringFromStream(const Stream::Ptr& stream, String *str, StreamReadContext& context) StreamReadStatus NetString::ReadStringFromStream(const Stream::Ptr& stream, String *str, StreamReadContext& context, bool may_wait)
{ {
if (context.Eof) if (context.Eof)
return StatusEof; return StatusEof;
if (context.MustRead) { if (context.MustRead) {
if (!context.FillFromStream(stream)) { if (!context.FillFromStream(stream, may_wait)) {
context.Eof = true; context.Eof = true;
return StatusEof; return StatusEof;
} }

View File

@ -38,7 +38,7 @@ class String;
class I2_BASE_API NetString class I2_BASE_API NetString
{ {
public: public:
static StreamReadStatus ReadStringFromStream(const Stream::Ptr& stream, String *message, StreamReadContext& context); static StreamReadStatus ReadStringFromStream(const Stream::Ptr& stream, String *message, StreamReadContext& context, bool may_wait = false);
static void WriteStringToStream(const Stream::Ptr& stream, const String& message); static void WriteStringToStream(const Stream::Ptr& stream, const String& message);
private: private:

View File

@ -61,6 +61,11 @@ void StdioStream::Close(void)
} }
} }
bool StdioStream::IsDataAvailable(void) const
{
return !IsEof();
}
bool StdioStream::IsEof(void) const bool StdioStream::IsEof(void) const
{ {
return !m_InnerStream->good(); return !m_InnerStream->good();

View File

@ -39,6 +39,7 @@ public:
virtual void Close(void); virtual void Close(void);
virtual bool IsDataAvailable(void) const;
virtual bool IsEof(void) const; virtual bool IsEof(void) const;
private: private:

View File

@ -61,13 +61,13 @@ void Stream::WaitForData(void)
m_CV.wait(lock); m_CV.wait(lock);
} }
StreamReadStatus Stream::ReadLine(String *line, StreamReadContext& context) StreamReadStatus Stream::ReadLine(String *line, StreamReadContext& context, bool may_wait)
{ {
if (context.Eof) if (context.Eof)
return StatusEof; return StatusEof;
if (context.MustRead) { if (context.MustRead) {
if (!context.FillFromStream(this)) { if (!context.FillFromStream(this, may_wait)) {
context.Eof = true; context.Eof = true;
*line = String(context.Buffer, &(context.Buffer[context.Size])); *line = String(context.Buffer, &(context.Buffer[context.Size]));
@ -86,6 +86,8 @@ StreamReadStatus Stream::ReadLine(String *line, StreamReadContext& context)
if (count == 1) if (count == 1)
first_newline = i; first_newline = i;
else if (count > 1)
break;
} }
} }
@ -103,9 +105,9 @@ StreamReadStatus Stream::ReadLine(String *line, StreamReadContext& context)
return StatusNeedData; return StatusNeedData;
} }
bool StreamReadContext::FillFromStream(const Stream::Ptr& stream) bool StreamReadContext::FillFromStream(const Stream::Ptr& stream, bool may_wait)
{ {
if (Wait && stream->SupportsWaiting()) if (may_wait && stream->SupportsWaiting())
stream->WaitForData(); stream->WaitForData();
size_t count = 0; size_t count = 0;
@ -120,7 +122,7 @@ bool StreamReadContext::FillFromStream(const Stream::Ptr& stream)
Size += rc; Size += rc;
count += rc; count += rc;
} while (stream->IsDataAvailable()); } while (count < 64 * 1024 && stream->IsDataAvailable());
if (count == 0 && stream->IsEof()) if (count == 0 && stream->IsEof())
return false; return false;

View File

@ -38,8 +38,8 @@ enum ConnectionRole
struct StreamReadContext struct StreamReadContext
{ {
StreamReadContext(bool wait = true) StreamReadContext(void)
: Buffer(NULL), Size(0), MustRead(true), Eof(false), Wait(wait) : Buffer(NULL), Size(0), MustRead(true), Eof(false)
{ } { }
~StreamReadContext(void) ~StreamReadContext(void)
@ -47,14 +47,13 @@ struct StreamReadContext
free(Buffer); free(Buffer);
} }
bool FillFromStream(const intrusive_ptr<Stream>& stream); bool FillFromStream(const intrusive_ptr<Stream>& stream, bool may_wait);
void DropData(size_t count); void DropData(size_t count);
char *Buffer; char *Buffer;
size_t Size; size_t Size;
bool MustRead; bool MustRead;
bool Eof; bool Eof;
bool Wait;
}; };
enum StreamReadStatus enum StreamReadStatus
@ -117,7 +116,7 @@ public:
void RegisterDataHandler(const boost::function<void(void)>& handler); void RegisterDataHandler(const boost::function<void(void)>& handler);
StreamReadStatus ReadLine(String *line, StreamReadContext& context); StreamReadStatus ReadLine(String *line, StreamReadContext& context, bool may_wait = false);
protected: protected:
void SignalDataAvailable(void); void SignalDataAvailable(void);

View File

@ -40,7 +40,7 @@ static Timer::Ptr l_ApiClientTimeoutTimer;
ApiClient::ApiClient(const String& identity, bool authenticated, const TlsStream::Ptr& stream, ConnectionRole role) ApiClient::ApiClient(const String& identity, bool authenticated, const TlsStream::Ptr& stream, ConnectionRole role)
: m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), m_Role(role), m_Seen(Utility::GetTime()), : m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), m_Role(role), m_Seen(Utility::GetTime()),
m_NextHeartbeat(0), m_HeartbeatTimeout(0), m_Context(false) m_NextHeartbeat(0), m_HeartbeatTimeout(0)
{ {
boost::call_once(l_ApiClientOnceFlag, &ApiClient::StaticInitialize); boost::call_once(l_ApiClientOnceFlag, &ApiClient::StaticInitialize);
@ -59,6 +59,8 @@ void ApiClient::StaticInitialize(void)
void ApiClient::Start(void) void ApiClient::Start(void)
{ {
m_Stream->RegisterDataHandler(boost::bind(&ApiClient::DataAvailableHandler, this)); m_Stream->RegisterDataHandler(boost::bind(&ApiClient::DataAvailableHandler, this));
if (m_Stream->IsDataAvailable())
DataAvailableHandler();
} }
String ApiClient::GetIdentity(void) const String ApiClient::GetIdentity(void) const
@ -195,6 +197,8 @@ bool ApiClient::ProcessMessage(void)
void ApiClient::DataAvailableHandler(void) void ApiClient::DataAvailableHandler(void)
{ {
boost::mutex::scoped_lock lock(m_DataHandlerMutex);
try { try {
while (ProcessMessage()) while (ProcessMessage())
; /* empty loop body */ ; /* empty loop body */

View File

@ -74,6 +74,7 @@ private:
double m_NextHeartbeat; double m_NextHeartbeat;
double m_HeartbeatTimeout; double m_HeartbeatTimeout;
Timer::Ptr m_TimeoutTimer; Timer::Ptr m_TimeoutTimer;
boost::mutex m_DataHandlerMutex;
StreamReadContext m_Context; StreamReadContext m_Context;