Add new connection handling

This commit is contained in:
Jean Flach 2018-10-26 14:07:07 +02:00 committed by Michael Friedrich
parent f89e649871
commit 0634e27d6d
4 changed files with 45 additions and 28 deletions

View File

@ -60,6 +60,8 @@ void RedisConnection::HandleRW()
try {
{
boost::mutex::scoped_lock lock(m_CMutex);
if (!m_Connected)
return;
redisAsyncHandleWrite(m_Context);
redisAsyncHandleRead(m_Context);
}
@ -74,7 +76,11 @@ void RedisConnection::HandleRW()
void RedisConnection::RedisInitialCallback(redisAsyncContext *c, void *r, void *p)
{
auto *state = (ConnectionState *) p;
if (r != nullptr) {
if (state->state != Starting && !r) {
Log(LogCritical, "RedisConnection")
<< "No answer from Redis during initial connection, is the Redis server running?";
return;
} else if (r != nullptr) {
redisReply *rep = (redisReply *) r;
if (rep->type == REDIS_REPLY_ERROR) {
Log(LogCritical, "RedisConnection")
@ -84,12 +90,6 @@ void RedisConnection::RedisInitialCallback(redisAsyncContext *c, void *r, void *
}
}
if (state->state != Starting && (!r || c->err)) {
Log(LogCritical, "RedisConnection") << c->errstr;
state->conn->m_Connected = false;
return;
}
if (state->state == Starting) {
state->state = Auth;
if (!state->conn->m_Password.IsEmpty()) {
@ -117,7 +117,7 @@ bool RedisConnection::IsConnected() {
void RedisConnection::Connect()
{
if (m_Context)
if (m_Connected)
return;
Log(LogInformation, "RedisWriter", "Trying to connect to redis server Async");
@ -129,20 +129,9 @@ void RedisConnection::Connect()
else
m_Context = redisAsyncConnectUnix(m_Path.CStr());
if (!m_Context || m_Context->err) {
if (!m_Context) {
Log(LogWarning, "RedisWriter", "Cannot allocate redis context.");
} else {
Log(LogWarning, "RedisWriter", "Connection error: ")
<< m_Context->errstr;
}
if (m_Context) {
redisAsyncFree(m_Context);
m_Context = NULL;
}
}
m_Context->data = (void*) this;
redisAsyncSetConnectCallback(m_Context, &ConnectCallback);
redisAsyncSetDisconnectCallback(m_Context, &DisconnectCallback);
}
@ -155,17 +144,39 @@ void RedisConnection::Disconnect()
redisAsyncDisconnect(m_Context);
}
void RedisConnection::ConnectCallback(const redisAsyncContext *c, int status)
{
auto *rc = (RedisConnection* ) const_cast<redisAsyncContext*>(c)->data;
if (status != REDIS_OK) {
if (c->err != 0) {
Log(LogCritical, "RedisConnection")
<< "Redis connection failure: " << c->errstr;
} else {
Log(LogCritical, "RedisConnection")
<< "Redis connection failure";
}
rc->m_Connected = false;
} else {
Log(LogInformation, "RedisConnection")
<< "Redis Connection: O N L I N E";
}
}
// It's unfortunate we can not pass any user data here. All we get to do is log a message and hope for the best
void RedisConnection::DisconnectCallback(const redisAsyncContext *c, int status)
{
auto *rc = (RedisConnection* ) const_cast<redisAsyncContext*>(c)->data;
boost::mutex::scoped_lock lock(rc->m_CMutex);
if (status == REDIS_OK)
Log(LogInformation, "RedisWriter") << "Redis disconnected by us";
Log(LogInformation, "RedisConnection") << "Redis disconnected by us";
else {
if (c->err != 0)
Log(LogCritical, "RedisWriter") << "Redis disconnected by server. Reason: " << c->errstr;
Log(LogCritical, "RedisConnection") << "Redis disconnected by server. Reason: " << c->errstr;
else
Log(LogCritical, "RedisWriter") << "Redis disconnected by server";
Log(LogCritical, "RedisConnection") << "Redis disconnected by server";
}
rc->m_Connected = false;
}
void RedisConnection::ExecuteQuery(const std::vector<String>& query, redisCallbackFn *fn, void *privdata)
@ -187,9 +198,9 @@ void RedisConnection::SendMessageInternal(const std::vector<String>& query, redi
boost::mutex::scoped_lock lock(m_CMutex);
if (!m_Context) {
if (!m_Context || !m_Connected) {
Log(LogCritical, "RedisWriter")
<< "Connection lost";
<< "Not connected to Redis";
return;
}

View File

@ -75,6 +75,7 @@ namespace icinga
void HandleRW();
static void DisconnectCallback(const redisAsyncContext *c, int status);
static void ConnectCallback(const redisAsyncContext *c, int status);
static void RedisInitialCallback(redisAsyncContext *c, void *r, void *p);

View File

@ -62,8 +62,6 @@ void RedisWriter::UpdateAllConfigObjects()
{
double startTime = Utility::GetTime();
m_Rcon->ExecuteQuery({"flushall"});
// Use a Workqueue to pack objects in parallel
WorkQueue upq(25000, Configuration::Concurrency);
upq.SetName("RedisWriter:ConfigDump");

View File

@ -111,6 +111,7 @@ void RedisWriter::TryToReconnect()
if (!m_Rcon->IsConnected())
return;
UpdateSubscriptions();
if (m_ConfigDumpInProgress || m_ConfigDumpDone)
@ -214,6 +215,9 @@ void RedisWriter::PublishStats()
{
AssertOnWorkQueue();
if (!m_Rcon->IsConnected())
return;
Dictionary::Ptr status = GetStats();
String jsonStats = JsonEncode(status);
@ -294,6 +298,9 @@ void RedisWriter::SendEvent(const Dictionary::Ptr& event)
{
AssertOnWorkQueue();
if (!m_Rcon->IsConnected())
return;
String body = JsonEncode(event);
// Log(LogInformation, "RedisWriter")