Add recursive callback to manage auth and db select

This commit is contained in:
Jean Flach 2018-10-25 17:48:32 +02:00 committed by Michael Friedrich
parent b59189b8d3
commit f89e649871
3 changed files with 98 additions and 38 deletions

View File

@ -30,7 +30,7 @@
using namespace icinga; using namespace icinga;
RedisConnection::RedisConnection(const String host, const int port, const String path, const String password, const int db) : RedisConnection::RedisConnection(const String host, const int port, const String path, const String password, const int db) :
m_Host(host), m_Port(port), m_Path(path), m_Password(password), m_DbIndex(db), m_Context(NULL) m_Host(host), m_Port(port), m_Path(path), m_Password(password), m_DbIndex(db), m_Context(NULL), m_Connected(false)
{ {
m_RedisConnectionWorkQueue.SetName("RedisConnection"); m_RedisConnectionWorkQueue.SetName("RedisConnection");
} }
@ -70,45 +70,84 @@ void RedisConnection::HandleRW()
} }
} }
void RedisConnection::RedisInitialCallback(redisAsyncContext *c, void *r, void *p)
{
auto *state = (ConnectionState *) p;
if (r != nullptr) {
redisReply *rep = (redisReply *) r;
if (rep->type == REDIS_REPLY_ERROR) {
Log(LogCritical, "RedisConnection")
<< "Failed to connect to Redis: " << rep->str;
state->conn->m_Connected = false;
return;
}
}
if (state->state != Starting && (!r || c->err)) {
Log(LogCritical, "RedisConnection") << c->errstr;
state->conn->m_Connected = false;
return;
}
if (state->state == Starting) {
state->state = Auth;
if (!state->conn->m_Password.IsEmpty()) {
boost::mutex::scoped_lock lock(state->conn->m_CMutex);
redisAsyncCommand(c, &RedisInitialCallback, p, "AUTH %s", state->conn->m_Password.CStr());
return;
}
}
if (state->state == Auth)
{
state->state = DBSelect;
if (state->conn->m_DbIndex != 0) {
boost::mutex::scoped_lock lock(state->conn->m_CMutex);
redisAsyncCommand(c, &RedisInitialCallback, p, "SELECT %d", state->conn->m_DbIndex);
return;
}
}
if (state->state == DBSelect)
state->conn->m_Connected = true;
}
bool RedisConnection::IsConnected() {
return m_Connected;
}
void RedisConnection::Connect() void RedisConnection::Connect()
{ {
if (m_Context) if (m_Context)
return; return;
Log(LogInformation, "RedisWriter", "Trying to connect to redis server Async"); Log(LogInformation, "RedisWriter", "Trying to connect to redis server Async");
boost::mutex::scoped_lock lock(m_CMutex); {
boost::mutex::scoped_lock lock(m_CMutex);
if (m_Path.IsEmpty()) if (m_Path.IsEmpty())
m_Context = redisAsyncConnect(m_Host.CStr(), m_Port); m_Context = redisAsyncConnect(m_Host.CStr(), m_Port);
else else
m_Context = redisAsyncConnectUnix(m_Path.CStr()); m_Context = redisAsyncConnectUnix(m_Path.CStr());
if (!m_Context || m_Context->err) { if (!m_Context || m_Context->err) {
if (!m_Context) { if (!m_Context) {
Log(LogWarning, "RedisWriter", "Cannot allocate redis context."); Log(LogWarning, "RedisWriter", "Cannot allocate redis context.");
} else { } else {
Log(LogWarning, "RedisWriter", "Connection error: ") Log(LogWarning, "RedisWriter", "Connection error: ")
<< m_Context->errstr; << m_Context->errstr;
}
if (m_Context) {
redisAsyncFree(m_Context);
m_Context = NULL;
}
} }
if (m_Context) { redisAsyncSetDisconnectCallback(m_Context, &DisconnectCallback);
redisAsyncFree(m_Context);
m_Context = NULL;
}
} }
redisAsyncSetDisconnectCallback(m_Context, &DisconnectCallback); m_State = ConnectionState{Starting, this};
RedisInitialCallback(m_Context, nullptr, (void*)&m_State);
/* TODO: This currently does not work properly:
* In case of error the connection is broken, yet the Context is not set to faulty. May be a bug with hiredis.
* Error case: Password does not match, or even: "Client sent AUTH, but no password is set" which also results in an error.
*/
if (!m_Password.IsEmpty()) {
ExecuteQuery({"AUTH", m_Password});
}
if (m_DbIndex != 0)
ExecuteQuery({"SELECT", Convert::ToString(m_DbIndex)});
} }
void RedisConnection::Disconnect() void RedisConnection::Disconnect()
@ -129,11 +168,6 @@ void RedisConnection::DisconnectCallback(const redisAsyncContext *c, int status)
} }
bool RedisConnection::IsConnected()
{
return (REDIS_CONNECTED & m_Context->c.flags) == REDIS_CONNECTED;
}
void RedisConnection::ExecuteQuery(const std::vector<String>& query, redisCallbackFn *fn, void *privdata) void RedisConnection::ExecuteQuery(const std::vector<String>& query, redisCallbackFn *fn, void *privdata)
{ {
m_RedisConnectionWorkQueue.Enqueue(std::bind(&RedisConnection::SendMessageInternal, this, query, fn, privdata)); m_RedisConnectionWorkQueue.Enqueue(std::bind(&RedisConnection::SendMessageInternal, this, query, fn, privdata));

View File

@ -31,6 +31,20 @@ namespace icinga
* *
* @ingroup redis * @ingroup redis
*/ */
enum conn_state{
Starting,
Auth,
DBSelect,
Done,
};
class RedisConnection;
struct ConnectionState {
conn_state state;
RedisConnection *conn;
};
class RedisConnection final : public Object class RedisConnection final : public Object
{ {
public: public:
@ -62,6 +76,9 @@ namespace icinga
static void DisconnectCallback(const redisAsyncContext *c, int status); static void DisconnectCallback(const redisAsyncContext *c, int status);
static void RedisInitialCallback(redisAsyncContext *c, void *r, void *p);
WorkQueue m_RedisConnectionWorkQueue{100000}; WorkQueue m_RedisConnectionWorkQueue{100000};
Timer::Ptr m_EventLoop; Timer::Ptr m_EventLoop;
@ -72,14 +89,18 @@ namespace icinga
int m_Port; int m_Port;
String m_Password; String m_Password;
int m_DbIndex; int m_DbIndex;
bool m_Connected;
boost::mutex m_CMutex; boost::mutex m_CMutex;
ConnectionState m_State;
}; };
struct redis_error : virtual std::exception, virtual boost::exception { }; struct redis_error : virtual std::exception, virtual boost::exception { };
struct errinfo_redis_query_; struct errinfo_redis_query_;
typedef boost::error_info<struct errinfo_redis_query_, std::string> errinfo_redis_query; typedef boost::error_info<struct errinfo_redis_query_, std::string> errinfo_redis_query;
} }
#endif //REDISCONNECTION_H #endif //REDISCONNECTION_H

