mirror of
https://github.com/Icinga/icinga2.git
synced 2025-09-26 19:18:48 +02:00
JsonRpcConnection: Log message processing time stats
This commit is contained in:
parent
f64595b9b1
commit
2bd789e529
@ -60,13 +60,15 @@ void JsonRpcConnection::Start()
|
||||
|
||||
void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
|
||||
{
|
||||
namespace ch = std::chrono;
|
||||
|
||||
m_Stream->next_layer().SetSeen(&m_Seen);
|
||||
|
||||
for (;;) {
|
||||
String message;
|
||||
String jsonString;
|
||||
|
||||
try {
|
||||
message = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024);
|
||||
jsonString = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024);
|
||||
} catch (const std::exception& ex) {
|
||||
Log(m_ShuttingDown ? LogDebug : LogNotice, "JsonRpcConnection")
|
||||
<< "Error while reading JSON-RPC message for identity '" << m_Identity
|
||||
@ -76,18 +78,56 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
|
||||
}
|
||||
|
||||
m_Seen = Utility::GetTime();
|
||||
if (m_Endpoint) {
|
||||
m_Endpoint->AddMessageReceived(jsonString.GetLength());
|
||||
}
|
||||
|
||||
String rpcMethod("UNKNOWN");
|
||||
String diagnosticInfo; // Contains the diagnostic/debug information in case of an error.
|
||||
|
||||
ch::steady_clock::duration cpuBoundDuration, totalDuration;
|
||||
auto start (ch::steady_clock::now());
|
||||
|
||||
try {
|
||||
Defer extractTotalDuration ([&start, &totalDuration]() {
|
||||
totalDuration = ch::steady_clock::now() - start;
|
||||
});
|
||||
|
||||
CpuBoundWork handleMessage (yc);
|
||||
|
||||
// Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads.
|
||||
cpuBoundDuration = ch::steady_clock::now() - start;
|
||||
|
||||
Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString);
|
||||
if (auto method = message->Get("method"); !method.IsEmpty()) {
|
||||
rpcMethod = method;
|
||||
}
|
||||
|
||||
MessageHandler(message);
|
||||
|
||||
l_TaskStats.InsertValue(Utility::GetTime(), 1);
|
||||
} catch (const std::exception& ex) {
|
||||
Log(m_ShuttingDown ? LogDebug : LogWarning, "JsonRpcConnection")
|
||||
<< "Error while processing JSON-RPC message for identity '" << m_Identity
|
||||
<< "': " << DiagnosticInformation(ex);
|
||||
diagnosticInfo = DiagnosticInformation(ex);
|
||||
}
|
||||
|
||||
auto severity = LogDebug;
|
||||
if (totalDuration >= ch::seconds(5) || (!m_ShuttingDown && !diagnosticInfo.IsEmpty())) {
|
||||
// Either this RPC message took an unexpectedly long time to process, or a fatal error
|
||||
// has occurred, so promote the log entry from debug to warning.
|
||||
severity = LogWarning;
|
||||
}
|
||||
|
||||
Log statsLog(severity, "JsonRpcConnection");
|
||||
statsLog << "Processing JSON-RPC '" << rpcMethod << "' message for identity '" << m_Identity << "' ";
|
||||
|
||||
if (cpuBoundDuration >= ch::seconds(1)) {
|
||||
statsLog << "waited " << ch::duration_cast<ch::milliseconds>(cpuBoundDuration).count() << "ms on semaphore and ";
|
||||
}
|
||||
|
||||
statsLog << "took total " << ch::duration_cast<ch::milliseconds>(totalDuration).count()
|
||||
<< "ms" << (diagnosticInfo.IsEmpty() ? "." : ": Error: "+diagnosticInfo);
|
||||
|
||||
if (!diagnosticInfo.IsEmpty()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -259,10 +299,8 @@ void JsonRpcConnection::Disconnect()
|
||||
}
|
||||
}
|
||||
|
||||
void JsonRpcConnection::MessageHandler(const String& jsonString)
|
||||
void JsonRpcConnection::MessageHandler(const Dictionary::Ptr& message)
|
||||
{
|
||||
Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString);
|
||||
|
||||
if (m_Endpoint && message->Contains("ts")) {
|
||||
double ts = message->Get("ts");
|
||||
|
||||
@ -281,8 +319,6 @@ void JsonRpcConnection::MessageHandler(const String& jsonString)
|
||||
origin->FromZone = m_Endpoint->GetZone();
|
||||
else
|
||||
origin->FromZone = Zone::GetByName(message->Get("originZone"));
|
||||
|
||||
m_Endpoint->AddMessageReceived(jsonString.GetLength());
|
||||
}
|
||||
|
||||
Value vmethod;
|
||||
|
@ -89,7 +89,19 @@ private:
|
||||
void CheckLiveness(boost::asio::yield_context yc);
|
||||
|
||||
bool ProcessMessage();
|
||||
void MessageHandler(const String& jsonString);
|
||||
|
||||
/**
|
||||
* MessageHandler routes the provided message to its corresponding handler (if any).
|
||||
*
|
||||
* This will first verify the timestamp of that RPC message (if any) and subsequently, rejects any message whose
|
||||
* timestamp is less than the remote log position of the client Endpoint; otherwise, the endpoint's remote log
|
||||
* position is updated to that timestamp. It is not expected to happen, but any message lacking an RPC method or
|
||||
* referring to a non-existent one is also discarded. Afterwards, the RPC handler is then called for that message
|
||||
* and sends it's result back to the sender if the message contains an ID.
|
||||
*
|
||||
* @param message Dictionary::Ptr The RPC message you want to process.
|
||||
*/
|
||||
void MessageHandler(const Dictionary::Ptr& message);
|
||||
|
||||
void CertificateRequestResponseHandler(const Dictionary::Ptr& message);
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user