Don't use separate threads for each ApiClient object

fixes #6109
This commit is contained in:
Gunnar Beutner 2015-02-14 16:34:36 +01:00
parent e0bbfb175c
commit f097e48889
25 changed files with 271 additions and 152 deletions

View File

@ -314,7 +314,8 @@ void DynamicObject::RestoreObjects(const String& filename, int attributeTypes)
WorkQueue upq(25000, Application::GetConcurrency()); WorkQueue upq(25000, Application::GetConcurrency());
String message; String message;
while (NetString::ReadStringFromStream(sfp, &message)) { StreamReadContext src;
while (NetString::ReadStringFromStream(sfp, &message, src) == StatusNewItem) {
upq.Enqueue(boost::bind(&DynamicObject::RestoreObject, message, attributeTypes)); upq.Enqueue(boost::bind(&DynamicObject::RestoreObject, message, attributeTypes));
restored++; restored++;
} }

View File

@ -92,8 +92,10 @@ size_t FIFO::Peek(void *buffer, size_t count)
/** /**
* Implements IOQueue::Read. * Implements IOQueue::Read.
*/ */
size_t FIFO::Read(void *buffer, size_t count) size_t FIFO::Read(void *buffer, size_t count, bool allow_partial)
{ {
ASSERT(allow_partial);
if (count > m_DataSize) if (count > m_DataSize)
count = m_DataSize; count = m_DataSize;
@ -116,6 +118,8 @@ void FIFO::Write(const void *buffer, size_t count)
ResizeBuffer(m_Offset + m_DataSize + count, false); ResizeBuffer(m_Offset + m_DataSize + count, false);
std::memcpy(m_Buffer + m_Offset + m_DataSize, buffer, count); std::memcpy(m_Buffer + m_Offset + m_DataSize, buffer, count);
m_DataSize += count; m_DataSize += count;
SignalDataAvailable();
} }
void FIFO::Close(void) void FIFO::Close(void)
@ -130,3 +134,13 @@ size_t FIFO::GetAvailableBytes(void) const
{ {
return m_DataSize; return m_DataSize;
} }
bool FIFO::SupportsWaiting(void) const
{
return true;
}
bool FIFO::IsDataAvailable(void) const
{
return m_DataSize > 0;
}

View File

@ -42,10 +42,12 @@ public:
~FIFO(void); ~FIFO(void);
size_t Peek(void *buffer, size_t count); size_t Peek(void *buffer, size_t count);
virtual size_t Read(void *buffer, size_t count); virtual size_t Read(void *buffer, size_t count, bool allow_partial = false);
virtual void Write(const void *buffer, size_t count); virtual void Write(const void *buffer, size_t count);
virtual void Close(void); virtual void Close(void);
virtual bool IsEof(void) const; virtual bool IsEof(void) const;
virtual bool SupportsWaiting(void) const;
virtual bool IsDataAvailable(void) const;
size_t GetAvailableBytes(void) const; size_t GetAvailableBytes(void) const;

View File

