Improve reconnect behavior for the RedisWriter class

refs #4991
This commit is contained in:
Gunnar Beutner 2017-02-13 10:37:24 +01:00
parent 8ad567ce99
commit 6d620e75ed
2 changed files with 62 additions and 12 deletions

View File

@ -33,21 +33,60 @@ void RedisWriter::Start(bool runtimeCreated)
{
ObjectImpl<RedisWriter>::Start(runtimeCreated);
boost::thread thread(boost::bind(&RedisWriter::HandleEvents, this));
boost::thread thread(boost::bind(&RedisWriter::ConnectionThreadProc, this));
thread.detach();
}
void RedisWriter::ConnectionThreadProc(void)
{
String path = GetPath();
String host = GetHost();
Log(LogInformation, "RedisWriter", "Trying to connecto redis server");
if (path.IsEmpty())
m_Context = redisConnect(host.CStr(), GetPort());
else
m_Context = redisConnectUnix(path.CStr());
String password = GetPassword();
if (!m_Context || m_Context->err) {
if (!m_Context) {
Log(LogWarning, "RedisWriter", "Cannot allocate redis context.");
} else {
Log(LogWarning, "RedisWriter", "Connection error: ")
<< m_Context->errstr;
}
}
void *reply = redisCommand(m_Context, "AUTH %s", password.CStr());
freeReplyObject(reply);
for (;;) {
String password = GetPassword();
if (!password.IsEmpty()) {
redisReply *reply = reinterpret_cast<redisReply *>(redisCommand(m_Context, "AUTH %s", password.CStr()));
if (reply->type == REDIS_REPLY_STATUS || reply->type == REDIS_REPLY_ERROR) {
Log(LogInformation, "RedisWriter")
<< "AUTH: " << reply->str;
}
freeReplyObject(reply);
}
HandleEvents();
for (;;) {
Log(LogInformation, "RedisWriter", "Trying to reconnect to redis server");
if (redisReconnect(m_Context) == REDIS_OK) {
Log(LogInformation, "RedisWriter", "Connection to redis server was reestablished");
break;
}
Log(LogInformation, "RedisWriter", "Unable to reconnect to redis server: Waiting for next attempt");
Utility::Sleep(15);
}
}
}
void RedisWriter::HandleEvents(void)
@ -81,14 +120,24 @@ void RedisWriter::HandleEvents(void)
String body = JsonEncode(result);
//TODO: Reconnect handling
try {
void *reply = redisCommand(m_Context, "LPUSH icinga:events %s", body.CStr());
freeReplyObject(reply);
} catch (const std::exception&) {
queue->RemoveClient(this);
EventQueue::UnregisterIfUnused(queueName, queue);
throw;
redisReply *reply = reinterpret_cast<redisReply *>(redisCommand(m_Context, "LPUSH icinga:events %s", body.CStr()));
if (!reply)
break;
if (reply->type == REDIS_REPLY_STATUS || reply->type == REDIS_REPLY_ERROR) {
Log(LogInformation, "RedisWriter")
<< "LPUSH icinga:events: " << reply->str;
}
if (reply->type == REDIS_REPLY_ERROR) {
freeReplyObject(reply);
break;
}
freeReplyObject(reply);
}
queue->RemoveClient(this);
EventQueue::UnregisterIfUnused(queueName, queue);
}

View File

@ -40,6 +40,7 @@ public:
virtual void Start(bool runtimeCreated) override;
private:
void ConnectionThreadProc(void);
void HandleEvents(void);
redisContext *m_Context;