cluster: Another fix for authority checks.

This commit is contained in:
Gunnar Beutner 2013-09-13 07:49:12 +02:00
parent cc1dbbca0b
commit 68ec21f13f
6 changed files with 69 additions and 54 deletions

View File

@ -33,8 +33,9 @@ void CheckerComponent::Start(void)
{ {
DynamicObject::Start(); DynamicObject::Start();
DynamicObject::OnStarted.connect(bind(&CheckerComponent::ObjectStartedHandler, this, _1)); DynamicObject::OnStarted.connect(bind(&CheckerComponent::ObjectHandler, this, _1));
DynamicObject::OnStopped.connect(bind(&CheckerComponent::ObjectStoppedHandler, this, _1)); DynamicObject::OnStopped.connect(bind(&CheckerComponent::ObjectHandler, this, _1));
DynamicObject::OnAuthorityChanged.connect(bind(&CheckerComponent::ObjectHandler, this, _1));
Service::OnNextCheckChanged.connect(bind(&CheckerComponent::NextCheckChangedHandler, this, _1)); Service::OnNextCheckChanged.connect(bind(&CheckerComponent::NextCheckChangedHandler, this, _1));
@ -48,8 +49,7 @@ void CheckerComponent::Start(void)
m_ResultTimer->Start(); m_ResultTimer->Start();
BOOST_FOREACH(const Service::Ptr& service, DynamicType::GetObjects<Service>()) { BOOST_FOREACH(const Service::Ptr& service, DynamicType::GetObjects<Service>()) {
if (service->IsActive()) ObjectHandler(service);
ObjectStartedHandler(service);
} }
} }
@ -83,17 +83,6 @@ void CheckerComponent::CheckThreadProc(void)
CheckTimeView::iterator it = idx.begin(); CheckTimeView::iterator it = idx.begin();
Service::Ptr service = *it; Service::Ptr service = *it;
if (!service->HasAuthority("checker")) {
idx.erase(it);
idx.insert(service);
continue;
}
if (!service->IsActive()) {
idx.erase(it);
continue;
}
double wait = service->GetNextCheck() - Utility::GetTime(); double wait = service->GetNextCheck() - Utility::GetTime();
if (wait > 0) { if (wait > 0) {
@ -177,7 +166,10 @@ void CheckerComponent::ExecuteCheckHelper(const Service::Ptr& service)
it = m_PendingServices.find(service); it = m_PendingServices.find(service);
if (it != m_PendingServices.end()) { if (it != m_PendingServices.end()) {
m_PendingServices.erase(it); m_PendingServices.erase(it);
if (service->IsActive() && service->HasAuthority("checker"))
m_IdleServices.insert(service); m_IdleServices.insert(service);
m_CV.notify_all(); m_CV.notify_all();
} }
} }
@ -200,7 +192,7 @@ void CheckerComponent::ResultTimerHandler(void)
Log(LogInformation, "checker", msgbuf.str()); Log(LogInformation, "checker", msgbuf.str());
} }
void CheckerComponent::ObjectStartedHandler(const DynamicObject::Ptr& object) void CheckerComponent::ObjectHandler(const DynamicObject::Ptr& object)
{ {
if (object->GetType() != DynamicType::GetByName("Service")) if (object->GetType() != DynamicType::GetByName("Service"))
return; return;
@ -210,26 +202,16 @@ void CheckerComponent::ObjectStartedHandler(const DynamicObject::Ptr& object)
{ {
boost::mutex::scoped_lock lock(m_Mutex); boost::mutex::scoped_lock lock(m_Mutex);
if (object->IsActive() && object->HasAuthority("checker")) {
if (m_PendingServices.find(service) != m_PendingServices.end()) if (m_PendingServices.find(service) != m_PendingServices.end())
return; return;
m_IdleServices.insert(service); m_IdleServices.insert(service);
m_CV.notify_all(); } else {
}
}
void CheckerComponent::ObjectStoppedHandler(const DynamicObject::Ptr& object)
{
if (object->GetType() != DynamicType::GetByName("Service"))
return;
Service::Ptr service = static_pointer_cast<Service>(object);
{
boost::mutex::scoped_lock lock(m_Mutex);
m_IdleServices.erase(service); m_IdleServices.erase(service);
m_PendingServices.erase(service); m_PendingServices.erase(service);
}
m_CV.notify_all(); m_CV.notify_all();
} }
} }

