Fix stability issues with the TlsStream/Stream classes

fixes #9481
This commit is contained in:
Gunnar Beutner 2015-06-24 09:44:59 +02:00
parent cc1bd53a96
commit 8dcb4efa5e
8 changed files with 26 additions and 14 deletions

View File

@ -32,13 +32,13 @@ using namespace icinga;
* @exception invalid_argument The input stream is invalid.
* @see https://github.com/PeterScott/netstring-c/blob/master/netstring.c
*/
StreamReadStatus NetString::ReadStringFromStream(const Stream::Ptr& stream, String *str, StreamReadContext& context)
StreamReadStatus NetString::ReadStringFromStream(const Stream::Ptr& stream, String *str, StreamReadContext& context, bool may_wait)
{
if (context.Eof)
return StatusEof;
if (context.MustRead) {
if (!context.FillFromStream(stream)) {
if (!context.FillFromStream(stream, may_wait)) {
context.Eof = true;
return StatusEof;
}

View File

@ -38,7 +38,7 @@ class String;
class I2_BASE_API NetString
{
public:
static StreamReadStatus ReadStringFromStream(const Stream::Ptr& stream, String *message, StreamReadContext& context);
static StreamReadStatus ReadStringFromStream(const Stream::Ptr& stream, String *message, StreamReadContext& context, bool may_wait = false);
static void WriteStringToStream(const Stream::Ptr& stream, const String& message);
private:

View File

@ -61,6 +61,11 @@ void StdioStream::Close(void)
}
}
bool StdioStream::IsDataAvailable(void) const
{
return !IsEof();
}
bool StdioStream::IsEof(void) const
{
return !m_InnerStream->good();

View File

@ -39,6 +39,7 @@ public:
virtual void Close(void);
virtual bool IsDataAvailable(void) const;
virtual bool IsEof(void) const;
private:

View File

@ -61,13 +61,13 @@ void Stream::WaitForData(void)
m_CV.wait(lock);
}
StreamReadStatus Stream::ReadLine(String *line, StreamReadContext& context)
StreamReadStatus Stream::ReadLine(String *line, StreamReadContext& context, bool may_wait)
{
if (context.Eof)
return StatusEof;
if (context.MustRead) {
if (!context.FillFromStream(this)) {
if (!context.FillFromStream(this, may_wait)) {
context.Eof = true;
*line = String(context.Buffer, &(context.Buffer[context.Size]));
@ -86,6 +86,8 @@ StreamReadStatus Stream::ReadLine(String *line, StreamReadContext& context)
if (count == 1)
first_newline = i;
else if (count > 1)
break;
}
}
@ -103,9 +105,9 @@ StreamReadStatus Stream::ReadLine(String *line, StreamReadContext& context)
return StatusNeedData;
}
bool StreamReadContext::FillFromStream(const Stream::Ptr& stream)
bool StreamReadContext::FillFromStream(const Stream::Ptr& stream, bool may_wait)
{
if (Wait && stream->SupportsWaiting())
if (may_wait && stream->SupportsWaiting())
stream->WaitForData();
size_t count = 0;
@ -120,7 +122,7 @@ bool StreamReadContext::FillFromStream(const Stream::Ptr& stream)
Size += rc;
count += rc;
} while (stream->IsDataAvailable());
} while (count < 64 * 1024 && stream->IsDataAvailable());
if (count == 0 && stream->IsEof())
return false;

View File

@ -38,8 +38,8 @@ enum ConnectionRole
struct StreamReadContext
{
StreamReadContext(bool wait = true)
: Buffer(NULL), Size(0), MustRead(true), Eof(false), Wait(wait)
StreamReadContext(void)
: Buffer(NULL), Size(0), MustRead(true), Eof(false)
{ }
~StreamReadContext(void)
@ -47,14 +47,13 @@ struct StreamReadContext
free(Buffer);
}
bool FillFromStream(const intrusive_ptr<Stream>& stream);
bool FillFromStream(const intrusive_ptr<Stream>& stream, bool may_wait);
void DropData(size_t count);
char *Buffer;
size_t Size;
bool MustRead;
bool Eof;
bool Wait;
};
enum StreamReadStatus
@ -117,7 +116,7 @@ public:
void RegisterDataHandler(const boost::function<void(void)>& handler);
StreamReadStatus ReadLine(String *line, StreamReadContext& context);
StreamReadStatus ReadLine(String *line, StreamReadContext& context, bool may_wait = false);
protected:
void SignalDataAvailable(void);

View File

@ -40,7 +40,7 @@ static Timer::Ptr l_ApiClientTimeoutTimer;
ApiClient::ApiClient(const String& identity, bool authenticated, const TlsStream::Ptr& stream, ConnectionRole role)
: m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), m_Role(role), m_Seen(Utility::GetTime()),
m_NextHeartbeat(0), m_HeartbeatTimeout(0), m_Context(false)
m_NextHeartbeat(0), m_HeartbeatTimeout(0)
{
boost::call_once(l_ApiClientOnceFlag, &ApiClient::StaticInitialize);
@ -59,6 +59,8 @@ void ApiClient::StaticInitialize(void)
void ApiClient::Start(void)
{
m_Stream->RegisterDataHandler(boost::bind(&ApiClient::DataAvailableHandler, this));
if (m_Stream->IsDataAvailable())
DataAvailableHandler();
}
String ApiClient::GetIdentity(void) const
@ -195,6 +197,8 @@ bool ApiClient::ProcessMessage(void)
void ApiClient::DataAvailableHandler(void)
{
boost::mutex::scoped_lock lock(m_DataHandlerMutex);
try {
while (ProcessMessage())
; /* empty loop body */

View File

@ -74,6 +74,7 @@ private:
double m_NextHeartbeat;
double m_HeartbeatTimeout;
Timer::Ptr m_TimeoutTimer;
boost::mutex m_DataHandlerMutex;
StreamReadContext m_Context;