Apply code style

This commit is contained in:
Michael Friedrich 2018-05-15 14:43:01 +02:00
parent 5eab856673
commit 61edfcb68d
3 changed files with 66 additions and 63 deletions

View File

@ -46,14 +46,14 @@ value: JsonEncode(Serialize(object, FAState))
INITIALIZE_ONCE(&RedisWriter::ConfigStaticInitialize); INITIALIZE_ONCE(&RedisWriter::ConfigStaticInitialize);
void RedisWriter::ConfigStaticInitialize(void) void RedisWriter::ConfigStaticInitialize()
{ {
/* triggered in ProcessCheckResult(), requires UpdateNextCheck() to be called before */ /* triggered in ProcessCheckResult(), requires UpdateNextCheck() to be called before */
ConfigObject::OnStateChanged.connect(boost::bind(&RedisWriter::StateChangedHandler, _1)); ConfigObject::OnStateChanged.connect(std::bind(&RedisWriter::StateChangedHandler, _1));
/* triggered on create, update and delete objects */ /* triggered on create, update and delete objects */
ConfigObject::OnActiveChanged.connect(boost::bind(&RedisWriter::VersionChangedHandler, _1)); ConfigObject::OnActiveChanged.connect(std::bind(&RedisWriter::VersionChangedHandler, _1));
ConfigObject::OnVersionChanged.connect(boost::bind(&RedisWriter::VersionChangedHandler, _1)); ConfigObject::OnVersionChanged.connect(std::bind(&RedisWriter::VersionChangedHandler, _1));
} }
void RedisWriter::UpdateAllConfigObjects(void) void RedisWriter::UpdateAllConfigObjects(void)
@ -68,7 +68,7 @@ void RedisWriter::UpdateAllConfigObjects(void)
const String keyPrefix = "icinga:config:"; const String keyPrefix = "icinga:config:";
do { do {
boost::shared_ptr<redisReply> reply = ExecuteQuery({ "SCAN", Convert::ToString(cursor), "MATCH", keyPrefix + "*", "COUNT", "1000" }); std::shared_ptr<redisReply> reply = ExecuteQuery({ "SCAN", Convert::ToString(cursor), "MATCH", keyPrefix + "*", "COUNT", "1000" });
VERIFY(reply->type == REDIS_REPLY_ARRAY); VERIFY(reply->type == REDIS_REPLY_ARRAY);
VERIFY(reply->elements % 2 == 0); VERIFY(reply->elements % 2 == 0);
@ -116,6 +116,7 @@ void RedisWriter::UpdateAllConfigObjects(void)
for (const Type::Ptr& type : Type::GetAllTypes()) { for (const Type::Ptr& type : Type::GetAllTypes()) {
ConfigType *ctype = dynamic_cast<ConfigType *>(type.get()); ConfigType *ctype = dynamic_cast<ConfigType *>(type.get());
if (!ctype) if (!ctype)
continue; continue;
@ -135,7 +136,7 @@ void RedisWriter::UpdateAllConfigObjects(void)
} }
Log(LogInformation, "RedisWriter") Log(LogInformation, "RedisWriter")
<< "Initial config/status dump finished in " << Utility::GetTime() - startTime << " seconds."; << "Initial config/status dump finished in " << Utility::GetTime() - startTime << " seconds.";
} }
void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, bool useTransaction, bool runtimeUpdate) void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, bool useTransaction, bool runtimeUpdate)
@ -324,7 +325,7 @@ void RedisWriter::StateChangedHandler(const ConfigObject::Ptr& object)
Type::Ptr type = object->GetReflectionType(); Type::Ptr type = object->GetReflectionType();
for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType<RedisWriter>()) { for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType<RedisWriter>()) {
rw->m_WorkQueue.Enqueue(boost::bind(&RedisWriter::SendStatusUpdate, rw, object, true)); rw->m_WorkQueue.Enqueue(std::bind(&RedisWriter::SendStatusUpdate, rw, object, true));
} }
} }
@ -335,12 +336,12 @@ void RedisWriter::VersionChangedHandler(const ConfigObject::Ptr& object)
if (object->IsActive()) { if (object->IsActive()) {
/* Create or update the object config */ /* Create or update the object config */
for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType<RedisWriter>()) { for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType<RedisWriter>()) {
rw->m_WorkQueue.Enqueue(boost::bind(&RedisWriter::SendConfigUpdate, rw.get(), object, true, true)); rw->m_WorkQueue.Enqueue(std::bind(&RedisWriter::SendConfigUpdate, rw.get(), object, true, true));
} }
} else if (!object->IsActive() && object->GetExtension("ConfigObjectDeleted")) { /* same as in apilistener-configsync.cpp */ } else if (!object->IsActive() && object->GetExtension("ConfigObjectDeleted")) { /* same as in apilistener-configsync.cpp */
/* Delete object config */ /* Delete object config */
for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType<RedisWriter>()) { for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType<RedisWriter>()) {
rw->m_WorkQueue.Enqueue(boost::bind(&RedisWriter::SendConfigDelete, rw.get(), object)); rw->m_WorkQueue.Enqueue(std::bind(&RedisWriter::SendConfigDelete, rw.get(), object));
} }
} }
} }

