Fine-grained locks (WIP, Part 6).

This commit is contained in:
Gunnar Beutner 2013-02-19 23:02:08 +01:00
parent ed78962427
commit 997ca3a77a
29 changed files with 419 additions and 236 deletions

View File

@ -33,10 +33,10 @@ void CheckerComponent::Start(void)
Service::OnCheckerChanged.connect(bind(&CheckerComponent::CheckerChangedHandler, this, _1));
Service::OnNextCheckChanged.connect(bind(&CheckerComponent::NextCheckChangedHandler, this, _1));
DynamicObject::OnUnregistered.connect(bind(&CheckerComponent::ObjectRemovedHandler, this, _1));
boost::thread thread(boost::bind(&CheckerComponent::CheckThreadProc, this));
thread.detach();
m_Stopped = false;
m_Thread = thread(boost::bind(&CheckerComponent::CheckThreadProc, this));
m_ResultTimer = boost::make_shared<Timer>();
m_ResultTimer->SetInterval(5);
@ -47,6 +47,14 @@ void CheckerComponent::Start(void)
void CheckerComponent::Stop(void)
{
m_Endpoint->Unregister();
{
boost::mutex::scoped_lock lock(m_Mutex);
m_Stopped = true;
m_CV.notify_all();
}
m_Thread.join();
}
void CheckerComponent::CheckThreadProc(void)
@ -60,9 +68,12 @@ void CheckerComponent::CheckThreadProc(void)
typedef nth_index<ServiceSet, 1>::type CheckTimeView;
CheckTimeView& idx = boost::get<1>(m_IdleServices);
while (idx.begin() == idx.end())
while (idx.begin() == idx.end() && !m_Stopped)
m_CV.wait(lock);
if (m_Stopped)
break;
CheckTimeView::iterator it = idx.begin();
service = it->lock();
@ -85,7 +96,8 @@ void CheckerComponent::CheckThreadProc(void)
/* Wait for the next check. */
boost::mutex::scoped_lock lock(m_Mutex);
m_CV.timed_wait(lock, boost::posix_time::milliseconds(wait * 1000));
if (!m_Stopped)
m_CV.timed_wait(lock, boost::posix_time::milliseconds(wait * 1000));
continue;
}
@ -216,19 +228,3 @@ void CheckerComponent::NextCheckChangedHandler(const Service::Ptr& service)
}
}
void CheckerComponent::ObjectRemovedHandler(const DynamicObject::Ptr& object)
{
Service::Ptr service = dynamic_pointer_cast<Service>(object);
/* ignore it if the removed object is not a service */
if (!service)
return;
{
boost::mutex::scoped_lock lock(m_Mutex);
m_IdleServices.erase(service);
m_PendingServices.erase(service);
m_CV.notify_all();
}
}

View File

@ -69,6 +69,8 @@ private:
boost::mutex m_Mutex;
boost::condition_variable m_CV;
bool m_Stopped;
thread m_Thread;
ServiceSet m_IdleServices;
ServiceSet m_PendingServices;
@ -84,7 +86,6 @@ private:
void CheckerChangedHandler(const Service::Ptr& service);
void NextCheckChangedHandler(const Service::Ptr& service);
void ObjectRemovedHandler(const DynamicObject::Ptr& object);
void RescheduleCheckTimer(void);
};

View File

