Count still queued messages per JsonRpcConnection

This commit is contained in:
Alexander A. Klimov 2024-12-11 16:57:50 +01:00
parent c253e7eb6e
commit 2495a9ed12
2 changed files with 9 additions and 0 deletions

View File

@ -154,6 +154,7 @@ void JsonRpcConnection::WriteOutgoingMessages(boost::asio::yield_context yc)
} }
size_t bytesSent = JsonRpc::SendRawMessage(m_Stream, message, yc); size_t bytesSent = JsonRpc::SendRawMessage(m_Stream, message, yc);
m_PendingOutgoingMessages.fetch_sub(1, std::memory_order_relaxed);
if (m_Endpoint) { if (m_Endpoint) {
m_Endpoint->AddMessageSent(bytesSent); m_Endpoint->AddMessageSent(bytesSent);
@ -230,6 +231,7 @@ void JsonRpcConnection::SendRawMessage(const String& message)
m_OutgoingMessagesQueue.emplace_back(message); m_OutgoingMessagesQueue.emplace_back(message);
m_OutgoingMessagesQueued.Set(); m_OutgoingMessagesQueued.Set();
m_PendingOutgoingMessages.fetch_add(1, std::memory_order_relaxed);
}); });
} }
@ -241,6 +243,7 @@ void JsonRpcConnection::SendMessageInternal(const Dictionary::Ptr& message)
m_OutgoingMessagesQueue.emplace_back(JsonEncode(message)); m_OutgoingMessagesQueue.emplace_back(JsonEncode(message));
m_OutgoingMessagesQueued.Set(); m_OutgoingMessagesQueued.Set();
m_PendingOutgoingMessages.fetch_add(1, std::memory_order_relaxed);
} }
void JsonRpcConnection::Disconnect() void JsonRpcConnection::Disconnect()

View File

@ -54,6 +54,11 @@ public:
Shared<AsioTlsStream>::Ptr GetStream() const; Shared<AsioTlsStream>::Ptr GetStream() const;
ConnectionRole GetRole() const; ConnectionRole GetRole() const;
auto GetPendingOutgoingMessages() const noexcept
{
return m_PendingOutgoingMessages.load(std::memory_order_relaxed);
}
void Disconnect(); void Disconnect();
void SendMessage(const Dictionary::Ptr& request); void SendMessage(const Dictionary::Ptr& request);
@ -76,6 +81,7 @@ private:
boost::asio::io_context::strand m_IoStrand; boost::asio::io_context::strand m_IoStrand;
std::vector<String> m_OutgoingMessagesQueue; std::vector<String> m_OutgoingMessagesQueue;
AsioEvent m_OutgoingMessagesQueued; AsioEvent m_OutgoingMessagesQueued;
Atomic<decltype(m_OutgoingMessagesQueue)::size_type> m_PendingOutgoingMessages {0};
AsioEvent m_WriterDone; AsioEvent m_WriterDone;
Atomic<bool> m_ShuttingDown; Atomic<bool> m_ShuttingDown;
boost::asio::deadline_timer m_CheckLivenessTimer, m_HeartbeatTimer; boost::asio::deadline_timer m_CheckLivenessTimer, m_HeartbeatTimer;