Use multiple WorkQueues to process cluster messages

refs #11014
This commit is contained in:
Gunnar Beutner 2016-01-27 15:45:58 +01:00
parent 864cdee8d9
commit d1b705613d
2 changed files with 23 additions and 10 deletions

View File

@ -37,10 +37,13 @@ REGISTER_APIFUNCTION(RequestCertificate, pki, &RequestCertificateHandler);
static boost::once_flag l_JsonRpcConnectionOnceFlag = BOOST_ONCE_INIT; static boost::once_flag l_JsonRpcConnectionOnceFlag = BOOST_ONCE_INIT;
static Timer::Ptr l_JsonRpcConnectionTimeoutTimer; static Timer::Ptr l_JsonRpcConnectionTimeoutTimer;
static WorkQueue *l_JsonRpcConnectionWorkQueues;
static size_t l_JsonRpcConnectionWorkQueueCount;
static int l_JsonRpcConnectionNextID;
JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated, JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
const TlsStream::Ptr& stream, ConnectionRole role) const TlsStream::Ptr& stream, ConnectionRole role)
: m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), : m_ID(l_JsonRpcConnectionNextID++), m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream),
m_Role(role), m_Timestamp(Utility::GetTime()), m_Seen(Utility::GetTime()), m_Role(role), m_Timestamp(Utility::GetTime()), m_Seen(Utility::GetTime()),
m_NextHeartbeat(0), m_HeartbeatTimeout(0) m_NextHeartbeat(0), m_HeartbeatTimeout(0)
{ {
@ -56,6 +59,9 @@ void JsonRpcConnection::StaticInitialize(void)
l_JsonRpcConnectionTimeoutTimer->OnTimerExpired.connect(boost::bind(&JsonRpcConnection::TimeoutTimerHandler)); l_JsonRpcConnectionTimeoutTimer->OnTimerExpired.connect(boost::bind(&JsonRpcConnection::TimeoutTimerHandler));
l_JsonRpcConnectionTimeoutTimer->SetInterval(15); l_JsonRpcConnectionTimeoutTimer->SetInterval(15);
l_JsonRpcConnectionTimeoutTimer->Start(); l_JsonRpcConnectionTimeoutTimer->Start();
l_JsonRpcConnectionWorkQueueCount = Application::GetConcurrency();
l_JsonRpcConnectionWorkQueues = new WorkQueue[l_JsonRpcConnectionWorkQueueCount];
} }
void JsonRpcConnection::Start(void) void JsonRpcConnection::Start(void)
@ -128,15 +134,8 @@ void JsonRpcConnection::Disconnect(void)
} }
} }
bool JsonRpcConnection::ProcessMessage(void) void JsonRpcConnection::MessageHandler(const Dictionary::Ptr& message)
{ {
Dictionary::Ptr message;
StreamReadStatus srs = JsonRpc::ReadMessage(m_Stream, &message, m_Context, false);
if (srs != StatusNewItem)
return false;
m_Seen = Utility::GetTime(); m_Seen = Utility::GetTime();
if (m_HeartbeatTimeout != 0) if (m_HeartbeatTimeout != 0)
@ -147,7 +146,7 @@ bool JsonRpcConnection::ProcessMessage(void)
/* ignore old messages */ /* ignore old messages */
if (ts < m_Endpoint->GetRemoteLogPosition()) if (ts < m_Endpoint->GetRemoteLogPosition())
return true; return;
m_Endpoint->SetRemoteLogPosition(ts); m_Endpoint->SetRemoteLogPosition(ts);
} }
@ -190,6 +189,18 @@ bool JsonRpcConnection::ProcessMessage(void)
resultMessage->Set("id", message->Get("id")); resultMessage->Set("id", message->Get("id"));
JsonRpc::SendMessage(m_Stream, resultMessage); JsonRpc::SendMessage(m_Stream, resultMessage);
} }
}
bool JsonRpcConnection::ProcessMessage(void)
{
Dictionary::Ptr message;
StreamReadStatus srs = JsonRpc::ReadMessage(m_Stream, &message, m_Context, false);
if (srs != StatusNewItem)
return false;
l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(boost::bind(&JsonRpcConnection::MessageHandler, JsonRpcConnection::Ptr(this), message));
return true; return true;
} }

View File

@ -72,6 +72,7 @@ public:
static Value HeartbeatAPIHandler(const intrusive_ptr<MessageOrigin>& origin, const Dictionary::Ptr& params); static Value HeartbeatAPIHandler(const intrusive_ptr<MessageOrigin>& origin, const Dictionary::Ptr& params);
private: private:
int m_ID;
String m_Identity; String m_Identity;
bool m_Authenticated; bool m_Authenticated;
Endpoint::Ptr m_Endpoint; Endpoint::Ptr m_Endpoint;
@ -86,6 +87,7 @@ private:
StreamReadContext m_Context; StreamReadContext m_Context;
bool ProcessMessage(void); bool ProcessMessage(void);
void MessageHandler(const Dictionary::Ptr& message);
void DataAvailableHandler(void); void DataAvailableHandler(void);
static void StaticInitialize(void); static void StaticInitialize(void);