@ -257,31 +257,40 @@ void CompatComponent::DumpDowntimes(ofstream& fp, const Service::Ptr& owner, Com
void CompatComponent::DumpHostStatus(ofstream& fp, const Host::Ptr& host)
{
ObjectLock olock(host);
Service::Ptr hc;
{
ObjectLock olock(host);
hc = host->GetHostCheckService();
fp << "hoststatus {" << "\n"
<< "\t" << "host_name=" << host->GetName() << "\n";
}
ServiceState hcState = StateOK;
if (hc) {
ObjectLock olock(hc);
hcState = hc->GetState();
}
int state;
if (!host->IsReachable())
if (!Host::IsReachable(host))
state = 2; /* unreachable */
else if (!host->IsUp())
else if (hcState != StateOK)
state = 1; /* down */
else
state = 0; /* up */
fp << "hoststatus {" << "\n"
<< "\t" << "host_name=" << host->GetName() << "\n";
Service::Ptr hostcheck = host->GetHostCheckService();
if (hostcheck) {
DumpServiceStatusAttrs(fp, hostcheck, CompatTypeHost);
}
if (hc)
DumpServiceStatusAttrs(fp, hc, CompatTypeHost);
fp << "\t" << "}" << "\n"
<< "\n";
if (hostcheck) {
DumpDowntimes(fp, hostcheck, CompatTypeHost);
DumpComments(fp, hostcheck, CompatTypeHost);
if (hc) {
DumpDowntimes(fp, hc, CompatTypeHost);
DumpComments(fp, hc, CompatTypeHost);
}
}
@ -312,14 +321,23 @@ void CompatComponent::DumpHostObject(ofstream& fp, const Host::Ptr& host)
void CompatComponent::DumpServiceStatusAttrs(ofstream& fp, const Service::Ptr& service, CompatObjectType type)
{
ObjectLock olock(service);
String output;
String perfdata;
double schedule_start = -1, schedule_end = -1;
double execution_start = -1, execution_end = -1;
Dictionary::Ptr cr = service->GetLastCheckResult();
Dictionary::Ptr cr;
int state;
Host::Ptr host;
{
ObjectLock olock(service);
cr = service->GetLastCheckResult();
state = service->GetState();
host = service->GetHost();
}
if (cr) {
output = cr->Get("output");
schedule_start = cr->Get("schedule_start");
@ -332,8 +350,6 @@ void CompatComponent::DumpServiceStatusAttrs(ofstream& fp, const Service::Ptr& s
double execution_time = (execution_end - execution_start);
double latency = (schedule_end - schedule_start) - execution_time;
int state = service->GetState();
if (state > StateUnknown)
state = StateUnknown;
@ -343,44 +359,60 @@ void CompatComponent::DumpServiceStatusAttrs(ofstream& fp, const Service::Ptr& s
else
state = 1;
if (!service->GetHost()->IsReachable())
if (Host::IsReachable(host))
state = 2;
}
fp << "\t" << "check_interval=" << service->GetCheckInterval() / 60.0 << "\n"
<< "\t" << "retry_interval=" << service->GetRetryInterval() / 60.0 << "\n"
<< "\t" << "has_been_checked=" << (service->GetLastCheckResult() ? 1 : 0) << "\n"
<< "\t" << "should_be_scheduled=1" << "\n"
<< "\t" << "check_execution_time=" << execution_time << "\n"
<< "\t" << "check_latency=" << latency << "\n"
<< "\t" << "current_state=" << state << "\n"
<< "\t" << "state_type=" << service->GetStateType() << "\n"
<< "\t" << "plugin_output=" << output << "\n"
<< "\t" << "performance_data=" << perfdata << "\n"
<< "\t" << "last_check=" << schedule_end << "\n"
<< "\t" << "next_check=" << service->GetNextCheck() << "\n"
<< "\t" << "current_attempt=" << service->GetCurrentCheckAttempt() << "\n"
<< "\t" << "max_attempts=" << service->GetMaxCheckAttempts() << "\n"
<< "\t" << "last_state_change=" << service->GetLastStateChange() << "\n"
<< "\t" << "last_hard_state_change=" << service->GetLastHardStateChange() << "\n"
<< "\t" << "last_update=" << time(NULL) << "\n"
<< "\t" << "active_checks_enabled=" << (service->GetEnableActiveChecks() ? 1 : 0) <<"\n"
<< "\t" << "passive_checks_enabled=" << (service->GetEnablePassiveChecks() ? 1 : 0) << "\n"
<< "\t" << "problem_has_been_acknowledged=" << (service->GetAcknowledgement() != AcknowledgementNone ? 1 : 0) << "\n"
<< "\t" << "acknowledgement_type=" << static_cast<int>(service->GetAcknowledgement()) << "\n"
<< "\t" << "acknowledgement_end_time=" << service->GetAcknowledgementExpiry() << "\n"
<< "\t" << "scheduled_downtime_depth=" << (service->IsInDowntime() ? 1 : 0) << "\n"
<< "\t" << "last_notification=" << service->GetLastNotification() << "\n"
<< "\t" << "next_notification=" << service->GetNextNotification() << "\n";
{
ObjectLock olock(service);
fp << "\t" << "check_interval=" << service->GetCheckInterval() / 60.0 << "\n"
<< "\t" << "retry_interval=" << service->GetRetryInterval() / 60.0 << "\n"
<< "\t" << "has_been_checked=" << (service->GetLastCheckResult() ? 1 : 0) << "\n"
<< "\t" << "should_be_scheduled=1" << "\n"
<< "\t" << "check_execution_time=" << execution_time << "\n"
<< "\t" << "check_latency=" << latency << "\n"
<< "\t" << "current_state=" << state << "\n"
<< "\t" << "state_type=" << service->GetStateType() << "\n"
<< "\t" << "plugin_output=" << output << "\n"
<< "\t" << "performance_data=" << perfdata << "\n"
<< "\t" << "last_check=" << schedule_end << "\n"
<< "\t" << "next_check=" << service->GetNextCheck() << "\n"
<< "\t" << "current_attempt=" << service->GetCurrentCheckAttempt() << "\n"
<< "\t" << "max_attempts=" << service->GetMaxCheckAttempts() << "\n"
<< "\t" << "last_state_change=" << service->GetLastStateChange() << "\n"
<< "\t" << "last_hard_state_change=" << service->GetLastHardStateChange() << "\n"
<< "\t" << "last_update=" << time(NULL) << "\n"
<< "\t" << "active_checks_enabled=" << (service->GetEnableActiveChecks() ? 1 : 0) <<"\n"
<< "\t" << "passive_checks_enabled=" << (service->GetEnablePassiveChecks() ? 1 : 0) << "\n"
<< "\t" << "problem_has_been_acknowledged=" << (service->GetAcknowledgement() != AcknowledgementNone ? 1 : 0) << "\n"
<< "\t" << "acknowledgement_type=" << static_cast<int>(service->GetAcknowledgement()) << "\n"
<< "\t" << "acknowledgement_end_time=" << service->GetAcknowledgementExpiry() << "\n"
<< "\t" << "scheduled_downtime_depth=" << (service->IsInDowntime() ? 1 : 0) << "\n"
<< "\t" << "last_notification=" << service->GetLastNotification() << "\n"
<< "\t" << "next_notification=" << service->GetNextNotification() << "\n";
}
}
void CompatComponent::DumpServiceStatus(ofstream& fp, const Service::Ptr& service)
{
ObjectLock olock(service);
String host_name, short_name;
Host::Ptr host;
{
ObjectLock olock(service);
short_name = service->GetShortName();
host = service->GetHost();
}
{
ObjectLock olock(host);
host_name = host->GetName();
}
fp << "servicestatus {" << "\n"
<< "\t" << "host_name=" << service->GetHost()->GetName() << "\n"
<< "\t" << "service_description=" << service->GetShortName() << "\n";
<< "\t" << "host_name=" << host_name << "\n"
<< "\t" << "service_description=" << short_name << "\n";
DumpServiceStatusAttrs(fp, service, CompatTypeService);
@ -393,29 +425,47 @@ void CompatComponent::DumpServiceStatus(ofstream& fp, const Service::Ptr& servic
void CompatComponent::DumpServiceObject(ofstream& fp, const Service::Ptr& service)
{
ObjectLock olock(service);
set<Service::Ptr> parentServices;
Host::Ptr host;
String host_name, short_name;
fp << "define service {" << "\n"
<< "\t" << "host_name" << "\t" << service->GetHost()->GetName() << "\n"
<< "\t" << "service_description" << "\t" << service->GetShortName() << "\n"
<< "\t" << "display_name" << "\t" << service->GetDisplayName() << "\n"
<< "\t" << "check_command" << "\t" << "check_i2" << "\n"
<< "\t" << "check_interval" << "\t" << service->GetCheckInterval() / 60.0 << "\n"
<< "\t" << "retry_interval" << "\t" << service->GetRetryInterval() / 60.0 << "\n"
<< "\t" << "max_check_attempts" << "\t" << 1 << "\n"
<< "\t" << "active_checks_enabled" << "\t" << (service->GetEnableActiveChecks() ? 1 : 0) << "\n"
<< "\t" << "passive_checks_enabled" << "\t" << (service->GetEnablePassiveChecks() ? 1 : 0) << "\n"
<< "\t" << "}" << "\n"
<< "\n";
{
ObjectLock olock(service);
parentServices = service->GetParentServices();
host = service->GetHost();
short_name = service->GetShortName();
}
BOOST_FOREACH(const Service::Ptr& parent, service->GetParentServices()) {
{
ObjectLock olock(host);
host_name = host->GetName();
}
{
ObjectLock olock(service);
fp << "define service {" << "\n"
<< "\t" << "host_name" << "\t" << host_name << "\n"
<< "\t" << "service_description" << "\t" << short_name << "\n"
<< "\t" << "display_name" << "\t" << service->GetDisplayName() << "\n"
<< "\t" << "check_command" << "\t" << "check_i2" << "\n"
<< "\t" << "check_interval" << "\t" << service->GetCheckInterval() / 60.0 << "\n"
<< "\t" << "retry_interval" << "\t" << service->GetRetryInterval() / 60.0 << "\n"
<< "\t" << "max_check_attempts" << "\t" << 1 << "\n"
<< "\t" << "active_checks_enabled" << "\t" << (service->GetEnableActiveChecks() ? 1 : 0) << "\n"
<< "\t" << "passive_checks_enabled" << "\t" << (service->GetEnablePassiveChecks() ? 1 : 0) << "\n"
<< "\t" << "}" << "\n"
<< "\n";
}
BOOST_FOREACH(const Service::Ptr& parent, parentServices) {
ObjectLock plock(parent);
fp << "define servicedependency {" << "\n"
<< "\t" << "dependent_host_name" << "\t" << service->GetHost()->GetName() << "\n"
<< "\t" << "dependent_host_name" << "\t" << host_name << "\n"
<< "\t" << "dependent_service_description" << "\t" << service->GetShortName() << "\n"
<< "\t" << "host_name" << "\t" << parent->GetHost()->GetName() << "\n"
<< "\t" << "service_description" << "\t" << parent->GetShortName() << "\n"
<< "\t" << "service_description" << "\t" << short_name << "\n"
<< "\t" << "execution_failure_criteria" << "\t" << "n" << "\n"
<< "\t" << "notification_failure_criteria" << "\t" << "w,u,c" << "\n"
<< "\t" << "}" << "\n"
@ -494,16 +544,22 @@ void CompatComponent::StatusTimerHandler(void)
}
BOOST_FOREACH(const DynamicObject::Ptr& object, DynamicType::GetObjects("HostGroup")) {
HostGroup::Ptr hg = static_pointer_cast<HostGroup>(object);
ObjectLock olock(hg);
set<Host::Ptr> members;
objectfp << "define hostgroup {" << "\n"
<< "\t" << "hostgroup_name" << "\t" << hg->GetName() << "\n"
<< "\t" << "notes_url" << "\t" << hg->GetNotesUrl() << "\n"
<< "\t" << "action_url" << "\t" << hg->GetActionUrl() << "\n";
{
HostGroup::Ptr hg = static_pointer_cast<HostGroup>(object);
ObjectLock olock(hg);
objectfp << "define hostgroup {" << "\n"
<< "\t" << "hostgroup_name" << "\t" << hg->GetName() << "\n"
<< "\t" << "notes_url" << "\t" << hg->GetNotesUrl() << "\n"
<< "\t" << "action_url" << "\t" << hg->GetActionUrl() << "\n";
members = hg->GetMembers();
}
objectfp << "\t" << "members" << "\t";
DumpNameList(objectfp, hg->GetMembers());
DumpNameList(objectfp, members);
objectfp << "\n"
<< "}" << "\n";
}
@ -516,25 +572,40 @@ void CompatComponent::StatusTimerHandler(void)
}
BOOST_FOREACH(const DynamicObject::Ptr& object, DynamicType::GetObjects("ServiceGroup")) {
ServiceGroup::Ptr sg = static_pointer_cast<ServiceGroup>(object);
ObjectLock olock(sg);
set<Service::Ptr> members;
objectfp << "define servicegroup {" << "\n"
<< "\t" << "servicegroup_name" << "\t" << sg->GetName() << "\n"
<< "\t" << "notes_url" << "\t" << sg->GetNotesUrl() << "\n"
<< "\t" << "action_url" << "\t" << sg->GetActionUrl() << "\n";
{
ServiceGroup::Ptr sg = static_pointer_cast<ServiceGroup>(object);
ObjectLock olock(sg);
objectfp << "define servicegroup {" << "\n"
<< "\t" << "servicegroup_name" << "\t" << sg->GetName() << "\n"
<< "\t" << "notes_url" << "\t" << sg->GetNotesUrl() << "\n"
<< "\t" << "action_url" << "\t" << sg->GetActionUrl() << "\n";
members = sg->GetMembers();
}
objectfp << "\t" << "members" << "\t";
vector<String> sglist;
BOOST_FOREACH(const Service::Ptr& service, sg->GetMembers()) {
ObjectLock slock(service);
Host::Ptr host = service->GetHost();
BOOST_FOREACH(const Service::Ptr& service, members) {
Host::Ptr host;
String host_name, short_name;
ObjectLock hlock(host);
sglist.push_back(host->GetName());
{
ObjectLock olock(service);
host = service->GetHost();
short_name = service->GetShortName();
}
sglist.push_back(service->GetShortName());
{
ObjectLock olock(host);
host_name = host->GetName();
}
sglist.push_back(host_name);
sglist.push_back(short_name);
}
DumpStringList(objectfp, sglist);

View File

@ -438,9 +438,9 @@ void CompatIdoComponent::DumpHostStatus(const Host::Ptr& host)
Logger::Write(LogDebug, "compatido", log.str());
int state;
if (!host->IsReachable())
if (!Host::IsReachable(host))
state = 2; /* unreachable */
else if (!host->IsUp())
else if (host->GetHostCheckService()->GetState() != StateOK)
state = 1; /* down */
else
state = 0; /* up */

View File

@ -33,6 +33,7 @@ void ReplicationComponent::Start(void)
DynamicObject::OnRegistered.connect(boost::bind(&ReplicationComponent::LocalObjectRegisteredHandler, this, _1));
DynamicObject::OnUnregistered.connect(boost::bind(&ReplicationComponent::LocalObjectUnregisteredHandler, this, _1));
DynamicObject::OnTransactionClosing.connect(boost::bind(&ReplicationComponent::TransactionClosingHandler, this, _2));
DynamicObject::OnFlushObject.connect(boost::bind(&ReplicationComponent::FlushObjectHandler, this, _1));
Endpoint::OnConnected.connect(boost::bind(&ReplicationComponent::EndpointConnectedHandler, this, _1));
@ -175,11 +176,21 @@ void ReplicationComponent::TransactionClosingHandler(const set<DynamicObject::We
if (!object)
continue;
if (!ShouldReplicateObject(object))
continue;
FlushObjectHandler(object);
}
}
RequestMessage request = MakeObjectMessage(object, "config::ObjectUpdate", DynamicObject::GetCurrentTx(), true);
EndpointManager::GetInstance()->SendMulticastMessage(m_Endpoint, request);
void ReplicationComponent::FlushObjectHandler(const DynamicObject::Ptr& object)
{
if (!ShouldReplicateObject(object))
return;
RequestMessage request = MakeObjectMessage(object, "config::ObjectUpdate", DynamicObject::GetCurrentTx(), true);
EndpointManager::Ptr em = EndpointManager::GetInstance();
{
ObjectLock olock(em);
em->SendMulticastMessage(m_Endpoint, request);
}
}

View File

@ -42,6 +42,7 @@ private:
void LocalObjectRegisteredHandler(const DynamicObject::Ptr& object);
void LocalObjectUnregisteredHandler(const DynamicObject::Ptr& object);
void TransactionClosingHandler(const set<DynamicObject::WeakPtr>& modifiedObjects);
void FlushObjectHandler(const DynamicObject::Ptr& object);
void RemoteObjectUpdateHandler(const RequestMessage& request);
void RemoteObjectRemovedHandler(const RequestMessage& request);

View File

@ -232,15 +232,11 @@ int main(int argc, char **argv)
return EXIT_FAILURE;
}
DynamicObject::NewTx();
bool validateOnly = g_AppParams.count("validate");
if (!LoadConfigFiles(validateOnly))
return EXIT_FAILURE;
DynamicObject::NewTx();
if (validateOnly) {
Logger::Write(LogInformation, "icinga-app", "Terminating as requested by --validate.");
return EXIT_SUCCESS;

View File

@ -149,7 +149,11 @@ void Application::RunEventLoop(void) const
flushTxTimer->Start();
#endif /* _DEBUG */
Timer::Initialize();
GetEQ().Join();
Timer::Uninitialize();
}
/**
@ -419,11 +423,8 @@ int Application::Run(void)
SetConsoleCtrlHandler(&Application::CtrlHandler, TRUE);
#endif /* _WIN32 */
DynamicObject::NewTx();
result = Main();
DynamicObject::NewTx();
DynamicObject::DeactivateObjects();
return result;

View File

@ -86,7 +86,6 @@ public:
*/
TResult GetResult(void)
{
boost::mutex::scoped_lock lock(m_Mutex);
if (!m_Finished)
BOOST_THROW_EXCEPTION(runtime_error("GetResult called on an unfinished AsyncTask"));
@ -110,7 +109,6 @@ public:
*/
void FinishException(const boost::exception_ptr& ex)
{
boost::mutex::scoped_lock lock(m_Mutex);
m_Exception = ex;
FinishInternal();
}
@ -122,7 +120,6 @@ public:
*/
void FinishResult(const TResult& result)
{
boost::mutex::scoped_lock lock(m_Mutex);
m_Result = result;
FinishInternal();
}
@ -149,24 +146,24 @@ private:
/**
* Finishes the task and causes the completion callback to be invoked. This
* function must be called before the object is destroyed.
*
* @threadsafety Caller must hold m_Mutex.
*/
void FinishInternal(void)
{
assert(!m_Finished);
CompletionCallback callback;
m_Finished = true;
{
boost::mutex::scoped_lock lock(m_Mutex);
assert(!m_Finished);
m_CV.notify_all();
m_Finished = true;
if (!m_CompletionCallback.empty()) {
Utility::QueueAsyncCallback(boost::bind(m_CompletionCallback, GetSelf()));
m_CV.notify_all();
/* Clear callback because the bound function might hold a
* reference to this task. */
m_CompletionCallback = CompletionCallback();
m_CompletionCallback.swap(callback);
}
if (!callback.empty())
Utility::QueueAsyncCallback(boost::bind(callback, GetSelf()));
}
mutable boost::mutex m_Mutex;

View File

@ -30,6 +30,7 @@ Timer::Ptr DynamicObject::m_TransactionTimer;
signals2::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnRegistered;
signals2::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnUnregistered;
signals2::signal<void (double, const set<DynamicObject::WeakPtr>&)> DynamicObject::OnTransactionClosing;
signals2::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnFlushObject;
DynamicObject::DynamicObject(const Dictionary::Ptr& serializedObject)
: m_Events(false), m_ConfigTx(0)
@ -527,11 +528,20 @@ double DynamicObject::GetCurrentTx(void)
{
boost::mutex::scoped_lock lock(m_TransactionMutex);
assert(m_CurrentTx != 0);
if (m_CurrentTx == 0) {
/* Set the initial transaction ID. */
m_CurrentTx = Utility::GetTime();
}
return m_CurrentTx;
}
void DynamicObject::Flush(void)
{
SendLocalUpdateEvents();
OnFlushObject(GetSelf());
}
/*
* @threadsafety Always. Caller must not hold any Object locks.
*/

View File

@ -99,6 +99,7 @@ public:
static signals2::signal<void (const DynamicObject::Ptr&)> OnRegistered;
static signals2::signal<void (const DynamicObject::Ptr&)> OnUnregistered;
static signals2::signal<void (double, const set<DynamicObject::WeakPtr>&)> OnTransactionClosing;
static signals2::signal<void (const DynamicObject::Ptr&)> OnFlushObject;
ScriptTask::Ptr MakeMethodTask(const String& method,
const vector<Value>& arguments);
@ -115,6 +116,8 @@ public:
void SetTx(double tx);
double GetTx(void) const;
void Flush(void);
void Register(void);
void Unregister(void);
@ -132,7 +135,6 @@ public:
static void DeactivateObjects(void);
static double GetCurrentTx(void);
static void NewTx(void);
protected:
virtual void OnInitCompleted(void);
@ -151,6 +153,8 @@ private:
static double m_CurrentTx;
static void NewTx(void);
/* This has to be a set of raw pointers because the DynamicObject
* constructor has to be able to insert objects into this list. */
static set<DynamicObject::WeakPtr> m_ModifiedObjects;

View File

@ -83,11 +83,20 @@ void DynamicType::RegisterObject(const DynamicObject::Ptr& object)
ObjectLock olock(object);
object->SetEvents(true);
if (m_ObjectMap.find(object->GetName()) != m_ObjectMap.end())
ObjectMap::iterator it = m_ObjectMap.find(object->GetName());
if (it != m_ObjectMap.end()) {
if (it->second == object)
return;
BOOST_THROW_EXCEPTION(runtime_error("RegisterObject() found existing object with the same name: " + object->GetName()));
}
m_ObjectMap[object->GetName()] = object;
m_ObjectSet.insert(object);
/* notify the object that it's been fully initialized */
object->OnInitCompleted();
}
void DynamicType::UnregisterObject(const DynamicObject::Ptr& object)
@ -128,22 +137,19 @@ void DynamicType::RegisterType(const DynamicType::Ptr& type)
DynamicObject::Ptr DynamicType::CreateObject(const Dictionary::Ptr& serializedUpdate) const
{
DynamicObject::Ptr obj = m_ObjectFactory(serializedUpdate);
ObjectLock olock(obj);
DynamicObject::Ptr object = m_ObjectFactory(serializedUpdate);
ObjectLock olock(object);
/* register attributes */
String name;
DynamicAttributeType type;
BOOST_FOREACH(tuples::tie(name, type), m_Attributes)
obj->RegisterAttribute(name, type);
object->RegisterAttribute(name, type);
/* apply the object's non-config attributes */
obj->ApplyUpdate(serializedUpdate, Attribute_All & ~Attribute_Config);
object->ApplyUpdate(serializedUpdate, Attribute_All & ~Attribute_Config);
/* notify the object that it's been fully initialized */
obj->OnInitCompleted();
return obj;
return object;
}
/**

View File

@ -32,7 +32,7 @@ EventQueue::EventQueue(void)
if (thread_count < 4)
thread_count = 4;
thread_count *= 8;
thread_count *= 4;
for (int i = 0; i < thread_count; i++)
m_Threads.create_thread(boost::bind(&EventQueue::QueueThreadProc, this));
@ -74,7 +74,7 @@ void EventQueue::Join(void)
*/
void EventQueue::QueueThreadProc(void)
{
while (!m_Stopped) {
for (;;) {
vector<Callback> events;
{
@ -83,6 +83,9 @@ void EventQueue::QueueThreadProc(void)
while (m_Events.empty() && !m_Stopped)
m_CV.wait(lock);
if (m_Stopped)
break;
events.swap(m_Events);
}
@ -112,5 +115,5 @@ void EventQueue::Post(const EventQueue::Callback& callback)
{
boost::mutex::scoped_lock lock(m_Mutex);
m_Events.push_back(callback);
m_CV.notify_all();
m_CV.notify_one();
}

View File

@ -41,7 +41,6 @@ Object::~Object(void)
*/
Object::SharedPtrHolder Object::GetSelf(void)
{
ObjectLock olock(this);
return Object::SharedPtrHolder(shared_from_this());
}

View File

@ -133,7 +133,8 @@ void Process::WorkerThreadProc(int taskFd)
if (fd >= 0)
tasks[fd] = task;
} catch (...) {
Application::GetEQ().Post(boost::bind(&Process::FinishException, task, boost::current_exception()));
ObjectLock olock(task);
task->FinishException(boost::current_exception());
}
}
@ -148,7 +149,8 @@ void Process::WorkerThreadProc(int taskFd)
prev = it;
tasks.erase(prev);
Application::GetEQ().Post(boost::bind(&Process::FinishResult, task, task->m_Result));
ObjectLock olock(task);
task->FinishResult(task->m_Result);
}
}
}

View File

@ -28,7 +28,10 @@ deque<Process::Ptr> Process::m_Tasks;
Process::Process(const vector<String>& arguments, const Dictionary::Ptr& extraEnvironment)
: AsyncTask<Process, ProcessResult>(), m_Arguments(arguments), m_ExtraEnvironment(extraEnvironment)
{
boost::call_once(&Process::Initialize, m_ThreadOnce);
{
boost::mutex::scoped_lock lock(m_Mutex);
boost::call_once(&Process::Initialize, m_ThreadOnce);
}
#ifndef _WIN32
m_FD = -1;

View File

@ -83,6 +83,8 @@ void StreamLogger::ProcessLogEntry(ostream& stream, bool tty, const LogEntry& en
strftime(timestamp, sizeof(timestamp), "%Y/%m/%d %H:%M:%S %z", &tmnow);
boost::mutex::scoped_lock lock(m_Mutex);
if (tty) {
switch (entry.Severity) {
case LogWarning:
@ -96,7 +98,6 @@ void StreamLogger::ProcessLogEntry(ostream& stream, bool tty, const LogEntry& en
}
}
boost::mutex::scoped_lock lock(m_Mutex);
stream << "[" << timestamp << "] "
<< Logger::SeverityToString(entry.Severity) << "/" << entry.Facility << ": "
<< entry.Message;

View File

@ -22,9 +22,10 @@
using namespace icinga;
Timer::TimerSet Timer::m_Timers;
thread Timer::m_Thread;
boost::mutex Timer::m_Mutex;
boost::condition_variable Timer::m_CV;
boost::once_flag Timer::m_ThreadOnce = BOOST_ONCE_INIT;
bool Timer::m_StopThread;
/**
* Extracts the next timestamp from a Timer.
@ -59,8 +60,25 @@ Timer::Timer(void)
*/
void Timer::Initialize(void)
{
thread worker(boost::bind(&Timer::TimerThreadProc));
worker.detach();
boost::mutex::scoped_lock lock(m_Mutex);
m_StopThread = false;
m_Thread = thread(boost::bind(&Timer::TimerThreadProc));
}
/**
* Disables the timer sub-system.
*
* @threadsafety Always.
*/
void Timer::Uninitialize(void)
{
{
boost::mutex::scoped_lock lock(m_Mutex);
m_StopThread = true;
m_CV.notify_all();
}
m_Thread.join();
}
/**
@ -70,18 +88,8 @@ void Timer::Initialize(void)
*/
void Timer::Call(void)
{
double st = Utility::GetTime();
OnTimerExpired(GetSelf());
double et = Utility::GetTime();
if (et - st > 1.0) {
stringstream msgbuf;
msgbuf << "Timer call took " << et - st << " seconds.";
Logger::Write(LogWarning, "base", msgbuf.str());
}
/* Re-enable the timer so it can be called again. */
m_Started = true;
Reschedule();
@ -118,8 +126,6 @@ double Timer::GetInterval(void) const
*/
void Timer::Start(void)
{
boost::call_once(&Timer::Initialize, m_ThreadOnce);
m_Started = true;
Reschedule();
@ -232,9 +238,12 @@ void Timer::TimerThreadProc(void)
NextTimerView& idx = boost::get<1>(m_Timers);
/* Wait until there is at least one timer. */
while (idx.empty())
while (idx.empty() && !m_StopThread)
m_CV.wait(lock);
if (m_StopThread)
break;
NextTimerView::iterator it = idx.begin();
Timer::Ptr timer = it->lock();

View File

@ -64,6 +64,9 @@ public:
signals2::signal<void(const Timer::Ptr&)> OnTimerExpired;
static void Initialize(void);
static void Uninitialize(void);
private:
double m_Interval; /**< The interval of the timer. */
double m_Next; /**< When the next event should happen. */
@ -79,13 +82,12 @@ private:
static boost::mutex m_Mutex;
static boost::condition_variable m_CV;
static thread m_Thread;
static bool m_StopThread;
static TimerSet m_Timers;
void Call(void);
static boost::once_flag m_ThreadOnce;
static void Initialize(void);
static void TimerThreadProc(void);
friend struct TimerNextExtractor;

View File

@ -383,18 +383,14 @@ void ConfigItem::UnloadUnit(const String& unit)
vector<ConfigItem::Ptr> obsoleteItems;
{
boost::mutex::scoped_lock lock(m_Mutex);
ConfigItem::Ptr item;
BOOST_FOREACH(tie(tuples::ignore, item), m_Items) {
ObjectLock olock(item);
ConfigItem::Ptr item;
BOOST_FOREACH(tie(tuples::ignore, item), m_Items) {
ObjectLock olock(item);
if (item->GetUnit() != unit)
continue;
if (item->GetUnit() != unit)
continue;
obsoleteItems.push_back(item);
}
obsoleteItems.push_back(item);
}
BOOST_FOREACH(const ConfigItem::Ptr& item, obsoleteItems) {

View File

@ -349,12 +349,12 @@ void ExternalCommandProcessor::AcknowledgeHostProblem(double, const vector<Strin
Host::Ptr host = Host::GetByName(arguments[0]);
if (host->IsUp())
BOOST_THROW_EXCEPTION(invalid_argument("The host '" + arguments[0] + "' is OK."));
Logger::Write(LogInformation, "icinga", "Setting acknowledgement for host '" + host->GetName() + "'");
Service::Ptr service = host->GetHostCheckService();
if (service) {
if (service->GetState() == StateOK)
BOOST_THROW_EXCEPTION(invalid_argument("The host '" + arguments[0] + "' is OK."));
service->SetAcknowledgement(sticky ? AcknowledgementSticky : AcknowledgementNormal);
service->SetAcknowledgementExpiry(0);
}
@ -370,12 +370,12 @@ void ExternalCommandProcessor::AcknowledgeHostProblemExpire(double, const vector
Host::Ptr host = Host::GetByName(arguments[0]);
if (host->IsUp())
BOOST_THROW_EXCEPTION(invalid_argument("The host '" + arguments[0] + "' is OK."));
Logger::Write(LogInformation, "icinga", "Setting timed acknowledgement for host '" + host->GetName() + "'");
Service::Ptr service = host->GetHostCheckService();
if (service) {
if (service->GetState() == StateOK)
BOOST_THROW_EXCEPTION(invalid_argument("The host '" + arguments[0] + "' is OK."));
service->SetAcknowledgement(sticky ? AcknowledgementSticky : AcknowledgementNormal);
service->SetAcknowledgementExpiry(timestamp);
}

View File

@ -34,7 +34,14 @@ REGISTER_TYPE(Host, hostAttributes);
Host::Host(const Dictionary::Ptr& properties)
: DynamicObject(properties)
{ }
{
HostGroup::InvalidateMembersCache();
}
void Host::OnInitCompleted(void)
{
UpdateSlaveServices();
}
Host::~Host(void)
{
@ -105,9 +112,18 @@ String Host::GetHostCheck(void) const
return Get("hostcheck");
}
bool Host::IsReachable(void)
bool Host::IsReachable(const Host::Ptr& self)
{
BOOST_FOREACH(const Service::Ptr& service, GetParentServices()) {
set<Service::Ptr> parentServices;
{
ObjectLock olock(self);
parentServices = self->GetParentServices();
}
BOOST_FOREACH(const Service::Ptr& service, parentServices) {
ObjectLock olock(service);
/* ignore pending services */
if (!service->GetLastCheckResult())
continue;
@ -124,9 +140,23 @@ bool Host::IsReachable(void)
return false;
}
BOOST_FOREACH(const Host::Ptr& host, GetParentHosts()) {
set<Host::Ptr> parentHosts;
{
ObjectLock olock(self);
parentHosts = self->GetParentHosts();
}
BOOST_FOREACH(const Host::Ptr& host, parentHosts) {
Service::Ptr hc;
{
ObjectLock olock(host);
hc = host->GetHostCheckService();
}
/* ignore hosts that are up */
if (host->IsUp())
if (hc && hc->GetState() == StateOK)
continue;
return false;
@ -135,28 +165,6 @@ bool Host::IsReachable(void)
return true;
}
bool Host::IsInDowntime(void) const
{
Service::Ptr service = GetHostCheckService();
if (!service)
return false;
ObjectLock olock(service);
return (service || service->IsInDowntime());
}
bool Host::IsUp(void) const
{
Service::Ptr service = GetHostCheckService();
if (!service)
return true;
ObjectLock olock(service);
return (service->GetState() == StateOK || service->GetState() == StateWarning);
}
template<bool copyServiceAttrs, typename TDict>
static void CopyServiceAttributes(TDict serviceDesc, const ConfigItemBuilder::Ptr& builder)
{
@ -280,10 +288,19 @@ void Host::OnAttributeChanged(const String& name, const Value&)
{
if (name == "hostgroups")
HostGroup::InvalidateMembersCache();
else if (name == "services")
else if (name == "services") {
ObjectLock olock(this);
UpdateSlaveServices();
else if (name == "notifications") {
BOOST_FOREACH(const Service::Ptr& service, GetServices()) {
} else if (name == "notifications") {
set<Service::Ptr> services;
{
ObjectLock olock(this);
services = GetServices();
}
BOOST_FOREACH(const Service::Ptr& service, services) {
ObjectLock olock(service);
service->UpdateSlaveNotifications();
}
}

View File

@ -56,9 +56,7 @@ public:
set<Host::Ptr> GetParentHosts(void) const;
set<shared_ptr<Service> > GetParentServices(void) const;
bool IsReachable(void);
bool IsInDowntime(void) const;
bool IsUp(void) const;
static bool IsReachable(const Host::Ptr& self);
shared_ptr<Service> GetServiceByShortName(const Value& name) const;
@ -69,6 +67,7 @@ public:
const std::vector<icinga::Value>& arguments);
protected:
void OnInitCompleted(void);
void OnAttributeChanged(const String& name, const Value& oldValue);
private:

View File

@ -95,7 +95,10 @@ void Notification::NotificationCompletedHandler(const ScriptTask::Ptr& task)
m_Tasks.erase(task);
try {
(void) task->GetResult();
{
ObjectLock tlock(task);
(void) task->GetResult();
}
Logger::Write(LogInformation, "icinga", "Completed sending notification for service '" + GetService()->GetName() + "'");
} catch (const exception& ex) {

View File

@ -97,7 +97,10 @@ void PluginNotificationTask::ProcessFinishedHandler(PluginNotificationTask ct)
ProcessResult pr;
try {
pr = ct.m_Process->GetResult();
{
ObjectLock tlock(ct.m_Process);
pr = ct.m_Process->GetResult();
}
if (pr.ExitStatus != 0) {
stringstream msgbuf;

View File

@ -350,9 +350,9 @@ void Service::ApplyCheckResult(const Dictionary::Ptr& cr)
/* Make sure the notification component sees the updated
* state/state_type attributes. */
DynamicObject::NewTx();
Flush();
if (IsReachable() && !IsInDowntime() && !IsAcknowledged())
if (IsReachable(GetSelf()) && !IsInDowntime() && !IsAcknowledged())
RequestNotifications(NotificationStateChange);
}
}
@ -457,7 +457,12 @@ void Service::CheckCompletedHandler(const Dictionary::Ptr& scheduleInfo,
Dictionary::Ptr result;
try {
Value vresult = task->GetResult();
Value vresult;
{
ObjectLock tlock(task);
vresult = task->GetResult();
}
if (vresult.IsObjectType<Dictionary>())
result = vresult;
@ -510,9 +515,9 @@ void Service::ProcessCheckResult(const Dictionary::Ptr& cr)
Service::UpdateStatistics(cr);
/* flush the current transaction so other instances see the service's
/* Flush the object so other instances see the service's
* new state when they receive the CheckResult message */
DynamicObject::NewTx();
Flush();
RequestMessage rm;
rm.SetMethod("checker::CheckResult");

View File

@ -125,7 +125,17 @@ void Service::UpdateSlaveNotifications(void)
newNotifications = boost::make_shared<Dictionary>();
vector<Dictionary::Ptr> notificationDescsList;
notificationDescsList.push_back(GetHost()->Get("notifications"));
String host_name;
{
Host::Ptr host = GetHost();
ObjectLock olock(host);
notificationDescsList.push_back(host->Get("notifications"));
host_name = host->GetName();
}
notificationDescsList.push_back(Get("notifications"));
BOOST_FOREACH(const Dictionary::Ptr& notificationDescs, notificationDescsList) {
@ -145,7 +155,7 @@ void Service::UpdateSlaveNotifications(void)
ConfigItemBuilder::Ptr builder = boost::make_shared<ConfigItemBuilder>(item->GetDebugInfo());
builder->SetType("Notification");
builder->SetName(name);
builder->AddExpression("host_name", OperatorSet, GetHost()->GetName());
builder->AddExpression("host_name", OperatorSet, host_name);
builder->AddExpression("service", OperatorSet, GetName());
CopyNotificationAttributes(this, builder);

View File

@ -47,7 +47,17 @@ REGISTER_TYPE(Service, serviceAttributes);
Service::Service(const Dictionary::Ptr& serializedObject)
: DynamicObject(serializedObject)
{ }
{
ServiceGroup::InvalidateMembersCache();
Host::InvalidateServicesCache();
Service::InvalidateDowntimesCache();
Service::InvalidateCommentsCache();
}
void Service::OnInitCompleted(void)
{
UpdateSlaveNotifications();
}
Service::~Service(void)
{
@ -142,9 +152,18 @@ String Service::GetShortName(void) const
return value;
}
bool Service::IsReachable(void) const
bool Service::IsReachable(const Service::Ptr& self)
{
BOOST_FOREACH(const Service::Ptr& service, GetParentServices()) {
set<Service::Ptr> parentServices;
{
ObjectLock olock(self);
parentServices = self->GetParentServices();
}
BOOST_FOREACH(const Service::Ptr& service, parentServices) {
ObjectLock olock(service);
/* ignore pending services */
if (!service->GetLastCheckResult())
continue;
@ -161,9 +180,23 @@ bool Service::IsReachable(void) const
return false;
}
BOOST_FOREACH(const Host::Ptr& host, GetParentHosts()) {
set<Host::Ptr> parentHosts;
{
ObjectLock olock(self);
parentHosts = self->GetParentHosts();
}
BOOST_FOREACH(const Host::Ptr& host, parentHosts) {
Service::Ptr hc;
{
ObjectLock olock(host);
hc = host->GetHostCheckService();
}
/* ignore hosts that are up */
if (host->IsUp())
if (hc && hc->GetState() == StateOK)
continue;
return false;
@ -222,8 +255,6 @@ void Service::SetAcknowledgementExpiry(double timestamp)
void Service::OnAttributeChanged(const String& name, const Value& oldValue)
{
ObjectLock olock(this);
if (name == "checker")
OnCheckerChanged(GetSelf(), oldValue);
else if (name == "next_check")
@ -232,7 +263,11 @@ void Service::OnAttributeChanged(const String& name, const Value& oldValue)
ServiceGroup::InvalidateMembersCache();
else if (name == "host_name" || name == "short_name") {
Host::InvalidateServicesCache();
UpdateSlaveNotifications();
{
ObjectLock olock(this);
UpdateSlaveNotifications();
}
} else if (name == "downtimes")
Service::InvalidateDowntimesCache();
else if (name == "comments")
@ -240,6 +275,7 @@ void Service::OnAttributeChanged(const String& name, const Value& oldValue)
else if (name == "notifications")
UpdateSlaveNotifications();
else if (name == "check_interval") {
ObjectLock(this);
ConfigItem::Ptr item = ConfigItem::GetObject("Service", GetName());
/* update the next check timestamp if we're the owner of this service */

View File

@ -111,7 +111,7 @@ public:
set<Host::Ptr> GetParentHosts(void) const;
set<Service::Ptr> GetParentServices(void) const;
bool IsReachable(void) const;
bool IsReachable(const Service::Ptr& self);
AcknowledgementType GetAcknowledgement(void);
void SetAcknowledgement(AcknowledgementType acknowledgement);
@ -248,6 +248,7 @@ public:
void SetNextNotification(double time);
protected:
virtual void OnInitCompleted(void);
virtual void OnAttributeChanged(const String& name, const Value& oldValue);
private: