Count incoming messages per type and endpoint

This commit is contained in:
Alexander A. Klimov 2025-04-01 15:38:38 +02:00
parent c566f6dc31
commit 3dd7b15808
3 changed files with 25 additions and 0 deletions

View File

@ -2,6 +2,7 @@
#include "remote/endpoint.hpp" #include "remote/endpoint.hpp"
#include "remote/endpoint-ti.cpp" #include "remote/endpoint-ti.cpp"
#include "remote/apifunction.hpp"
#include "remote/apilistener.hpp" #include "remote/apilistener.hpp"
#include "remote/jsonrpcconnection.hpp" #include "remote/jsonrpcconnection.hpp"
#include "remote/zone.hpp" #include "remote/zone.hpp"
@ -35,6 +36,13 @@ void Endpoint::SetCachedZone(const Zone::Ptr& zone)
m_Zone = zone; m_Zone = zone;
} }
Endpoint::Endpoint()
{
for (auto& [name, afunc] : ApiFunctionRegistry::GetInstance()->GetItems()) {
m_MessageCounters.emplace(afunc, 0);
}
}
void Endpoint::AddClient(const JsonRpcConnection::Ptr& client) void Endpoint::AddClient(const JsonRpcConnection::Ptr& client)
{ {
bool was_master = ApiListener::GetInstance()->IsMaster(); bool was_master = ApiListener::GetInstance()->IsMaster();
@ -117,6 +125,11 @@ void Endpoint::AddMessageReceived(int bytes)
SetLastMessageReceived(time); SetLastMessageReceived(time);
} }
void Endpoint::AddMessageReceived(const intrusive_ptr<ApiFunction>& method)
{
m_MessageCounters.at(method).fetch_add(1, std::memory_order_relaxed);
}
double Endpoint::GetMessagesSentPerSecond() const double Endpoint::GetMessagesSentPerSecond() const
{ {
return m_MessagesSent.CalculateRate(Utility::GetTime(), 60); return m_MessagesSent.CalculateRate(Utility::GetTime(), 60);

View File

@ -5,12 +5,16 @@
#include "remote/i2-remote.hpp" #include "remote/i2-remote.hpp"
#include "remote/endpoint-ti.hpp" #include "remote/endpoint-ti.hpp"
#include "base/atomic.hpp"
#include "base/ringbuffer.hpp" #include "base/ringbuffer.hpp"
#include <cstdint>
#include <set> #include <set>
#include <unordered_map>
namespace icinga namespace icinga
{ {
class ApiFunction;
class JsonRpcConnection; class JsonRpcConnection;
class Zone; class Zone;
@ -28,6 +32,8 @@ public:
static boost::signals2::signal<void(const Endpoint::Ptr&, const intrusive_ptr<JsonRpcConnection>&)> OnConnected; static boost::signals2::signal<void(const Endpoint::Ptr&, const intrusive_ptr<JsonRpcConnection>&)> OnConnected;
static boost::signals2::signal<void(const Endpoint::Ptr&, const intrusive_ptr<JsonRpcConnection>&)> OnDisconnected; static boost::signals2::signal<void(const Endpoint::Ptr&, const intrusive_ptr<JsonRpcConnection>&)> OnDisconnected;
Endpoint();
void AddClient(const intrusive_ptr<JsonRpcConnection>& client); void AddClient(const intrusive_ptr<JsonRpcConnection>& client);
void RemoveClient(const intrusive_ptr<JsonRpcConnection>& client); void RemoveClient(const intrusive_ptr<JsonRpcConnection>& client);
std::set<intrusive_ptr<JsonRpcConnection> > GetClients() const; std::set<intrusive_ptr<JsonRpcConnection> > GetClients() const;
@ -42,6 +48,7 @@ public:
void AddMessageSent(int bytes); void AddMessageSent(int bytes);
void AddMessageReceived(int bytes); void AddMessageReceived(int bytes);
void AddMessageReceived(const intrusive_ptr<ApiFunction>& method);
double GetMessagesSentPerSecond() const override; double GetMessagesSentPerSecond() const override;
double GetMessagesReceivedPerSecond() const override; double GetMessagesReceivedPerSecond() const override;
@ -56,6 +63,7 @@ private:
mutable std::mutex m_ClientsLock; mutable std::mutex m_ClientsLock;
std::set<intrusive_ptr<JsonRpcConnection> > m_Clients; std::set<intrusive_ptr<JsonRpcConnection> > m_Clients;
intrusive_ptr<Zone> m_Zone; intrusive_ptr<Zone> m_Zone;
std::unordered_map<intrusive_ptr<ApiFunction>, Atomic<uint_fast64_t>> m_MessageCounters;
mutable RingBuffer m_MessagesSent{60}; mutable RingBuffer m_MessagesSent{60};
mutable RingBuffer m_MessagesReceived{60}; mutable RingBuffer m_MessagesReceived{60};

View File

@ -351,6 +351,10 @@ void JsonRpcConnection::MessageHandler(const Dictionary::Ptr& message)
Log(LogNotice, "JsonRpcConnection") Log(LogNotice, "JsonRpcConnection")
<< "Call to non-existent function '" << method << "' from endpoint '" << m_Identity << "'."; << "Call to non-existent function '" << method << "' from endpoint '" << m_Identity << "'.";
} else { } else {
if (m_Endpoint) {
m_Endpoint->AddMessageReceived(afunc);
}
Dictionary::Ptr params = message->Get("params"); Dictionary::Ptr params = message->Get("params");
if (params) if (params)
resultMessage->Set("result", afunc->Invoke(origin, params)); resultMessage->Set("result", afunc->Invoke(origin, params));