From 0634e27d6de1384b03f2e3103a9bec0fd3a93e4c Mon Sep 17 00:00:00 2001 From: Jean Flach Date: Fri, 26 Oct 2018 14:07:07 +0200 Subject: [PATCH] Add new connection handling --- lib/redis/redisconnection.cpp | 63 ++++++++++++++++++------------- lib/redis/redisconnection.hpp | 1 + lib/redis/rediswriter-objects.cpp | 2 - lib/redis/rediswriter.cpp | 7 ++++ 4 files changed, 45 insertions(+), 28 deletions(-) diff --git a/lib/redis/redisconnection.cpp b/lib/redis/redisconnection.cpp index 7171fc638..4391a68f7 100644 --- a/lib/redis/redisconnection.cpp +++ b/lib/redis/redisconnection.cpp @@ -60,6 +60,8 @@ void RedisConnection::HandleRW() try { { boost::mutex::scoped_lock lock(m_CMutex); + if (!m_Connected) + return; redisAsyncHandleWrite(m_Context); redisAsyncHandleRead(m_Context); } @@ -74,7 +76,11 @@ void RedisConnection::HandleRW() void RedisConnection::RedisInitialCallback(redisAsyncContext *c, void *r, void *p) { auto *state = (ConnectionState *) p; - if (r != nullptr) { + if (state->state != Starting && !r) { + Log(LogCritical, "RedisConnection") + << "No answer from Redis during initial connection, is the Redis server running?"; + return; + } else if (r != nullptr) { redisReply *rep = (redisReply *) r; if (rep->type == REDIS_REPLY_ERROR) { Log(LogCritical, "RedisConnection") @@ -84,12 +90,6 @@ void RedisConnection::RedisInitialCallback(redisAsyncContext *c, void *r, void * } } - if (state->state != Starting && (!r || c->err)) { - Log(LogCritical, "RedisConnection") << c->errstr; - state->conn->m_Connected = false; - return; - } - if (state->state == Starting) { state->state = Auth; if (!state->conn->m_Password.IsEmpty()) { @@ -117,7 +117,7 @@ bool RedisConnection::IsConnected() { void RedisConnection::Connect() { - if (m_Context) + if (m_Connected) return; Log(LogInformation, "RedisWriter", "Trying to connect to redis server Async"); @@ -129,20 +129,9 @@ void RedisConnection::Connect() else m_Context = redisAsyncConnectUnix(m_Path.CStr()); - if (!m_Context || m_Context->err) { - if (!m_Context) { - Log(LogWarning, "RedisWriter", "Cannot allocate redis context."); - } else { - Log(LogWarning, "RedisWriter", "Connection error: ") - << m_Context->errstr; - } - - if (m_Context) { - redisAsyncFree(m_Context); - m_Context = NULL; - } - } + m_Context->data = (void*) this; + redisAsyncSetConnectCallback(m_Context, &ConnectCallback); redisAsyncSetDisconnectCallback(m_Context, &DisconnectCallback); } @@ -155,17 +144,39 @@ void RedisConnection::Disconnect() redisAsyncDisconnect(m_Context); } +void RedisConnection::ConnectCallback(const redisAsyncContext *c, int status) +{ + auto *rc = (RedisConnection* ) const_cast(c)->data; + if (status != REDIS_OK) { + if (c->err != 0) { + Log(LogCritical, "RedisConnection") + << "Redis connection failure: " << c->errstr; + } else { + Log(LogCritical, "RedisConnection") + << "Redis connection failure"; + } + rc->m_Connected = false; + } else { + Log(LogInformation, "RedisConnection") + << "Redis Connection: O N L I N E"; + } +} + +// It's unfortunate we can not pass any user data here. All we get to do is log a message and hope for the best void RedisConnection::DisconnectCallback(const redisAsyncContext *c, int status) { + auto *rc = (RedisConnection* ) const_cast(c)->data; + boost::mutex::scoped_lock lock(rc->m_CMutex); if (status == REDIS_OK) - Log(LogInformation, "RedisWriter") << "Redis disconnected by us"; + Log(LogInformation, "RedisConnection") << "Redis disconnected by us"; else { if (c->err != 0) - Log(LogCritical, "RedisWriter") << "Redis disconnected by server. Reason: " << c->errstr; + Log(LogCritical, "RedisConnection") << "Redis disconnected by server. Reason: " << c->errstr; else - Log(LogCritical, "RedisWriter") << "Redis disconnected by server"; + Log(LogCritical, "RedisConnection") << "Redis disconnected by server"; } + rc->m_Connected = false; } void RedisConnection::ExecuteQuery(const std::vector& query, redisCallbackFn *fn, void *privdata) @@ -187,9 +198,9 @@ void RedisConnection::SendMessageInternal(const std::vector& query, redi boost::mutex::scoped_lock lock(m_CMutex); - if (!m_Context) { + if (!m_Context || !m_Connected) { Log(LogCritical, "RedisWriter") - << "Connection lost"; + << "Not connected to Redis"; return; } diff --git a/lib/redis/redisconnection.hpp b/lib/redis/redisconnection.hpp index 6a4afe285..3dee85e93 100644 --- a/lib/redis/redisconnection.hpp +++ b/lib/redis/redisconnection.hpp @@ -75,6 +75,7 @@ namespace icinga void HandleRW(); static void DisconnectCallback(const redisAsyncContext *c, int status); + static void ConnectCallback(const redisAsyncContext *c, int status); static void RedisInitialCallback(redisAsyncContext *c, void *r, void *p); diff --git a/lib/redis/rediswriter-objects.cpp b/lib/redis/rediswriter-objects.cpp index 9edc4cc36..07c772642 100644 --- a/lib/redis/rediswriter-objects.cpp +++ b/lib/redis/rediswriter-objects.cpp @@ -62,8 +62,6 @@ void RedisWriter::UpdateAllConfigObjects() { double startTime = Utility::GetTime(); - m_Rcon->ExecuteQuery({"flushall"}); - // Use a Workqueue to pack objects in parallel WorkQueue upq(25000, Configuration::Concurrency); upq.SetName("RedisWriter:ConfigDump"); diff --git a/lib/redis/rediswriter.cpp b/lib/redis/rediswriter.cpp index ca8099315..5760d91d7 100644 --- a/lib/redis/rediswriter.cpp +++ b/lib/redis/rediswriter.cpp @@ -111,6 +111,7 @@ void RedisWriter::TryToReconnect() if (!m_Rcon->IsConnected()) return; + UpdateSubscriptions(); if (m_ConfigDumpInProgress || m_ConfigDumpDone) @@ -214,6 +215,9 @@ void RedisWriter::PublishStats() { AssertOnWorkQueue(); + if (!m_Rcon->IsConnected()) + return; + Dictionary::Ptr status = GetStats(); String jsonStats = JsonEncode(status); @@ -294,6 +298,9 @@ void RedisWriter::SendEvent(const Dictionary::Ptr& event) { AssertOnWorkQueue(); + if (!m_Rcon->IsConnected()) + return; + String body = JsonEncode(event); // Log(LogInformation, "RedisWriter")