Introduce ReplayLog

refs #7752
This commit is contained in:
Alexander A. Klimov 2020-04-09 12:02:44 +02:00
parent fbc2184a37
commit 229598674a
4 changed files with 178 additions and 0 deletions

View File

@ -36,6 +36,7 @@ set(remote_SOURCES
modifyobjecthandler.cpp modifyobjecthandler.hpp
objectqueryhandler.cpp objectqueryhandler.hpp
pkiutility.cpp pkiutility.hpp
replay-log.cpp replay-log.hpp
statushandler.cpp statushandler.hpp
templatequeryhandler.cpp templatequeryhandler.hpp
typequeryhandler.cpp typequeryhandler.hpp

View File

@ -5,6 +5,8 @@
#include "remote/i2-remote.hpp"
#include "remote/endpoint-ti.hpp"
#include "remote/replay-log.hpp"
#include "base/lazy-init.hpp"
#include "base/ringbuffer.hpp"
#include <set>
@ -28,6 +30,10 @@ public:
static boost::signals2::signal<void(const Endpoint::Ptr&, const intrusive_ptr<JsonRpcConnection>&)> OnConnected;
static boost::signals2::signal<void(const Endpoint::Ptr&, const intrusive_ptr<JsonRpcConnection>&)> OnDisconnected;
inline Endpoint() : m_ReplayLog([this]() { return ReplayLog(GetName()); })
{
}
void AddClient(const intrusive_ptr<JsonRpcConnection>& client);
void RemoveClient(const intrusive_ptr<JsonRpcConnection>& client);
std::set<intrusive_ptr<JsonRpcConnection> > GetClients() const;
@ -38,6 +44,11 @@ public:
static Endpoint::Ptr GetLocalEndpoint();
inline ReplayLog& GetReplayLog()
{
return m_ReplayLog.Get();
}
void SetCachedZone(const intrusive_ptr<Zone>& zone);
void AddMessageSent(int bytes);
@ -56,6 +67,7 @@ private:
mutable std::mutex m_ClientsLock;
std::set<intrusive_ptr<JsonRpcConnection> > m_Clients;
intrusive_ptr<Zone> m_Zone;
LazyInit<ReplayLog> m_ReplayLog;
mutable RingBuffer m_MessagesSent{60};
mutable RingBuffer m_MessagesReceived{60};

77
lib/remote/replay-log.cpp Normal file
View File

@ -0,0 +1,77 @@
/* Icinga 2 | (c) 2020 Icinga GmbH | GPLv2+ */
#include "base/string.hpp"
#include "remote/apilistener.hpp"
#include "remote/replay-log.hpp"
#include <cstdio>
#include <SQLiteCpp/Database.h>
#include <utility>
using namespace icinga;
ReplayLogIterator::ReplayLogIterator(SQLite::Database& db)
: m_Stmt(Shared<SQLite::Statement>::Make(db, "SELECT ts, content FROM message ORDER BY ts"))
{
Next();
}
void ReplayLogIterator::Next()
{
if (m_Stmt) {
if (m_Stmt->executeStep()) {
m_Message.first = m_Stmt->getColumn(0).getDouble();
auto content (m_Stmt->getColumn(1));
auto begin ((const char*)content.getBlob());
m_Message.second = String(begin, begin + content.getBytes());
} else {
m_Stmt = nullptr;
}
}
}
static inline
String EndpointToFile(const String& endpoint)
{
auto file (ApiListener::GetApiDir() + "log/");
for (auto c : endpoint) {
char buf[3] = { 0 };
sprintf(buf, "%02x", (unsigned int)(((int)c + 256) % 256));
file += (char*)buf;
}
file += ".sqlite3";
return std::move(file);
}
ReplayLog::ReplayLog(const String& endpoint) : ReplayLog(endpoint, EndpointToFile(endpoint))
{
}
void ReplayLog::Log(double ts, const String& message)
{
SQLite::Statement stmt (m_DB, "INSERT INTO message (ts, content) VALUES (?, ?)");
stmt.bind(1, ts);
stmt.bindNoCopy(2, message.GetData());
(void)stmt.exec();
}
void ReplayLog::Cleanup(double upTo)
{
SQLite::Statement stmt (m_DB, "DELETE FROM message WHERE ts < ?");
stmt.bind(1, upTo);
(void)stmt.exec();
}
ReplayLog::ReplayLog(const String& endpoint, const String& file)
: m_DB(file.CStr(), SQLite::OPEN_READWRITE | SQLite::OPEN_CREATE)
{
m_DB.exec("CREATE TABLE IF NOT EXISTS message ( ts REAL, content BLOB )");
m_DB.exec("CREATE INDEX IF NOT EXISTS ix_message_ts ON message ( ts )");
}

88
lib/remote/replay-log.hpp Normal file
View File

@ -0,0 +1,88 @@
/* Icinga 2 | (c) 2020 Icinga GmbH | GPLv2+ */
#ifndef REPLAY_LOG_H
#define REPLAY_LOG_H
#include "base/shared.hpp"
#include "base/string.hpp"
#include <SQLiteCpp/Database.h>
#include <SQLiteCpp/Statement.h>
namespace icinga
{
/**
* An endpoint's cluster messages iterator.
*
* @ingroup remote
*/
class ReplayLogIterator
{
public:
ReplayLogIterator() = default;
ReplayLogIterator(SQLite::Database& db);
ReplayLogIterator(const ReplayLogIterator&) = delete;
ReplayLogIterator(ReplayLogIterator&&) = default;
ReplayLogIterator& operator=(const ReplayLogIterator&) = delete;
ReplayLogIterator& operator=(ReplayLogIterator&&) = default;
inline bool operator!=(const ReplayLogIterator& rhs)
{
return m_Stmt != rhs.m_Stmt;
}
inline ReplayLogIterator& operator++()
{
Next();
return *this;
}
inline const std::pair<double, String>& operator*()
{
return m_Message;
}
private:
void Next();
Shared<SQLite::Statement>::Ptr m_Stmt;
std::pair<double, String> m_Message;
};
/**
* An endpoint's cluster messages log.
*
* @ingroup remote
*/
class ReplayLog
{
public:
ReplayLog(const String& endpoint);
ReplayLog(const ReplayLog&) = delete;
ReplayLog(ReplayLog&&) = default;
ReplayLog& operator=(const ReplayLog&) = delete;
ReplayLog& operator=(ReplayLog&&) = default;
inline ReplayLogIterator begin()
{
return ReplayLogIterator(m_DB);
}
inline ReplayLogIterator end()
{
return ReplayLogIterator();
}
void Log(double ts, const String& message);
void Cleanup(double upTo);
private:
ReplayLog(const String& endpoint, const String& file);
SQLite::Database m_DB;
};
}
#endif /* REPLAY_LOG_H */