View File

@ -97,8 +97,7 @@ private:
void AdjustCheckTimer(void); void AdjustCheckTimer(void);
void ObjectStartedHandler(const DynamicObject::Ptr& object); void ObjectHandler(const DynamicObject::Ptr& object);
void ObjectStoppedHandler(const DynamicObject::Ptr& object);
void NextCheckChangedHandler(const Service::Ptr& service); void NextCheckChangedHandler(const Service::Ptr& service);
void RescheduleCheckTimer(void); void RescheduleCheckTimer(void);

View File

@ -85,8 +85,6 @@ void ClusterComponent::Start(void)
Service::OnAcknowledgementSet.connect(boost::bind(&ClusterComponent::AcknowledgementSetHandler, this, _1, _2, _3, _4, _5, _6)); Service::OnAcknowledgementSet.connect(boost::bind(&ClusterComponent::AcknowledgementSetHandler, this, _1, _2, _3, _4, _5, _6));
Service::OnAcknowledgementCleared.connect(boost::bind(&ClusterComponent::AcknowledgementClearedHandler, this, _1, _2)); Service::OnAcknowledgementCleared.connect(boost::bind(&ClusterComponent::AcknowledgementClearedHandler, this, _1, _2));
DynamicObject::OnCheckAuthority.connect(boost::bind(&ClusterComponent::CheckAuthorityHandler, this, _1, _2, _3));
Endpoint::OnMessageReceived.connect(boost::bind(&ClusterComponent::MessageHandler, this, _1, _2)); Endpoint::OnMessageReceived.connect(boost::bind(&ClusterComponent::MessageHandler, this, _1, _2));
} }
@ -552,6 +550,8 @@ void ClusterComponent::ClusterTimerHandler(void)
(void) unlink(path.CStr()); (void) unlink(path.CStr());
} }
} }
UpdateAuthority();
} }
void ClusterComponent::CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr, const String& authority) void ClusterComponent::CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr, const String& authority)
@ -1128,7 +1128,7 @@ void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Diction
} }
} }
void ClusterComponent::CheckAuthorityHandler(const DynamicObject::Ptr& object, const String& type, bool& result) bool ClusterComponent::IsAuthority(const DynamicObject::Ptr& object, const String& type)
{ {
Array::Ptr authorities = object->GetAuthorities(); Array::Ptr authorities = object->GetAuthorities();
std::vector<String> endpoints; std::vector<String> endpoints;
@ -1156,11 +1156,8 @@ void ClusterComponent::CheckAuthorityHandler(const DynamicObject::Ptr& object, c
endpoints.push_back(endpoint->GetName()); endpoints.push_back(endpoint->GetName());
} }
if (endpoints.empty()) { if (endpoints.empty())
result = false; return false;
return;
}
std::sort(endpoints.begin(), endpoints.end()); std::sort(endpoints.begin(), endpoints.end());
@ -1170,7 +1167,17 @@ void ClusterComponent::CheckAuthorityHandler(const DynamicObject::Ptr& object, c
Log(LogDebug, "cluster", "Authority for object '" + object->GetName() + "' of type '" + object->GetType()->GetName() + "' is '" + endpoints[index] + "'."); Log(LogDebug, "cluster", "Authority for object '" + object->GetName() + "' of type '" + object->GetType()->GetName() + "' is '" + endpoints[index] + "'.");
result = (endpoints[index] == GetIdentity()); return (endpoints[index] == GetIdentity());
}
void ClusterComponent::UpdateAuthority(void)
{
BOOST_FOREACH(const DynamicType::Ptr& type, DynamicType::GetTypes()) {
BOOST_FOREACH(const DynamicObject::Ptr& object, type->GetObjects()) {
object->SetAuthority("checker", IsAuthority(object, "checker"));
object->SetAuthority("notifications", IsAuthority(object, "notifications"));
}
}
} }
bool ClusterComponent::SupportsChecks(void) bool ClusterComponent::SupportsChecks(void)

View File