View File

@ -31,8 +31,8 @@ using namespace icinga;
REGISTER_TYPE(RedisWriter); REGISTER_TYPE(RedisWriter);
RedisWriter::RedisWriter(void) RedisWriter::RedisWriter()
: m_Context(NULL) : m_Context(NULL)
{ {
m_WorkQueue.SetName("RedisWriter"); m_WorkQueue.SetName("RedisWriter");
} }
@ -45,31 +45,31 @@ void RedisWriter::Start(bool runtimeCreated)
ObjectImpl<RedisWriter>::Start(runtimeCreated); ObjectImpl<RedisWriter>::Start(runtimeCreated);
Log(LogInformation, "RedisWriter") Log(LogInformation, "RedisWriter")
<< "'" << GetName() << "' started."; << "'" << GetName() << "' started.";
m_ConfigDumpInProgress = false; m_ConfigDumpInProgress = false;
m_WorkQueue.SetExceptionCallback(boost::bind(&RedisWriter::ExceptionHandler, this, _1)); m_WorkQueue.SetExceptionCallback(std::bind(&RedisWriter::ExceptionHandler, this, _1));
m_ReconnectTimer = new Timer(); m_ReconnectTimer = new Timer();
m_ReconnectTimer->SetInterval(15); m_ReconnectTimer->SetInterval(15);
m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&RedisWriter::ReconnectTimerHandler, this)); m_ReconnectTimer->OnTimerExpired.connect(std::bind(&RedisWriter::ReconnectTimerHandler, this));
m_ReconnectTimer->Start(); m_ReconnectTimer->Start();
m_ReconnectTimer->Reschedule(0); m_ReconnectTimer->Reschedule(0);
m_SubscriptionTimer = new Timer(); m_SubscriptionTimer = new Timer();
m_SubscriptionTimer->SetInterval(15); m_SubscriptionTimer->SetInterval(15);
m_SubscriptionTimer->OnTimerExpired.connect(boost::bind(&RedisWriter::UpdateSubscriptionsTimerHandler, this)); m_SubscriptionTimer->OnTimerExpired.connect(std::bind(&RedisWriter::UpdateSubscriptionsTimerHandler, this));
m_SubscriptionTimer->Start(); m_SubscriptionTimer->Start();
m_StatsTimer = new Timer(); m_StatsTimer = new Timer();
m_StatsTimer->SetInterval(10); m_StatsTimer->SetInterval(10);
m_StatsTimer->OnTimerExpired.connect(boost::bind(&RedisWriter::PublishStatsTimerHandler, this)); m_StatsTimer->OnTimerExpired.connect(std::bind(&RedisWriter::PublishStatsTimerHandler, this));
m_StatsTimer->Start(); m_StatsTimer->Start();
m_WorkQueue.SetName("RedisWriter"); m_WorkQueue.SetName("RedisWriter");
boost::thread thread(boost::bind(&RedisWriter::HandleEvents, this)); boost::thread thread(std::bind(&RedisWriter::HandleEvents, this));
thread.detach(); thread.detach();
} }
@ -78,7 +78,7 @@ void RedisWriter::ExceptionHandler(boost::exception_ptr exp)
Log(LogCritical, "RedisWriter", "Exception during redis query. Verify that Redis is operational."); Log(LogCritical, "RedisWriter", "Exception during redis query. Verify that Redis is operational.");
Log(LogDebug, "RedisWriter") Log(LogDebug, "RedisWriter")
<< "Exception during redis operation: " << DiagnosticInformation(exp); << "Exception during redis operation: " << DiagnosticInformation(exp);
if (m_Context) { if (m_Context) {
redisFree(m_Context); redisFree(m_Context);
@ -86,12 +86,12 @@ void RedisWriter::ExceptionHandler(boost::exception_ptr exp)
} }
} }
void RedisWriter::ReconnectTimerHandler(void) void RedisWriter::ReconnectTimerHandler()
{ {
m_WorkQueue.Enqueue(boost::bind(&RedisWriter::TryToReconnect, this)); m_WorkQueue.Enqueue(std::bind(&RedisWriter::TryToReconnect, this));
} }
void RedisWriter::TryToReconnect(void) void RedisWriter::TryToReconnect()
{ {
AssertOnWorkQueue(); AssertOnWorkQueue();
@ -113,7 +113,7 @@ void RedisWriter::TryToReconnect(void)
Log(LogWarning, "RedisWriter", "Cannot allocate redis context."); Log(LogWarning, "RedisWriter", "Cannot allocate redis context.");
} else { } else {
Log(LogWarning, "RedisWriter", "Connection error: ") Log(LogWarning, "RedisWriter", "Connection error: ")
<< m_Context->errstr; << m_Context->errstr;
} }
if (m_Context) { if (m_Context) {
@ -148,12 +148,12 @@ void RedisWriter::TryToReconnect(void)
} }
/* /*
void RedisWriter::UpdateSubscriptionsTimerHandler(void) void RedisWriter::UpdateSubscriptionsTimerHandler()
{ {
m_WorkQueue.Enqueue(boost::bind(&RedisWriter::UpdateSubscriptions, this)); m_WorkQueue.Enqueue(std::bind(&RedisWriter::UpdateSubscriptions, this));
} }
void RedisWriter::UpdateSubscriptions(void) void RedisWriter::UpdateSubscriptions()
{ {
AssertOnWorkQueue(); AssertOnWorkQueue();
@ -169,7 +169,7 @@ void RedisWriter::UpdateSubscriptions(void)
String keyPrefix = "icinga:subscription:"; String keyPrefix = "icinga:subscription:";
do { do {
boost::shared_ptr<redisReply> reply = ExecuteQuery({ "SCAN", Convert::ToString(cursor), "MATCH", keyPrefix + "*", "COUNT", "1000" }); std::shared_ptr<redisReply> reply = ExecuteQuery({ "SCAN", Convert::ToString(cursor), "MATCH", keyPrefix + "*", "COUNT", "1000" });
VERIFY(reply->type == REDIS_REPLY_ARRAY); VERIFY(reply->type == REDIS_REPLY_ARRAY);
VERIFY(reply->elements % 2 == 0); VERIFY(reply->elements % 2 == 0);
@ -190,21 +190,20 @@ void RedisWriter::UpdateSubscriptions(void)
if (!RedisWriter::GetSubscriptionTypes(key, rsi)) if (!RedisWriter::GetSubscriptionTypes(key, rsi))
Log(LogInformation, "RedisWriter") Log(LogInformation, "RedisWriter")
<< "Subscription \"" << key<< "\" has no types listed."; << "Subscription \"" << key<< "\" has no types listed.";
else else
m_Subscriptions[key.SubStr(keyPrefix.GetLength())] = rsi; m_Subscriptions[key.SubStr(keyPrefix.GetLength())] = rsi;
} }
} while (cursor != 0); } while (cursor != 0);
Log(LogInformation, "RedisWriter") Log(LogInformation, "RedisWriter")
<< "Current Redis event subscriptions: " << m_Subscriptions.size(); << "Current Redis event subscriptions: " << m_Subscriptions.size();
} }
int RedisWriter::GetSubscriptionTypes(String key, RedisSubscriptionInfo& rsi) int RedisWriter::GetSubscriptionTypes(String key, RedisSubscriptionInfo& rsi)
{ {
try { try {
boost::shared_ptr<redisReply> redisReply = ExecuteQuery({ "SMEMBERS", key }); std::shared_ptr<redisReply> redisReply = ExecuteQuery({ "SMEMBERS", key });
VERIFY(redisReply->type == REDIS_REPLY_ARRAY); VERIFY(redisReply->type == REDIS_REPLY_ARRAY);
for (size_t j = 0; j < redisReply->elements; j++) { for (size_t j = 0; j < redisReply->elements; j++) {
@ -223,10 +222,10 @@ int RedisWriter::GetSubscriptionTypes(String key, RedisSubscriptionInfo& rsi)
void RedisWriter::PublishStatsTimerHandler(void) void RedisWriter::PublishStatsTimerHandler(void)
{ {
m_WorkQueue.Enqueue(boost::bind(&RedisWriter::PublishStats, this)); m_WorkQueue.Enqueue(std::bind(&RedisWriter::PublishStats, this));
} }
void RedisWriter::PublishStats(void) void RedisWriter::PublishStats()
{ {
AssertOnWorkQueue(); AssertOnWorkQueue();
@ -243,7 +242,7 @@ void RedisWriter::PublishStats(void)
ExecuteQuery({ "PUBLISH", "icinga:stats", jsonStats }); ExecuteQuery({ "PUBLISH", "icinga:stats", jsonStats });
} }
void RedisWriter::HandleEvents(void) void RedisWriter::HandleEvents()
{ {
String queueName = Utility::NewUniqueID(); String queueName = Utility::NewUniqueID();
EventQueue::Ptr queue = new EventQueue(queueName); EventQueue::Ptr queue = new EventQueue(queueName);
@ -272,7 +271,7 @@ void RedisWriter::HandleEvents(void)
if (!event) if (!event)
continue; continue;
m_WorkQueue.Enqueue(boost::bind(&RedisWriter::SendEvent, this, event)); m_WorkQueue.Enqueue(std::bind(&RedisWriter::SendEvent, this, event));
} }
queue->RemoveClient(this); queue->RemoveClient(this);
@ -296,13 +295,15 @@ void RedisWriter::HandleEvent(const Dictionary::Ptr& event)
String body = JsonEncode(event); String body = JsonEncode(event);
boost::shared_ptr<redisReply> maxExists = ExecuteQuery({ "EXISTS", "icinga:subscription:" + name + ":limit" }); std::shared_ptr<redisReply> maxExists = ExecuteQuery({ "EXISTS", "icinga:subscription:" + name + ":limit" });
long maxEvents = MAX_EVENTS_DEFAULT; long maxEvents = MAX_EVENTS_DEFAULT;
if (maxExists->integer) { if (maxExists->integer) {
boost::shared_ptr<redisReply> redisReply = ExecuteQuery({ "GET", "icinga:subscription:" + name + ":limit"}); std::shared_ptr<redisReply> redisReply = ExecuteQuery({ "GET", "icinga:subscription:" + name + ":limit"});
VERIFY(redisReply->type == REDIS_REPLY_STRING); VERIFY(redisReply->type == REDIS_REPLY_STRING);
Log(LogInformation, "RedisWriter") Log(LogInformation, "RedisWriter")
<< "Got limit " << redisReply->str << " for " << name; << "Got limit " << redisReply->str << " for " << name;
maxEvents = Convert::ToLong(redisReply->str); maxEvents = Convert::ToLong(redisReply->str);
} }
@ -324,7 +325,8 @@ void RedisWriter::HandleEvent(const Dictionary::Ptr& event)
String body = JsonEncode(event); String body = JsonEncode(event);
Log(LogInformation, "RedisWriter") Log(LogInformation, "RedisWriter")
<< "Sending event \"" << body << "\""; << "Sending event \"" << body << "\"";
ExecuteQuery({ "PUBLISH", "icinga:event:all", body }); ExecuteQuery({ "PUBLISH", "icinga:event:all", body });
ExecuteQuery({ "PUBLISH", "icinga:event:" + event->Get("type"), body }); ExecuteQuery({ "PUBLISH", "icinga:event:" + event->Get("type"), body });
} }
@ -332,17 +334,17 @@ void RedisWriter::HandleEvent(const Dictionary::Ptr& event)
void RedisWriter::Stop(bool runtimeRemoved) void RedisWriter::Stop(bool runtimeRemoved)
{ {
Log(LogInformation, "RedisWriter") Log(LogInformation, "RedisWriter")
<< "'" << GetName() << "' stopped."; << "'" << GetName() << "' stopped.";
ObjectImpl<RedisWriter>::Stop(runtimeRemoved); ObjectImpl<RedisWriter>::Stop(runtimeRemoved);
} }
void RedisWriter::AssertOnWorkQueue(void) void RedisWriter::AssertOnWorkQueue()
{ {
ASSERT(m_WorkQueue.IsWorkerThread()); ASSERT(m_WorkQueue.IsWorkerThread());
} }
boost::shared_ptr<redisReply> RedisWriter::ExecuteQuery(const std::vector<String>& query) std::shared_ptr<redisReply> RedisWriter::ExecuteQuery(const std::vector<String>& query)
{ {
const char **argv; const char **argv;
size_t *argvlen; size_t *argvlen;
@ -362,23 +364,23 @@ boost::shared_ptr<redisReply> RedisWriter::ExecuteQuery(const std::vector<String
if (reply->type == REDIS_REPLY_ERROR) { if (reply->type == REDIS_REPLY_ERROR) {
Log(LogCritical, "RedisWriter") Log(LogCritical, "RedisWriter")
<< "Redis query failed: " << reply->str; << "Redis query failed: " << reply->str;
String msg = reply->str; String msg = reply->str;
freeReplyObject(reply); freeReplyObject(reply);
BOOST_THROW_EXCEPTION( BOOST_THROW_EXCEPTION(
redis_error() redis_error()
<< errinfo_message(msg) << errinfo_message(msg)
<< errinfo_redis_query(Utility::Join(Array::FromVector(query), ' ', false)) << errinfo_redis_query(Utility::Join(Array::FromVector(query), ' ', false))
); );
} }
return boost::shared_ptr<redisReply>(reply, freeReplyObject); return std::shared_ptr<redisReply>(reply, freeReplyObject);
} }
std::vector<boost::shared_ptr<redisReply> > RedisWriter::ExecuteQueries(const std::vector<std::vector<String> >& queries) std::vector<std::shared_ptr<redisReply> > RedisWriter::ExecuteQueries(const std::vector<std::vector<String> >& queries)
{ {
const char **argv; const char **argv;
size_t *argvlen; size_t *argvlen;
@ -398,7 +400,7 @@ std::vector<boost::shared_ptr<redisReply> > RedisWriter::ExecuteQueries(const st
delete [] argvlen; delete [] argvlen;
} }
std::vector<boost::shared_ptr<redisReply> > replies; std::vector<std::shared_ptr<redisReply> > replies;
for (size_t i = 0; i < queries.size(); i++) { for (size_t i = 0; i < queries.size(); i++) {
redisReply *rawReply; redisReply *rawReply;
@ -420,14 +422,14 @@ std::vector<boost::shared_ptr<redisReply> > RedisWriter::ExecuteQueries(const st
if (reply->type == REDIS_REPLY_ERROR) { if (reply->type == REDIS_REPLY_ERROR) {
Log(LogCritical, "RedisWriter") Log(LogCritical, "RedisWriter")
<< "Redis query failed: " << reply->str; << "Redis query failed: " << reply->str;
String msg = reply->str; String msg = reply->str;
BOOST_THROW_EXCEPTION( BOOST_THROW_EXCEPTION(
redis_error() redis_error()
<< errinfo_message(msg) << errinfo_message(msg)
<< errinfo_redis_query(Utility::Join(Array::FromVector(query), ' ', false)) << errinfo_redis_query(Utility::Join(Array::FromVector(query), ' ', false))
); );
} }
} }

View File

@ -44,28 +44,28 @@ public:
DECLARE_OBJECT(RedisWriter); DECLARE_OBJECT(RedisWriter);
DECLARE_OBJECTNAME(RedisWriter); DECLARE_OBJECTNAME(RedisWriter);
RedisWriter(void); RedisWriter();
static void ConfigStaticInitialize(void); static void ConfigStaticInitialize();
virtual void Start(bool runtimeCreated) override; virtual void Start(bool runtimeCreated) override;
virtual void Stop(bool runtimeRemoved) override; virtual void Stop(bool runtimeRemoved) override;
private: private:
void ReconnectTimerHandler(void); void ReconnectTimerHandler();
void TryToReconnect(void); void TryToReconnect();
void HandleEvents(void); void HandleEvents();
void HandleEvent(const Dictionary::Ptr& event); void HandleEvent(const Dictionary::Ptr& event);
void SendEvent(const Dictionary::Ptr& event); void SendEvent(const Dictionary::Ptr& event);
void UpdateSubscriptionsTimerHandler(void); void UpdateSubscriptionsTimerHandler();
void UpdateSubscriptions(void); void UpdateSubscriptions();
int GetSubscriptionTypes(String key, RedisSubscriptionInfo& rsi); int GetSubscriptionTypes(String key, RedisSubscriptionInfo& rsi);
void PublishStatsTimerHandler(void); void PublishStatsTimerHandler();
void PublishStats(void); void PublishStats();
/* config & status dump */ /* config & status dump */
void UpdateAllConfigObjects(void); void UpdateAllConfigObjects();
void SendConfigUpdate(const ConfigObject::Ptr& object, bool useTransaction, bool runtimeUpdate = false); void SendConfigUpdate(const ConfigObject::Ptr& object, bool useTransaction, bool runtimeUpdate = false);
void SendConfigDelete(const ConfigObject::Ptr& object); void SendConfigDelete(const ConfigObject::Ptr& object);
void SendStatusUpdate(const ConfigObject::Ptr& object, bool useTransaction); void SendStatusUpdate(const ConfigObject::Ptr& object, bool useTransaction);
@ -84,12 +84,12 @@ private:
static void StateChangedHandler(const ConfigObject::Ptr& object); static void StateChangedHandler(const ConfigObject::Ptr& object);
static void VersionChangedHandler(const ConfigObject::Ptr& object); static void VersionChangedHandler(const ConfigObject::Ptr& object);
void AssertOnWorkQueue(void); void AssertOnWorkQueue();
void ExceptionHandler(boost::exception_ptr exp); void ExceptionHandler(boost::exception_ptr exp);
boost::shared_ptr<redisReply> ExecuteQuery(const std::vector<String>& query); boost::shared_ptr<redisReply> ExecuteQuery(const std::vector<String>& query);
std::vector<boost::shared_ptr<redisReply> > ExecuteQueries(const std::vector<std::vector<String> >& queries); std::vector<std::shared_ptr<redisReply> > ExecuteQueries(const std::vector<std::vector<String> >& queries);
Timer::Ptr m_StatsTimer; Timer::Ptr m_StatsTimer;
Timer::Ptr m_ReconnectTimer; Timer::Ptr m_ReconnectTimer;