From c50b0fe442c10bc1824de28731a073602184e782 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Tue, 10 Dec 2024 15:59:10 +0100 Subject: [PATCH] Benchmark message reading/waiting/processing time per endpoint --- lib/remote/endpoint.hpp | 11 +++++++++++ lib/remote/jsonrpcconnection.cpp | 24 ++++++++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/lib/remote/endpoint.hpp b/lib/remote/endpoint.hpp index 09ad8a4fd..7a03e1fd9 100644 --- a/lib/remote/endpoint.hpp +++ b/lib/remote/endpoint.hpp @@ -50,6 +50,13 @@ public: void AddMessageReceived(int bytes); void AddMessageReceived(const intrusive_ptr& method); + void AddInputTimes(const AtomicDuration::Clock::duration& readTime, const AtomicDuration::Clock::duration& semaphoreTime, const AtomicDuration::Clock::duration& processTime) + { + m_InputReadTime += readTime; + m_InputSemaphoreTime += semaphoreTime; + m_InputProcessTime += processTime; + } + double GetMessagesSentPerSecond() const override; double GetMessagesReceivedPerSecond() const override; @@ -71,6 +78,10 @@ private: mutable RingBuffer m_MessagesReceived{60}; mutable RingBuffer m_BytesSent{60}; mutable RingBuffer m_BytesReceived{60}; + + AtomicDuration m_InputReadTime; + AtomicDuration m_InputSemaphoreTime; + AtomicDuration m_InputProcessTime; }; } diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp index b85944867..4dde5ee12 100644 --- a/lib/remote/jsonrpcconnection.cpp +++ b/lib/remote/jsonrpcconnection.cpp @@ -66,12 +66,24 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc) return ch::duration_cast(d).count(); }); + AtomicDuration::Clock::time_point readStart, readEnd, processingStart; + m_Stream->next_layer().SetSeen(&m_Seen); while (!m_ShuttingDown) { String jsonString; try { + if (m_Endpoint) { + // Only once we receive at least one byte, we know there must be a message to read. + if (!m_Stream->in_avail()) { + m_Stream->async_fill(yc); + } + + // Only then we can start measuring the time it takes to read it. + readStart = AtomicDuration::Clock::now(); + } + jsonString = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024); } catch (const std::exception& ex) { Log(m_ShuttingDown ? LogDebug : LogNotice, "JsonRpcConnection") @@ -81,6 +93,10 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc) break; } + if (m_Endpoint) { + readEnd = AtomicDuration::Clock::now(); + } + m_Seen = Utility::GetTime(); if (m_Endpoint) { m_Endpoint->AddMessageReceived(jsonString.GetLength()); @@ -96,6 +112,10 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc) // Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads. cpuBoundDuration = ch::steady_clock::now() - start; + if (m_Endpoint) { + processingStart = AtomicDuration::Clock::now(); + } + Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString); if (String method = message->Get("method"); !method.IsEmpty()) { rpcMethod = std::move(method); @@ -103,6 +123,10 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc) MessageHandler(message); + if (m_Endpoint) { + m_Endpoint->AddInputTimes(readEnd - readStart, cpuBoundDuration, AtomicDuration::Clock::now() - processingStart); + } + l_TaskStats.InsertValue(Utility::GetTime(), 1); auto total = ch::steady_clock::now() - start;