@ -32,86 +32,66 @@ using namespace icinga;
* @exception invalid_argument The input stream is invalid. * @exception invalid_argument The input stream is invalid.
* @see https://github.com/PeterScott/netstring-c/blob/master/netstring.c * @see https://github.com/PeterScott/netstring-c/blob/master/netstring.c
*/ */
bool NetString::ReadStringFromStream(const Stream::Ptr& stream, String *str) StreamReadStatus NetString::ReadStringFromStream(const Stream::Ptr& stream, String *str, StreamReadContext& context)
{ {
/* 16 bytes are enough for the header */ if (stream->IsEof())
const size_t header_length = 16; return StatusEof;
size_t read_length;
char *header = static_cast<char *>(malloc(header_length));
if (header == NULL) if (context.MustRead && context.FillFromStream(stream) == 0)
BOOST_THROW_EXCEPTION(std::bad_alloc()); return StatusEof;
read_length = 0; size_t header_length = 0;
while (read_length < header_length) { for (size_t i = 0; i < context.Size; i++) {
/* Read one byte. */ if (context.Buffer[i] == ':') {
int rc = stream->Read(header + read_length, 1); header_length = i;
if (rc == 0) {
if (read_length == 0) {
free(header);
return false;
}
BOOST_THROW_EXCEPTION(std::runtime_error("Read() failed."));
}
ASSERT(rc == 1);
read_length++;
if (header[read_length - 1] == ':') {
break; break;
} else if (header_length == read_length) { } else if (i > 16)
free(header);
BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid NetString (missing :)")); BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid NetString (missing :)"));
}
} }
if (header_length == 0) {
context.MustRead = true;
return StatusNeedData;
}
/* make sure there's a header */
if (header_length == 0)
BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid NetString (no length specifier)"));
/* no leading zeros allowed */ /* no leading zeros allowed */
if (header[0] == '0' && isdigit(header[1])) { if (context.Buffer[0] == '0' && isdigit(context.Buffer[1]))
free(header);
BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid NetString (leading zero)")); BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid NetString (leading zero)"));
}
size_t len, i; size_t len, i;
len = 0; len = 0;
for (i = 0; i < read_length && isdigit(header[i]); i++) { for (i = 0; i < header_length && isdigit(context.Buffer[i]); i++) {
/* length specifier must have at most 9 characters */ /* length specifier must have at most 9 characters */
if (i >= 9) { if (i >= 9)
free(header);
BOOST_THROW_EXCEPTION(std::invalid_argument("Length specifier must not exceed 9 characters")); BOOST_THROW_EXCEPTION(std::invalid_argument("Length specifier must not exceed 9 characters"));
}
len = len * 10 + (header[i] - '0'); len = len * 10 + (context.Buffer[i] - '0');
} }
free(header);
/* read the whole message */ /* read the whole message */
size_t data_length = len + 1; size_t data_length = len + 1;
char *data = static_cast<char *>(malloc(data_length)); char *data = context.Buffer + header_length + 1;
if (data == NULL) { if (context.Size < header_length + 1 + len + 1) {
BOOST_THROW_EXCEPTION(std::bad_alloc()); context.MustRead = true;
return StatusNeedData;
} }
size_t rc = stream->Read(data, data_length);
if (rc != data_length)
BOOST_THROW_EXCEPTION(std::runtime_error("Read() failed."));
if (data[len] != ',') if (data[len] != ',')
BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid NetString (missing ,)")); BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid NetString (missing ,)"));
*str = String(&data[0], &data[len]); *str = String(&data[0], &data[len]);
free(data); context.DropData(header_length + 1 + len + 1);
return true; return StatusNewItem;
} }
/** /**

View File

@ -38,7 +38,7 @@ class String;
class I2_BASE_API NetString class I2_BASE_API NetString
{ {
public: public:
static bool ReadStringFromStream(const Stream::Ptr& stream, String *message); static StreamReadStatus ReadStringFromStream(const Stream::Ptr& stream, String *message, StreamReadContext& context);
static void WriteStringToStream(const Stream::Ptr& stream, const String& message); static void WriteStringToStream(const Stream::Ptr& stream, const String& message);
private: private:

View File

@ -38,10 +38,12 @@ void NetworkStream::Close(void)
* @param count The number of bytes to read from the queue. * @param count The number of bytes to read from the queue.
* @returns The number of bytes actually read. * @returns The number of bytes actually read.
*/ */
size_t NetworkStream::Read(void *buffer, size_t count) size_t NetworkStream::Read(void *buffer, size_t count, bool allow_partial)
{ {
size_t rc; size_t rc;
ASSERT(allow_partial);
if (m_Eof) if (m_Eof)
BOOST_THROW_EXCEPTION(std::invalid_argument("Tried to read from closed socket.")); BOOST_THROW_EXCEPTION(std::invalid_argument("Tried to read from closed socket."));

View File

@ -39,7 +39,7 @@ public:
NetworkStream(const Socket::Ptr& socket); NetworkStream(const Socket::Ptr& socket);
virtual size_t Read(void *buffer, size_t count); virtual size_t Read(void *buffer, size_t count, bool allow_partial = false);
virtual void Write(const void *buffer, size_t count); virtual void Write(const void *buffer, size_t count);
virtual void Close(void); virtual void Close(void);

View File

@ -38,7 +38,7 @@ StdioStream::~StdioStream(void)
Close(); Close();
} }
size_t StdioStream::Read(void *buffer, size_t size) size_t StdioStream::Read(void *buffer, size_t size, bool allow_partial)
{ {
ObjectLock olock(this); ObjectLock olock(this);

View File

@ -34,7 +34,7 @@ public:
StdioStream(std::iostream *innerStream, bool ownsStream); StdioStream(std::iostream *innerStream, bool ownsStream);
~StdioStream(void); ~StdioStream(void);
virtual size_t Read(void *buffer, size_t size); virtual size_t Read(void *buffer, size_t size, bool allow_partial = false);
virtual void Write(const void *buffer, size_t size); virtual void Write(const void *buffer, size_t size);
virtual void Close(void); virtual void Close(void);

View File

@ -22,54 +22,109 @@
using namespace icinga; using namespace icinga;
bool Stream::ReadLine(String *line, ReadLineContext& context) void Stream::RegisterDataHandler(const boost::function<void(void)>& handler)
{ {
if (context.Eof) if (SupportsWaiting())
return false; OnDataAvailable.connect(handler);
else
BOOST_THROW_EXCEPTION(std::runtime_error("Stream does not support waiting."));
}
for (;;) { bool Stream::SupportsWaiting(void) const
if (context.MustRead) { {
context.Buffer = (char *)realloc(context.Buffer, context.Size + 4096); return false;
}
if (!context.Buffer) bool Stream::IsDataAvailable(void) const
throw std::bad_alloc(); {
return false;
}
size_t rc = Read(context.Buffer + context.Size, 4096); void Stream::SignalDataAvailable(void)
{
OnDataAvailable();
if (rc == 0) { {
*line = String(context.Buffer, &(context.Buffer[context.Size])); boost::mutex::scoped_lock lock(m_Mutex);
boost::algorithm::trim_right(*line); m_CV.notify_all();
context.Eof = true;
return true;
}
context.Size += rc;
}
int count = 0;
size_t first_newline;
for (size_t i = 0; i < context.Size; i++) {
if (context.Buffer[i] == '\n') {
count++;
if (count == 1)
first_newline = i;
}
}
context.MustRead = (count <= 1);
if (count > 0) {
*line = String(context.Buffer, &(context.Buffer[first_newline]));
boost::algorithm::trim_right(*line);
memmove(context.Buffer, context.Buffer + first_newline + 1, context.Size - first_newline - 1);
context.Size -= first_newline + 1;
return true;
}
} }
} }
void Stream::WaitForData(void)
{
if (!SupportsWaiting())
BOOST_THROW_EXCEPTION(std::runtime_error("Stream does not support waiting."));
boost::mutex::scoped_lock lock(m_Mutex);
while (!IsDataAvailable())
m_CV.wait(lock);
}
StreamReadStatus Stream::ReadLine(String *line, StreamReadContext& context)
{
if (IsEof())
return StatusEof;
if (context.MustRead) {
if (!context.FillFromStream(this)) {
*line = String(context.Buffer, &(context.Buffer[context.Size]));
boost::algorithm::trim_right(*line);
return StatusNewItem;
}
}
int count = 0;
size_t first_newline;
for (size_t i = 0; i < context.Size; i++) {
if (context.Buffer[i] == '\n') {
count++;
if (count == 1)
first_newline = i;
}
}
context.MustRead = (count <= 1);
if (count > 0) {
*line = String(context.Buffer, &(context.Buffer[first_newline]));
boost::algorithm::trim_right(*line);
context.DropData(first_newline + 1);
return StatusNewItem;
}
return StatusNeedData;
}
bool StreamReadContext::FillFromStream(const Stream::Ptr& stream)
{
if (Wait && stream->SupportsWaiting())
stream->WaitForData();
do {
Buffer = (char *)realloc(Buffer, Size + 4096);
if (!Buffer)
throw std::bad_alloc();
size_t rc = stream->Read(Buffer + Size, 4096, true);
if (rc == 0 && stream->IsEof())
return false;
Size += rc;
} while (stream->IsDataAvailable());
return true;
}
void StreamReadContext::DropData(size_t count)
{
memmove(Buffer, Buffer + count, Size - count);
Size -= count;
}

View File

@ -22,11 +22,13 @@
#include "base/i2-base.hpp" #include "base/i2-base.hpp"
#include "base/object.hpp" #include "base/object.hpp"
#include <boost/signals2.hpp>
namespace icinga namespace icinga
{ {
class String; class String;
class Stream;
enum ConnectionRole enum ConnectionRole
{ {
@ -34,20 +36,31 @@ enum ConnectionRole
RoleServer RoleServer
}; };
struct ReadLineContext struct StreamReadContext
{ {
ReadLineContext(void) : Buffer(NULL), Size(0), Eof(false), MustRead(true) StreamReadContext(bool wait = true)
: Buffer(NULL), Size(0), MustRead(true), Wait(wait)
{ } { }
~ReadLineContext(void) ~StreamReadContext(void)
{ {
free(Buffer); free(Buffer);
} }
bool FillFromStream(const intrusive_ptr<Stream>& stream);
void DropData(size_t count);
char *Buffer; char *Buffer;
size_t Size; size_t Size;
bool Eof;
bool MustRead; bool MustRead;
bool Wait;
};
enum StreamReadStatus
{
StatusNewItem,
StatusNeedData,
StatusEof
}; };
/** /**
@ -66,9 +79,10 @@ public:
* @param buffer The buffer where data should be stored. May be NULL if you're * @param buffer The buffer where data should be stored. May be NULL if you're
* not actually interested in the data. * not actually interested in the data.
* @param count The number of bytes to read from the queue. * @param count The number of bytes to read from the queue.
* @param allow_partial Whether to allow partial reads.
* @returns The number of bytes actually read. * @returns The number of bytes actually read.
*/ */
virtual size_t Read(void *buffer, size_t count) = 0; virtual size_t Read(void *buffer, size_t count, bool allow_partial = false) = 0;
/** /**
* Writes data to the stream. * Writes data to the stream.
@ -91,7 +105,27 @@ public:
*/ */
virtual bool IsEof(void) const = 0; virtual bool IsEof(void) const = 0;
bool ReadLine(String *line, ReadLineContext& context); /**
* Waits until data can be read from the stream.
*/
void WaitForData(void);
virtual bool SupportsWaiting(void) const;
virtual bool IsDataAvailable(void) const;
void RegisterDataHandler(const boost::function<void(void)>& handler);
StreamReadStatus ReadLine(String *line, StreamReadContext& context);
protected:
void SignalDataAvailable(void);
private:
boost::signals2::signal<void(void)> OnDataAvailable;
boost::mutex m_Mutex;
boost::condition_variable m_CV;
}; };
} }

