mirror of https://github.com/Icinga/icinga2.git
Merge pull request #5753 from Icinga/fix/ringbuffer-does-not-get-updated-if-nothing-is-written-5750
Fix that RingBuffer does not get updated and add metrics about communication between endpoints
This commit is contained in:
commit
75def4b074
|
@ -102,18 +102,21 @@ StreamReadStatus NetString::ReadStringFromStream(const Stream::Ptr& stream, Stri
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Writes data into a stream using the netstring format.
|
* Writes data into a stream using the netstring format and returns bytes written.
|
||||||
*
|
*
|
||||||
* @param stream The stream.
|
* @param stream The stream.
|
||||||
* @param str The String that is to be written.
|
* @param str The String that is to be written.
|
||||||
|
*
|
||||||
|
* @return The amount of bytes written.
|
||||||
*/
|
*/
|
||||||
void NetString::WriteStringToStream(const Stream::Ptr& stream, const String& str)
|
size_t NetString::WriteStringToStream(const Stream::Ptr& stream, const String& str)
|
||||||
{
|
{
|
||||||
std::ostringstream msgbuf;
|
std::ostringstream msgbuf;
|
||||||
WriteStringToStream(msgbuf, str);
|
WriteStringToStream(msgbuf, str);
|
||||||
|
|
||||||
String msg = msgbuf.str();
|
String msg = msgbuf.str();
|
||||||
stream->Write(msg.CStr(), msg.GetLength());
|
stream->Write(msg.CStr(), msg.GetLength());
|
||||||
|
return msg.GetLength();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -39,7 +39,7 @@ class I2_BASE_API NetString
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
static StreamReadStatus ReadStringFromStream(const Stream::Ptr& stream, String *message, StreamReadContext& context, bool may_wait = false);
|
static StreamReadStatus ReadStringFromStream(const Stream::Ptr& stream, String *message, StreamReadContext& context, bool may_wait = false);
|
||||||
static void WriteStringToStream(const Stream::Ptr& stream, const String& message);
|
static size_t WriteStringToStream(const Stream::Ptr& stream, const String& message);
|
||||||
static void WriteStringToStream(std::ostream& stream, const String& message);
|
static void WriteStringToStream(std::ostream& stream, const String& message);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
|
@ -19,11 +19,13 @@
|
||||||
|
|
||||||
#include "base/ringbuffer.hpp"
|
#include "base/ringbuffer.hpp"
|
||||||
#include "base/objectlock.hpp"
|
#include "base/objectlock.hpp"
|
||||||
|
#include "base/utility.hpp"
|
||||||
|
#include <algorithm>
|
||||||
|
|
||||||
using namespace icinga;
|
using namespace icinga;
|
||||||
|
|
||||||
RingBuffer::RingBuffer(RingBuffer::SizeType slots)
|
RingBuffer::RingBuffer(RingBuffer::SizeType slots)
|
||||||
: Object(), m_Slots(slots, 0), m_TimeValue(0)
|
: Object(), m_Slots(slots, 0), m_TimeValue(0), m_InsertedValues(0)
|
||||||
{ }
|
{ }
|
||||||
|
|
||||||
RingBuffer::SizeType RingBuffer::GetLength(void) const
|
RingBuffer::SizeType RingBuffer::GetLength(void) const
|
||||||
|
@ -39,6 +41,9 @@ void RingBuffer::InsertValue(RingBuffer::SizeType tv, int num)
|
||||||
|
|
||||||
RingBuffer::SizeType offsetTarget = tv % m_Slots.size();
|
RingBuffer::SizeType offsetTarget = tv % m_Slots.size();
|
||||||
|
|
||||||
|
if (m_TimeValue == 0)
|
||||||
|
m_InsertedValues = 1;
|
||||||
|
|
||||||
if (tv > m_TimeValue) {
|
if (tv > m_TimeValue) {
|
||||||
RingBuffer::SizeType offset = m_TimeValue % m_Slots.size();
|
RingBuffer::SizeType offset = m_TimeValue % m_Slots.size();
|
||||||
|
|
||||||
|
@ -50,6 +55,9 @@ void RingBuffer::InsertValue(RingBuffer::SizeType tv, int num)
|
||||||
offset = 0;
|
offset = 0;
|
||||||
|
|
||||||
m_Slots[offset] = 0;
|
m_Slots[offset] = 0;
|
||||||
|
|
||||||
|
if (m_TimeValue != 0 && m_InsertedValues < m_Slots.size())
|
||||||
|
m_InsertedValues++;
|
||||||
}
|
}
|
||||||
|
|
||||||
m_TimeValue = tv;
|
m_TimeValue = tv;
|
||||||
|
@ -58,14 +66,16 @@ void RingBuffer::InsertValue(RingBuffer::SizeType tv, int num)
|
||||||
m_Slots[offsetTarget] += num;
|
m_Slots[offsetTarget] += num;
|
||||||
}
|
}
|
||||||
|
|
||||||
int RingBuffer::GetValues(RingBuffer::SizeType span) const
|
int RingBuffer::UpdateAndGetValues(RingBuffer::SizeType tv, RingBuffer::SizeType span)
|
||||||
{
|
{
|
||||||
ObjectLock olock(this);
|
ObjectLock olock(this);
|
||||||
|
|
||||||
|
InsertValue(tv, 0);
|
||||||
|
|
||||||
if (span > m_Slots.size())
|
if (span > m_Slots.size())
|
||||||
span = m_Slots.size();
|
span = m_Slots.size();
|
||||||
|
|
||||||
int off = m_TimeValue % m_Slots.size();;
|
int off = m_TimeValue % m_Slots.size();
|
||||||
int sum = 0;
|
int sum = 0;
|
||||||
while (span > 0) {
|
while (span > 0) {
|
||||||
sum += m_Slots[off];
|
sum += m_Slots[off];
|
||||||
|
@ -79,3 +89,10 @@ int RingBuffer::GetValues(RingBuffer::SizeType span) const
|
||||||
|
|
||||||
return sum;
|
return sum;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
double RingBuffer::CalculateRate(RingBuffer::SizeType tv, RingBuffer::SizeType span)
|
||||||
|
{
|
||||||
|
ObjectLock olock(this);
|
||||||
|
int sum = UpdateAndGetValues(tv, span);
|
||||||
|
return sum / static_cast<double>(std::min(span, m_InsertedValues));
|
||||||
|
}
|
||||||
|
|
|
@ -43,11 +43,13 @@ public:
|
||||||
|
|
||||||
SizeType GetLength(void) const;
|
SizeType GetLength(void) const;
|
||||||
void InsertValue(SizeType tv, int num);
|
void InsertValue(SizeType tv, int num);
|
||||||
int GetValues(SizeType span) const;
|
int UpdateAndGetValues(SizeType tv, SizeType span);
|
||||||
|
double CalculateRate(SizeType tv, SizeType span);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::vector<int> m_Slots;
|
std::vector<int> m_Slots;
|
||||||
SizeType m_TimeValue;
|
SizeType m_TimeValue;
|
||||||
|
SizeType m_InsertedValues;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -295,8 +295,8 @@ void WorkQueue::IncreaseTaskCount(void)
|
||||||
m_TaskStats.InsertValue(now, 1);
|
m_TaskStats.InsertValue(now, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
int WorkQueue::GetTaskCount(RingBuffer::SizeType span) const
|
int WorkQueue::GetTaskCount(RingBuffer::SizeType span)
|
||||||
{
|
{
|
||||||
boost::mutex::scoped_lock lock(m_StatsMutex);
|
boost::mutex::scoped_lock lock(m_StatsMutex);
|
||||||
return m_TaskStats.GetValues(span);
|
return m_TaskStats.UpdateAndGetValues(Utility::GetTime(), span);
|
||||||
}
|
}
|
||||||
|
|
|
@ -93,7 +93,7 @@ public:
|
||||||
bool IsWorkerThread(void) const;
|
bool IsWorkerThread(void) const;
|
||||||
|
|
||||||
size_t GetLength(void) const;
|
size_t GetLength(void) const;
|
||||||
int GetTaskCount(RingBuffer::SizeType span) const;
|
int GetTaskCount(RingBuffer::SizeType span);
|
||||||
|
|
||||||
void SetExceptionCallback(const ExceptionCallback& callback);
|
void SetExceptionCallback(const ExceptionCallback& callback);
|
||||||
|
|
||||||
|
|
|
@ -473,10 +473,10 @@ void DbConnection::IncreaseQueryCount(void)
|
||||||
m_QueryStats.InsertValue(now, 1);
|
m_QueryStats.InsertValue(now, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
int DbConnection::GetQueryCount(RingBuffer::SizeType span) const
|
int DbConnection::GetQueryCount(RingBuffer::SizeType span)
|
||||||
{
|
{
|
||||||
boost::mutex::scoped_lock lock(m_StatsMutex);
|
boost::mutex::scoped_lock lock(m_StatsMutex);
|
||||||
return m_QueryStats.GetValues(span);
|
return m_QueryStats.UpdateAndGetValues(Utility::GetTime(), span);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool DbConnection::IsIDCacheValid(void) const
|
bool DbConnection::IsIDCacheValid(void) const
|
||||||
|
|
|
@ -73,7 +73,7 @@ public:
|
||||||
void SetStatusUpdate(const DbObject::Ptr& dbobj, bool hasupdate);
|
void SetStatusUpdate(const DbObject::Ptr& dbobj, bool hasupdate);
|
||||||
bool GetStatusUpdate(const DbObject::Ptr& dbobj) const;
|
bool GetStatusUpdate(const DbObject::Ptr& dbobj) const;
|
||||||
|
|
||||||
int GetQueryCount(RingBuffer::SizeType span) const;
|
int GetQueryCount(RingBuffer::SizeType span);
|
||||||
virtual int GetPendingQueryCount(void) const = 0;
|
virtual int GetPendingQueryCount(void) const = 0;
|
||||||
|
|
||||||
virtual void ValidateFailoverTimeout(double value, const ValidationUtils& utils) override;
|
virtual void ValidateFailoverTimeout(double value, const ValidationUtils& utils) override;
|
||||||
|
|
|
@ -45,12 +45,12 @@ void CIB::UpdateActiveServiceChecksStatistics(long tv, int num)
|
||||||
|
|
||||||
int CIB::GetActiveHostChecksStatistics(long timespan)
|
int CIB::GetActiveHostChecksStatistics(long timespan)
|
||||||
{
|
{
|
||||||
return m_ActiveHostChecksStatistics.GetValues(timespan);
|
return m_ActiveHostChecksStatistics.UpdateAndGetValues(Utility::GetTime(), timespan);
|
||||||
}
|
}
|
||||||
|
|
||||||
int CIB::GetActiveServiceChecksStatistics(long timespan)
|
int CIB::GetActiveServiceChecksStatistics(long timespan)
|
||||||
{
|
{
|
||||||
return m_ActiveServiceChecksStatistics.GetValues(timespan);
|
return m_ActiveServiceChecksStatistics.UpdateAndGetValues(Utility::GetTime(), timespan);
|
||||||
}
|
}
|
||||||
|
|
||||||
void CIB::UpdatePassiveHostChecksStatistics(long tv, int num)
|
void CIB::UpdatePassiveHostChecksStatistics(long tv, int num)
|
||||||
|
@ -65,12 +65,12 @@ void CIB::UpdatePassiveServiceChecksStatistics(long tv, int num)
|
||||||
|
|
||||||
int CIB::GetPassiveHostChecksStatistics(long timespan)
|
int CIB::GetPassiveHostChecksStatistics(long timespan)
|
||||||
{
|
{
|
||||||
return m_PassiveHostChecksStatistics.GetValues(timespan);
|
return m_PassiveHostChecksStatistics.UpdateAndGetValues(Utility::GetTime(), timespan);
|
||||||
}
|
}
|
||||||
|
|
||||||
int CIB::GetPassiveServiceChecksStatistics(long timespan)
|
int CIB::GetPassiveServiceChecksStatistics(long timespan)
|
||||||
{
|
{
|
||||||
return m_PassiveServiceChecksStatistics.GetValues(timespan);
|
return m_PassiveServiceChecksStatistics.UpdateAndGetValues(Utility::GetTime(), timespan);
|
||||||
}
|
}
|
||||||
|
|
||||||
CheckableCheckStatistics CIB::CalculateHostCheckStats(void)
|
CheckableCheckStatistics CIB::CalculateHostCheckStats(void)
|
||||||
|
|
|
@ -91,6 +91,13 @@ void ClusterZoneCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const Che
|
||||||
bool connected = false;
|
bool connected = false;
|
||||||
double zoneLag = 0;
|
double zoneLag = 0;
|
||||||
|
|
||||||
|
double lastMessageSent = 0;
|
||||||
|
double lastMessageReceived = 0;
|
||||||
|
double messagesSentPerSecond = 0;
|
||||||
|
double messagesReceivedPerSecond = 0;
|
||||||
|
double bytesSentPerSecond = 0;
|
||||||
|
double bytesReceivedPerSecond = 0;
|
||||||
|
|
||||||
for (const Endpoint::Ptr& endpoint : zone->GetEndpoints()) {
|
for (const Endpoint::Ptr& endpoint : zone->GetEndpoints()) {
|
||||||
if (endpoint->GetConnected())
|
if (endpoint->GetConnected())
|
||||||
connected = true;
|
connected = true;
|
||||||
|
@ -99,6 +106,17 @@ void ClusterZoneCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const Che
|
||||||
|
|
||||||
if (eplag > 0 && eplag > zoneLag)
|
if (eplag > 0 && eplag > zoneLag)
|
||||||
zoneLag = eplag;
|
zoneLag = eplag;
|
||||||
|
|
||||||
|
if (endpoint->GetLastMessageSent() > lastMessageSent)
|
||||||
|
lastMessageSent = endpoint->GetLastMessageSent();
|
||||||
|
|
||||||
|
if (endpoint->GetLastMessageReceived() > lastMessageReceived)
|
||||||
|
lastMessageReceived = endpoint->GetLastMessageReceived();
|
||||||
|
|
||||||
|
messagesSentPerSecond += endpoint->GetMessagesSentPerSecond();
|
||||||
|
messagesReceivedPerSecond += endpoint->GetMessagesReceivedPerSecond();
|
||||||
|
bytesSentPerSecond += endpoint->GetBytesSentPerSecond();
|
||||||
|
bytesReceivedPerSecond += endpoint->GetBytesReceivedPerSecond();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!connected) {
|
if (!connected) {
|
||||||
|
@ -122,6 +140,12 @@ void ClusterZoneCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const Che
|
||||||
|
|
||||||
Array::Ptr perfdata = new Array();
|
Array::Ptr perfdata = new Array();
|
||||||
perfdata->Add(new PerfdataValue("slave_lag", zoneLag, false, "s", lagWarning, lagCritical));
|
perfdata->Add(new PerfdataValue("slave_lag", zoneLag, false, "s", lagWarning, lagCritical));
|
||||||
|
perfdata->Add(new PerfdataValue("last_messages_sent", lastMessageSent));
|
||||||
|
perfdata->Add(new PerfdataValue("last_messages_received", lastMessageReceived));
|
||||||
|
perfdata->Add(new PerfdataValue("sum_messages_sent_per_second", messagesSentPerSecond));
|
||||||
|
perfdata->Add(new PerfdataValue("sum_messages_received_per_second", messagesReceivedPerSecond));
|
||||||
|
perfdata->Add(new PerfdataValue("sum_bytes_sent_per_second", bytesSentPerSecond));
|
||||||
|
perfdata->Add(new PerfdataValue("sum_bytes_received_per_second", bytesReceivedPerSecond));
|
||||||
cr->SetPerformanceData(perfdata);
|
cr->SetPerformanceData(perfdata);
|
||||||
|
|
||||||
checkable->ProcessCheckResult(cr);
|
checkable->ProcessCheckResult(cr);
|
||||||
|
|
|
@ -100,6 +100,36 @@ void IcingaCheckTask::ScriptFunc(const Checkable::Ptr& service, const CheckResul
|
||||||
perfdata->Add(new PerfdataValue("num_hosts_in_downtime", hs.hosts_in_downtime));
|
perfdata->Add(new PerfdataValue("num_hosts_in_downtime", hs.hosts_in_downtime));
|
||||||
perfdata->Add(new PerfdataValue("num_hosts_acknowledged", hs.hosts_acknowledged));
|
perfdata->Add(new PerfdataValue("num_hosts_acknowledged", hs.hosts_acknowledged));
|
||||||
|
|
||||||
|
std::vector<Endpoint::Ptr> endpoints = ConfigType::GetObjectsByType<Endpoint>();
|
||||||
|
|
||||||
|
double lastMessageSent = 0;
|
||||||
|
double lastMessageReceived = 0;
|
||||||
|
double messagesSentPerSecond = 0;
|
||||||
|
double messagesReceivedPerSecond = 0;
|
||||||
|
double bytesSentPerSecond = 0;
|
||||||
|
double bytesReceivedPerSecond = 0;
|
||||||
|
|
||||||
|
for (Endpoint::Ptr endpoint : endpoints)
|
||||||
|
{
|
||||||
|
if (endpoint->GetLastMessageSent() > lastMessageSent)
|
||||||
|
lastMessageSent = endpoint->GetLastMessageSent();
|
||||||
|
|
||||||
|
if (endpoint->GetLastMessageReceived() > lastMessageReceived)
|
||||||
|
lastMessageReceived = endpoint->GetLastMessageReceived();
|
||||||
|
|
||||||
|
messagesSentPerSecond += endpoint->GetMessagesSentPerSecond();
|
||||||
|
messagesReceivedPerSecond += endpoint->GetMessagesReceivedPerSecond();
|
||||||
|
bytesSentPerSecond += endpoint->GetBytesSentPerSecond();
|
||||||
|
bytesReceivedPerSecond += endpoint->GetBytesReceivedPerSecond();
|
||||||
|
}
|
||||||
|
|
||||||
|
perfdata->Add(new PerfdataValue("last_messages_sent", lastMessageSent));
|
||||||
|
perfdata->Add(new PerfdataValue("last_messages_received", lastMessageReceived));
|
||||||
|
perfdata->Add(new PerfdataValue("sum_messages_sent_per_second", messagesSentPerSecond));
|
||||||
|
perfdata->Add(new PerfdataValue("sum_messages_received_per_second", messagesReceivedPerSecond));
|
||||||
|
perfdata->Add(new PerfdataValue("sum_bytes_sent_per_second", bytesSentPerSecond));
|
||||||
|
perfdata->Add(new PerfdataValue("sum_bytes_received_per_second", bytesReceivedPerSecond));
|
||||||
|
|
||||||
cr->SetOutput("Icinga 2 has been running for " + Utility::FormatDuration(uptime) +
|
cr->SetOutput("Icinga 2 has been running for " + Utility::FormatDuration(uptime) +
|
||||||
". Version: " + Application::GetAppVersion());
|
". Version: " + Application::GetAppVersion());
|
||||||
cr->SetPerformanceData(perfdata);
|
cr->SetPerformanceData(perfdata);
|
||||||
|
|
|
@ -1144,7 +1144,8 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
NetString::WriteStringToStream(client->GetStream(), pmessage->Get("message"));
|
size_t bytesSent = NetString::WriteStringToStream(client->GetStream(), pmessage->Get("message"));
|
||||||
|
endpoint->AddMessageSent(bytesSent);
|
||||||
count++;
|
count++;
|
||||||
} catch (const std::exception& ex) {
|
} catch (const std::exception& ex) {
|
||||||
Log(LogWarning, "ApiListener")
|
Log(LogWarning, "ApiListener")
|
||||||
|
@ -1169,7 +1170,8 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
|
||||||
lmessage->Set("method", "log::SetLogPosition");
|
lmessage->Set("method", "log::SetLogPosition");
|
||||||
lmessage->Set("params", lparams);
|
lmessage->Set("params", lparams);
|
||||||
|
|
||||||
JsonRpc::SendMessage(client->GetStream(), lmessage);
|
size_t bytesSent = JsonRpc::SendMessage(client->GetStream(), lmessage);
|
||||||
|
endpoint->AddMessageSent(bytesSent);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -34,6 +34,10 @@ REGISTER_TYPE(Endpoint);
|
||||||
boost::signals2::signal<void(const Endpoint::Ptr&, const JsonRpcConnection::Ptr&)> Endpoint::OnConnected;
|
boost::signals2::signal<void(const Endpoint::Ptr&, const JsonRpcConnection::Ptr&)> Endpoint::OnConnected;
|
||||||
boost::signals2::signal<void(const Endpoint::Ptr&, const JsonRpcConnection::Ptr&)> Endpoint::OnDisconnected;
|
boost::signals2::signal<void(const Endpoint::Ptr&, const JsonRpcConnection::Ptr&)> Endpoint::OnDisconnected;
|
||||||
|
|
||||||
|
Endpoint::Endpoint(void)
|
||||||
|
: m_MessagesSent(60), m_BytesSent(60), m_MessagesReceived(60), m_BytesReceived(60)
|
||||||
|
{ }
|
||||||
|
|
||||||
void Endpoint::OnAllConfigLoaded(void)
|
void Endpoint::OnAllConfigLoaded(void)
|
||||||
{
|
{
|
||||||
ObjectImpl<Endpoint>::OnAllConfigLoaded();
|
ObjectImpl<Endpoint>::OnAllConfigLoaded();
|
||||||
|
@ -117,3 +121,39 @@ Endpoint::Ptr Endpoint::GetLocalEndpoint(void)
|
||||||
|
|
||||||
return listener->GetLocalEndpoint();
|
return listener->GetLocalEndpoint();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Endpoint::AddMessageSent(int bytes)
|
||||||
|
{
|
||||||
|
double time = Utility::GetTime();
|
||||||
|
m_MessagesSent.InsertValue(time, 1);
|
||||||
|
m_BytesSent.InsertValue(time, bytes);
|
||||||
|
SetLastMessageSent(time);
|
||||||
|
}
|
||||||
|
|
||||||
|
void Endpoint::AddMessageReceived(int bytes)
|
||||||
|
{
|
||||||
|
double time = Utility::GetTime();
|
||||||
|
m_MessagesReceived.InsertValue(time, 1);
|
||||||
|
m_BytesReceived.InsertValue(time, bytes);
|
||||||
|
SetLastMessageReceived(time);
|
||||||
|
}
|
||||||
|
|
||||||
|
double Endpoint::GetMessagesSentPerSecond(void) const
|
||||||
|
{
|
||||||
|
return m_MessagesSent.CalculateRate(Utility::GetTime(), 60);
|
||||||
|
}
|
||||||
|
|
||||||
|
double Endpoint::GetMessagesReceivedPerSecond(void) const
|
||||||
|
{
|
||||||
|
return m_MessagesReceived.CalculateRate(Utility::GetTime(), 60);
|
||||||
|
}
|
||||||
|
|
||||||
|
double Endpoint::GetBytesSentPerSecond(void) const
|
||||||
|
{
|
||||||
|
return m_BytesSent.CalculateRate(Utility::GetTime(), 60);
|
||||||
|
}
|
||||||
|
|
||||||
|
double Endpoint::GetBytesReceivedPerSecond(void) const
|
||||||
|
{
|
||||||
|
return m_BytesReceived.CalculateRate(Utility::GetTime(), 60);
|
||||||
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@
|
||||||
|
|
||||||
#include "remote/i2-remote.hpp"
|
#include "remote/i2-remote.hpp"
|
||||||
#include "remote/endpoint.thpp"
|
#include "remote/endpoint.thpp"
|
||||||
|
#include "base/ringbuffer.hpp"
|
||||||
#include <set>
|
#include <set>
|
||||||
|
|
||||||
namespace icinga
|
namespace icinga
|
||||||
|
@ -41,6 +42,8 @@ public:
|
||||||
DECLARE_OBJECT(Endpoint);
|
DECLARE_OBJECT(Endpoint);
|
||||||
DECLARE_OBJECTNAME(Endpoint);
|
DECLARE_OBJECTNAME(Endpoint);
|
||||||
|
|
||||||
|
Endpoint(void);
|
||||||
|
|
||||||
static boost::signals2::signal<void(const Endpoint::Ptr&, const intrusive_ptr<JsonRpcConnection>&)> OnConnected;
|
static boost::signals2::signal<void(const Endpoint::Ptr&, const intrusive_ptr<JsonRpcConnection>&)> OnConnected;
|
||||||
static boost::signals2::signal<void(const Endpoint::Ptr&, const intrusive_ptr<JsonRpcConnection>&)> OnDisconnected;
|
static boost::signals2::signal<void(const Endpoint::Ptr&, const intrusive_ptr<JsonRpcConnection>&)> OnDisconnected;
|
||||||
|
|
||||||
|
@ -56,6 +59,15 @@ public:
|
||||||
|
|
||||||
void SetCachedZone(const intrusive_ptr<Zone>& zone);
|
void SetCachedZone(const intrusive_ptr<Zone>& zone);
|
||||||
|
|
||||||
|
void AddMessageSent(int bytes);
|
||||||
|
void AddMessageReceived(int bytes);
|
||||||
|
|
||||||
|
double GetMessagesSentPerSecond(void) const override;
|
||||||
|
double GetMessagesReceivedPerSecond(void) const override;
|
||||||
|
|
||||||
|
double GetBytesSentPerSecond(void) const override;
|
||||||
|
double GetBytesReceivedPerSecond(void) const override;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
virtual void OnAllConfigLoaded(void) override;
|
virtual void OnAllConfigLoaded(void) override;
|
||||||
|
|
||||||
|
@ -63,6 +75,11 @@ private:
|
||||||
mutable boost::mutex m_ClientsLock;
|
mutable boost::mutex m_ClientsLock;
|
||||||
std::set<intrusive_ptr<JsonRpcConnection> > m_Clients;
|
std::set<intrusive_ptr<JsonRpcConnection> > m_Clients;
|
||||||
intrusive_ptr<Zone> m_Zone;
|
intrusive_ptr<Zone> m_Zone;
|
||||||
|
|
||||||
|
mutable RingBuffer m_MessagesSent;
|
||||||
|
mutable RingBuffer m_MessagesReceived;
|
||||||
|
mutable RingBuffer m_BytesSent;
|
||||||
|
mutable RingBuffer m_BytesReceived;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,6 +45,25 @@ class Endpoint : ConfigObject
|
||||||
[no_user_modify, no_storage] bool connected {
|
[no_user_modify, no_storage] bool connected {
|
||||||
get;
|
get;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Timestamp last_message_sent;
|
||||||
|
Timestamp last_message_received;
|
||||||
|
|
||||||
|
[no_user_modify, no_storage] double messages_sent_per_second {
|
||||||
|
get;
|
||||||
|
};
|
||||||
|
|
||||||
|
[no_user_modify, no_storage] double messages_received_per_second {
|
||||||
|
get;
|
||||||
|
};
|
||||||
|
|
||||||
|
[no_user_modify, no_storage] double bytes_sent_per_second {
|
||||||
|
get;
|
||||||
|
};
|
||||||
|
|
||||||
|
[no_user_modify, no_storage] double bytes_received_per_second {
|
||||||
|
get;
|
||||||
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,11 +54,13 @@ static bool GetDebugJsonRpcCached(void)
|
||||||
#endif /* I2_DEBUG */
|
#endif /* I2_DEBUG */
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sends a message to the connected peer.
|
* Sends a message to the connected peer and returns the bytes sent.
|
||||||
*
|
*
|
||||||
* @param message The message.
|
* @param message The message.
|
||||||
|
*
|
||||||
|
* @return The amount of bytes sent.
|
||||||
*/
|
*/
|
||||||
void JsonRpc::SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& message)
|
size_t JsonRpc::SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& message)
|
||||||
{
|
{
|
||||||
String json = JsonEncode(message);
|
String json = JsonEncode(message);
|
||||||
|
|
||||||
|
@ -67,7 +69,7 @@ void JsonRpc::SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& mess
|
||||||
std::cerr << ConsoleColorTag(Console_ForegroundBlue) << ">> " << json << ConsoleColorTag(Console_Normal) << "\n";
|
std::cerr << ConsoleColorTag(Console_ForegroundBlue) << ">> " << json << ConsoleColorTag(Console_Normal) << "\n";
|
||||||
#endif /* I2_DEBUG */
|
#endif /* I2_DEBUG */
|
||||||
|
|
||||||
NetString::WriteStringToStream(stream, json);
|
return NetString::WriteStringToStream(stream, json);
|
||||||
}
|
}
|
||||||
|
|
||||||
StreamReadStatus JsonRpc::ReadMessage(const Stream::Ptr& stream, String *message, StreamReadContext& src, bool may_wait)
|
StreamReadStatus JsonRpc::ReadMessage(const Stream::Ptr& stream, String *message, StreamReadContext& src, bool may_wait)
|
||||||
|
|
|
@ -35,7 +35,7 @@ namespace icinga
|
||||||
class I2_REMOTE_API JsonRpc
|
class I2_REMOTE_API JsonRpc
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
static void SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& message);
|
static size_t SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& message);
|
||||||
static StreamReadStatus ReadMessage(const Stream::Ptr& stream, String *message, StreamReadContext& src, bool may_wait = false);
|
static StreamReadStatus ReadMessage(const Stream::Ptr& stream, String *message, StreamReadContext& src, bool may_wait = false);
|
||||||
static Dictionary::Ptr DecodeMessage(const String& message);
|
static Dictionary::Ptr DecodeMessage(const String& message);
|
||||||
|
|
||||||
|
|
|
@ -117,7 +117,8 @@ void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message)
|
||||||
ObjectLock olock(m_Stream);
|
ObjectLock olock(m_Stream);
|
||||||
if (m_Stream->IsEof())
|
if (m_Stream->IsEof())
|
||||||
return;
|
return;
|
||||||
JsonRpc::SendMessage(m_Stream, message);
|
size_t bytesSent = JsonRpc::SendMessage(m_Stream, message);
|
||||||
|
m_Endpoint->AddMessageSent(bytesSent);
|
||||||
} catch (const std::exception& ex) {
|
} catch (const std::exception& ex) {
|
||||||
std::ostringstream info;
|
std::ostringstream info;
|
||||||
info << "Error while sending JSON-RPC message for identity '" << m_Identity << "'";
|
info << "Error while sending JSON-RPC message for identity '" << m_Identity << "'";
|
||||||
|
@ -188,6 +189,8 @@ void JsonRpcConnection::MessageHandler(const String& jsonString)
|
||||||
origin->FromZone = m_Endpoint->GetZone();
|
origin->FromZone = m_Endpoint->GetZone();
|
||||||
else
|
else
|
||||||
origin->FromZone = Zone::GetByName(message->Get("originZone"));
|
origin->FromZone = Zone::GetByName(message->Get("originZone"));
|
||||||
|
|
||||||
|
m_Endpoint->AddMessageReceived(jsonString.GetLength());
|
||||||
}
|
}
|
||||||
|
|
||||||
Value vmethod;
|
Value vmethod;
|
||||||
|
|
Loading…
Reference in New Issue