diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp index 889d4452c..cd98d6ad6 100644 --- a/lib/remote/jsonrpcconnection.cpp +++ b/lib/remote/jsonrpcconnection.cpp @@ -154,6 +154,7 @@ void JsonRpcConnection::WriteOutgoingMessages(boost::asio::yield_context yc) } size_t bytesSent = JsonRpc::SendRawMessage(m_Stream, message, yc); + m_PendingOutgoingMessages.fetch_sub(1, std::memory_order_relaxed); if (m_Endpoint) { m_Endpoint->AddMessageSent(bytesSent); @@ -230,6 +231,7 @@ void JsonRpcConnection::SendRawMessage(const String& message) m_OutgoingMessagesQueue.emplace_back(message); m_OutgoingMessagesQueued.Set(); + m_PendingOutgoingMessages.fetch_add(1, std::memory_order_relaxed); }); } @@ -241,6 +243,7 @@ void JsonRpcConnection::SendMessageInternal(const Dictionary::Ptr& message) m_OutgoingMessagesQueue.emplace_back(JsonEncode(message)); m_OutgoingMessagesQueued.Set(); + m_PendingOutgoingMessages.fetch_add(1, std::memory_order_relaxed); } void JsonRpcConnection::Disconnect() diff --git a/lib/remote/jsonrpcconnection.hpp b/lib/remote/jsonrpcconnection.hpp index 826d3b46a..e63f1f11f 100644 --- a/lib/remote/jsonrpcconnection.hpp +++ b/lib/remote/jsonrpcconnection.hpp @@ -54,6 +54,11 @@ public: Shared::Ptr GetStream() const; ConnectionRole GetRole() const; + auto GetPendingOutgoingMessages() const noexcept + { + return m_PendingOutgoingMessages.load(std::memory_order_relaxed); + } + void Disconnect(); void SendMessage(const Dictionary::Ptr& request); @@ -76,6 +81,7 @@ private: boost::asio::io_context::strand m_IoStrand; std::vector m_OutgoingMessagesQueue; AsioEvent m_OutgoingMessagesQueued; + Atomic m_PendingOutgoingMessages {0}; AsioEvent m_WriterDone; Atomic m_ShuttingDown; boost::asio::deadline_timer m_CheckLivenessTimer, m_HeartbeatTimer;