mirror of
https://github.com/Icinga/icinga2.git
synced 2025-07-21 12:44:58 +02:00
Merge pull request #10387 from Icinga/cnt-msg
Introduce Endpoint#messages_received_per_type
This commit is contained in:
commit
827f85c327
@ -5,8 +5,8 @@
|
|||||||
|
|
||||||
using namespace icinga;
|
using namespace icinga;
|
||||||
|
|
||||||
ApiFunction::ApiFunction(Callback function)
|
ApiFunction::ApiFunction(const char* name, Callback function)
|
||||||
: m_Callback(std::move(function))
|
: m_Name(name), m_Callback(std::move(function))
|
||||||
{ }
|
{ }
|
||||||
|
|
||||||
Value ApiFunction::Invoke(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& arguments)
|
Value ApiFunction::Invoke(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& arguments)
|
||||||
|
@ -25,7 +25,12 @@ public:
|
|||||||
|
|
||||||
typedef std::function<Value(const MessageOrigin::Ptr& origin, const Dictionary::Ptr&)> Callback;
|
typedef std::function<Value(const MessageOrigin::Ptr& origin, const Dictionary::Ptr&)> Callback;
|
||||||
|
|
||||||
ApiFunction(Callback function);
|
ApiFunction(const char* name, Callback function);
|
||||||
|
|
||||||
|
const char* GetName() const noexcept
|
||||||
|
{
|
||||||
|
return m_Name;
|
||||||
|
}
|
||||||
|
|
||||||
Value Invoke(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& arguments);
|
Value Invoke(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& arguments);
|
||||||
|
|
||||||
@ -33,6 +38,7 @@ public:
|
|||||||
static void Register(const String& name, const ApiFunction::Ptr& function);
|
static void Register(const String& name, const ApiFunction::Ptr& function);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
const char* m_Name;
|
||||||
Callback m_Callback;
|
Callback m_Callback;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -49,7 +55,7 @@ public:
|
|||||||
|
|
||||||
#define REGISTER_APIFUNCTION(name, ns, callback) \
|
#define REGISTER_APIFUNCTION(name, ns, callback) \
|
||||||
INITIALIZE_ONCE([]() { \
|
INITIALIZE_ONCE([]() { \
|
||||||
ApiFunction::Ptr func = new ApiFunction(callback); \
|
ApiFunction::Ptr func = new ApiFunction(#ns "::" #name, callback); \
|
||||||
ApiFunctionRegistry::GetInstance()->Register(#ns "::" #name, func); \
|
ApiFunctionRegistry::GetInstance()->Register(#ns "::" #name, func); \
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
#include "remote/endpoint.hpp"
|
#include "remote/endpoint.hpp"
|
||||||
#include "remote/endpoint-ti.cpp"
|
#include "remote/endpoint-ti.cpp"
|
||||||
|
#include "remote/apifunction.hpp"
|
||||||
#include "remote/apilistener.hpp"
|
#include "remote/apilistener.hpp"
|
||||||
#include "remote/jsonrpcconnection.hpp"
|
#include "remote/jsonrpcconnection.hpp"
|
||||||
#include "remote/zone.hpp"
|
#include "remote/zone.hpp"
|
||||||
@ -35,6 +36,13 @@ void Endpoint::SetCachedZone(const Zone::Ptr& zone)
|
|||||||
m_Zone = zone;
|
m_Zone = zone;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Endpoint::Endpoint()
|
||||||
|
{
|
||||||
|
for (auto& [name, afunc] : ApiFunctionRegistry::GetInstance()->GetItems()) {
|
||||||
|
m_MessageCounters.emplace(afunc, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void Endpoint::AddClient(const JsonRpcConnection::Ptr& client)
|
void Endpoint::AddClient(const JsonRpcConnection::Ptr& client)
|
||||||
{
|
{
|
||||||
bool was_master = ApiListener::GetInstance()->IsMaster();
|
bool was_master = ApiListener::GetInstance()->IsMaster();
|
||||||
@ -117,6 +125,11 @@ void Endpoint::AddMessageReceived(int bytes)
|
|||||||
SetLastMessageReceived(time);
|
SetLastMessageReceived(time);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Endpoint::AddMessageReceived(const intrusive_ptr<ApiFunction>& method)
|
||||||
|
{
|
||||||
|
m_MessageCounters.at(method).fetch_add(1, std::memory_order_relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
double Endpoint::GetMessagesSentPerSecond() const
|
double Endpoint::GetMessagesSentPerSecond() const
|
||||||
{
|
{
|
||||||
return m_MessagesSent.CalculateRate(Utility::GetTime(), 60);
|
return m_MessagesSent.CalculateRate(Utility::GetTime(), 60);
|
||||||
@ -136,3 +149,16 @@ double Endpoint::GetBytesReceivedPerSecond() const
|
|||||||
{
|
{
|
||||||
return m_BytesReceived.CalculateRate(Utility::GetTime(), 60);
|
return m_BytesReceived.CalculateRate(Utility::GetTime(), 60);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Dictionary::Ptr Endpoint::GetMessagesReceivedPerType() const
|
||||||
|
{
|
||||||
|
DictionaryData result;
|
||||||
|
|
||||||
|
for (auto& [afunc, cnt] : m_MessageCounters) {
|
||||||
|
if (auto v (cnt.load(std::memory_order_relaxed)); v) {
|
||||||
|
result.emplace_back(afunc->GetName(), v);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return new Dictionary(std::move(result));
|
||||||
|
}
|
||||||
|
@ -5,12 +5,16 @@
|
|||||||
|
|
||||||
#include "remote/i2-remote.hpp"
|
#include "remote/i2-remote.hpp"
|
||||||
#include "remote/endpoint-ti.hpp"
|
#include "remote/endpoint-ti.hpp"
|
||||||
|
#include "base/atomic.hpp"
|
||||||
#include "base/ringbuffer.hpp"
|
#include "base/ringbuffer.hpp"
|
||||||
|
#include <cstdint>
|
||||||
#include <set>
|
#include <set>
|
||||||
|
#include <unordered_map>
|
||||||
|
|
||||||
namespace icinga
|
namespace icinga
|
||||||
{
|
{
|
||||||
|
|
||||||
|
class ApiFunction;
|
||||||
class JsonRpcConnection;
|
class JsonRpcConnection;
|
||||||
class Zone;
|
class Zone;
|
||||||
|
|
||||||
@ -28,6 +32,8 @@ public:
|
|||||||
static boost::signals2::signal<void(const Endpoint::Ptr&, const intrusive_ptr<JsonRpcConnection>&)> OnConnected;
|
static boost::signals2::signal<void(const Endpoint::Ptr&, const intrusive_ptr<JsonRpcConnection>&)> OnConnected;
|
||||||
static boost::signals2::signal<void(const Endpoint::Ptr&, const intrusive_ptr<JsonRpcConnection>&)> OnDisconnected;
|
static boost::signals2::signal<void(const Endpoint::Ptr&, const intrusive_ptr<JsonRpcConnection>&)> OnDisconnected;
|
||||||
|
|
||||||
|
Endpoint();
|
||||||
|
|
||||||
void AddClient(const intrusive_ptr<JsonRpcConnection>& client);
|
void AddClient(const intrusive_ptr<JsonRpcConnection>& client);
|
||||||
void RemoveClient(const intrusive_ptr<JsonRpcConnection>& client);
|
void RemoveClient(const intrusive_ptr<JsonRpcConnection>& client);
|
||||||
std::set<intrusive_ptr<JsonRpcConnection> > GetClients() const;
|
std::set<intrusive_ptr<JsonRpcConnection> > GetClients() const;
|
||||||
@ -42,6 +48,7 @@ public:
|
|||||||
|
|
||||||
void AddMessageSent(int bytes);
|
void AddMessageSent(int bytes);
|
||||||
void AddMessageReceived(int bytes);
|
void AddMessageReceived(int bytes);
|
||||||
|
void AddMessageReceived(const intrusive_ptr<ApiFunction>& method);
|
||||||
|
|
||||||
double GetMessagesSentPerSecond() const override;
|
double GetMessagesSentPerSecond() const override;
|
||||||
double GetMessagesReceivedPerSecond() const override;
|
double GetMessagesReceivedPerSecond() const override;
|
||||||
@ -49,6 +56,8 @@ public:
|
|||||||
double GetBytesSentPerSecond() const override;
|
double GetBytesSentPerSecond() const override;
|
||||||
double GetBytesReceivedPerSecond() const override;
|
double GetBytesReceivedPerSecond() const override;
|
||||||
|
|
||||||
|
Dictionary::Ptr GetMessagesReceivedPerType() const override;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
void OnAllConfigLoaded() override;
|
void OnAllConfigLoaded() override;
|
||||||
|
|
||||||
@ -56,6 +65,7 @@ private:
|
|||||||
mutable std::mutex m_ClientsLock;
|
mutable std::mutex m_ClientsLock;
|
||||||
std::set<intrusive_ptr<JsonRpcConnection> > m_Clients;
|
std::set<intrusive_ptr<JsonRpcConnection> > m_Clients;
|
||||||
intrusive_ptr<Zone> m_Zone;
|
intrusive_ptr<Zone> m_Zone;
|
||||||
|
std::unordered_map<intrusive_ptr<ApiFunction>, Atomic<uint_fast64_t>> m_MessageCounters;
|
||||||
|
|
||||||
mutable RingBuffer m_MessagesSent{60};
|
mutable RingBuffer m_MessagesSent{60};
|
||||||
mutable RingBuffer m_MessagesReceived{60};
|
mutable RingBuffer m_MessagesReceived{60};
|
||||||
|
@ -54,6 +54,10 @@ class Endpoint : ConfigObject
|
|||||||
[no_user_modify, no_storage] double bytes_received_per_second {
|
[no_user_modify, no_storage] double bytes_received_per_second {
|
||||||
get;
|
get;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
[no_user_modify, no_storage] Dictionary::Ptr messages_received_per_type {
|
||||||
|
get;
|
||||||
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -356,6 +356,10 @@ void JsonRpcConnection::MessageHandler(const Dictionary::Ptr& message)
|
|||||||
Log(LogNotice, "JsonRpcConnection")
|
Log(LogNotice, "JsonRpcConnection")
|
||||||
<< "Call to non-existent function '" << method << "' from endpoint '" << m_Identity << "'.";
|
<< "Call to non-existent function '" << method << "' from endpoint '" << m_Identity << "'.";
|
||||||
} else {
|
} else {
|
||||||
|
if (m_Endpoint) {
|
||||||
|
m_Endpoint->AddMessageReceived(afunc);
|
||||||
|
}
|
||||||
|
|
||||||
Dictionary::Ptr params = message->Get("params");
|
Dictionary::Ptr params = message->Get("params");
|
||||||
if (params)
|
if (params)
|
||||||
resultMessage->Set("result", afunc->Invoke(origin, params));
|
resultMessage->Set("result", afunc->Invoke(origin, params));
|
||||||
|
Loading…
x
Reference in New Issue
Block a user