diff --git a/lib/redis/redisconnection.cpp b/lib/redis/redisconnection.cpp index db5fb2742..7171fc638 100644 --- a/lib/redis/redisconnection.cpp +++ b/lib/redis/redisconnection.cpp @@ -30,7 +30,7 @@ using namespace icinga; RedisConnection::RedisConnection(const String host, const int port, const String path, const String password, const int db) : - m_Host(host), m_Port(port), m_Path(path), m_Password(password), m_DbIndex(db), m_Context(NULL) + m_Host(host), m_Port(port), m_Path(path), m_Password(password), m_DbIndex(db), m_Context(NULL), m_Connected(false) { m_RedisConnectionWorkQueue.SetName("RedisConnection"); } @@ -70,45 +70,84 @@ void RedisConnection::HandleRW() } } + +void RedisConnection::RedisInitialCallback(redisAsyncContext *c, void *r, void *p) +{ + auto *state = (ConnectionState *) p; + if (r != nullptr) { + redisReply *rep = (redisReply *) r; + if (rep->type == REDIS_REPLY_ERROR) { + Log(LogCritical, "RedisConnection") + << "Failed to connect to Redis: " << rep->str; + state->conn->m_Connected = false; + return; + } + } + + if (state->state != Starting && (!r || c->err)) { + Log(LogCritical, "RedisConnection") << c->errstr; + state->conn->m_Connected = false; + return; + } + + if (state->state == Starting) { + state->state = Auth; + if (!state->conn->m_Password.IsEmpty()) { + boost::mutex::scoped_lock lock(state->conn->m_CMutex); + redisAsyncCommand(c, &RedisInitialCallback, p, "AUTH %s", state->conn->m_Password.CStr()); + return; + } + } + if (state->state == Auth) + { + state->state = DBSelect; + if (state->conn->m_DbIndex != 0) { + boost::mutex::scoped_lock lock(state->conn->m_CMutex); + redisAsyncCommand(c, &RedisInitialCallback, p, "SELECT %d", state->conn->m_DbIndex); + return; + } + } + if (state->state == DBSelect) + state->conn->m_Connected = true; +} +bool RedisConnection::IsConnected() { + return m_Connected; +} + + void RedisConnection::Connect() { if (m_Context) return; Log(LogInformation, "RedisWriter", "Trying to connect to redis server Async"); - boost::mutex::scoped_lock lock(m_CMutex); + { + boost::mutex::scoped_lock lock(m_CMutex); - if (m_Path.IsEmpty()) - m_Context = redisAsyncConnect(m_Host.CStr(), m_Port); - else - m_Context = redisAsyncConnectUnix(m_Path.CStr()); + if (m_Path.IsEmpty()) + m_Context = redisAsyncConnect(m_Host.CStr(), m_Port); + else + m_Context = redisAsyncConnectUnix(m_Path.CStr()); - if (!m_Context || m_Context->err) { - if (!m_Context) { - Log(LogWarning, "RedisWriter", "Cannot allocate redis context."); - } else { - Log(LogWarning, "RedisWriter", "Connection error: ") - << m_Context->errstr; + if (!m_Context || m_Context->err) { + if (!m_Context) { + Log(LogWarning, "RedisWriter", "Cannot allocate redis context."); + } else { + Log(LogWarning, "RedisWriter", "Connection error: ") + << m_Context->errstr; + } + + if (m_Context) { + redisAsyncFree(m_Context); + m_Context = NULL; + } } - if (m_Context) { - redisAsyncFree(m_Context); - m_Context = NULL; - } + redisAsyncSetDisconnectCallback(m_Context, &DisconnectCallback); } - redisAsyncSetDisconnectCallback(m_Context, &DisconnectCallback); - - /* TODO: This currently does not work properly: - * In case of error the connection is broken, yet the Context is not set to faulty. May be a bug with hiredis. - * Error case: Password does not match, or even: "Client sent AUTH, but no password is set" which also results in an error. - */ - if (!m_Password.IsEmpty()) { - ExecuteQuery({"AUTH", m_Password}); - } - - if (m_DbIndex != 0) - ExecuteQuery({"SELECT", Convert::ToString(m_DbIndex)}); + m_State = ConnectionState{Starting, this}; + RedisInitialCallback(m_Context, nullptr, (void*)&m_State); } void RedisConnection::Disconnect() @@ -129,11 +168,6 @@ void RedisConnection::DisconnectCallback(const redisAsyncContext *c, int status) } -bool RedisConnection::IsConnected() -{ - return (REDIS_CONNECTED & m_Context->c.flags) == REDIS_CONNECTED; -} - void RedisConnection::ExecuteQuery(const std::vector& query, redisCallbackFn *fn, void *privdata) { m_RedisConnectionWorkQueue.Enqueue(std::bind(&RedisConnection::SendMessageInternal, this, query, fn, privdata)); diff --git a/lib/redis/redisconnection.hpp b/lib/redis/redisconnection.hpp index c968c15af..6a4afe285 100644 --- a/lib/redis/redisconnection.hpp +++ b/lib/redis/redisconnection.hpp @@ -31,6 +31,20 @@ namespace icinga * * @ingroup redis */ + + enum conn_state{ + Starting, + Auth, + DBSelect, + Done, + }; + + class RedisConnection; + struct ConnectionState { + conn_state state; + RedisConnection *conn; + }; + class RedisConnection final : public Object { public: @@ -62,6 +76,9 @@ namespace icinga static void DisconnectCallback(const redisAsyncContext *c, int status); + static void RedisInitialCallback(redisAsyncContext *c, void *r, void *p); + + WorkQueue m_RedisConnectionWorkQueue{100000}; Timer::Ptr m_EventLoop; @@ -72,14 +89,18 @@ namespace icinga int m_Port; String m_Password; int m_DbIndex; + bool m_Connected; boost::mutex m_CMutex; + ConnectionState m_State; + }; struct redis_error : virtual std::exception, virtual boost::exception { }; struct errinfo_redis_query_; typedef boost::error_info errinfo_redis_query; + } #endif //REDISCONNECTION_H diff --git a/lib/redis/rediswriter.cpp b/lib/redis/rediswriter.cpp index d93b602a7..ca8099315 100644 --- a/lib/redis/rediswriter.cpp +++ b/lib/redis/rediswriter.cpp @@ -104,11 +104,13 @@ void RedisWriter::TryToReconnect() { AssertOnWorkQueue(); - if (m_ConfigDumpDone && m_Rcon->IsConnected()) + if (m_ConfigDumpDone) return; - else if (!m_Rcon->IsConnected()) + else m_Rcon->Start(); + if (!m_Rcon->IsConnected()) + return; UpdateSubscriptions(); if (m_ConfigDumpInProgress || m_ConfigDumpDone) @@ -135,8 +137,11 @@ void RedisWriter::UpdateSubscriptions() Log(LogInformation, "RedisWriter", "Updating Redis subscriptions"); - m_Subscriptions.clear(); - + if (!m_Rcon->IsConnected()) { + Log(LogCritical, "DEBUG, Redis") + << "NO CONNECT CHIEF"; + return; + } long long cursor = 0; String keyPrefix = "icinga:subscription:"; @@ -349,7 +354,7 @@ redisReply* RedisWriter::RedisGet(const std::vector& query) { boost::mutex::scoped_lock lock(wait->mtx); while (!wait->ready) { - wait->cv.timed_wait(lock, boost::posix_time::milliseconds(long(15 * 1000))); + wait->cv.wait(lock); if (!wait->ready) wait->ready = true; }