View File

@ -104,11 +104,13 @@ void RedisWriter::TryToReconnect()
{ {
AssertOnWorkQueue(); AssertOnWorkQueue();
if (m_ConfigDumpDone && m_Rcon->IsConnected()) if (m_ConfigDumpDone)
return; return;
else if (!m_Rcon->IsConnected()) else
m_Rcon->Start(); m_Rcon->Start();
if (!m_Rcon->IsConnected())
return;
UpdateSubscriptions(); UpdateSubscriptions();
if (m_ConfigDumpInProgress || m_ConfigDumpDone) if (m_ConfigDumpInProgress || m_ConfigDumpDone)
@ -135,8 +137,11 @@ void RedisWriter::UpdateSubscriptions()
Log(LogInformation, "RedisWriter", "Updating Redis subscriptions"); Log(LogInformation, "RedisWriter", "Updating Redis subscriptions");
m_Subscriptions.clear(); if (!m_Rcon->IsConnected()) {
Log(LogCritical, "DEBUG, Redis")
<< "NO CONNECT CHIEF";
return;
}
long long cursor = 0; long long cursor = 0;
String keyPrefix = "icinga:subscription:"; String keyPrefix = "icinga:subscription:";
@ -349,7 +354,7 @@ redisReply* RedisWriter::RedisGet(const std::vector<String>& query) {
boost::mutex::scoped_lock lock(wait->mtx); boost::mutex::scoped_lock lock(wait->mtx);
while (!wait->ready) { while (!wait->ready) {
wait->cv.timed_wait(lock, boost::posix_time::milliseconds(long(15 * 1000))); wait->cv.wait(lock);
if (!wait->ready) if (!wait->ready)
wait->ready = true; wait->ready = true;
} }