Updated the checker and delegation components to use replication for the "checker" property.

This commit is contained in:
Gunnar Beutner 2012-08-03 23:03:58 +02:00
parent 5641e521b0
commit 22dabfc60d
12 changed files with 116 additions and 238 deletions

View File

@ -82,6 +82,8 @@ Application::Ptr Application::GetInstance(void)
*/
void Application::RunEventLoop(void)
{
double nextProfile = 0;
while (!m_ShuttingDown) {
Object::ClearHeldObjects();
@ -96,11 +98,15 @@ void Application::RunEventLoop(void)
DynamicObject::BeginTx();
#ifdef _DEBUG
if (nextProfile < Utility::GetTime()) {
stringstream msgbuf;
msgbuf << "Active objects: " << Object::GetAliveObjects();
Logger::Write(LogInformation, "base", msgbuf.str());
Object::PrintMemoryProfile();
nextProfile = Utility::GetTime() + 15.0;
}
#endif /* _DEBUG */
}
}

View File

@ -25,7 +25,6 @@ map<pair<String, String>, Dictionary::Ptr> DynamicObject::m_PersistentUpdates;
double DynamicObject::m_CurrentTx = 0;
set<DynamicObject::Ptr> DynamicObject::m_ModifiedObjects;
boost::signal<void (const DynamicObject::Ptr&, const String& name)> DynamicObject::OnAttributeChanged;
boost::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnRegistered;
boost::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnUnregistered;
boost::signal<void (const set<DynamicObject::Ptr>&)> DynamicObject::OnTransactionClosing;
@ -43,18 +42,10 @@ DynamicObject::DynamicObject(const Dictionary::Ptr& serializedObject)
if (!serializedObject->Contains("configTx"))
throw invalid_argument("Serialized object must contain a config snapshot.");
/* apply state from the config item/remote update */
ApplyUpdate(serializedObject, true);
/* restore the object's persistent state */
map<pair<String, String>, Dictionary::Ptr>::iterator it;
it = m_PersistentUpdates.find(make_pair(GetType(), GetName()));
if (it != m_PersistentUpdates.end()) {
Logger::Write(LogDebug, "base", "Restoring persistent state "
"for object " + GetType() + ":" + GetName());
ApplyUpdate(it->second, true);
m_PersistentUpdates.erase(it);
}
/* apply config state from the config item/remote update;
* The DynamicObject::Create function takes care of restoring
* non-config state after the object has been fully constructed */
InternalApplyUpdate(serializedObject, Attribute_Config, true);
}
Dictionary::Ptr DynamicObject::BuildUpdate(double sinceTx, int attributeTypes) const
@ -95,10 +86,15 @@ Dictionary::Ptr DynamicObject::BuildUpdate(double sinceTx, int attributeTypes) c
return update;
}
void DynamicObject::ApplyUpdate(const Dictionary::Ptr& serializedUpdate, bool suppressEvents)
void DynamicObject::ApplyUpdate(const Dictionary::Ptr& serializedUpdate, int allowedTypes)
{
InternalApplyUpdate(serializedUpdate, allowedTypes, false);
}
void DynamicObject::InternalApplyUpdate(const Dictionary::Ptr& serializedUpdate, int allowedTypes, bool suppressEvents)
{
double configTx = 0;
if (serializedUpdate->Contains("configTx")) {
if ((allowedTypes & Attribute_Config) != 0 && serializedUpdate->Contains("configTx")) {
configTx = serializedUpdate->Get("configTx");
if (configTx > m_ConfigTx)
@ -114,8 +110,12 @@ void DynamicObject::ApplyUpdate(const Dictionary::Ptr& serializedUpdate, bool su
Dictionary::Ptr attr = it->second;
Value data = attr->Get("data");
int type = attr->Get("type");
if ((type & ~allowedTypes) != 0)
continue;
Value data = attr->Get("data");
double tx = attr->Get("tx");
if (type & Attribute_Config)
@ -128,33 +128,6 @@ void DynamicObject::ApplyUpdate(const Dictionary::Ptr& serializedUpdate, bool su
}
}
void DynamicObject::SanitizeUpdate(const Dictionary::Ptr& serializedUpdate, int allowedTypes)
{
if ((allowedTypes & Attribute_Config) == 0)
serializedUpdate->Remove("configTx");
Dictionary::Ptr attrs = serializedUpdate->Get("attrs");
Dictionary::Iterator prev, it;
for (it = attrs->Begin(); it != attrs->End(); ) {
if (!it->second.IsObjectType<Dictionary>())
continue;
Dictionary::Ptr attr = it->second;
int type = attr->Get("type");
if (type == 0 || type & ~allowedTypes) {
prev = it;
it++;
attrs->Remove(prev);
continue;
}
it++;
}
}
void DynamicObject::RegisterAttribute(const String& name, DynamicAttributeType type)
{
DynamicAttribute attr;
@ -188,7 +161,10 @@ void DynamicObject::InternalSetAttribute(const String& name, const Value& data,
pair<DynamicObject::AttributeIterator, bool> tt;
tt = m_Attributes.insert(make_pair(name, attr));
Value oldValue;
if (!tt.second && tx >= tt.first->second.Tx) {
oldValue = tt.first->second.Data;
tt.first->second.Data = data;
tt.first->second.Tx = tx;
}
@ -198,7 +174,7 @@ void DynamicObject::InternalSetAttribute(const String& name, const Value& data,
if (!suppressEvent) {
m_ModifiedObjects.insert(GetSelf());
DynamicObject::OnAttributeChanged(GetSelf(), name);
OnAttributeChanged(name, oldValue);
}
}
@ -494,13 +470,31 @@ void DynamicObject::RegisterClass(const String& type, DynamicObject::Factory fac
DynamicObject::Ptr DynamicObject::Create(const String& type, const Dictionary::Ptr& properties)
{
DynamicObject::ClassMap::iterator it;
it = GetClasses().find(type);
DynamicObject::ClassMap::iterator ct;
ct = GetClasses().find(type);
if (it != GetClasses().end())
return it->second(properties);
else
return boost::make_shared<DynamicObject>(properties);
DynamicObject::Ptr obj;
if (ct != GetClasses().end()) {
obj = ct->second(properties);
} else {
obj = boost::make_shared<DynamicObject>(properties);
Logger::Write(LogCritical, "base", "Creating generic DynamicObject for type '" + type + "'");
}
/* restore the object's persistent non-config attributes */
map<pair<String, String>, Dictionary::Ptr>::iterator st;
st = m_PersistentUpdates.find(make_pair(obj->GetType(), obj->GetName()));
if (st != m_PersistentUpdates.end()) {
Logger::Write(LogDebug, "base", "Restoring persistent state "
"for object " + obj->GetType() + ":" + obj->GetName());
obj->ApplyUpdate(st->second, Attribute_All & ~Attribute_Config);
/* we're done with this update, remove it */
m_PersistentUpdates.erase(st);
}
return obj;
}
double DynamicObject::GetCurrentTx(void)
@ -522,3 +516,6 @@ void DynamicObject::FinishTx(void)
m_CurrentTx = 0;
}
void DynamicObject::OnAttributeChanged(const String& name, const Value& oldValue)
{ }

View File

@ -38,6 +38,9 @@ enum DynamicAttributeType
/* Attributes read from the config file are implicitly marked
* as config attributes. */
Attribute_Config = 8,
/* Combination of all attribute types */
Attribute_All = Attribute_Transient | Attribute_Local | Attribute_Replicated | Attribute_Config
};
struct DynamicAttribute
@ -71,8 +74,7 @@ public:
DynamicObject(const Dictionary::Ptr& serializedObject);
Dictionary::Ptr BuildUpdate(double sinceTx, int attributeTypes) const;
void ApplyUpdate(const Dictionary::Ptr& serializedUpdate, bool suppressEvents = false);
static void SanitizeUpdate(const Dictionary::Ptr& serializedUpdate, int allowedTypes);
void ApplyUpdate(const Dictionary::Ptr& serializedUpdate, int allowedTypes);
void RegisterAttribute(const String& name, DynamicAttributeType type);
@ -86,7 +88,6 @@ public:
AttributeConstIterator AttributeBegin(void) const;
AttributeConstIterator AttributeEnd(void) const;
static boost::signal<void (const DynamicObject::Ptr&, const String& name)> OnAttributeChanged;
static boost::signal<void (const DynamicObject::Ptr&)> OnRegistered;
static boost::signal<void (const DynamicObject::Ptr&)> OnUnregistered;
static boost::signal<void (const set<DynamicObject::Ptr>&)> OnTransactionClosing;
@ -123,6 +124,9 @@ public:
static void BeginTx(void);
static void FinishTx(void);
protected:
virtual void OnAttributeChanged(const String& name, const Value& oldValue);
private:
void InternalSetAttribute(const String& name, const Value& data, double tx, bool suppressEvent = false);
Value InternalGetAttribute(const String& name) const;
@ -137,6 +141,8 @@ private:
static double m_CurrentTx;
static set<DynamicObject::Ptr> m_ModifiedObjects;
void InternalApplyUpdate(const Dictionary::Ptr& serializedUpdate, int allowedTypes, bool suppressEvents);
};
class RegisterClassHelper

View File

@ -95,6 +95,7 @@ using std::map;
using std::list;
using std::set;
using std::multimap;
using std::multiset;
using std::pair;
using std::deque;
using std::make_pair;

View File

@ -24,6 +24,7 @@ using namespace icinga;
REGISTER_CLASS(Service);
boost::signal<void (const Service::Ptr&, const CheckResultMessage&)> Service::OnCheckResultReceived;
boost::signal<void (const Service::Ptr&, const String&)> Service::OnCheckerChanged;
Service::Service(const Dictionary::Ptr& serializedObject)
: DynamicObject(serializedObject)
@ -474,3 +475,9 @@ Dictionary::Ptr Service::ResolveDependencies(const Host::Ptr& host, const Dictio
return result;
}
void Service::OnAttributeChanged(const String& name, const Value& oldValue)
{
if (name == "checker")
OnCheckerChanged(GetSelf(), oldValue);
}

View File

@ -107,6 +107,10 @@ public:
static Dictionary::Ptr ResolveDependencies(const Host::Ptr& host, const Dictionary::Ptr& dependencies);
static boost::signal<void (const Service::Ptr& service, const CheckResultMessage&)> OnCheckResultReceived;
static boost::signal<void (const Service::Ptr&, const String&)> OnCheckerChanged;
protected:
virtual void OnAttributeChanged(const String& name, const Value& oldValue);
};
}

View File

@ -24,13 +24,12 @@ using namespace icinga;
void CheckerComponent::Start(void)
{
m_Endpoint = boost::make_shared<VirtualEndpoint>();
m_Endpoint->RegisterTopicHandler("checker::AssignService",
boost::bind(&CheckerComponent::AssignServiceRequestHandler, this, _2, _3));
m_Endpoint->RegisterTopicHandler("checker::ClearServices",
boost::bind(&CheckerComponent::ClearServicesRequestHandler, this, _2, _3));
m_Endpoint->RegisterPublication("checker::ServiceStateChange");
EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint);
Service::OnCheckerChanged.connect(bind(&CheckerComponent::CheckerChangedHandler, this, _1));
DynamicObject::OnUnregistered.connect(bind(&CheckerComponent::ServiceRemovedHandler, this, _1));
m_CheckTimer = boost::make_shared<Timer>();
m_CheckTimer->SetInterval(1);
m_CheckTimer->OnTimerExpired.connect(boost::bind(&CheckerComponent::CheckTimerHandler, this));
@ -61,12 +60,13 @@ void CheckerComponent::CheckTimerHandler(void)
long tasks = 0;
while (!m_Services.empty()) {
Service::Ptr service = m_Services.top();
CheckerComponent::ServiceMultiSet::iterator it = m_Services.begin();
Service::Ptr service = *it;
if (service->GetNextCheck() > now)
break;
m_Services.pop();
m_Services.erase(it);
Logger::Write(LogDebug, "checker", "Executing service check for '" + service->GetName() + "'");
@ -130,11 +130,11 @@ void CheckerComponent::CheckCompletedHandler(const Service::Ptr& service, const
/* remove the service from the list of pending services; if it's not in the
* list this was a manual (i.e. forced) check and we must not re-add the
* service to the services list because it's already there. */
set<Service::Ptr>::iterator it;
CheckerComponent::ServiceMultiSet::iterator it;
it = m_PendingServices.find(service);
if (it != m_PendingServices.end()) {
m_PendingServices.erase(it);
m_Services.push(service);
m_Services.insert(service);
}
Logger::Write(LogDebug, "checker", "Check finished for service '" + service->GetName() + "'");
@ -149,35 +149,31 @@ void CheckerComponent::ResultTimerHandler(void)
Logger::Write(LogInformation, "checker", msgbuf.str());
}
void CheckerComponent::AssignServiceRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
void CheckerComponent::CheckerChangedHandler(const Service::Ptr& service)
{
MessagePart params;
if (!request.GetParams(&params))
String checker = service->GetChecker();
if (checker == EndpointManager::GetInstance()->GetIdentity() || checker == m_Endpoint->GetIdentity()) {
if (m_PendingServices.find(service) != m_PendingServices.end())
return;
String service;
if (!params.Get("service", &service))
return;
if (!Service::Exists(service)) {
Logger::Write(LogWarning, "checker", "Ignoring delegation request for unknown service '" + service + "'.");
return;
m_Services.insert(service);
} else {
m_Services.erase(service);
m_PendingServices.erase(service);
}
Service::Ptr object = Service::GetByName(service);
m_Services.push(object);
Logger::Write(LogDebug, "checker", "Accepted delegation for service '" + service + "'");
}
void CheckerComponent::ClearServicesRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
void CheckerComponent::ServiceRemovedHandler(const DynamicObject::Ptr& object)
{
Logger::Write(LogInformation, "checker", "Clearing service delegations.");
Service::Ptr service = dynamic_pointer_cast<Service>(object);
/* clear the services lists */
m_Services = ServiceQueue();
m_PendingServices.clear();
/* ignore it if the removed object is not a service */
if (!service)
return;
m_Services.erase(service);
m_PendingServices.erase(service);
}
EXPORT_COMPONENT(checker, CheckerComponent);

View File

@ -41,7 +41,7 @@ public:
typedef shared_ptr<CheckerComponent> Ptr;
typedef weak_ptr<CheckerComponent> WeakPtr;
typedef priority_queue<Service::Ptr, vector<Service::Ptr>, ServiceNextCheckLessComparer> ServiceQueue;
typedef multiset<Service::Ptr, ServiceNextCheckLessComparer> ServiceMultiSet;
virtual void Start(void);
virtual void Stop(void);
@ -49,8 +49,8 @@ public:
private:
VirtualEndpoint::Ptr m_Endpoint;
ServiceQueue m_Services;
set<Service::Ptr> m_PendingServices;
ServiceMultiSet m_Services;
ServiceMultiSet m_PendingServices;
Timer::Ptr m_CheckTimer;
@ -63,8 +63,11 @@ private:
void AdjustCheckTimer(void);
void AssignServiceRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request);
void ClearServicesRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request);
void CheckerChangedHandler(const Service::Ptr& service);
void ServiceRemovedHandler(const DynamicObject::Ptr& object);
//void AssignServiceRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request);
//void ClearServicesRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request);
};
}