@ -111,7 +111,9 @@ private:
void AcknowledgementSetHandler(const Service::Ptr& service, const String& author, const String& comment, AcknowledgementType type, double expiry, const String& authority); void AcknowledgementSetHandler(const Service::Ptr& service, const String& author, const String& comment, AcknowledgementType type, double expiry, const String& authority);
void AcknowledgementClearedHandler(const Service::Ptr& service, const String& authority); void AcknowledgementClearedHandler(const Service::Ptr& service, const String& authority);
void MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message); void MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message);
void CheckAuthorityHandler(const DynamicObject::Ptr& object, const String& type, bool& result);
bool IsAuthority(const DynamicObject::Ptr& object, const String& type);
void UpdateAuthority(void);
static bool SupportsChecks(void); static bool SupportsChecks(void);
static bool SupportsNotifications(void); static bool SupportsNotifications(void);

View File

@ -41,7 +41,7 @@ using namespace icinga;
boost::signals2::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnStarted; boost::signals2::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnStarted;
boost::signals2::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnStopped; boost::signals2::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnStopped;
boost::signals2::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnStateChanged; boost::signals2::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnStateChanged;
boost::signals2::signal<void (const DynamicObject::Ptr&, const String&, bool&)> DynamicObject::OnCheckAuthority; boost::signals2::signal<void (const DynamicObject::Ptr&, const String&, bool)> DynamicObject::OnAuthorityChanged;
DynamicObject::DynamicObject(void) DynamicObject::DynamicObject(void)
: m_Active(false) : m_Active(false)
@ -126,11 +126,33 @@ Array::Ptr DynamicObject::GetAuthorities(void) const
return m_Authorities; return m_Authorities;
} }
bool DynamicObject::HasAuthority(const String& type) void DynamicObject::SetAuthority(const String& type, bool value)
{ {
bool result = true; ASSERT(!OwnsLock());
OnCheckAuthority(GetSelf(), type, result);
return result; {
ObjectLock olock(this);
if (!m_Authority)
m_Authority = boost::make_shared<Dictionary>();
bool old_value = HasAuthority(type);
if (old_value == value)
return;
m_Authority->Set(type, value);
}
OnAuthorityChanged(GetSelf(), type, value);
}
bool DynamicObject::HasAuthority(const String& type) const
{
if (!m_Authority)
return true;
return m_Authority->Get(type);
} }
void DynamicObject::SetExtension(const String& key, const Object::Ptr& object) void DynamicObject::SetExtension(const String& key, const Object::Ptr& object)

View File

@ -63,7 +63,7 @@ public:
static boost::signals2::signal<void (const DynamicObject::Ptr&)> OnStarted; static boost::signals2::signal<void (const DynamicObject::Ptr&)> OnStarted;
static boost::signals2::signal<void (const DynamicObject::Ptr&)> OnStopped; static boost::signals2::signal<void (const DynamicObject::Ptr&)> OnStopped;
static boost::signals2::signal<void (const DynamicObject::Ptr&)> OnStateChanged; static boost::signals2::signal<void (const DynamicObject::Ptr&)> OnStateChanged;
static boost::signals2::signal<void (const DynamicObject::Ptr&, const String&, bool&)> OnCheckAuthority; static boost::signals2::signal<void (const DynamicObject::Ptr&, const String&, bool)> OnAuthorityChanged;
Value InvokeMethod(const String& method, const std::vector<Value>& arguments); Value InvokeMethod(const String& method, const std::vector<Value>& arguments);
@ -73,7 +73,9 @@ public:
bool IsActive(void) const; bool IsActive(void) const;
Array::Ptr GetAuthorities(void) const; Array::Ptr GetAuthorities(void) const;
bool HasAuthority(const String& type);
void SetAuthority(const String& type, bool value);
bool HasAuthority(const String& type) const;
void SetExtension(const String& key, const Object::Ptr& object); void SetExtension(const String& key, const Object::Ptr& object);
Object::Ptr GetExtension(const String& key); Object::Ptr GetExtension(const String& key);
@ -119,6 +121,7 @@ private:
Array::Ptr m_Authorities; Array::Ptr m_Authorities;
bool m_Active; bool m_Active;
Dictionary::Ptr m_Authority;
static DynamicObject::Ptr GetObject(const String& type, const String& name); static DynamicObject::Ptr GetObject(const String& type, const String& name);
}; };