diff --git a/lib/redis/redisconnection.cpp b/lib/redis/redisconnection.cpp index 32b4272ea..080b64c4c 100644 --- a/lib/redis/redisconnection.cpp +++ b/lib/redis/redisconnection.cpp @@ -17,244 +17,283 @@ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * ******************************************************************************/ -#include "base/object.hpp" #include "redis/redisconnection.hpp" -#include "base/workqueue.hpp" -#include "base/logger.hpp" +#include "base/array.hpp" #include "base/convert.hpp" -#include "base/utility.hpp" -#include "redis/rediswriter.hpp" -#include "hiredis/hiredis.h" +#include "base/defer.hpp" +#include "base/io-engine.hpp" +#include "base/logger.hpp" +#include "base/objectlock.hpp" +#include "base/string.hpp" +#include "base/tcpsocket.hpp" +#include +#include +#include +#include +#include +#include #include #include using namespace icinga; +namespace asio = boost::asio; RedisConnection::RedisConnection(const String host, const int port, const String path, const String password, const int db) : - m_Host(host), m_Port(port), m_Path(path), m_Password(password), m_DbIndex(db), m_Context(NULL), m_Connected(false) + RedisConnection(IoEngine::Get().GetIoService(), host, port, path, password, db) { - m_RedisConnectionWorkQueue.SetName("RedisConnection"); } -void RedisConnection::StaticInitialize() +RedisConnection::RedisConnection(boost::asio::io_context& io, String host, int port, String path, String password, int db) + : m_Host(std::move(host)), m_Port(port), m_Path(std::move(path)), m_Password(std::move(password)), m_DbIndex(db), + m_Connecting(false), m_Connected(false), m_Strand(io), m_QueuedWrites(io), m_QueuedReads(io) { } void RedisConnection::Start() { - RedisConnection::Connect(); + if (!m_Connecting.exchange(true)) { + Ptr keepAlive (this); - std::thread thread(&RedisConnection::HandleRW, this); - thread.detach(); -} - -void RedisConnection::AssertOnWorkQueue() -{ - ASSERT(m_RedisConnectionWorkQueue.IsWorkerThread()); -} - -void RedisConnection::HandleRW() -{ - Utility::SetThreadName("RedisConnection Handler"); - - for (;;) { - try { - { - boost::mutex::scoped_lock lock(m_CMutex); - if (!m_Connected) - return; - redisAsyncHandleWrite(m_Context); - redisAsyncHandleRead(m_Context); - } - Utility::Sleep(0.1); - } catch (const std::exception&) { - Log(LogCritical, "RedisWriter", "Internal Redis Error"); - } + asio::spawn(m_Strand, [this, keepAlive](asio::yield_context yc) { Connect(yc); }); } } - -void RedisConnection::RedisInitialCallback(redisAsyncContext *c, void *r, void *p) -{ - auto state = (ConnectionState *) p; - if (state->state != Starting && !r) { - Log(LogCritical, "RedisConnection") - << "No answer from Redis during initial connection, is the Redis server running?"; - return; - } else if (r != nullptr) { - redisReply *rep = (redisReply *) r; - if (rep->type == REDIS_REPLY_ERROR) { - Log(LogCritical, "RedisConnection") - << "Failed to connect to Redis: " << rep->str; - state->conn->m_Connected = false; - return; - } - } - - if (state->state == Starting) { - state->state = Auth; - if (!state->conn->m_Password.IsEmpty()) { - boost::mutex::scoped_lock lock(state->conn->m_CMutex); - redisAsyncCommand(c, &RedisInitialCallback, p, "AUTH %s", state->conn->m_Password.CStr()); - return; - } - } - if (state->state == Auth) - { - state->state = DBSelect; - if (state->conn->m_DbIndex != 0) { - boost::mutex::scoped_lock lock(state->conn->m_CMutex); - redisAsyncCommand(c, &RedisInitialCallback, p, "SELECT %d", state->conn->m_DbIndex); - return; - } - } - if (state->state == DBSelect) - state->conn->m_Connected = true; -} bool RedisConnection::IsConnected() { - return m_Connected; + return m_Connected.load(); } - -void RedisConnection::Connect() +void RedisConnection::FireAndForgetQuery(RedisConnection::Query query) { - if (m_Connected) - return; + auto item (std::make_shared(std::move(query))); - Log(LogInformation, "RedisWriter", "Trying to connect to redis server Async"); - { - boost::mutex::scoped_lock lock(m_CMutex); - - if (m_Path.IsEmpty()) - m_Context = redisAsyncConnect(m_Host.CStr(), m_Port); - else - m_Context = redisAsyncConnectUnix(m_Path.CStr()); - - m_Context->data = (void*) this; - - redisAsyncSetConnectCallback(m_Context, &ConnectCallback); - redisAsyncSetDisconnectCallback(m_Context, &DisconnectCallback); - } - - m_State = ConnectionState{Starting, this}; - RedisInitialCallback(m_Context, nullptr, (void*)&m_State); + asio::post(m_Strand, [this, item]() { + m_Queues.FireAndForgetQuery.emplace(std::move(*item)); + m_QueuedWrites.Set(); + }); } -void RedisConnection::Disconnect() +void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries) { - redisAsyncDisconnect(m_Context); + auto item (std::make_shared(std::move(queries))); + + asio::post(m_Strand, [this, item]() { + m_Queues.FireAndForgetQueries.emplace(std::move(*item)); + m_QueuedWrites.Set(); + }); } -void RedisConnection::ConnectCallback(const redisAsyncContext *c, int status) +RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query query) { - auto *rc = (RedisConnection* ) const_cast(c)->data; - if (status != REDIS_OK) { - if (c->err != 0) { - Log(LogCritical, "RedisConnection") - << "Redis connection failure: " << c->errstr; + std::promise promise; + auto future (promise.get_future()); + auto item (std::make_shared(std::move(query), std::move(promise))); + + asio::post(m_Strand, [this, item]() { + m_Queues.GetResultOfQuery.emplace(std::move(*item)); + m_QueuedWrites.Set(); + }); + + item = nullptr; + future.wait(); + return future.get(); +} + +RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Queries queries) +{ + std::promise promise; + auto future (promise.get_future()); + auto item (std::make_shared(std::move(queries), std::move(promise))); + + asio::post(m_Strand, [this, item]() { + m_Queues.GetResultsOfQueries.emplace(std::move(*item)); + m_QueuedWrites.Set(); + }); + + item = nullptr; + future.wait(); + return future.get(); +} + +void RedisConnection::Connect(asio::yield_context& yc) +{ + Defer notConnecting ([this]() { + if (!m_Connected.load()) { + m_Connecting.store(false); + } + }); + + Log(LogInformation, "RedisWriter", "Trying to connect to Redis server (async)"); + + try { + if (m_Path.IsEmpty()) { + m_TcpConn = decltype(m_TcpConn)(new TcpConn(m_Strand.context())); + icinga::Connect(m_TcpConn->next_layer(), m_Host, Convert::ToString(m_Port), yc); } else { - Log(LogCritical, "RedisConnection") - << "Redis connection failure"; + m_UnixConn = decltype(m_UnixConn)(new UnixConn(m_Strand.context())); + m_UnixConn->next_layer().async_connect(Unix::endpoint(m_Path.CStr()), yc); } - rc->m_Connected = false; - } else { - Log(LogInformation, "RedisConnection") - << "Redis Connection: O N L I N E"; - } -} -// It's unfortunate we can not pass any user data here. All we get to do is log a message and hope for the best -void RedisConnection::DisconnectCallback(const redisAsyncContext *c, int status) -{ - auto *rc = (RedisConnection* ) const_cast(c)->data; - boost::mutex::scoped_lock lock(rc->m_CMutex); - if (status == REDIS_OK) - Log(LogInformation, "RedisConnection") << "Redis disconnected by us"; - else { - if (c->err != 0) - Log(LogCritical, "RedisConnection") << "Redis disconnected by server. Reason: " << c->errstr; - else - Log(LogCritical, "RedisConnection") << "Redis disconnected by server"; - } + { + Ptr keepAlive (this); - rc->m_Connected = false; -} - -void RedisConnection::ExecuteQuery(std::vector query, redisCallbackFn *fn, void *privdata) -{ - auto queryPtr (std::make_shared(std::move(query))); - - m_RedisConnectionWorkQueue.Enqueue([this, queryPtr, fn, privdata]() { - SendMessageInternal(*queryPtr, fn, privdata); - }); -} - -void -RedisConnection::ExecuteQueries(std::vector> queries, redisCallbackFn *fn, void *privdata) -{ - auto queriesPtr (std::make_shared(std::move(queries))); - - m_RedisConnectionWorkQueue.Enqueue([this, queriesPtr, fn, privdata]() { - SendMessagesInternal(*queriesPtr, fn, privdata); - }); -} - -void RedisConnection::SendMessageInternal(const std::vector& query, redisCallbackFn *fn, void *privdata) -{ - AssertOnWorkQueue(); - - { - boost::mutex::scoped_lock lock(m_CMutex); - - if (!m_Context || !m_Connected) { - Log(LogCritical, "RedisWriter") - << "Not connected to Redis"; - return; + asio::spawn(m_Strand, [this, keepAlive](asio::yield_context yc) { ReadLoop(yc); }); + asio::spawn(m_Strand, [this, keepAlive](asio::yield_context yc) { WriteLoop(yc); }); } - } - const char **argv; - size_t *argvlen; - - argv = new const char *[query.size()]; - argvlen = new size_t[query.size()]; - String debugstr; - - for (std::vector::size_type i = 0; i < query.size(); i++) { - argv[i] = query[i].CStr(); - argvlen[i] = query[i].GetLength(); - debugstr += argv[i]; - debugstr += " "; - } - - Log(LogDebug, "RedisWriter, Connection") - << "Sending Command: " << debugstr; - - int r; - - { - boost::mutex::scoped_lock lock(m_CMutex); - - r = redisAsyncCommandArgv(m_Context, fn, privdata, query.size(), argv, argvlen); - } - - delete[] argv; - delete[] argvlen; - - if (r == REDIS_REPLY_ERROR) { + m_Connected.store(true); + } catch (const boost::coroutines::detail::forced_unwind&) { + throw; + } catch (const std::exception& ex) { Log(LogCritical, "RedisWriter") - << "Redis Async query failed"; - - BOOST_THROW_EXCEPTION( - redis_error() - << errinfo_redis_query(Utility::Join(Array::FromVector(query), ' ', false)) - ); + << "Cannot connect to " << m_Host << ":" << m_Port << ": " << ex.what(); } } -void RedisConnection::SendMessagesInternal(const std::vector>& queries, redisCallbackFn *fn, void *privdata) +void RedisConnection::ReadLoop(asio::yield_context& yc) { - for (const auto& query : queries) { - SendMessageInternal(query, fn, privdata); + for (;;) { + m_QueuedReads.Wait(yc); + + do { + auto item (std::move(m_Queues.FutureResponseActions.front())); + m_Queues.FutureResponseActions.pop(); + + switch (item.Action) { + case ResponseAction::Ignore: + for (auto i (item.Amount); i; --i) { + ReadOne(yc); + } + break; + case ResponseAction::Deliver: + for (auto i (item.Amount); i; --i) { + auto promise (std::move(m_Queues.ReplyPromises.front())); + m_Queues.ReplyPromises.pop(); + + promise.set_value(ReadOne(yc)); + } + break; + case ResponseAction::DeliverBulk: + { + auto promise (std::move(m_Queues.RepliesPromises.front())); + m_Queues.RepliesPromises.pop(); + + Replies replies; + replies.reserve(item.Amount); + + for (auto i (item.Amount); i; --i) { + replies.emplace_back(ReadOne(yc)); + } + + promise.set_value(std::move(replies)); + } + } + } while (!m_Queues.FutureResponseActions.empty()); + + m_QueuedReads.Clear(); + } +} + +void RedisConnection::WriteLoop(asio::yield_context& yc) +{ + for (;;) { + m_QueuedWrites.Wait(yc); + + bool writtenAll = true; + + do { + writtenAll = true; + + if (!m_Queues.FireAndForgetQuery.empty()) { + auto item (std::move(m_Queues.FireAndForgetQuery.front())); + m_Queues.FireAndForgetQuery.pop(); + + if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) { + m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Ignore}); + } else { + ++m_Queues.FutureResponseActions.back().Amount; + } + + m_QueuedReads.Set(); + writtenAll = false; + + WriteOne(item, yc); + } + + if (!m_Queues.FireAndForgetQueries.empty()) { + auto item (std::move(m_Queues.FireAndForgetQueries.front())); + m_Queues.FireAndForgetQueries.pop(); + + if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) { + m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.size(), ResponseAction::Ignore}); + } else { + m_Queues.FutureResponseActions.back().Amount += item.size(); + } + + m_QueuedReads.Set(); + writtenAll = false; + + for (auto& query : item) { + WriteOne(query, yc); + } + } + + if (!m_Queues.GetResultOfQuery.empty()) { + auto item (std::move(m_Queues.GetResultOfQuery.front())); + m_Queues.GetResultOfQuery.pop(); + m_Queues.ReplyPromises.emplace(std::move(item.second)); + + if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Deliver) { + m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Deliver}); + } else { + ++m_Queues.FutureResponseActions.back().Amount; + } + + m_QueuedReads.Set(); + writtenAll = false; + + WriteOne(item.first, yc); + } + + if (!m_Queues.GetResultsOfQueries.empty()) { + auto item (std::move(m_Queues.GetResultsOfQueries.front())); + m_Queues.GetResultsOfQueries.pop(); + m_Queues.RepliesPromises.emplace(std::move(item.second)); + m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.first.size(), ResponseAction::DeliverBulk}); + + m_QueuedReads.Set(); + writtenAll = false; + + for (auto& query : item.first) { + WriteOne(query, yc); + } + } + } while (!writtenAll); + + m_QueuedWrites.Clear(); + + if (m_Path.IsEmpty()) { + m_TcpConn->async_flush(yc); + } else { + m_UnixConn->async_flush(yc); + } + + } +} + +RedisConnection::Reply RedisConnection::ReadOne(boost::asio::yield_context& yc) +{ + if (m_Path.IsEmpty()) { + return ReadRESP(*m_TcpConn, yc); + } else { + return ReadRESP(*m_UnixConn, yc); + } +} + +void RedisConnection::WriteOne(RedisConnection::Query& query, asio::yield_context& yc) +{ + if (m_Path.IsEmpty()) { + WriteRESP(*m_TcpConn, query, yc); + } else { + WriteRESP(*m_UnixConn, query, yc); } } diff --git a/lib/redis/redisconnection.hpp b/lib/redis/redisconnection.hpp index 7b1e21404..b6b5c005d 100644 --- a/lib/redis/redisconnection.hpp +++ b/lib/redis/redisconnection.hpp @@ -20,9 +20,35 @@ #ifndef REDISCONNECTION_H #define REDISCONNECTION_H -#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "base/array.hpp" +#include "base/atomic.hpp" +#include "base/io-engine.hpp" #include "base/object.hpp" -#include "base/workqueue.hpp" +#include "base/string.hpp" +#include "base/value.hpp" namespace icinga { @@ -31,77 +57,288 @@ namespace icinga * * @ingroup redis */ - - enum conn_state{ - Starting, - Auth, - DBSelect, - Done, - }; - - class RedisConnection; - struct ConnectionState { - conn_state state; - RedisConnection *conn; - }; - class RedisConnection final : public Object { public: DECLARE_PTR_TYPEDEFS(RedisConnection); + typedef std::vector Query; + typedef std::vector Queries; + typedef Value Reply; + typedef std::vector Replies; + RedisConnection(const String host, const int port, const String path, const String password = "", const int db = 0); void Start(); - void Connect(); - - void Disconnect(); - bool IsConnected(); - void ExecuteQuery(std::vector query, redisCallbackFn *fn = NULL, void *privdata = NULL); + void FireAndForgetQuery(Query query); + void FireAndForgetQueries(Queries queries); - void ExecuteQueries(std::vector> queries, redisCallbackFn *fn = NULL, - void *privdata = NULL); + Reply GetResultOfQuery(Query query); + Replies GetResultsOfQueries(Queries queries); private: - static void StaticInitialize(); + enum class ResponseAction : unsigned char + { + Ignore, Deliver, DeliverBulk + }; - void SendMessageInternal(const std::vector& query, redisCallbackFn *fn, void *privdata); - void SendMessagesInternal(const std::vector>& queries, redisCallbackFn *fn, void *privdata); + struct FutureResponseAction + { + size_t Amount; + ResponseAction Action; + }; - void AssertOnWorkQueue(); + typedef boost::asio::ip::tcp Tcp; + typedef boost::asio::local::stream_protocol Unix; - void HandleRW(); + typedef boost::asio::buffered_stream TcpConn; + typedef boost::asio::buffered_stream UnixConn; - static void DisconnectCallback(const redisAsyncContext *c, int status); - static void ConnectCallback(const redisAsyncContext *c, int status); + template + static Value ReadRESP(AsyncReadStream& stream, boost::asio::yield_context& yc); - static void RedisInitialCallback(redisAsyncContext *c, void *r, void *p); + template + static std::vector ReadLine(AsyncReadStream& stream, boost::asio::yield_context& yc, size_t hint = 0); + template + static void WriteRESP(AsyncWriteStream& stream, const Query& query, boost::asio::yield_context& yc); - WorkQueue m_RedisConnectionWorkQueue{100000}; - Timer::Ptr m_EventLoop; + template + static void WriteInt(AsyncWriteStream& stream, intmax_t i, boost::asio::yield_context& yc); - redisAsyncContext *m_Context; + RedisConnection(boost::asio::io_context& io, String host, int port, String path, String password, int db); + + void Connect(boost::asio::yield_context& yc); + void ReadLoop(boost::asio::yield_context& yc); + void WriteLoop(boost::asio::yield_context& yc); + Reply ReadOne(boost::asio::yield_context& yc); + void WriteOne(Query& query, boost::asio::yield_context& yc); String m_Path; String m_Host; int m_Port; String m_Password; int m_DbIndex; - bool m_Connected; - boost::mutex m_CMutex; - ConnectionState m_State; + boost::asio::io_context::strand m_Strand; + std::unique_ptr m_TcpConn; + std::unique_ptr m_UnixConn; + Atomic m_Connecting, m_Connected; + struct { + std::queue FireAndForgetQuery; + std::queue FireAndForgetQueries; + std::queue>> GetResultOfQuery; + std::queue>> GetResultsOfQueries; + std::queue> ReplyPromises; + std::queue> RepliesPromises; + std::queue FutureResponseActions; + } m_Queues; + + AsioConditionVariable m_QueuedWrites, m_QueuedReads; }; - struct redis_error : virtual std::exception, virtual boost::exception { }; +class RedisError final : public Object +{ +public: + DECLARE_PTR_TYPEDEFS(RedisError); - struct errinfo_redis_query_; - typedef boost::error_info errinfo_redis_query; + inline RedisError(String message) : m_Message(std::move(message)) + { + } + + inline const String& GetMessage() + { + return m_Message; + } + +private: + String m_Message; +}; + +class RedisProtocolError : public std::runtime_error +{ +protected: + inline RedisProtocolError() : runtime_error("") + { + } +}; + +class BadRedisType : public RedisProtocolError +{ +public: + inline BadRedisType(char type) : m_What{type, 0} + { + } + + virtual const char * what() const noexcept override + { + return m_What; + } + +private: + char m_What[2]; +}; + +class BadRedisInt : public RedisProtocolError +{ +public: + inline BadRedisInt(std::vector intStr) : m_What(std::move(intStr)) + { + m_What.emplace_back(0); + } + + virtual const char * what() const noexcept override + { + return m_What.data(); + } + +private: + std::vector m_What; +}; + +template +Value RedisConnection::ReadRESP(AsyncReadStream& stream, boost::asio::yield_context& yc) +{ + namespace asio = boost::asio; + + char type = 0; + asio::async_read(stream, asio::mutable_buffer(&type, 1), yc); + + switch (type) { + case '+': + { + auto buf (ReadLine(stream, yc)); + return String(buf.begin(), buf.end()); + } + case '-': + { + auto buf (ReadLine(stream, yc)); + return new RedisError(String(buf.begin(), buf.end())); + } + case ':': + { + auto buf (ReadLine(stream, yc, 21)); + intmax_t i = 0; + + try { + i = boost::lexical_cast(boost::string_view(buf.data(), buf.size())); + } catch (...) { + throw BadRedisInt(std::move(buf)); + } + + return (double)i; + } + case '$': + { + auto buf (ReadLine(stream, yc, 21)); + intmax_t i = 0; + + try { + i = boost::lexical_cast(boost::string_view(buf.data(), buf.size())); + } catch (...) { + throw BadRedisInt(std::move(buf)); + } + + if (i < 0) { + return Value(); + } + + buf.clear(); + buf.insert(buf.end(), i, 0); + asio::async_read(stream, asio::mutable_buffer(buf.data(), buf.size()), yc); + + { + char crlf[2]; + asio::async_read(stream, asio::mutable_buffer(crlf, 2), yc); + } + + return String(buf.begin(), buf.end()); + } + case '*': + { + auto buf (ReadLine(stream, yc, 21)); + intmax_t i = 0; + + try { + i = boost::lexical_cast(boost::string_view(buf.data(), buf.size())); + } catch (...) { + throw BadRedisInt(std::move(buf)); + } + + Array::Ptr arr = new Array(); + + if (i < 0) { + i = 0; + } + + arr->Reserve(i); + + for (; i; --i) { + arr->Add(ReadRESP(stream, yc)); + } + + return arr; + } + default: + throw BadRedisType(type); + } +} + +template +std::vector RedisConnection::ReadLine(AsyncReadStream& stream, boost::asio::yield_context& yc, size_t hint) +{ + namespace asio = boost::asio; + + std::vector line; + line.reserve(hint); + + char next = 0; + asio::mutable_buffer buf (&next, 1); + + for (;;) { + asio::async_read(stream, buf, yc); + + if (next == '\r') { + asio::async_read(stream, buf, yc); + return std::move(line); + } + + line.emplace_back(next); + } +} + +template +void RedisConnection::WriteRESP(AsyncWriteStream& stream, const Query& query, boost::asio::yield_context& yc) +{ + namespace asio = boost::asio; + + asio::async_write(stream, asio::const_buffer("*", 1), yc); + WriteInt(stream, query.size(), yc); + asio::async_write(stream, asio::const_buffer("\r\n", 2), yc); + + for (auto& arg : query) { + asio::async_write(stream, asio::const_buffer("$", 1), yc); + WriteInt(stream, arg.GetLength(), yc); + asio::async_write(stream, asio::const_buffer("\r\n", 2), yc); + asio::async_write(stream, asio::const_buffer(arg.CStr(), arg.GetLength()), yc); + asio::async_write(stream, asio::const_buffer("\r\n", 2), yc); + } +} + +template +void RedisConnection::WriteInt(AsyncWriteStream& stream, intmax_t i, boost::asio::yield_context& yc) +{ + namespace asio = boost::asio; + + char buf[21] = {}; + sprintf(buf, "%jd", i); + + asio::async_write(stream, asio::const_buffer(buf, strlen(buf)), yc); +} } diff --git a/lib/redis/rediswriter-objects.cpp b/lib/redis/rediswriter-objects.cpp index 2c930e380..e02d33bc6 100644 --- a/lib/redis/rediswriter-objects.cpp +++ b/lib/redis/rediswriter-objects.cpp @@ -166,7 +166,7 @@ void RedisWriter::UpdateAllConfigObjects() if (transaction.size() > 1) { transaction.push_back({"EXEC"}); - m_Rcon->ExecuteQueries(std::move(transaction)); + m_Rcon->FireAndForgetQueries(std::move(transaction)); transaction = {{"MULTI"}}; } } @@ -184,10 +184,10 @@ void RedisWriter::UpdateAllConfigObjects() if (transaction.size() > 1) { transaction.push_back({"EXEC"}); - m_Rcon->ExecuteQueries(std::move(transaction)); + m_Rcon->FireAndForgetQueries(std::move(transaction)); } - m_Rcon->ExecuteQuery({"PUBLISH", "icinga:config:dump", lcType}); + m_Rcon->FireAndForgetQuery({"PUBLISH", "icinga:config:dump", lcType}); Log(LogNotice, "RedisWriter") << "Dumped " << bulkCounter << " objects of type " << type.second; @@ -249,7 +249,7 @@ void RedisWriter::DeleteKeys(const std::vector& keys) { query.emplace_back(key); } - m_Rcon->ExecuteQuery(std::move(query)); + m_Rcon->FireAndForgetQuery(std::move(query)); } std::vector RedisWriter::GetTypeObjectKeys(const String& type) @@ -649,7 +649,7 @@ void RedisWriter::UpdateState(const Checkable::Ptr& checkable) { Dictionary::Ptr stateAttrs = SerializeState(checkable); - m_Rcon->ExecuteQuery({"HSET", m_PrefixStateObject + GetLowerCaseTypeNameDB(checkable), GetObjectIdentifier(checkable), JsonEncode(stateAttrs)}); + m_Rcon->FireAndForgetQuery({"HSET", m_PrefixStateObject + GetLowerCaseTypeNameDB(checkable), GetObjectIdentifier(checkable), JsonEncode(stateAttrs)}); } // Used to update a single object, used for runtime updates @@ -666,7 +666,7 @@ void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtime CreateConfigUpdate(object, typeName, statements, runtimeUpdate); Checkable::Ptr checkable = dynamic_pointer_cast(object); if (checkable) { - m_Rcon->ExecuteQuery({"HSET", m_PrefixStateObject + typeName, + m_Rcon->FireAndForgetQuery({"HSET", m_PrefixStateObject + typeName, GetObjectIdentifier(checkable), JsonEncode(SerializeState(checkable))}); } @@ -681,7 +681,7 @@ void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtime if (transaction.size() > 1) { transaction.push_back({"EXEC"}); - m_Rcon->ExecuteQueries(std::move(transaction)); + m_Rcon->FireAndForgetQueries(std::move(transaction)); } } @@ -965,7 +965,7 @@ RedisWriter::CreateConfigUpdate(const ConfigObject::Ptr& object, const String ty /* Send an update event to subscribers. */ if (runtimeUpdate) { - m_Rcon->ExecuteQuery({"PUBLISH", "icinga:config:update", typeName + ":" + objectKey}); + m_Rcon->FireAndForgetQuery({"PUBLISH", "icinga:config:update", typeName + ":" + objectKey}); } } @@ -974,7 +974,7 @@ void RedisWriter::SendConfigDelete(const ConfigObject::Ptr& object) String typeName = object->GetReflectionType()->GetName().ToLower(); String objectKey = GetObjectIdentifier(object); - m_Rcon->ExecuteQueries({ + m_Rcon->FireAndForgetQueries({ {"HDEL", m_PrefixConfigObject + typeName, objectKey}, {"DEL", m_PrefixStateObject + typeName + ":" + objectKey}, {"PUBLISH", "icinga:config:delete", typeName + ":" + objectKey} @@ -1010,7 +1010,7 @@ void RedisWriter::SendStatusUpdate(const ConfigObject::Ptr& object) streamadd.emplace_back(kv.second); } - m_Rcon->ExecuteQuery(std::move(streamadd)); + m_Rcon->FireAndForgetQuery(std::move(streamadd)); } Dictionary::Ptr RedisWriter::SerializeState(const Checkable::Ptr& checkable) @@ -1147,7 +1147,7 @@ RedisWriter::UpdateObjectAttrs(const ConfigObject::Ptr& object, int fieldType, typeName = typeNameOverride.ToLower(); return {GetObjectIdentifier(object), JsonEncode(attrs)}; - //m_Rcon->ExecuteQuery({"HSET", keyPrefix + typeName, GetObjectIdentifier(object), JsonEncode(attrs)}); + //m_Rcon->FireAndForgetQuery({"HSET", keyPrefix + typeName, GetObjectIdentifier(object), JsonEncode(attrs)}); } void RedisWriter::StateChangeHandler(const ConfigObject::Ptr &object) diff --git a/lib/redis/rediswriter.cpp b/lib/redis/rediswriter.cpp index 1ec7a9823..1b9043176 100644 --- a/lib/redis/rediswriter.cpp +++ b/lib/redis/rediswriter.cpp @@ -150,29 +150,23 @@ void RedisWriter::UpdateSubscriptions() if (!m_Rcon || !m_Rcon->IsConnected()) return; - long long cursor = 0; - + String cursor = "0"; String keyPrefix = "icinga:subscription:"; do { - auto reply = RedisGet({ "SCAN", Convert::ToString(cursor), "MATCH", keyPrefix + "*", "COUNT", "1000" }); + Array::Ptr reply = m_Rcon->GetResultOfQuery({ "SCAN", cursor, "MATCH", keyPrefix + "*", "COUNT", "1000" }); + VERIFY(reply->GetLength() % 2u == 0u); - VERIFY(reply->type == REDIS_REPLY_ARRAY); - VERIFY(reply->elements % 2 == 0); + cursor = reply->Get(0); - redisReply *cursorReply = reply->element[0]; - cursor = Convert::ToLong(cursorReply->str); + Array::Ptr keys = reply->Get(1); + ObjectLock oLock (keys); - redisReply *keysReply = reply->element[1]; - - for (size_t i = 0; i < keysReply->elements; i++) { - if (boost::algorithm::ends_with(keysReply->element[i]->str, ":limit")) + for (String key : keys) { + if (boost::algorithm::ends_with(key, ":limit")) continue; - redisReply *keyReply = keysReply->element[i]; - VERIFY(keyReply->type == REDIS_REPLY_STRING); RedisSubscriptionInfo rsi; - String key = keysReply->element[i]->str; if (!RedisWriter::GetSubscriptionTypes(key, rsi)) { Log(LogInformation, "RedisWriter") @@ -181,7 +175,7 @@ void RedisWriter::UpdateSubscriptions() m_Subscriptions[key.SubStr(keyPrefix.GetLength())] = rsi; } } - } while (cursor != 0); + } while (cursor != "0"); Log(LogInformation, "RedisWriter") << "Current Redis event subscriptions: " << m_Subscriptions.size(); @@ -190,14 +184,17 @@ void RedisWriter::UpdateSubscriptions() bool RedisWriter::GetSubscriptionTypes(String key, RedisSubscriptionInfo& rsi) { try { - auto redisReply = RedisGet({ "SMEMBERS", key }); - VERIFY(redisReply->type == REDIS_REPLY_ARRAY); + Array::Ptr redisReply = m_Rcon->GetResultOfQuery({ "SMEMBERS", key }); - if (redisReply->elements == 0) + if (redisReply->GetLength() == 0) return false; - for (size_t j = 0; j < redisReply->elements; j++) { - rsi.EventTypes.insert(redisReply->element[j]->str); + { + ObjectLock oLock (redisReply); + + for (String member : redisReply) { + rsi.EventTypes.insert(member); + } } Log(LogInformation, "RedisWriter") @@ -229,7 +226,7 @@ void RedisWriter::PublishStats() status->Set("config_dump_in_progress", m_ConfigDumpInProgress); String jsonStats = JsonEncode(status); - m_Rcon->ExecuteQuery({ "PUBLISH", "icinga:stats", jsonStats }); + m_Rcon->FireAndForgetQuery({ "PUBLISH", "icinga:stats", jsonStats }); } void RedisWriter::HandleEvents() @@ -281,20 +278,19 @@ void RedisWriter::HandleEvent(const Dictionary::Ptr& event) String body = JsonEncode(event); - auto maxExists = RedisGet({ "EXISTS", "icinga:subscription:" + name + ":limit" }); + double maxExists = m_Rcon->GetResultOfQuery({ "EXISTS", "icinga:subscription:" + name + ":limit" }); long maxEvents = MAX_EVENTS_DEFAULT; - if (maxExists->integer) { - auto redisReply = RedisGet({ "GET", "icinga:subscription:" + name + ":limit"}); - VERIFY(redisReply->type == REDIS_REPLY_STRING); + if (maxExists != 0) { + String redisReply = m_Rcon->GetResultOfQuery({ "GET", "icinga:subscription:" + name + ":limit"}); Log(LogInformation, "RedisWriter") - << "Got limit " << redisReply->str << " for " << name; + << "Got limit " << redisReply << " for " << name; - maxEvents = Convert::ToLong(redisReply->str); + maxEvents = Convert::ToLong(redisReply); } - m_Rcon->ExecuteQueries({ + m_Rcon->FireAndForgetQueries({ { "MULTI" }, { "LPUSH", "icinga:event:" + name, body }, { "LTRIM", "icinga:event:" + name, "0", String(maxEvents - 1)}, @@ -354,7 +350,7 @@ void RedisWriter::SendEvent(const Dictionary::Ptr& event) // Log(LogInformation, "RedisWriter") // << "Sending event \"" << body << "\""; - m_Rcon->ExecuteQueries({ + m_Rcon->FireAndForgetQueries({ { "PUBLISH", "icinga:event:all", body }, { "PUBLISH", "icinga:event:" + event->Get("type"), body }}); } @@ -385,21 +381,6 @@ struct synchronousWait { redisReply* reply; }; -void RedisWriter::RedisQueryCallback(redisAsyncContext *c, void *r, void *p) { - auto wait = (struct synchronousWait*) p; - auto rp = reinterpret_cast(r); - - - if (r == NULL) - wait->reply = nullptr; - else - wait->reply = RedisWriter::dupReplyObject(rp); - - boost::mutex::scoped_lock lock(wait->mtx); - wait->ready = true; - wait->cv.notify_all(); -} - struct RedisReplyDeleter { inline void operator() (redisReply *reply) @@ -407,19 +388,3 @@ struct RedisReplyDeleter freeReplyObject(reply); } }; - -std::shared_ptr RedisWriter::RedisGet(std::vector query) { - auto *wait = new synchronousWait; - wait->ready = false; - - m_Rcon->ExecuteQuery(std::move(query), RedisQueryCallback, wait); - - boost::mutex::scoped_lock lock(wait->mtx); - while (!wait->ready) { - wait->cv.wait(lock); - if (!wait->ready) - wait->ready = true; - } - - return std::shared_ptr(wait->reply, RedisReplyDeleter()); -} diff --git a/lib/redis/rediswriter.hpp b/lib/redis/rediswriter.hpp index 17fbc10a1..b62a82cb4 100644 --- a/lib/redis/rediswriter.hpp +++ b/lib/redis/rediswriter.hpp @@ -115,8 +115,6 @@ private: void ExceptionHandler(boost::exception_ptr exp); //Used to get a reply from the asyncronous connection - std::shared_ptr RedisGet(std::vector query); - static void RedisQueryCallback(redisAsyncContext *c, void *r, void *p); static redisReply* dupReplyObject(redisReply* reply);