Decode cluster messages in the WorkQueue threads

refs #11014
This commit is contained in:
Gunnar Beutner 2016-01-27 16:43:23 +01:00
parent d1b705613d
commit 85c962a587
5 changed files with 41 additions and 12 deletions

View File

@ -232,11 +232,12 @@ int PkiUtility::RequestCertificate(const String& host, const String& port, const
JsonRpc::SendMessage(stream, request);
String jsonString;
Dictionary::Ptr response;
StreamReadContext src;
for (;;) {
StreamReadStatus srs = JsonRpc::ReadMessage(stream, &response, src);
StreamReadStatus srs = JsonRpc::ReadMessage(stream, &jsonString, src);
if (srs == StatusEof)
break;
@ -244,6 +245,8 @@ int PkiUtility::RequestCertificate(const String& host, const String& port, const
if (srs != StatusNewItem)
continue;
response = JsonRpc::DecodeMessage(jsonString);
if (response && response->Contains("error")) {
Log(LogCritical, "cli", "Could not fetch valid response. Please check the master log (notice or debug).");
#ifdef I2_DEBUG

View File

@ -34,7 +34,7 @@ void JsonRpc::SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& mess
NetString::WriteStringToStream(stream, json);
}
StreamReadStatus JsonRpc::ReadMessage(const Stream::Ptr& stream, Dictionary::Ptr *message, StreamReadContext& src, bool may_wait)
StreamReadStatus JsonRpc::ReadMessage(const Stream::Ptr& stream, String *message, StreamReadContext& src, bool may_wait)
{
String jsonString;
StreamReadStatus srs = NetString::ReadStringFromStream(stream, &jsonString, src, may_wait);
@ -42,15 +42,19 @@ StreamReadStatus JsonRpc::ReadMessage(const Stream::Ptr& stream, Dictionary::Ptr
if (srs != StatusNewItem)
return srs;
Value value = JsonDecode(jsonString);
*message = jsonString;
return StatusNewItem;
}
Dictionary::Ptr JsonRpc::DecodeMessage(const String& message)
{
Value value = JsonDecode(message);
if (!value.IsObjectType<Dictionary>()) {
BOOST_THROW_EXCEPTION(std::invalid_argument("JSON-RPC"
" message must be a dictionary."));
}
*message = value;
return StatusNewItem;
return value;
}

View File

@ -36,7 +36,8 @@ class I2_REMOTE_API JsonRpc
{
public:
static void SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& message);
static StreamReadStatus ReadMessage(const Stream::Ptr& stream, Dictionary::Ptr *message, StreamReadContext& src, bool may_wait = false);
static StreamReadStatus ReadMessage(const Stream::Ptr& stream, String *message, StreamReadContext& src, bool may_wait = false);
static Dictionary::Ptr DecodeMessage(const String& message);
private:
JsonRpc(void);

View File

@ -134,8 +134,28 @@ void JsonRpcConnection::Disconnect(void)
}
}
void JsonRpcConnection::MessageHandler(const Dictionary::Ptr& message)
void JsonRpcConnection::MessageHandlerWrapper(const String& jsonString)
{
if (m_Stream->IsEof())
return;
try {
MessageHandler(jsonString);
} catch (const std::exception& ex) {
Log(LogWarning, "JsonRpcConnection")
<< "Error while reading JSON-RPC message for identity '" << m_Identity
<< "': " << DiagnosticInformation(ex);
Disconnect();
return;
}
}
void JsonRpcConnection::MessageHandler(const String& jsonString)
{
Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString);
m_Seen = Utility::GetTime();
if (m_HeartbeatTimeout != 0)
@ -193,14 +213,14 @@ void JsonRpcConnection::MessageHandler(const Dictionary::Ptr& message)
bool JsonRpcConnection::ProcessMessage(void)
{
Dictionary::Ptr message;
String message;
StreamReadStatus srs = JsonRpc::ReadMessage(m_Stream, &message, m_Context, false);
if (srs != StatusNewItem)
return false;
l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(boost::bind(&JsonRpcConnection::MessageHandler, JsonRpcConnection::Ptr(this), message));
l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(boost::bind(&JsonRpcConnection::MessageHandlerWrapper, JsonRpcConnection::Ptr(this), message));
return true;
}

View File

@ -87,7 +87,8 @@ private:
StreamReadContext m_Context;
bool ProcessMessage(void);
void MessageHandler(const Dictionary::Ptr& message);
void MessageHandlerWrapper(const String& jsonString);
void MessageHandler(const String& jsonString);
void DataAvailableHandler(void);
static void StaticInitialize(void);