View File

@ -233,7 +233,9 @@ void CIBSyncComponent::RemoteObjectUpdateHandler(const Endpoint::Ptr& sender, co
if (object->GetSource().IsEmpty())
object->SetSource(sender->GetIdentity());
object->ApplyUpdate(update, true);
// TODO: disallow config updates depending on endpoint config
object->ApplyUpdate(update, Attribute_All);
}
}

View File

@ -24,103 +24,16 @@ using namespace icinga;
void DelegationComponent::Start(void)
{
DynamicObject::OnRegistered.connect(boost::bind(&DelegationComponent::ServiceCommittedHandler, this, _1));
DynamicObject::OnUnregistered.connect(boost::bind(&DelegationComponent::ServiceRemovedHandler, this, _1));
m_DelegationTimer = boost::make_shared<Timer>();
m_DelegationTimer->SetInterval(30);
m_DelegationTimer->OnTimerExpired.connect(boost::bind(&DelegationComponent::DelegationTimerHandler, this));
m_DelegationTimer->Start();
m_DelegationTimer->Reschedule(0);
m_Endpoint = boost::make_shared<VirtualEndpoint>();
m_Endpoint->RegisterPublication("checker::AssignService");
m_Endpoint->RegisterPublication("checker::ClearServices");
m_Endpoint->RegisterPublication("delegation::ServiceStatus");
EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint);
EndpointManager::GetInstance()->OnNewEndpoint.connect(bind(&DelegationComponent::NewEndpointHandler, this, _2));
}
void DelegationComponent::Stop(void)
{
EndpointManager::Ptr mgr = EndpointManager::GetInstance();
if (mgr)
mgr->UnregisterEndpoint(m_Endpoint);
}
void DelegationComponent::ServiceCommittedHandler(const DynamicObject::Ptr& object)
{
Service::Ptr service = dynamic_pointer_cast<Service>(object);
if (!service)
return;
String checker = service->GetChecker();
if (!checker.IsEmpty()) {
/* object was updated, clear its checker to make sure it's re-delegated by the delegation timer */
service->SetChecker("");
/* TODO: figure out a better way to clear individual services */
Endpoint::Ptr endpoint = EndpointManager::GetInstance()->GetEndpointByIdentity(checker);
if (endpoint)
ClearServices(endpoint);
}
}
void DelegationComponent::ServiceRemovedHandler(const DynamicObject::Ptr& object)
{
Service::Ptr service = dynamic_pointer_cast<Service>(object);
if (!service)
return;
String checker = service->GetChecker();
if (!checker.IsEmpty()) {
/* TODO: figure out a better way to clear individual services */
Endpoint::Ptr endpoint = EndpointManager::GetInstance()->GetEndpointByIdentity(checker);
if (endpoint)
ClearServices(endpoint);
}
}
void DelegationComponent::AssignService(const Endpoint::Ptr& checker, const Service::Ptr& service)
{
RequestMessage request;
request.SetMethod("checker::AssignService");
MessagePart params;
params.Set("service", service->GetName());
request.SetParams(params);
Logger::Write(LogDebug, "delegation", "Trying to delegate service '" + service->GetName() + "'");
EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, checker, request);
}
void DelegationComponent::ClearServices(const Endpoint::Ptr& checker)
{
stringstream msgbuf;
msgbuf << "Clearing assigned services for endpoint '" << checker->GetIdentity() << "'";
Logger::Write(LogInformation, "delegation", msgbuf.str());
RequestMessage request;
request.SetMethod("checker::ClearServices");
MessagePart params;
request.SetParams(params);
EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, checker, request);
}
bool DelegationComponent::IsEndpointChecker(const Endpoint::Ptr& endpoint)
{
return (endpoint->HasSubscription("checker::AssignService"));
return (endpoint->HasPublication("checker::ServiceStateChange"));
}
vector<Endpoint::Ptr> DelegationComponent::GetCheckerCandidates(const Service::Ptr& service) const
@ -149,32 +62,6 @@ vector<Endpoint::Ptr> DelegationComponent::GetCheckerCandidates(const Service::P
return candidates;
}
void DelegationComponent::NewEndpointHandler(const Endpoint::Ptr& endpoint)
{
endpoint->OnSessionEstablished.connect(bind(&DelegationComponent::SessionEstablishedHandler, this, _1));
}
void DelegationComponent::SessionEstablishedHandler(const Endpoint::Ptr& endpoint)
{
/* ignore this endpoint if it's not a checker */
if (!IsEndpointChecker(endpoint))
return;
/* locally clear checker for all services that previously belonged to this endpoint */
DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Service")) {
Service::Ptr service = dynamic_pointer_cast<Service>(object);
if (!service)
continue;
if (service->GetChecker() == endpoint->GetIdentity())
service->SetChecker("");
}
/* remotely clear services for this endpoint */
ClearServices(endpoint);
}
void DelegationComponent::DelegationTimerHandler(void)
{
map<Endpoint::Ptr, int> histogram;
@ -280,25 +167,6 @@ void DelegationComponent::DelegationTimerHandler(void)
Logger::Write(LogInformation, "delegation", msgbuf.str());
}
if (delegated > 0) {
if (need_clear) {
Endpoint::Ptr endpoint;
BOOST_FOREACH(tie(endpoint, tuples::ignore), histogram) {
ClearServices(endpoint);
}
}
BOOST_FOREACH(const Service::Ptr& service, services) {
String checker = service->GetChecker();
Endpoint::Ptr endpoint = EndpointManager::GetInstance()->GetEndpointByIdentity(checker);
if (!endpoint)
continue;
AssignService(endpoint, service);
}
}
stringstream msgbuf;
msgbuf << "Updated delegations for " << delegated << " services";
Logger::Write(LogInformation, "delegation", msgbuf.str());

View File

@ -30,27 +30,15 @@ class DelegationComponent : public IComponent
{
public:
virtual void Start(void);
virtual void Stop(void);
private:
VirtualEndpoint::Ptr m_Endpoint;
Timer::Ptr m_DelegationTimer;
void NewEndpointHandler(const Endpoint::Ptr& endpoint);
void SessionEstablishedHandler(const Endpoint::Ptr& endpoint);
void ServiceCommittedHandler(const DynamicObject::Ptr& object);
void ServiceRemovedHandler(const DynamicObject::Ptr& object);
void DelegationTimerHandler(void);
vector<Endpoint::Ptr> GetCheckerCandidates(const Service::Ptr& service) const;
void AssignService(const Endpoint::Ptr& checker, const Service::Ptr& service);
void ClearServices(const Endpoint::Ptr& checker);
static bool IsEndpointChecker(const Endpoint::Ptr& endpoint);
void CheckResultRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request);
};
}

View File

@ -106,7 +106,7 @@ DynamicObject::Ptr ConfigItem::Commit(void)
if (!dobj)
dobj = DynamicObject::Create(GetType(), update);
else
dobj->ApplyUpdate(update);
dobj->ApplyUpdate(update, Attribute_Config);
m_DynamicObject = dobj;