Fine-grained locks (WIP, Part 2).

This commit is contained in:
Gunnar Beutner 2013-02-18 14:40:24 +01:00
parent c63c28dd9c
commit a4c7052a6e
41 changed files with 532 additions and 412 deletions

View File

@ -35,10 +35,8 @@ void CheckerComponent::Start(void)
Service::OnNextCheckChanged.connect(bind(&CheckerComponent::NextCheckChangedHandler, this, _1));
DynamicObject::OnUnregistered.connect(bind(&CheckerComponent::ObjectRemovedHandler, this, _1));
m_CheckTimer = boost::make_shared<Timer>();
m_CheckTimer->SetInterval(0.1);
m_CheckTimer->OnTimerExpired.connect(boost::bind(&CheckerComponent::CheckTimerHandler, this));
m_CheckTimer->Start();
boost::thread thread(boost::bind(&CheckerComponent::CheckThreadProc, this));
thread.detach();
m_ResultTimer = boost::make_shared<Timer>();
m_ResultTimer->SetInterval(5);
@ -51,15 +49,8 @@ void CheckerComponent::Stop(void)
m_Endpoint->Unregister();
}
void CheckerComponent::CheckTimerHandler(void)
void CheckerComponent::CheckThreadProc(void)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
double now = Utility::GetTime();
long tasks = 0;
int missedServices = 0, missedChecks = 0;
for (;;) {
Service::Ptr service;
@ -69,8 +60,8 @@ void CheckerComponent::CheckTimerHandler(void)
typedef nth_index<ServiceSet, 1>::type CheckTimeView;
CheckTimeView& idx = boost::get<1>(m_IdleServices);
if (idx.begin() == idx.end())
break;
while (idx.begin() == idx.end())
m_CV.wait(lock);
CheckTimeView::iterator it = idx.begin();
service = it->lock();
@ -79,18 +70,32 @@ void CheckerComponent::CheckTimerHandler(void)
idx.erase(it);
continue;
}
{
ObjectLock olock(service);
if (service->GetNextCheck() > now)
break;
}
idx.erase(it);
}
ObjectLock olock(service);
double wait;
{
ObjectLock olock(service);
wait = service->GetNextCheck() - Utility::GetTime();
}
if (wait > 0) {
/* Make sure the service we just examined can be destroyed while we're waiting. */
service.reset();
/* Wait for the next check. */
boost::mutex::scoped_lock lock(m_Mutex);
m_CV.timed_wait(lock, boost::posix_time::milliseconds(wait * 1000));
continue;
}
{
boost::mutex::scoped_lock lock(m_Mutex);
m_IdleServices.erase(service);
}
ObjectLock olock(service); /* also required for the key extractor */
/* reschedule the service if checks are currently disabled
* for it and this is not a forced check */
@ -115,51 +120,28 @@ void CheckerComponent::CheckTimerHandler(void)
service->SetForceNextCheck(false);
Dictionary::Ptr cr = service->GetLastCheckResult();
if (cr) {
double lastCheck = cr->Get("execution_end");
int missed = (Utility::GetTime() - lastCheck) / service->GetCheckInterval() - 1;
if (missed > 0 && !service->GetFirstCheck()) {
missedChecks += missed;
missedServices++;
}
}
service->SetFirstCheck(false);
Logger::Write(LogDebug, "checker", "Executing service check for '" + service->GetName() + "'");
m_IdleServices.erase(service);
m_PendingServices.insert(service);
{
boost::mutex::scoped_lock lock(m_Mutex);
m_IdleServices.erase(service);
m_PendingServices.insert(service);
}
try {
service->BeginExecuteCheck(boost::bind(&CheckerComponent::CheckCompletedHandler, this, service));
} catch (const exception& ex) {
Logger::Write(LogCritical, "checker", "Exception occured while checking service '" + service->GetName() + "': " + diagnostic_information(ex));
}
tasks++;
}
if (missedServices > 0) {
stringstream msgbuf;
msgbuf << "Missed " << missedChecks << " checks for " << missedServices << " services";;
Logger::Write(LogWarning, "checker", msgbuf.str());
}
if (tasks > 0) {
stringstream msgbuf;
msgbuf << "CheckTimerHandler: created " << tasks << " task(s)";
Logger::Write(LogDebug, "checker", msgbuf.str());
}
RescheduleCheckTimer();
}
void CheckerComponent::CheckCompletedHandler(const Service::Ptr& service)
{
ObjectLock olock(service); /* required for the key extractor */
{
boost::mutex::scoped_lock lock(m_Mutex);
@ -171,15 +153,11 @@ void CheckerComponent::CheckCompletedHandler(const Service::Ptr& service)
if (it != m_PendingServices.end()) {
m_PendingServices.erase(it);
m_IdleServices.insert(service);
m_CV.notify_all();
}
}
RescheduleCheckTimer();
{
ObjectLock olock(service);
Logger::Write(LogDebug, "checker", "Check finished for service '" + service->GetName() + "'");
}
Logger::Write(LogDebug, "checker", "Check finished for service '" + service->GetName() + "'");
}
void CheckerComponent::ResultTimerHandler(void)
@ -199,12 +177,8 @@ void CheckerComponent::ResultTimerHandler(void)
void CheckerComponent::CheckerChangedHandler(const Service::Ptr& service)
{
String checker;
{
ObjectLock olock(service);
checker = service->GetChecker();
}
ObjectLock olock(service); /* also required for the key extractor */
String checker = service->GetChecker();
if (checker == EndpointManager::GetInstance()->GetIdentity() || checker == m_Endpoint->GetName()) {
boost::mutex::scoped_lock lock(m_Mutex);
@ -213,17 +187,20 @@ void CheckerComponent::CheckerChangedHandler(const Service::Ptr& service)
return;
m_IdleServices.insert(service);
m_CV.notify_all();
} else {
boost::mutex::scoped_lock lock(m_Mutex);
m_IdleServices.erase(service);
m_PendingServices.erase(service);
m_CV.notify_all();
}
}
void CheckerComponent::NextCheckChangedHandler(const Service::Ptr& service)
{
{
ObjectLock olock(service); /* required for the key extractor */
boost::mutex::scoped_lock lock(m_Mutex);
/* remove and re-insert the service from the set in order to force an index update */
@ -234,11 +211,9 @@ void CheckerComponent::NextCheckChangedHandler(const Service::Ptr& service)
if (it == idx.end())
return;
idx.erase(it);
idx.insert(service);
idx.replace(it, service);
m_CV.notify_all();
}
RescheduleCheckTimer();
}
void CheckerComponent::ObjectRemovedHandler(const DynamicObject::Ptr& object)
@ -254,35 +229,6 @@ void CheckerComponent::ObjectRemovedHandler(const DynamicObject::Ptr& object)
m_IdleServices.erase(service);
m_PendingServices.erase(service);
m_CV.notify_all();
}
}
void CheckerComponent::RescheduleCheckTimer(void)
{
Service::Ptr service;
{
boost::mutex::scoped_lock lock(m_Mutex);
if (m_IdleServices.empty())
return;
typedef nth_index<ServiceSet, 1>::type CheckTimeView;
CheckTimeView& idx = boost::get<1>(m_IdleServices);
do {
CheckTimeView::iterator it = idx.begin();
if (it == idx.end())
return;
service = it->lock();
if (!service)
idx.erase(it);
} while (!service);
}
ObjectLock olock(service);
m_CheckTimer->Reschedule(service->GetNextCheck());
}

View File

