From 229598674ada8d4e9e6546f224ce8ce3a6f50d0f Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Thu, 9 Apr 2020 12:02:44 +0200 Subject: [PATCH] Introduce ReplayLog refs #7752 --- lib/remote/CMakeLists.txt | 1 + lib/remote/endpoint.hpp | 12 ++++++ lib/remote/replay-log.cpp | 77 ++++++++++++++++++++++++++++++++++ lib/remote/replay-log.hpp | 88 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 178 insertions(+) create mode 100644 lib/remote/replay-log.cpp create mode 100644 lib/remote/replay-log.hpp diff --git a/lib/remote/CMakeLists.txt b/lib/remote/CMakeLists.txt index 740b112b4..2b6c5721a 100644 --- a/lib/remote/CMakeLists.txt +++ b/lib/remote/CMakeLists.txt @@ -36,6 +36,7 @@ set(remote_SOURCES modifyobjecthandler.cpp modifyobjecthandler.hpp objectqueryhandler.cpp objectqueryhandler.hpp pkiutility.cpp pkiutility.hpp + replay-log.cpp replay-log.hpp statushandler.cpp statushandler.hpp templatequeryhandler.cpp templatequeryhandler.hpp typequeryhandler.cpp typequeryhandler.hpp diff --git a/lib/remote/endpoint.hpp b/lib/remote/endpoint.hpp index d641c2c6b..b3ee07468 100644 --- a/lib/remote/endpoint.hpp +++ b/lib/remote/endpoint.hpp @@ -5,6 +5,8 @@ #include "remote/i2-remote.hpp" #include "remote/endpoint-ti.hpp" +#include "remote/replay-log.hpp" +#include "base/lazy-init.hpp" #include "base/ringbuffer.hpp" #include @@ -28,6 +30,10 @@ public: static boost::signals2::signal&)> OnConnected; static boost::signals2::signal&)> OnDisconnected; + inline Endpoint() : m_ReplayLog([this]() { return ReplayLog(GetName()); }) + { + } + void AddClient(const intrusive_ptr& client); void RemoveClient(const intrusive_ptr& client); std::set > GetClients() const; @@ -38,6 +44,11 @@ public: static Endpoint::Ptr GetLocalEndpoint(); + inline ReplayLog& GetReplayLog() + { + return m_ReplayLog.Get(); + } + void SetCachedZone(const intrusive_ptr& zone); void AddMessageSent(int bytes); @@ -56,6 +67,7 @@ private: mutable std::mutex m_ClientsLock; std::set > m_Clients; intrusive_ptr m_Zone; + LazyInit m_ReplayLog; mutable RingBuffer m_MessagesSent{60}; mutable RingBuffer m_MessagesReceived{60}; diff --git a/lib/remote/replay-log.cpp b/lib/remote/replay-log.cpp new file mode 100644 index 000000000..c5ea600a7 --- /dev/null +++ b/lib/remote/replay-log.cpp @@ -0,0 +1,77 @@ +/* Icinga 2 | (c) 2020 Icinga GmbH | GPLv2+ */ + +#include "base/string.hpp" +#include "remote/apilistener.hpp" +#include "remote/replay-log.hpp" +#include +#include +#include + +using namespace icinga; + +ReplayLogIterator::ReplayLogIterator(SQLite::Database& db) + : m_Stmt(Shared::Make(db, "SELECT ts, content FROM message ORDER BY ts")) +{ + Next(); +} + +void ReplayLogIterator::Next() +{ + if (m_Stmt) { + if (m_Stmt->executeStep()) { + m_Message.first = m_Stmt->getColumn(0).getDouble(); + + auto content (m_Stmt->getColumn(1)); + auto begin ((const char*)content.getBlob()); + + m_Message.second = String(begin, begin + content.getBytes()); + } else { + m_Stmt = nullptr; + } + } +} + +static inline +String EndpointToFile(const String& endpoint) +{ + auto file (ApiListener::GetApiDir() + "log/"); + + for (auto c : endpoint) { + char buf[3] = { 0 }; + sprintf(buf, "%02x", (unsigned int)(((int)c + 256) % 256)); + + file += (char*)buf; + } + + file += ".sqlite3"; + + return std::move(file); +} + +ReplayLog::ReplayLog(const String& endpoint) : ReplayLog(endpoint, EndpointToFile(endpoint)) +{ +} + +void ReplayLog::Log(double ts, const String& message) +{ + SQLite::Statement stmt (m_DB, "INSERT INTO message (ts, content) VALUES (?, ?)"); + stmt.bind(1, ts); + stmt.bindNoCopy(2, message.GetData()); + + (void)stmt.exec(); +} + +void ReplayLog::Cleanup(double upTo) +{ + SQLite::Statement stmt (m_DB, "DELETE FROM message WHERE ts < ?"); + stmt.bind(1, upTo); + + (void)stmt.exec(); +} + +ReplayLog::ReplayLog(const String& endpoint, const String& file) + : m_DB(file.CStr(), SQLite::OPEN_READWRITE | SQLite::OPEN_CREATE) +{ + m_DB.exec("CREATE TABLE IF NOT EXISTS message ( ts REAL, content BLOB )"); + m_DB.exec("CREATE INDEX IF NOT EXISTS ix_message_ts ON message ( ts )"); +} diff --git a/lib/remote/replay-log.hpp b/lib/remote/replay-log.hpp new file mode 100644 index 000000000..c13b6deaa --- /dev/null +++ b/lib/remote/replay-log.hpp @@ -0,0 +1,88 @@ +/* Icinga 2 | (c) 2020 Icinga GmbH | GPLv2+ */ + +#ifndef REPLAY_LOG_H +#define REPLAY_LOG_H + +#include "base/shared.hpp" +#include "base/string.hpp" +#include +#include + +namespace icinga +{ + +/** + * An endpoint's cluster messages iterator. + * + * @ingroup remote + */ +class ReplayLogIterator +{ +public: + ReplayLogIterator() = default; + ReplayLogIterator(SQLite::Database& db); + + ReplayLogIterator(const ReplayLogIterator&) = delete; + ReplayLogIterator(ReplayLogIterator&&) = default; + ReplayLogIterator& operator=(const ReplayLogIterator&) = delete; + ReplayLogIterator& operator=(ReplayLogIterator&&) = default; + + inline bool operator!=(const ReplayLogIterator& rhs) + { + return m_Stmt != rhs.m_Stmt; + } + + inline ReplayLogIterator& operator++() + { + Next(); + return *this; + } + + inline const std::pair& operator*() + { + return m_Message; + } + +private: + void Next(); + + Shared::Ptr m_Stmt; + std::pair m_Message; +}; + +/** + * An endpoint's cluster messages log. + * + * @ingroup remote + */ +class ReplayLog +{ +public: + ReplayLog(const String& endpoint); + ReplayLog(const ReplayLog&) = delete; + ReplayLog(ReplayLog&&) = default; + ReplayLog& operator=(const ReplayLog&) = delete; + ReplayLog& operator=(ReplayLog&&) = default; + + inline ReplayLogIterator begin() + { + return ReplayLogIterator(m_DB); + } + + inline ReplayLogIterator end() + { + return ReplayLogIterator(); + } + + void Log(double ts, const String& message); + void Cleanup(double upTo); + +private: + ReplayLog(const String& endpoint, const String& file); + + SQLite::Database m_DB; +}; + +} + +#endif /* REPLAY_LOG_H */