diff --git a/components/cluster/clusterlistener.cpp b/components/cluster/clusterlistener.cpp index 634ee0141..dacfda585 100644 --- a/components/cluster/clusterlistener.cpp +++ b/components/cluster/clusterlistener.cpp @@ -73,6 +73,8 @@ void ClusterListener::Start(void) m_ClusterTimer->SetInterval(5); m_ClusterTimer->Start(); + m_MessageQueue.SetExceptionCallback(&ClusterListener::MessageExceptionHandler); + Service::OnNewCheckResult.connect(boost::bind(&ClusterListener::CheckResultHandler, this, _1, _2, _3)); Service::OnNextCheckChanged.connect(boost::bind(&ClusterListener::NextCheckChangedHandler, this, _1, _2, _3)); Notification::OnNextNotificationChanged.connect(boost::bind(&ClusterListener::NextNotificationChangedHandler, this, _1, _2, _3)); @@ -670,7 +672,7 @@ void ClusterListener::CheckResultHandler(const Service::Ptr& service, const Chec Dictionary::Ptr params = make_shared(); params->Set("service", service->GetName()); - params->Set("check_result", cr); + params->Set("check_result", Serialize(cr)); Dictionary::Ptr message = make_shared(); message->Set("jsonrpc", "2.0"); @@ -841,7 +843,7 @@ void ClusterListener::CommentAddedHandler(const Service::Ptr& service, const Com Dictionary::Ptr params = make_shared(); params->Set("service", service->GetName()); - params->Set("comment", comment); + params->Set("comment", Serialize(comment)); Dictionary::Ptr message = make_shared(); message->Set("jsonrpc", "2.0"); @@ -879,7 +881,7 @@ void ClusterListener::DowntimeAddedHandler(const Service::Ptr& service, const Do Dictionary::Ptr params = make_shared(); params->Set("service", service->GetName()); - params->Set("downtime", downtime); + params->Set("downtime", Serialize(downtime)); Dictionary::Ptr message = make_shared(); message->Set("jsonrpc", "2.0"); @@ -955,6 +957,11 @@ void ClusterListener::AsyncMessageHandler(const Endpoint::Ptr& sender, const Dic m_MessageQueue.Enqueue(boost::bind(&ClusterListener::MessageHandler, this, sender, message)); } +void ClusterListener::MessageExceptionHandler(boost::exception_ptr exp) +{ + Log(LogCritical, "cluster", "Exception while processing cluster message: " + boost::diagnostic_information(exp)); +} + void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message) { sender->SetSeen(Utility::GetTime()); @@ -1015,7 +1022,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona return; } - CheckResult::Ptr cr = params->Get("check_result"); + CheckResult::Ptr cr = Deserialize(params->Get("check_result")); if (!cr) return; @@ -1209,7 +1216,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona return; } - Comment::Ptr comment = params->Get("comment"); + Comment::Ptr comment = Deserialize(params->Get("comment")); service->AddComment(comment->GetEntryType(), comment->GetAuthor(), comment->GetText(), comment->GetExpireTime(), comment->GetId(), sender->GetName()); @@ -1252,7 +1259,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona return; } - Downtime::Ptr downtime = params->Get("downtime"); + Downtime::Ptr downtime = Deserialize(params->Get("downtime")); service->AddDowntime(downtime->GetAuthor(), downtime->GetComment(), downtime->GetStartTime(), downtime->GetEndTime(), diff --git a/components/cluster/clusterlistener.h b/components/cluster/clusterlistener.h index e3fac529c..f7b36ef0a 100644 --- a/components/cluster/clusterlistener.h +++ b/components/cluster/clusterlistener.h @@ -111,6 +111,8 @@ private: void SetSecurityInfo(const Dictionary::Ptr& message, const DynamicObject::Ptr& object, int privs); void PersistMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message); + + static void MessageExceptionHandler(boost::exception_ptr exp); }; }