@ -30,6 +30,9 @@ struct ServiceNextCheckExtractor
{
typedef double result_type;
/**
* @threadsafety Caller must hold the mutex for the service.
*/
double operator()(const Service::WeakPtr& wservice)
{
Service::Ptr service = wservice.lock();
@ -37,10 +40,7 @@ struct ServiceNextCheckExtractor
if (!service)
return 0;
{
ObjectLock olock(service);
return service->GetNextCheck();
}
return service->GetNextCheck();
}
};
@ -68,15 +68,14 @@ private:
Endpoint::Ptr m_Endpoint;
boost::mutex m_Mutex;
boost::condition_variable m_CV;
ServiceSet m_IdleServices;
ServiceSet m_PendingServices;
Timer::Ptr m_CheckTimer;
Timer::Ptr m_ResultTimer;
void CheckTimerHandler(void);
void CheckThreadProc(void);
void ResultTimerHandler(void);
void CheckCompletedHandler(const Service::Ptr& service);

View File

@ -154,11 +154,7 @@ void CompatComponent::CommandPipeThread(const String& commandPath)
String command = line;
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
ProcessCommand(command);
}
ProcessCommand(command);
}
fclose(fp);
@ -181,6 +177,8 @@ void CompatComponent::ProcessCommand(const String& command)
void CompatComponent::DumpComments(ofstream& fp, const Service::Ptr& owner, CompatObjectType type)
{
ObjectLock olock(owner);
Service::Ptr service;
Host::Ptr host;
Dictionary::Ptr comments = owner->GetComments();
@ -216,6 +214,8 @@ void CompatComponent::DumpComments(ofstream& fp, const Service::Ptr& owner, Comp
void CompatComponent::DumpDowntimes(ofstream& fp, const Service::Ptr& owner, CompatObjectType type)
{
ObjectLock olock(owner);
Dictionary::Ptr downtimes = owner->GetDowntimes();
if (!downtimes)
@ -257,6 +257,8 @@ void CompatComponent::DumpDowntimes(ofstream& fp, const Service::Ptr& owner, Com
void CompatComponent::DumpHostStatus(ofstream& fp, const Host::Ptr& host)
{
ObjectLock olock(host);
int state;
if (!host->IsReachable())
state = 2; /* unreachable */
@ -285,6 +287,8 @@ void CompatComponent::DumpHostStatus(ofstream& fp, const Host::Ptr& host)
void CompatComponent::DumpHostObject(ofstream& fp, const Host::Ptr& host)
{
ObjectLock olock(host);
fp << "define host {" << "\n"
<< "\t" << "host_name" << "\t" << host->GetName() << "\n"
<< "\t" << "display_name" << "\t" << host->GetDisplayName() << "\n"
@ -308,6 +312,8 @@ void CompatComponent::DumpHostObject(ofstream& fp, const Host::Ptr& host)
void CompatComponent::DumpServiceStatusAttrs(ofstream& fp, const Service::Ptr& service, CompatObjectType type)
{
ObjectLock olock(service);
String output;
String perfdata;
double schedule_start = -1, schedule_end = -1;
@ -370,6 +376,8 @@ void CompatComponent::DumpServiceStatusAttrs(ofstream& fp, const Service::Ptr& s
void CompatComponent::DumpServiceStatus(ofstream& fp, const Service::Ptr& service)
{
ObjectLock olock(service);
fp << "servicestatus {" << "\n"
<< "\t" << "host_name=" << service->GetHost()->GetName() << "\n"
<< "\t" << "service_description=" << service->GetShortName() << "\n";
@ -385,6 +393,8 @@ void CompatComponent::DumpServiceStatus(ofstream& fp, const Service::Ptr& servic
void CompatComponent::DumpServiceObject(ofstream& fp, const Service::Ptr& service)
{
ObjectLock olock(service);
fp << "define service {" << "\n"
<< "\t" << "host_name" << "\t" << service->GetHost()->GetName() << "\n"
<< "\t" << "service_description" << "\t" << service->GetShortName() << "\n"
@ -399,6 +409,8 @@ void CompatComponent::DumpServiceObject(ofstream& fp, const Service::Ptr& servic
<< "\n";
BOOST_FOREACH(const Service::Ptr& parent, service->GetParentServices()) {
ObjectLock plock(parent);
fp << "define servicedependency {" << "\n"
<< "\t" << "dependent_host_name" << "\t" << service->GetHost()->GetName() << "\n"
<< "\t" << "dependent_service_description" << "\t" << service->GetShortName() << "\n"
@ -416,8 +428,6 @@ void CompatComponent::DumpServiceObject(ofstream& fp, const Service::Ptr& servic
*/
void CompatComponent::StatusTimerHandler(void)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
Logger::Write(LogInformation, "compat", "Writing compat status information");
String statuspath = GetStatusPath();
@ -468,55 +478,85 @@ void CompatComponent::StatusTimerHandler(void)
<< "# This file is auto-generated. Do not modify this file." << "\n"
<< "\n";
DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Host")->GetObjects()) {
const Host::Ptr& host = static_pointer_cast<Host>(object);
{
DynamicType::Ptr dt = DynamicType::GetByName("Host");
ObjectLock dlock(dt);
DumpHostStatus(statusfp, host);
DumpHostObject(objectfp, host);
}
DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
Host::Ptr host = static_pointer_cast<Host>(object);
BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("HostGroup")->GetObjects()) {
const HostGroup::Ptr& hg = static_pointer_cast<HostGroup>(object);
objectfp << "define hostgroup {" << "\n"
<< "\t" << "hostgroup_name" << "\t" << hg->GetName() << "\n"
<< "\t" << "notes_url" << "\t" << hg->GetNotesUrl() << "\n"
<< "\t" << "action_url" << "\t" << hg->GetActionUrl() << "\n";
objectfp << "\t" << "members" << "\t";
DumpNameList(objectfp, hg->GetMembers());
objectfp << "\n"
<< "}" << "\n";
}
BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Service")->GetObjects()) {
const Service::Ptr& service = static_pointer_cast<Service>(object);
DumpServiceStatus(statusfp, service);
DumpServiceObject(objectfp, service);
}
BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("ServiceGroup")->GetObjects()) {
const ServiceGroup::Ptr& sg = static_pointer_cast<ServiceGroup>(object);
objectfp << "define servicegroup {" << "\n"
<< "\t" << "servicegroup_name" << "\t" << sg->GetName() << "\n"
<< "\t" << "notes_url" << "\t" << sg->GetNotesUrl() << "\n"
<< "\t" << "action_url" << "\t" << sg->GetActionUrl() << "\n";
objectfp << "\t" << "members" << "\t";
vector<String> sglist;
BOOST_FOREACH(const Service::Ptr& service, sg->GetMembers()) {
sglist.push_back(service->GetHost()->GetName());
sglist.push_back(service->GetShortName());
DumpHostStatus(statusfp, host);
DumpHostObject(objectfp, host);
}
}
DumpStringList(objectfp, sglist);
{
DynamicType::Ptr dt = DynamicType::GetByName("Host");
ObjectLock dlock(dt);
objectfp << "\n"
<< "}" << "\n";
DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
HostGroup::Ptr hg = static_pointer_cast<HostGroup>(object);
ObjectLock olock(hg);
objectfp << "define hostgroup {" << "\n"
<< "\t" << "hostgroup_name" << "\t" << hg->GetName() << "\n"
<< "\t" << "notes_url" << "\t" << hg->GetNotesUrl() << "\n"
<< "\t" << "action_url" << "\t" << hg->GetActionUrl() << "\n";
objectfp << "\t" << "members" << "\t";
DumpNameList(objectfp, hg->GetMembers());
objectfp << "\n"
<< "}" << "\n";
}
}
{
DynamicType::Ptr dt = DynamicType::GetByName("Service");
ObjectLock dlock(dt);
DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
Service::Ptr service = static_pointer_cast<Service>(object);
DumpServiceStatus(statusfp, service);
DumpServiceObject(objectfp, service);
}
}
{
DynamicType::Ptr dt = DynamicType::GetByName("ServiceGroup");
ObjectLock dlock(dt);
DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
ServiceGroup::Ptr sg = static_pointer_cast<ServiceGroup>(object);
ObjectLock olock(sg);
objectfp << "define servicegroup {" << "\n"
<< "\t" << "servicegroup_name" << "\t" << sg->GetName() << "\n"
<< "\t" << "notes_url" << "\t" << sg->GetNotesUrl() << "\n"
<< "\t" << "action_url" << "\t" << sg->GetActionUrl() << "\n";
objectfp << "\t" << "members" << "\t";
vector<String> sglist;
BOOST_FOREACH(const Service::Ptr& service, sg->GetMembers()) {
ObjectLock slock(service);
Host::Ptr host = service->GetHost();
ObjectLock hlock(host);
sglist.push_back(host->GetName());
sglist.push_back(service->GetShortName());
}
DumpStringList(objectfp, sglist);
objectfp << "\n"
<< "}" << "\n";
}
}
statusfp.close();

View File

@ -71,6 +71,7 @@ private:
else
first = false;
ObjectLock olock(*it);
fp << (*it)->GetName();
}
}

View File

@ -48,13 +48,17 @@ bool DelegationComponent::IsEndpointChecker(const Endpoint::Ptr& endpoint)
return (endpoint->HasSubscription("checker"));
}
vector<Endpoint::Ptr> DelegationComponent::GetCheckerCandidates(const Service::Ptr& service) const
set<Endpoint::Ptr> DelegationComponent::GetCheckerCandidates(const Service::Ptr& service) const
{
vector<Endpoint::Ptr> candidates;
set<Endpoint::Ptr> candidates;
DynamicType::Ptr dt = DynamicType::GetByName("Endpoint");
ObjectLock dlock(dt);
DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Endpoint")->GetObjects()) {
BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
ObjectLock olock(endpoint);
String myIdentity = EndpointManager::GetInstance()->GetIdentity();
@ -74,7 +78,7 @@ vector<Endpoint::Ptr> DelegationComponent::GetCheckerCandidates(const Service::P
if (!service->IsAllowedChecker(endpoint->GetName()))
continue;
candidates.push_back(endpoint);
candidates.insert(endpoint);
}
return candidates;
@ -82,59 +86,71 @@ vector<Endpoint::Ptr> DelegationComponent::GetCheckerCandidates(const Service::P
void DelegationComponent::DelegationTimerHandler(void)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
map<Endpoint::Ptr, int> histogram;
DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Endpoint")->GetObjects()) {
Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
{
DynamicType::Ptr dt = DynamicType::GetByName("Endpoint");
ObjectLock dlock(dt);
histogram[endpoint] = 0;
DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
histogram[endpoint] = 0;
}
}
vector<Service::Ptr> services;
/* build "checker -> service count" histogram */
BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Service")->GetObjects()) {
Service::Ptr service = dynamic_pointer_cast<Service>(object);
{
/* build "checker -> service count" histogram */
DynamicType::Ptr dt = DynamicType::GetByName("Service");
ObjectLock dlock(dt);
if (!service)
continue;
DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
Service::Ptr service = dynamic_pointer_cast<Service>(object);
services.push_back(service);
if (!service)
continue;
String checker = service->GetChecker();
if (checker.IsEmpty())
continue;
services.push_back(service);
if (!Endpoint::Exists(checker))
continue;
ObjectLock olock(service);
String checker = service->GetChecker();
if (checker.IsEmpty())
continue;
Endpoint::Ptr endpoint = Endpoint::GetByName(checker);
if (!Endpoint::Exists(checker))
continue;
histogram[endpoint]++;
Endpoint::Ptr endpoint = Endpoint::GetByName(checker);
histogram[endpoint]++;
}
}
std::random_shuffle(services.begin(), services.end());
//std::random_shuffle(services.begin(), services.end());
int delegated = 0;
/* re-assign services */
BOOST_FOREACH(const Service::Ptr& service, services) {
ObjectLock olock(service);
String checker = service->GetChecker();
Endpoint::Ptr oldEndpoint;
if (Endpoint::Exists(checker))
oldEndpoint = Endpoint::GetByName(checker);
vector<Endpoint::Ptr> candidates = GetCheckerCandidates(service);
set<Endpoint::Ptr> candidates = GetCheckerCandidates(service);
int avg_services = 0, overflow_tolerance = 0;
vector<Endpoint::Ptr>::iterator cit;
if (candidates.size() > 0) {
std::random_shuffle(candidates.begin(), candidates.end());
//std::random_shuffle(candidates.begin(), candidates.end());
stringstream msgbuf;
msgbuf << "Service: " << service->GetName() << ", candidates: " << candidates.size();
@ -150,8 +166,11 @@ void DelegationComponent::DelegationTimerHandler(void)
/* don't re-assign service if the checker is still valid
* and doesn't have too many services */
ObjectLock elock(oldEndpoint);
if (oldEndpoint && oldEndpoint->IsConnected() &&
find(candidates.begin(), candidates.end(), oldEndpoint) != candidates.end() &&
candidates.find(oldEndpoint) != candidates.end() &&
histogram[oldEndpoint] <= avg_services + overflow_tolerance)
continue;
@ -169,6 +188,7 @@ void DelegationComponent::DelegationTimerHandler(void)
if (histogram[candidate] > avg_services)
continue;
ObjectLock clock(candidate);
service->SetChecker(candidate->GetName());
histogram[candidate]++;

View File

@ -36,7 +36,7 @@ private:
void DelegationTimerHandler(void);
vector<Endpoint::Ptr> GetCheckerCandidates(const Service::Ptr& service) const;
set<Endpoint::Ptr> GetCheckerCandidates(const Service::Ptr& service) const;
static bool IsEndpointChecker(const Endpoint::Ptr& endpoint);

View File

@ -54,16 +54,16 @@ void DemoComponent::Stop(void)
*/
void DemoComponent::DemoTimerHandler(void)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
Logger::Write(LogInformation, "demo", "Sending multicast 'hello"
" world' message.");
RequestMessage request;
request.SetMethod("demo::HelloWorld");
EndpointManager::GetInstance()->SendMulticastMessage(m_Endpoint,
request);
EndpointManager::Ptr em = EndpointManager::GetInstance();
ObjectLock olock(em);
em->SendMulticastMessage(m_Endpoint, request);
}
/**

View File

@ -29,6 +29,8 @@ EXPORT_COMPONENT(notification, NotificationComponent);
void NotificationComponent::Start(void)
{
m_Endpoint = Endpoint::MakeEndpoint("notification", false);
ObjectLock olock(m_Endpoint);
m_Endpoint->RegisterTopicHandler("icinga::SendNotifications",
boost::bind(&NotificationComponent::SendNotificationsRequestHandler, this, _2,
_3));
@ -53,8 +55,6 @@ void NotificationComponent::Stop(void)
*/
void NotificationComponent::NotificationTimerHandler(void)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
// TODO: implement
}
@ -78,5 +78,7 @@ void NotificationComponent::SendNotificationsRequestHandler(const Endpoint::Ptr&
return;
Service::Ptr service = Service::GetByName(svc);
ObjectLock olock(service);
service->SendNotifications(static_cast<NotificationType>(type));
}

View File

@ -88,12 +88,8 @@ static bool LoadConfigFiles(bool validateOnly)
static void ReloadConfigTimerHandler(void)
{
if (g_ReloadConfig) {
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
Logger::Write(LogInformation, "icinga-app", "Received SIGHUP. Reloading config files.");
LoadConfigFiles(false);
}
Logger::Write(LogInformation, "icinga-app", "Received SIGHUP. Reloading config files.");
LoadConfigFiles(false);
g_ReloadConfig = false;
}

View File

@ -21,7 +21,6 @@
using namespace icinga;
recursive_mutex Application::m_Mutex;
Application *Application::m_Instance = NULL;
bool Application::m_ShuttingDown = false;
bool Application::m_Debugging = false;
@ -110,11 +109,6 @@ void Application::SetArgV(char **argv)
m_ArgV = argv;
}
void Application::NewTxTimerHandler(void)
{
DynamicObject::NewTx();
}
#ifdef _DEBUG
void Application::ProfileTimerHandler(void)
{
@ -142,12 +136,6 @@ void Application::RunEventLoop(void) const
thread t(&Application::TimeWatchThreadProc);
t.detach();
/* Set up a timer to periodically flush the tx. */
Timer::Ptr newTxTimer = boost::make_shared<Timer>();
newTxTimer->OnTimerExpired.connect(boost::bind(&Application::NewTxTimerHandler));
newTxTimer->SetInterval(0.5);
newTxTimer->Start();
/* Set up a timer that watches the m_Shutdown flag. */
Timer::Ptr shutdownTimer = boost::make_shared<Timer>();
shutdownTimer->OnTimerExpired.connect(boost::bind(&Application::ShutdownTimerHandler));
@ -162,7 +150,7 @@ void Application::RunEventLoop(void) const
flushTxTimer->Start();
#endif /* _DEBUG */
GetEQ().Run();
GetEQ().Join();
}
/**
@ -569,16 +557,6 @@ void Application::SetPkgDataDir(const String& path)
m_PkgDataDir = path;
}
/**
* Returns the global mutex.
*
* @returns The mutex.
*/
recursive_mutex& Application::GetMutex(void)
{
return m_Mutex;
}
/**
* Returns the main thread's event queue.
*

View File

@ -79,15 +79,12 @@ public:
static String GetPkgDataDir(void);
static void SetPkgDataDir(const String& path);
static recursive_mutex& GetMutex(void);
static EventQueue& GetEQ(void);
protected:
void RunEventLoop(void) const;
private:
static recursive_mutex m_Mutex; /**< The global mutex. */
static Application *m_Instance; /**< The application instance. */
static bool m_ShuttingDown; /**< Whether the application is in the process of

View File

@ -66,12 +66,7 @@ public:
void Start(const CompletionCallback& completionCallback = CompletionCallback())
{
m_CompletionCallback = completionCallback;
try {
Run();
} catch (...) {
FinishException(boost::current_exception());
}
Utility::QueueAsyncCallback(boost::bind(&AsyncTask<TClass, TResult>::Run, this));
}
/**

View File

@ -48,4 +48,4 @@ private:
}
#endif /* CONNECTION_H */
#endif /* CONNECTION_H */

View File

@ -59,9 +59,12 @@ struct DictionaryKeyLessComparer
*
* @param key The key whose value should be retrieved.
* @returns The value of an empty value if the key was not found.
* @threadsafety Always.
*/
Value Dictionary::Get(const char *key) const
{
ObjectLock olock(this);
map<String, Value>::const_iterator it;
it = std::lower_bound(m_Data.begin(), m_Data.end(), key, DictionaryKeyLessComparer());
@ -77,6 +80,7 @@ Value Dictionary::Get(const char *key) const
*
* @param key The key whose value should be retrieved.
* @returns The value or an empty value if the key was not found.
* @threadsafety Always.
*/
Value Dictionary::Get(const String& key) const
{
@ -88,9 +92,12 @@ Value Dictionary::Get(const String& key) const
*
* @param key The key.
* @param value The value.
* @threadsafety Always.
*/
void Dictionary::Set(const String& key, const Value& value)
{
ObjectLock olock(this);
if (value.IsEmpty()) {
Remove(key);
return;
@ -107,9 +114,12 @@ void Dictionary::Set(const String& key, const Value& value)
*
* @param value The value.
* @returns The key that was used to add the new item.
* @threadsafety Always.
*/
String Dictionary::Add(const Value& value)
{
ObjectLock olock(this);
Dictionary::Iterator it;
String key;
long index = GetLength();
@ -150,9 +160,12 @@ Dictionary::Iterator Dictionary::End(void)
* Returns the number of elements in the dictionary.
*
* @returns Number of elements.
* @threadsafety Always.
*/
size_t Dictionary::GetLength(void) const
{
ObjectLock olock(this);
return m_Data.size();
}
@ -161,9 +174,12 @@ size_t Dictionary::GetLength(void) const
*
* @param key The key.
* @returns true if the dictionary contains the key, false otherwise.
* @threadsafety Always.
*/
bool Dictionary::Contains(const String& key) const
{
ObjectLock olock(this);
return (m_Data.find(key) != m_Data.end());
}
@ -171,9 +187,12 @@ bool Dictionary::Contains(const String& key) const
* Removes the specified key from the dictionary.
*
* @param key The key.
* @threadsafety Always.
*/
void Dictionary::Remove(const String& key)
{
ObjectLock olock(this);
Dictionary::Iterator it;
it = m_Data.find(key);
@ -198,9 +217,12 @@ void Dictionary::Remove(Dictionary::Iterator it)
* Makes a shallow copy of a dictionary.
*
* @returns a copy of the dictionary.
* @threadsafety Always.
*/
Dictionary::Ptr Dictionary::ShallowClone(void) const
{
ObjectLock olock(this);
Dictionary::Ptr clone = boost::make_shared<Dictionary>();
String key;
@ -217,6 +239,7 @@ Dictionary::Ptr Dictionary::ShallowClone(void) const
*
* @param json The JSON object.
* @returns A dictionary that is equivalent to the JSON object.
* @threadsafety Always.
*/
Dictionary::Ptr Dictionary::FromJson(cJSON *json)
{
@ -237,12 +260,15 @@ Dictionary::Ptr Dictionary::FromJson(cJSON *json)
*
* @returns A JSON object that is equivalent to the dictionary. Values that
* cannot be represented in JSON are omitted.
* @threadsafety Always.
*/
cJSON *Dictionary::ToJson(void) const
{
cJSON *json = cJSON_CreateObject();
try {
ObjectLock olock(this);
String key;
Value value;
BOOST_FOREACH(tie(key, value), m_Data) {

View File

@ -23,7 +23,9 @@ using namespace icinga;
double DynamicObject::m_CurrentTx = 0;
set<DynamicObject *> DynamicObject::m_ModifiedObjects;
boost::mutex DynamicObject::m_ModifiedObjectsMutex;
boost::mutex DynamicObject::m_TransactionMutex;
boost::once_flag DynamicObject::m_TransactionOnce;
Timer::Ptr DynamicObject::m_TransactionTimer;
signals2::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnRegistered;
signals2::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnUnregistered;
@ -46,6 +48,8 @@ DynamicObject::DynamicObject(const Dictionary::Ptr& serializedObject)
* The DynamicObject::Create function takes care of restoring
* non-config state after the object has been fully constructed */
ApplyUpdate(serializedObject, Attribute_Config);
boost::call_once(m_TransactionOnce, &DynamicObject::Initialize);
}
/*
@ -53,10 +57,22 @@ DynamicObject::DynamicObject(const Dictionary::Ptr& serializedObject)
*/
DynamicObject::~DynamicObject(void)
{
boost::mutex::scoped_lock lock(m_ModifiedObjectsMutex);
boost::mutex::scoped_lock lock(m_TransactionMutex);
m_ModifiedObjects.erase(this);
}
void DynamicObject::Initialize(void)
{
/* Set up a timer to periodically create a new transaction. */
m_TransactionTimer = boost::make_shared<Timer>();
m_TransactionTimer->SetInterval(0.5);
m_TransactionTimer->OnTimerExpired.connect(boost::bind(&DynamicObject::NewTx));
m_TransactionTimer->Start();
}
/**
* @threadsafety Always.
*/
void DynamicObject::SendLocalUpdateEvents(void)
{
map<String, Value, string_iless>::iterator it;
@ -199,7 +215,7 @@ void DynamicObject::InternalSetAttribute(const String& name, const Value& data,
m_ConfigTx = tx;
{
boost::mutex::scoped_lock lock(m_ModifiedObjectsMutex);
boost::mutex::scoped_lock lock(m_TransactionMutex);
m_ModifiedObjects.insert(this);
}
@ -280,8 +296,6 @@ String DynamicObject::GetSource(void) const
void DynamicObject::Register(void)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
DynamicType::Ptr dtype = GetType();
DynamicObject::Ptr dobj = dtype->GetObject(GetName());
@ -302,9 +316,8 @@ void DynamicObject::Start(void)
void DynamicObject::Unregister(void)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
DynamicType::Ptr dtype = GetType();
ObjectLock olock(dtype);
if (!dtype || !dtype->GetObject(GetName()))
return;
@ -322,11 +335,16 @@ ScriptTask::Ptr DynamicObject::InvokeMethod(const String& method,
if (!value.IsObjectType<Dictionary>())
return ScriptTask::Ptr();
String funcName;
Dictionary::Ptr methods = value;
if (!methods->Contains(method))
return ScriptTask::Ptr();
String funcName = methods->Get(method);
{
ObjectLock olock(methods);
if (!methods->Contains(method))
return ScriptTask::Ptr();
funcName = methods->Get(method);
}
ScriptFunction::Ptr func = ScriptFunction::GetByName(funcName);
@ -344,8 +362,6 @@ ScriptTask::Ptr DynamicObject::InvokeMethod(const String& method,
*/
void DynamicObject::DumpObjects(const String& filename)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
Logger::Write(LogInformation, "base", "Dumping program state to file '" + filename + "'");
String tempFilename = filename + ".tmp";
@ -409,8 +425,6 @@ void DynamicObject::DumpObjects(const String& filename)
*/
void DynamicObject::RestoreObjects(const String& filename)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
Logger::Write(LogInformation, "base", "Restoring program state from file '" + filename + "'");
std::fstream fp;
@ -432,6 +446,7 @@ void DynamicObject::RestoreObjects(const String& filename)
bool hasConfig = update->Contains("configTx");
DynamicType::Ptr dt = DynamicType::GetByName(type);
ObjectLock dlock(dt);
if (!dt)
BOOST_THROW_EXCEPTION(invalid_argument("Invalid type: " + type));
@ -455,13 +470,8 @@ void DynamicObject::RestoreObjects(const String& filename)
Logger::Write(LogDebug, "base", msgbuf.str());
}
/*
* @threadsafety Always.
*/
void DynamicObject::DeactivateObjects(void)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
DynamicType::TypeMap::iterator tt;
for (tt = DynamicType::GetTypes().begin(); tt != DynamicType::GetTypes().end(); tt++) {
DynamicType::NameMap::iterator nt;
@ -479,7 +489,7 @@ void DynamicObject::DeactivateObjects(void)
*/
double DynamicObject::GetCurrentTx(void)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
boost::mutex::scoped_lock lock(m_TransactionMutex);
assert(m_CurrentTx != 0);
@ -487,29 +497,27 @@ double DynamicObject::GetCurrentTx(void)
}
/*
* @threadsafety Always.
* @threadsafety Always. Caller must not hold any Object locks.
*/
void DynamicObject::NewTx(void)
{
double tx;
set<DynamicObject *> objects;
{
boost::mutex::scoped_lock lock(m_ModifiedObjectsMutex);
boost::mutex::scoped_lock lock(m_TransactionMutex);
/* Some objects may accidentally bleed into the next transaction because
* we're not holding the global mutex while "stealing" the modified objects,
* but that's entirely ok. */
tx = m_CurrentTx;
m_ModifiedObjects.swap(objects);
m_CurrentTx = Utility::GetTime();
}
recursive_mutex::scoped_lock lock(Application::GetMutex());
BOOST_FOREACH(DynamicObject *object, objects) {
ObjectLock olock(object);
object->SendLocalUpdateEvents();
}
OnTransactionClosing(m_CurrentTx, objects);
m_CurrentTx = Utility::GetTime();
OnTransactionClosing(tx, objects);
}
void DynamicObject::OnInitCompleted(void)
@ -523,10 +531,12 @@ void DynamicObject::OnAttributeChanged(const String&, const Value&)
*/
DynamicObject::Ptr DynamicObject::GetObject(const String& type, const String& name)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
DynamicType::Ptr dtype = DynamicType::GetByName(type);
return dtype->GetObject(name);
{
ObjectLock olock(dtype);
return dtype->GetObject(name);
}
}
const DynamicObject::AttributeMap& DynamicObject::GetAttributes(void) const

View File

@ -81,6 +81,8 @@ public:
DynamicObject(const Dictionary::Ptr& serializedObject);
~DynamicObject(void);
static void Initialize(void);
Dictionary::Ptr BuildUpdate(double sinceTx, int attributeTypes) const;
void ApplyUpdate(const Dictionary::Ptr& serializedUpdate, int allowedTypes);
@ -147,7 +149,9 @@ private:
/* This has to be a set of raw pointers because the DynamicObject
* constructor has to be able to insert objects into this list. */
static set<DynamicObject *> m_ModifiedObjects;
static boost::mutex m_ModifiedObjectsMutex;
static boost::mutex m_TransactionMutex;
static boost::once_flag m_TransactionOnce;
static Timer::Ptr m_TransactionTimer;
friend class DynamicType; /* for OnInitCompleted. */
};

View File

@ -21,8 +21,6 @@
using namespace icinga;
boost::mutex DynamicType::m_Mutex;
DynamicType::DynamicType(const String& name, const DynamicType::ObjectFactory& factory)
: m_Name(name), m_ObjectFactory(factory)
{ }
@ -32,7 +30,7 @@ DynamicType::DynamicType(const String& name, const DynamicType::ObjectFactory& f
*/
DynamicType::Ptr DynamicType::GetByName(const String& name)
{
boost::mutex::scoped_lock lock(m_Mutex);
boost::mutex::scoped_lock lock(GetStaticMutex());
DynamicType::TypeMap::const_iterator tt = GetTypes().find(name);
@ -43,7 +41,7 @@ DynamicType::Ptr DynamicType::GetByName(const String& name)
}
/**
* @threadsafety Caller must hold DynamicType::m_Mutex while using the map.
* @threadsafety Caller must hold DynamicType::GetStaticMutex() while using the map.
*/
DynamicType::TypeMap& DynamicType::GetTypes(void)
{
@ -52,7 +50,7 @@ DynamicType::TypeMap& DynamicType::GetTypes(void)
}
/**
* @threadsafety Caller must hold DynamicType::m_Mutex while using the map.
* @threadsafety Caller must hold DynamicType::GetStaticMutex() while using the map.
*/
DynamicType::NameMap& DynamicType::GetObjects(void)
{
@ -89,7 +87,7 @@ DynamicObject::Ptr DynamicType::GetObject(const String& name) const
*/
void DynamicType::RegisterType(const DynamicType::Ptr& type)
{
boost::mutex::scoped_lock lock(m_Mutex);
boost::mutex::scoped_lock lock(GetStaticMutex());
DynamicType::TypeMap::const_iterator tt = GetTypes().find(type->GetName());
@ -147,3 +145,9 @@ void DynamicType::AddAttributes(const AttributeDescription *attributes, int attr
for (int i = 0; i < attributeCount; i++)
AddAttribute(attributes[i].Name, attributes[i].Type);
}
boost::mutex& DynamicType::GetStaticMutex(void)
{
static boost::mutex mutex;
return mutex;
}

View File

@ -64,12 +64,13 @@ public:
void AddAttributes(const AttributeDescription *attributes, int attributeCount);
private:
static boost::mutex m_Mutex;
String m_Name;
ObjectFactory m_ObjectFactory;
map<String, DynamicAttributeType> m_Attributes;
NameMap m_Objects;
static boost::mutex& GetStaticMutex(void);
};
/**

View File

@ -26,7 +26,15 @@ using namespace icinga;
*/
EventQueue::EventQueue(void)
: m_Stopped(false)
{ }
{
int cpus = thread::hardware_concurrency();
if (cpus < 4)
cpus = 4;
for (int i = 0; i < cpus; i++)
m_Threads.create_thread(boost::bind(&EventQueue::QueueThreadProc, this));
}
/**
* @threadsafety Always.
@ -34,6 +42,7 @@ EventQueue::EventQueue(void)
EventQueue::~EventQueue(void)
{
Stop();
Join();
}
/**
@ -47,23 +56,13 @@ void EventQueue::Stop(void)
}
/**
* Spawns worker threads and waits for them to complete.
* Waits for all worker threads to finish.
*
* @threadsafety Always.
*/
void EventQueue::Run(void)
void EventQueue::Join(void)
{
thread_group threads;
int cpus = thread::hardware_concurrency();
if (cpus == 0)
cpus = 4;
for (int i = 0; i < cpus * 4; i++)
threads.create_thread(boost::bind(&EventQueue::QueueThreadProc, this));
threads.join_all();
m_Threads.join_all();
}
/**

View File

@ -36,13 +36,13 @@ public:
EventQueue(void);
~EventQueue(void);
void Run(void);
void Stop(void);
void Join(void);
void Post(const Callback& callback);
void Stop(void);
private:
boost::thread::id m_Owner;
thread_group m_Threads;
boost::mutex m_Mutex;
condition_variable m_CV;

View File

@ -81,10 +81,7 @@ void Logger::Write(LogSeverity severity, const String& facility,
entry.Facility = facility;
entry.Message = message;
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
ForwardLogEntry(entry);
}
ForwardLogEntry(entry);
}
/**
@ -113,13 +110,21 @@ void Logger::ForwardLogEntry(const LogEntry& entry)
DynamicType::Ptr dt = DynamicType::GetByName("Logger");
DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
Logger::Ptr logger = dynamic_pointer_cast<Logger>(object);
if (entry.Severity >= logger->GetMinSeverity())
logger->m_Impl->ProcessLogEntry(entry);
{
ObjectLock olock(dt);
BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
Logger::Ptr logger = dynamic_pointer_cast<Logger>(object);
processed = true;
{
ObjectLock llock(logger);
if (entry.Severity >= logger->GetMinSeverity())
logger->m_Impl->ProcessLogEntry(entry);
}
processed = true;
}
}
LogSeverity defaultLogLevel;

View File

@ -37,9 +37,11 @@ Object::~Object(void)
* Returns a reference-counted pointer to this object.
*
* @returns A shared_ptr object that points to this object
* @threadsafety Always.
*/
Object::SharedPtrHolder Object::GetSelf(void)
{
ObjectLock olock(this);
return Object::SharedPtrHolder(shared_from_this());
}
@ -50,7 +52,7 @@ Object::SharedPtrHolder Object::GetSelf(void)
* @returns The object's mutex.
* @threadsafety Always.
*/
recursive_mutex& Object::GetMutex(void)
recursive_mutex& Object::GetMutex(void) const
{
return m_Mutex;
}

View File

@ -93,7 +93,7 @@ public:
SharedPtrHolder GetSelf(void);
recursive_mutex& GetMutex(void);
recursive_mutex& GetMutex(void) const;
protected:
Object(void);
@ -103,7 +103,7 @@ private:
Object(const Object& other);
Object& operator=(const Object& rhs);
recursive_mutex m_Mutex;
mutable recursive_mutex m_Mutex;
};
/**
@ -112,15 +112,35 @@ private:
struct ObjectLock {
public:
ObjectLock(const Object::Ptr& object)
: m_Lock(object->GetMutex())
{ }
#ifdef _DEBUG
: m_Lock(), m_Object(object)
#endif /* _DEBUG */
{
if (object)
m_Lock = recursive_mutex::scoped_lock(object->GetMutex());
}
ObjectLock(Object *object)
: m_Lock(object->GetMutex())
{ }
ObjectLock(const Object *object)
#ifdef _DEBUG
: m_Lock(), m_Object(object->GetSelf())
#endif /* _DEBUG */
{
if (object)
m_Lock = recursive_mutex::scoped_lock(object->GetMutex());
}
#ifdef _DEBUG
~ObjectLock(void)
{
assert(m_Object.lock());
}
#endif /* _DEBUG */
private:
recursive_mutex::scoped_lock m_Lock;
#ifdef _DEBUG
Object::WeakPtr m_Object;
#endif /* _DEBUG */
};
/**

View File

@ -48,4 +48,4 @@ private:
}
#endif /* STDIOSTREAM_H */
#endif /* STDIOSTREAM_H */

View File

@ -82,6 +82,8 @@ void Timer::Call(void)
Logger::Write(LogWarning, "base", msgbuf.str());
}
/* Re-enable the timer so it can be called again. */
m_Started = true;
Reschedule();
}
@ -118,6 +120,8 @@ void Timer::Start(void)
{
boost::call_once(&Timer::Initialize, m_ThreadOnce);
m_Started = true;
Reschedule();
}
@ -129,6 +133,8 @@ void Timer::Start(void)
void Timer::Stop(void)
{
boost::mutex::scoped_lock lock(m_Mutex);
m_Started = false;
m_Timers.erase(GetSelf());
/* Notify the worker thread that we've disabled a timer. */
@ -158,12 +164,14 @@ void Timer::Reschedule(double next)
m_Next = next;
/* Remove and re-add the timer to update the index. */
m_Timers.erase(GetSelf());
m_Timers.insert(GetSelf());
if (m_Started) {
/* Remove and re-add the timer to update the index. */
m_Timers.erase(GetSelf());
m_Timers.insert(GetSelf());
/* Notify the worker that we've rescheduled a timer. */
m_CV.notify_all();
/* Notify the worker that we've rescheduled a timer. */
m_CV.notify_all();
}
}
/**
@ -250,6 +258,7 @@ void Timer::TimerThreadProc(void)
/* Remove the timer from the list so it doesn't get called again
* until the current call is completed. */
timer->m_Started = false;
m_Timers.erase(timer);
/* Asynchronously call the timer. */

View File

@ -67,6 +67,7 @@ public:
private:
double m_Interval; /**< The interval of the timer. */
double m_Next; /**< When the next event should happen. */
bool m_Started; /**< Whether the timer is enabled. */
typedef multi_index_container<
Timer::WeakPtr,

View File

@ -548,3 +548,8 @@ void Utility::SetNonBlockingSocket(SOCKET s)
ioctlsocket(s, FIONBIO, &lTrue);
#endif /* _WIN32 */
}
void Utility::QueueAsyncCallback(const boost::function<void (void)>& callback)
{
Application::GetEQ().Post(callback);
}

View File

@ -58,6 +58,8 @@ public:
static bool Glob(const String& pathSpec, const function<void (const String&)>& callback);
static void QueueAsyncCallback(const boost::function<void (void)>& callback);
static
#ifdef _WIN32
HMODULE

View File

@ -120,7 +120,17 @@ Dictionary::Ptr ConfigItem::Link(void) const
void ConfigItem::InternalLink(const Dictionary::Ptr& dictionary) const
{
BOOST_FOREACH(const String& name, m_Parents) {
ConfigItem::Ptr parent = ConfigItem::GetObject(GetType(), name);
ConfigItem::Ptr parent;
ConfigCompilerContext *context = ConfigCompilerContext::GetContext();
if (context)
parent = context->GetItem(GetType(), name);
/* ignore already active objects while we're in the compiler
* context and linking to existing items is disabled. */
if (!parent && (!context || (context->GetFlags() & CompilerLinkExisting)))
parent = ConfigItem::GetObject(GetType(), name);
if (!parent) {
stringstream message;
@ -158,6 +168,7 @@ DynamicObject::Ptr ConfigItem::Commit(void)
if (it != m_Items.end()) {
/* Unregister the old item from its parents. */
ConfigItem::Ptr oldItem = it->second;
ObjectLock olock(oldItem);
oldItem->UnregisterFromParents();
/* Steal the old item's children. */
@ -167,6 +178,7 @@ DynamicObject::Ptr ConfigItem::Commit(void)
/* Register this item with its parents. */
BOOST_FOREACH(const String& parentName, m_Parents) {
ConfigItem::Ptr parent = GetObject(GetType(), parentName);
ObjectLock olock(parent);
parent->RegisterChild(GetSelf());
}
@ -196,20 +208,32 @@ DynamicObject::Ptr ConfigItem::Commit(void)
/* Update or create the object and apply the configuration settings. */
DynamicObject::Ptr dobj = m_DynamicObject.lock();
if (!dobj)
if (!dobj) {
ObjectLock dlock(dtype);
dobj = dtype->GetObject(GetName());
}
if (!dobj)
bool was_null = false;
if (!dobj) {
ObjectLock dlock(dtype);
dobj = dtype->CreateObject(update);
else
dobj->ApplyUpdate(update, Attribute_Config);
was_null = true;
}
m_DynamicObject = dobj;
{
ObjectLock olock(dobj);
if (dobj->IsAbstract())
dobj->Unregister();
else
dobj->Register();
if (!was_null)
dobj->ApplyUpdate(update, Attribute_Config);
m_DynamicObject = dobj;
if (dobj->IsAbstract())
dobj->Unregister();
else
dobj->Register();
}
/* We need to make a copy of the child objects because the
* OnParentCommitted() handler is going to update the list. */
@ -222,6 +246,7 @@ DynamicObject::Ptr ConfigItem::Commit(void)
if (!child)
continue;
ObjectLock olock(child);
child->OnParentCommitted();
}
@ -237,8 +262,10 @@ void ConfigItem::Unregister(void)
{
DynamicObject::Ptr dobj = m_DynamicObject.lock();
if (dobj)
if (dobj) {
ObjectLock olock(dobj);
dobj->Unregister();
}
ConfigItem::ItemMap::iterator it;
it = m_Items.find(make_pair(GetType(), GetName()));
@ -266,8 +293,10 @@ void ConfigItem::UnregisterFromParents(void)
BOOST_FOREACH(const String& parentName, m_Parents) {
ConfigItem::Ptr parent = GetObject(GetType(), parentName);
if (parent)
if (parent) {
ObjectLock olock(parent);
parent->UnregisterChild(GetSelf());
}
}
}
@ -300,24 +329,6 @@ DynamicObject::Ptr ConfigItem::GetDynamicObject(void) const
*/
ConfigItem::Ptr ConfigItem::GetObject(const String& type, const String& name)
{
{
recursive_mutex::scoped_lock lockg(Application::GetMutex());
ConfigCompilerContext *context = ConfigCompilerContext::GetContext();
if (context) {
ConfigItem::Ptr item = context->GetItem(type, name);
if (item)
return item;
/* ignore already active objects while we're in the compiler
* context and linking to existing items is disabled. */
if ((context->GetFlags() & CompilerLinkExisting) == 0)
return ConfigItem::Ptr();
}
}
{
boost::mutex::scoped_lock lock(m_Mutex);
@ -362,7 +373,7 @@ void ConfigItem::Dump(ostream& fp) const
}
/**
* @threadsafety Caller must hold the global mutex.
* @threadsafety Always.
*/
void ConfigItem::UnloadUnit(const String& unit)
{
@ -372,15 +383,22 @@ void ConfigItem::UnloadUnit(const String& unit)
vector<ConfigItem::Ptr> obsoleteItems;
ConfigItem::Ptr item;
BOOST_FOREACH(tie(tuples::ignore, item), m_Items) {
if (item->GetUnit() != unit)
continue;
{
boost::mutex::scoped_lock lock(m_Mutex);
obsoleteItems.push_back(item);
ConfigItem::Ptr item;
BOOST_FOREACH(tie(tuples::ignore, item), m_Items) {
ObjectLock olock(item);
if (item->GetUnit() != unit)
continue;
obsoleteItems.push_back(item);
}
}
BOOST_FOREACH(item, obsoleteItems) {
BOOST_FOREACH(const ConfigItem::Ptr& item, obsoleteItems) {
ObjectLock olock(item);
item->Unregister();
}
}

View File

@ -106,7 +106,17 @@ ConfigItem::Ptr ConfigItemBuilder::Compile(void)
}
BOOST_FOREACH(const String& parent, m_Parents) {
ConfigItem::Ptr item = ConfigItem::GetObject(m_Type, parent);
ConfigItem::Ptr item;
ConfigCompilerContext *context = ConfigCompilerContext::GetContext();
if (context)
item = context->GetItem(m_Type, parent);
/* ignore already active objects while we're in the compiler
* context and linking to existing items is disabled. */
if (!item && (!context || (context->GetFlags() & CompilerLinkExisting)))
item = ConfigItem::GetObject(m_Type, parent);
if (!item) {
stringstream msgbuf;

View File

@ -79,11 +79,7 @@ void ExternalCommandProcessor::Execute(double time, const String& command, const
callback = it->second;
}
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
callback(time, arguments);
}
callback(time, arguments);
}
/**
@ -723,7 +719,8 @@ void ExternalCommandProcessor::ScheduleServicegroupHostDowntime(double, const ve
set<Service::Ptr> services;
BOOST_FOREACH(const Service::Ptr& service, sg->GetMembers()) {
Service::Ptr hcService = service->GetHost()->GetHostCheckService();
Host::Ptr host = service->GetHost();
Service::Ptr hcService = host->GetHostCheckService();
if (hcService)
services.insert(hcService);
}
@ -765,9 +762,8 @@ void ExternalCommandProcessor::AddHostComment(double, const vector<String>& argu
Logger::Write(LogInformation, "icinga", "Creating comment for host " + host->GetName());
Service::Ptr service = host->GetHostCheckService();
if (service) {
if (service)
(void) service->AddComment(CommentUser, arguments[2], arguments[3], 0);
}
}
void ExternalCommandProcessor::DelHostComment(double, const vector<String>& arguments)
@ -813,9 +809,8 @@ void ExternalCommandProcessor::DelAllHostComments(double, const vector<String>&
Logger::Write(LogInformation, "icinga", "Removing all comments for host " + host->GetName());
Service::Ptr service = host->GetHostCheckService();
if (service) {
if (service)
service->RemoveAllComments();
}
}
void ExternalCommandProcessor::DelAllSvcComments(double, const vector<String>& arguments)

View File

@ -352,7 +352,19 @@ void Host::ValidateServiceDictionary(const ScriptTask::Ptr& task, const vector<V
continue;
}
if (!ConfigItem::GetObject("Service", name)) {
ConfigItem::Ptr item;
ConfigCompilerContext *context = ConfigCompilerContext::GetContext();
if (context)
item = context->GetItem("Service", name);
/* ignore already active objects while we're in the compiler
* context and linking to existing items is disabled. */
if (!item && (!context || (context->GetFlags() & CompilerLinkExisting)))
item = ConfigItem::GetObject("Service", name);
if (!item) {
ConfigCompilerContext::GetContext()->AddError(false, "Validation failed for " +
location + ": Service '" + name + "' not found.");
}

View File

@ -27,13 +27,8 @@ PluginCheckTask::PluginCheckTask(const ScriptTask::Ptr& task, const Process::Ptr
: m_Task(task), m_Process(process)
{ }
/**
* @threadsafety Always.
*/
void PluginCheckTask::ScriptFunc(const ScriptTask::Ptr& task, const vector<Value>& arguments)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
if (arguments.size() < 1)
BOOST_THROW_EXCEPTION(invalid_argument("Missing argument: Service must be specified."));
@ -41,17 +36,34 @@ void PluginCheckTask::ScriptFunc(const ScriptTask::Ptr& task, const vector<Value
if (!vservice.IsObjectType<Service>())
BOOST_THROW_EXCEPTION(invalid_argument("Argument must be a service."));
Service::Ptr service = vservice;
vector<Dictionary::Ptr> macroDicts;
macroDicts.push_back(service->GetMacros());
macroDicts.push_back(service->CalculateDynamicMacros());
macroDicts.push_back(service->GetHost()->GetMacros());
macroDicts.push_back(service->GetHost()->CalculateDynamicMacros());
macroDicts.push_back(IcingaApplication::GetInstance()->GetMacros());
Value raw_command;
Host::Ptr host;
{
Service::Ptr service = vservice;
ObjectLock olock(service);
macroDicts.push_back(service->GetMacros());
macroDicts.push_back(service->CalculateDynamicMacros());
raw_command = service->GetCheckCommand();
host = service->GetHost();
}
{
ObjectLock olock(host);
macroDicts.push_back(host->GetMacros());
macroDicts.push_back(host->CalculateDynamicMacros());
}
{
IcingaApplication::Ptr app = IcingaApplication::GetInstance();
ObjectLock olock(app);
macroDicts.push_back(app->GetMacros());
}
Dictionary::Ptr macros = MacroProcessor::MergeMacroDicts(macroDicts);
Value command = MacroProcessor::ResolveMacros(service->GetCheckCommand(), macros);
Value command = MacroProcessor::ResolveMacros(raw_command, macros);
Process::Ptr process = boost::make_shared<Process>(Process::SplitCommand(command), macros);
@ -60,13 +72,8 @@ void PluginCheckTask::ScriptFunc(const ScriptTask::Ptr& task, const vector<Value
process->Start(boost::bind(&PluginCheckTask::ProcessFinishedHandler, ct));
}
/**
* @threadsafety Always.
*/
void PluginCheckTask::ProcessFinishedHandler(PluginCheckTask ct)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
ProcessResult pr;
try {

View File

@ -33,8 +33,6 @@ PluginNotificationTask::PluginNotificationTask(const ScriptTask::Ptr& task, cons
*/
void PluginNotificationTask::ScriptFunc(const ScriptTask::Ptr& task, const vector<Value>& arguments)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
if (arguments.size() < 1)
BOOST_THROW_EXCEPTION(invalid_argument("Missing argument: Notification target must be specified."));
@ -44,23 +42,49 @@ void PluginNotificationTask::ScriptFunc(const ScriptTask::Ptr& task, const vecto
if (!arguments[0].IsObjectType<Notification>())
BOOST_THROW_EXCEPTION(invalid_argument("Argument must be a service."));
Notification::Ptr notification = arguments[0];
NotificationType type = static_cast<NotificationType>(static_cast<int>(arguments[1]));
vector<Dictionary::Ptr> macroDicts;
macroDicts.push_back(notification->GetMacros());
macroDicts.push_back(notification->GetService()->GetMacros());
macroDicts.push_back(notification->GetService()->CalculateDynamicMacros());
macroDicts.push_back(notification->GetService()->GetHost()->GetMacros());
macroDicts.push_back(notification->GetService()->GetHost()->CalculateDynamicMacros());
macroDicts.push_back(IcingaApplication::GetInstance()->GetMacros());
Value raw_command;
Service::Ptr service;
Host::Ptr host;
String service_name;
{
Notification::Ptr notification = arguments[0];
ObjectLock olock(notification);
macroDicts.push_back(notification->GetMacros());
raw_command = notification->GetNotificationCommand();
service = notification->GetService();
}
{
ObjectLock olock(service);
macroDicts.push_back(service->GetMacros());
macroDicts.push_back(service->CalculateDynamicMacros());
service_name = service->GetName();
host = service->GetHost();
}
{
ObjectLock olock(host);
macroDicts.push_back(host->GetMacros());
macroDicts.push_back(host->CalculateDynamicMacros());
}
{
IcingaApplication::Ptr app = IcingaApplication::GetInstance();
ObjectLock olock(app);
macroDicts.push_back(app->GetMacros());
}
Dictionary::Ptr macros = MacroProcessor::MergeMacroDicts(macroDicts);
Value command = MacroProcessor::ResolveMacros(notification->GetNotificationCommand(), macros);
Value command = MacroProcessor::ResolveMacros(raw_command, macros);
Process::Ptr process = boost::make_shared<Process>(Process::SplitCommand(command), macros);
PluginNotificationTask ct(task, process, notification->GetService()->GetName(), command);
PluginNotificationTask ct(task, process, service_name, command);
process->Start(boost::bind(&PluginNotificationTask::ProcessFinishedHandler, ct));
}
@ -70,8 +94,6 @@ void PluginNotificationTask::ScriptFunc(const ScriptTask::Ptr& task, const vecto
*/
void PluginNotificationTask::ProcessFinishedHandler(PluginNotificationTask ct)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
ProcessResult pr;
try {

View File

@ -535,7 +535,9 @@ void Service::ProcessCheckResult(const Dictionary::Ptr& cr)
rm.SetParams(params);
EndpointManager::GetInstance()->SendMulticastMessage(rm);
EndpointManager::Ptr em = EndpointManager::GetInstance();
ObjectLock olock(em);
em->SendMulticastMessage(rm);
}
void Service::UpdateStatistics(const Dictionary::Ptr& cr)

View File

@ -210,11 +210,13 @@ void Service::RemoveExpiredComments(void)
void Service::CommentsExpireTimerHandler(void)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
DynamicType::Ptr dt = DynamicType::GetByName("Service");
ObjectLock dlock(dt);
DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Service")->GetObjects()) {
BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
Service::Ptr service = dynamic_pointer_cast<Service>(object);
ObjectLock olock(service);
service->RemoveExpiredComments();
}
}

View File

@ -275,11 +275,13 @@ void Service::RemoveExpiredDowntimes(void)
void Service::DowntimesExpireTimerHandler(void)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
DynamicType::Ptr dt = DynamicType::GetByName("Service");
ObjectLock dlock(dt);
DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Service")->GetObjects()) {
BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
Service::Ptr service = dynamic_pointer_cast<Service>(object);
ObjectLock slock(service);
service->RemoveExpiredDowntimes();
}
}

View File

@ -94,9 +94,8 @@ Service::Ptr Service::GetByName(const String& name)
Service::Ptr Service::GetByNamePair(const String& hostName, const String& serviceName)
{
if (!hostName.IsEmpty()) {
recursive_mutex::scoped_lock lock(Application::GetMutex());
Host::Ptr host = Host::GetByName(hostName);
ObjectLock olock(host);
return host->GetServiceByShortName(serviceName);
} else {
return Service::GetByName(serviceName);

View File

@ -365,11 +365,7 @@ PyObject *PythonLanguage::PyRegisterFunction(PyObject *self, PyObject *args)
return NULL;
}
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
interp->RegisterPythonFunction(name, object);
}
interp->RegisterPythonFunction(name, object);
Py_INCREF(Py_None);
return Py_None;

View File

@ -277,7 +277,7 @@ void Endpoint::ProcessRequest(const Endpoint::Ptr& sender, const RequestMessage&
if (it == m_TopicHandlers.end())
return;
(*it->second)(GetSelf(), sender, request);
Application::GetEQ().Post(boost::bind(boost::ref(*it->second), GetSelf(), sender, request));
} else {
GetClient()->SendMessage(request);
}
@ -360,4 +360,3 @@ String Endpoint::GetService(void) const
{
return Get("service");
}

View File

@ -325,8 +325,6 @@ bool EndpointManager::RequestTimeoutLessComparer(const pair<String, PendingReque
void EndpointManager::SubscriptionTimerHandler(void)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
Dictionary::Ptr subscriptions = boost::make_shared<Dictionary>();
DynamicObject::Ptr object;
@ -351,8 +349,6 @@ void EndpointManager::SubscriptionTimerHandler(void)
void EndpointManager::ReconnectTimerHandler(void)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Endpoint")->GetObjects()) {
Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
@ -377,8 +373,6 @@ void EndpointManager::ReconnectTimerHandler(void)
void EndpointManager::RequestTimerHandler(void)
{
recursive_mutex::scoped_lock lock(Application::GetMutex());
map<String, PendingRequest>::iterator it;
for (it = m_Requests.begin(); it != m_Requests.end(); it++) {
if (it->second.HasTimedOut()) {