Benchmark message reading/waiting/processing time per endpoint

This commit is contained in:
Alexander A. Klimov 2024-12-10 15:59:10 +01:00 committed by Julian Brost
parent 002d422738
commit 2b1e1a5a08
2 changed files with 36 additions and 0 deletions

View File

@ -5,6 +5,7 @@
#include "remote/i2-remote.hpp"
#include "remote/endpoint-ti.hpp"
#include "base/atomic.hpp"
#include "base/ringbuffer.hpp"
#include <set>
@ -43,6 +44,13 @@ public:
void AddMessageSent(int bytes);
void AddMessageReceived(int bytes);
void AddInputTimes(const AtomicDuration::Clock::duration& readTime, const AtomicDuration::Clock::duration& semaphoreTime, const AtomicDuration::Clock::duration& processTime)
{
m_InputReadTime += readTime;
m_InputSemaphoreTime += semaphoreTime;
m_InputProcessTime += processTime;
}
double GetMessagesSentPerSecond() const override;
double GetMessagesReceivedPerSecond() const override;
@ -61,6 +69,10 @@ private:
mutable RingBuffer m_MessagesReceived{60};
mutable RingBuffer m_BytesSent{60};
mutable RingBuffer m_BytesReceived{60};
AtomicDuration m_InputReadTime;
AtomicDuration m_InputSemaphoreTime;
AtomicDuration m_InputProcessTime;
};
}

View File

@ -66,12 +66,24 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
return ch::duration_cast<ch::milliseconds>(d).count();
});
AtomicDuration::Clock::time_point readStart, readEnd, processingStart;
m_Stream->next_layer().SetSeen(&m_Seen);
while (!m_ShuttingDown) {
String jsonString;
try {
if (m_Endpoint) {
// Only once we receive at least one byte, we know there must be a message to read.
if (!m_Stream->in_avail()) {
m_Stream->async_fill(yc);
}
// Only then we can start measuring the time it takes to read it.
readStart = AtomicDuration::Clock::now();
}
jsonString = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024);
} catch (const std::exception& ex) {
Log(m_ShuttingDown ? LogDebug : LogNotice, "JsonRpcConnection")
@ -81,6 +93,10 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
break;
}
if (m_Endpoint) {
readEnd = AtomicDuration::Clock::now();
}
m_Seen = Utility::GetTime();
if (m_Endpoint) {
m_Endpoint->AddMessageReceived(jsonString.GetLength());
@ -96,6 +112,10 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
// Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads.
cpuBoundDuration = ch::steady_clock::now() - start;
if (m_Endpoint) {
processingStart = AtomicDuration::Clock::now();
}
Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString);
if (String method = message->Get("method"); !method.IsEmpty()) {
rpcMethod = std::move(method);
@ -103,6 +123,10 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
MessageHandler(message);
if (m_Endpoint) {
m_Endpoint->AddInputTimes(readEnd - readStart, cpuBoundDuration, AtomicDuration::Clock::now() - processingStart);
}
l_TaskStats.InsertValue(Utility::GetTime(), 1);
auto total = ch::steady_clock::now() - start;