View File

@ -147,7 +147,7 @@ void TlsStream::OnEvent(int revents)
rc = SSL_write(m_SSL.get(), buffer, count); rc = SSL_write(m_SSL.get(), buffer, count);
if (rc > 0) if (rc > 0)
m_SendQ->Read(NULL, rc); m_SendQ->Read(NULL, rc, true);
break; break;
case TlsActionHandshake: case TlsActionHandshake:
@ -181,13 +181,17 @@ void TlsStream::OnEvent(int revents)
ChangeEvents(POLLIN); ChangeEvents(POLLIN);
} }
lock.unlock();
if (m_RecvQ->IsDataAvailable())
SignalDataAvailable();
return; return;
} }
err = SSL_get_error(m_SSL.get(), rc); err = SSL_get_error(m_SSL.get(), rc);
std::ostringstream msgbuf; std::ostringstream msgbuf;
char errbuf[120];
switch (err) { switch (err) {
case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_READ:
@ -251,16 +255,17 @@ void TlsStream::Handshake(void)
/** /**
* Processes data for the stream. * Processes data for the stream.
*/ */
size_t TlsStream::Read(void *buffer, size_t count) size_t TlsStream::Read(void *buffer, size_t count, bool allow_partial)
{ {
boost::mutex::scoped_lock lock(m_Mutex); boost::mutex::scoped_lock lock(m_Mutex);
while (m_RecvQ->GetAvailableBytes() < count && !m_ErrorOccurred && !m_Eof) if (!allow_partial)
m_CV.wait(lock); while (m_RecvQ->GetAvailableBytes() < count && !m_ErrorOccurred && !m_Eof)
m_CV.wait(lock);
HandleError(); HandleError();
return m_RecvQ->Read(buffer, count); return m_RecvQ->Read(buffer, count, true);
} }
void TlsStream::Write(const void *buffer, size_t count) void TlsStream::Write(const void *buffer, size_t count)
@ -291,3 +296,15 @@ bool TlsStream::IsEof(void) const
{ {
return m_Eof; return m_Eof;
} }
bool TlsStream::SupportsWaiting(void) const
{
return true;
}
bool TlsStream::IsDataAvailable(void) const
{
boost::mutex::scoped_lock lock(m_Mutex);
return m_RecvQ->GetAvailableBytes() > 0;
}

View File

@ -58,11 +58,14 @@ public:
virtual void Close(void); virtual void Close(void);
virtual size_t Read(void *buffer, size_t count); virtual size_t Read(void *buffer, size_t count, bool allow_partial = false);
virtual void Write(const void *buffer, size_t count); virtual void Write(const void *buffer, size_t count);
virtual bool IsEof(void) const; virtual bool IsEof(void) const;
virtual bool SupportsWaiting(void) const;
virtual bool IsDataAvailable(void) const;
bool IsVerifyOK(void) const; bool IsVerifyOK(void) const;
private: private:

View File

@ -84,6 +84,7 @@ int ObjectListCommand::Run(const boost::program_options::variables_map& vm, cons
std::map<String, int> type_count; std::map<String, int> type_count;
String message; String message;
StreamReadContext src;
String name_filter, type_filter; String name_filter, type_filter;
if (vm.count("name")) if (vm.count("name"))
@ -93,7 +94,7 @@ int ObjectListCommand::Run(const boost::program_options::variables_map& vm, cons
bool first = true; bool first = true;
while (NetString::ReadStringFromStream(sfp, &message)) { while (NetString::ReadStringFromStream(sfp, &message, src) == StatusNewItem) {
PrintObject(std::cout, first, message, type_count, name_filter, type_filter); PrintObject(std::cout, first, message, type_count, name_filter, type_filter);
objects_count++; objects_count++;
} }

View File

@ -262,9 +262,16 @@ int PkiUtility::RequestCertificate(const String& host, const String& port, const
JsonRpc::SendMessage(stream, request); JsonRpc::SendMessage(stream, request);
Dictionary::Ptr response; Dictionary::Ptr response;
StreamReadContext src;
for (;;) { for (;;) {
response = JsonRpc::ReadMessage(stream); StreamReadStatus srs = JsonRpc::ReadMessage(stream, &response, src);
if (srs == StatusEof)
break;
if (srs != StatusNewItem)
continue;
if (response && response->Contains("error")) { if (response && response->Contains("error")) {
Log(LogCritical, "cli", "Could not fetch valid response. Please check the master log (notice or debug)."); Log(LogCritical, "cli", "Could not fetch valid response. Please check the master log (notice or debug).");

View File

@ -39,8 +39,9 @@ Value VariableUtility::GetVariable(const String& name)
StdioStream::Ptr sfp = new StdioStream(&fp, false); StdioStream::Ptr sfp = new StdioStream(&fp, false);
String message; String message;
StreamReadContext src;
while (NetString::ReadStringFromStream(sfp, &message)) { while (NetString::ReadStringFromStream(sfp, &message, src) == StatusNewItem) {
Dictionary::Ptr variable = JsonDecode(message); Dictionary::Ptr variable = JsonDecode(message);
if (variable->Get("name") == name) { if (variable->Get("name") == name) {
@ -62,8 +63,9 @@ void VariableUtility::PrintVariables(std::ostream& outfp)
unsigned long variables_count = 0; unsigned long variables_count = 0;
String message; String message;
StreamReadContext src;
while (NetString::ReadStringFromStream(sfp, &message)) { while (NetString::ReadStringFromStream(sfp, &message, src) == StatusNewItem) {
Dictionary::Ptr variable = JsonDecode(message); Dictionary::Ptr variable = JsonDecode(message);
outfp << variable->Get("name") << " = " << variable->Get("value") << "\n"; outfp << variable->Get("name") << " = " << variable->Get("value") << "\n";
variables_count++; variables_count++;

View File

@ -30,7 +30,7 @@ namespace icinga
/** /**
* @ingroup icinga * @ingroup icinga
*/ */
class I2_BASE_API ObjectUtils class I2_ICINGA_API ObjectUtils
{ {
public: public:
static Service::Ptr GetService(const String& host, const String& name); static Service::Ptr GetService(const String& host, const String& name);

View File

@ -179,11 +179,19 @@ void LivestatusListener::ClientHandler(const Socket::Ptr& client)
for (;;) { for (;;) {
String line; String line;
ReadLineContext context; StreamReadContext context;
std::vector<String> lines; std::vector<String> lines;
while (stream->ReadLine(&line, context)) { for (;;) {
StreamReadStatus srs = stream->ReadLine(&line, context);
if (srs == StatusEof)
break;
if (srs != StatusNewItem)
continue;
if (line.GetLength() > 0) if (line.GetLength() > 0)
lines.push_back(line); lines.push_back(line);
else else

View File

@ -36,7 +36,7 @@ REGISTER_APIFUNCTION(RequestCertificate, pki, &RequestCertificateHandler);
ApiClient::ApiClient(const String& identity, bool authenticated, const TlsStream::Ptr& stream, ConnectionRole role) ApiClient::ApiClient(const String& identity, bool authenticated, const TlsStream::Ptr& stream, ConnectionRole role)
: m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), m_Role(role), m_Seen(Utility::GetTime()), : m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), m_Role(role), m_Seen(Utility::GetTime()),
m_NextHeartbeat(0) m_NextHeartbeat(0), m_Context(false)
{ {
if (authenticated) if (authenticated)
m_Endpoint = Endpoint::GetByName(identity); m_Endpoint = Endpoint::GetByName(identity);
@ -44,8 +44,7 @@ ApiClient::ApiClient(const String& identity, bool authenticated, const TlsStream
void ApiClient::Start(void) void ApiClient::Start(void)
{ {
boost::thread thread(boost::bind(&ApiClient::MessageThreadProc, ApiClient::Ptr(this))); m_Stream->RegisterDataHandler(boost::bind(&ApiClient::DataAvailableHandler, this));
thread.detach();
} }
String ApiClient::GetIdentity(void) const String ApiClient::GetIdentity(void) const
@ -134,21 +133,9 @@ bool ApiClient::ProcessMessage(void)
{ {
Dictionary::Ptr message; Dictionary::Ptr message;
if (m_Stream->IsEof()) StreamReadStatus srs = JsonRpc::ReadMessage(m_Stream, &message, m_Context);
return false;
try { if (srs != StatusNewItem)
message = JsonRpc::ReadMessage(m_Stream);
} catch (const openssl_error& ex) {
const unsigned long *pe = boost::get_error_info<errinfo_openssl_error>(ex);
if (pe && *pe == 0)
return false; /* Connection was closed cleanly */
throw;
}
if (!message)
return false; return false;
if (message->Get("method") != "log::SetLogPosition") if (message->Get("method") != "log::SetLogPosition")
@ -208,19 +195,17 @@ bool ApiClient::ProcessMessage(void)
return true; return true;
} }
void ApiClient::MessageThreadProc(void) void ApiClient::DataAvailableHandler(void)
{ {
Utility::SetThreadName("API Client");
try { try {
while (ProcessMessage()) while (ProcessMessage())
; /* empty loop body */ ; /* empty loop body */
} catch (const std::exception& ex) { } catch (const std::exception& ex) {
Log(LogWarning, "ApiClient") Log(LogWarning, "ApiClient")
<< "Error while reading JSON-RPC message for identity '" << m_Identity << "': " << DiagnosticInformation(ex); << "Error while reading JSON-RPC message for identity '" << m_Identity << "': " << DiagnosticInformation(ex);
}
Disconnect(); Disconnect();
}
} }
Value SetLogPositionHandler(const MessageOrigin& origin, const Dictionary::Ptr& params) Value SetLogPositionHandler(const MessageOrigin& origin, const Dictionary::Ptr& params)

View File

@ -74,10 +74,12 @@ private:
double m_Seen; double m_Seen;
double m_NextHeartbeat; double m_NextHeartbeat;
StreamReadContext m_Context;
WorkQueue m_WriteQueue; WorkQueue m_WriteQueue;
bool ProcessMessage(void); bool ProcessMessage(void);
void MessageThreadProc(void); void DataAvailableHandler(void);
void SendMessageSync(const Dictionary::Ptr& request); void SendMessageSync(const Dictionary::Ptr& request);
}; };

View File

@ -686,11 +686,12 @@ void ApiListener::ReplayLog(const ApiClient::Ptr& client)
StdioStream::Ptr logStream = new StdioStream(fp, true); StdioStream::Ptr logStream = new StdioStream(fp, true);
String message; String message;
StreamReadContext src;
while (true) { while (true) {
Dictionary::Ptr pmessage; Dictionary::Ptr pmessage;
try { try {
if (!NetString::ReadStringFromStream(logStream, &message)) if (NetString::ReadStringFromStream(logStream, &message, src) != StatusNewItem)
break; break;
pmessage = JsonDecode(message); pmessage = JsonDecode(message);

View File

@ -36,11 +36,13 @@ void JsonRpc::SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& mess
NetString::WriteStringToStream(stream, json); NetString::WriteStringToStream(stream, json);
} }
Dictionary::Ptr JsonRpc::ReadMessage(const Stream::Ptr& stream) StreamReadStatus JsonRpc::ReadMessage(const Stream::Ptr& stream, Dictionary::Ptr *message, StreamReadContext& src)
{ {
String jsonString; String jsonString;
if (!NetString::ReadStringFromStream(stream, &jsonString)) StreamReadStatus srs = NetString::ReadStringFromStream(stream, &jsonString, src);
return Dictionary::Ptr();
if (srs != StatusNewItem)
return srs;
//std::cerr << "<< " << jsonString << std::endl; //std::cerr << "<< " << jsonString << std::endl;
Value value = JsonDecode(jsonString); Value value = JsonDecode(jsonString);
@ -50,5 +52,7 @@ Dictionary::Ptr JsonRpc::ReadMessage(const Stream::Ptr& stream)
" message must be a dictionary.")); " message must be a dictionary."));
} }
return value; *message = value;
return StatusNewItem;
} }

View File

@ -36,7 +36,7 @@ class I2_REMOTE_API JsonRpc
{ {
public: public:
static void SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& message); static void SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& message);
static Dictionary::Ptr ReadMessage(const Stream::Ptr& stream); static StreamReadStatus ReadMessage(const Stream::Ptr& stream, Dictionary::Ptr *message, StreamReadContext& src);
private: private:
JsonRpc(void); JsonRpc(void);

View File

@ -32,7 +32,8 @@ BOOST_AUTO_TEST_CASE(netstring)
NetString::WriteStringToStream(fifo, "hello"); NetString::WriteStringToStream(fifo, "hello");
String s; String s;
BOOST_CHECK(NetString::ReadStringFromStream(fifo, &s)); StreamReadContext src;
BOOST_CHECK(NetString::ReadStringFromStream(fifo, &s, src));
BOOST_CHECK(s == "hello"); BOOST_CHECK(s == "hello");
fifo->Close(); fifo->Close();

View File

@ -34,7 +34,7 @@ BOOST_AUTO_TEST_CASE(readline_stdio)
StdioStream::Ptr stdstream = new StdioStream(&msgbuf, false); StdioStream::Ptr stdstream = new StdioStream(&msgbuf, false);
ReadLineContext rlc; StreamReadContext rlc;
String line; String line;
BOOST_CHECK(stdstream->ReadLine(&line, rlc)); BOOST_CHECK(stdstream->ReadLine(&line, rlc));