Refactored object locking code.

This commit is contained in:
Gunnar Beutner 2013-03-01 12:07:52 +01:00
parent 365b7a7ba3
commit 572a477da3
52 changed files with 618 additions and 300 deletions

View File

@ -27,9 +27,13 @@ void CheckerComponent::Start(void)
{ {
m_Endpoint = Endpoint::MakeEndpoint("checker", false); m_Endpoint = Endpoint::MakeEndpoint("checker", false);
/* dummy registration so the delegation module knows this is a checker {
TODO: figure out a better way for this */ ObjectLock olock(m_Endpoint);
m_Endpoint->RegisterSubscription("checker");
/* dummy registration so the delegation module knows this is a checker
TODO: figure out a better way for this */
m_Endpoint->RegisterSubscription("checker");
}
Service::OnCheckerChanged.connect(bind(&CheckerComponent::CheckerChangedHandler, this, _1)); Service::OnCheckerChanged.connect(bind(&CheckerComponent::CheckerChangedHandler, this, _1));
Service::OnNextCheckChanged.connect(bind(&CheckerComponent::NextCheckChangedHandler, this, _1)); Service::OnNextCheckChanged.connect(bind(&CheckerComponent::NextCheckChangedHandler, this, _1));
@ -46,7 +50,10 @@ void CheckerComponent::Start(void)
void CheckerComponent::Stop(void) void CheckerComponent::Stop(void)
{ {
m_Endpoint->Unregister(); {
ObjectLock olock(m_Endpoint);
m_Endpoint->Unregister();
}
{ {
boost::mutex::scoped_lock lock(m_Mutex); boost::mutex::scoped_lock lock(m_Mutex);
@ -74,12 +81,13 @@ void CheckerComponent::CheckThreadProc(void)
CheckTimeView::iterator it = idx.begin(); CheckTimeView::iterator it = idx.begin();
Service::Ptr service = *it; Service::Ptr service = *it;
ObjectLock olock(service); /* also required for the key extractor. */
if (!service->IsRegistered()) { if (!service->IsRegistered()) {
idx.erase(it); idx.erase(it);
continue; continue;
} }
ObjectLock olock(service); /* also required for the key extractor. */
double wait; double wait;
{ {
@ -129,7 +137,15 @@ void CheckerComponent::CheckThreadProc(void)
try { try {
olock.Unlock(); olock.Unlock();
Service::BeginExecuteCheck(service, boost::bind(&CheckerComponent::CheckCompletedHandler, static_cast<CheckerComponent::Ptr>(GetSelf()), service));
CheckerComponent::Ptr self;
{
ObjectLock olock(this);
self = GetSelf();
}
Service::BeginExecuteCheck(service, boost::bind(&CheckerComponent::CheckCompletedHandler, static_cast<CheckerComponent::Ptr>(self), service));
} catch (const exception& ex) { } catch (const exception& ex) {
olock.Lock(); olock.Lock();
Logger::Write(LogCritical, "checker", "Exception occured while checking service '" + service->GetName() + "': " + diagnostic_information(ex)); Logger::Write(LogCritical, "checker", "Exception occured while checking service '" + service->GetName() + "': " + diagnostic_information(ex));
@ -178,7 +194,16 @@ void CheckerComponent::CheckerChangedHandler(const Service::Ptr& service)
ObjectLock olock(service); /* also required for the key extractor */ ObjectLock olock(service); /* also required for the key extractor */
String checker = service->GetCurrentChecker(); String checker = service->GetCurrentChecker();
if (checker == EndpointManager::GetInstance()->GetIdentity() || checker == m_Endpoint->GetName()) { EndpointManager::Ptr em = EndpointManager::GetInstance();
String identity;
{
ObjectLock elock(em);
identity = em->GetIdentity();
}
if (checker == identity || Endpoint::GetByName(checker) == m_Endpoint) {
if (m_PendingServices.find(service) != m_PendingServices.end()) if (m_PendingServices.find(service) != m_PendingServices.end())
return; return;

View File

@ -36,7 +36,11 @@ REGISTER_COMPONENT("compat", CompatComponent);
*/ */
String CompatComponent::GetStatusPath(void) const String CompatComponent::GetStatusPath(void) const
{ {
Value statusPath = GetConfig()->Get("status_path"); DynamicObject::Ptr config = GetConfig();
ObjectLock olock(config);
Value statusPath = config->Get("status_path");
if (statusPath.IsEmpty()) if (statusPath.IsEmpty())
return Application::GetLocalStateDir() + "/cache/icinga2/status.dat"; return Application::GetLocalStateDir() + "/cache/icinga2/status.dat";
else else
@ -50,7 +54,11 @@ String CompatComponent::GetStatusPath(void) const
*/ */
String CompatComponent::GetObjectsPath(void) const String CompatComponent::GetObjectsPath(void) const
{ {
Value objectsPath = GetConfig()->Get("objects_path"); DynamicObject::Ptr config = GetConfig();
ObjectLock olock(config);
Value objectsPath = config->Get("objects_path");
if (objectsPath.IsEmpty()) if (objectsPath.IsEmpty())
return Application::GetLocalStateDir() + "/cache/icinga2/objects.cache"; return Application::GetLocalStateDir() + "/cache/icinga2/objects.cache";
else else
@ -64,7 +72,11 @@ String CompatComponent::GetObjectsPath(void) const
*/ */
String CompatComponent::GetLogPath(void) const String CompatComponent::GetLogPath(void) const
{ {
Value logPath = GetConfig()->Get("log_path"); DynamicObject::Ptr config = GetConfig();
ObjectLock olock(config);
Value logPath = config->Get("log_path");
if (logPath.IsEmpty()) if (logPath.IsEmpty())
return Application::GetLocalStateDir() + "/log/icinga2/compat"; return Application::GetLocalStateDir() + "/log/icinga2/compat";
else else
@ -78,7 +90,11 @@ String CompatComponent::GetLogPath(void) const
*/ */
String CompatComponent::GetCommandPath(void) const String CompatComponent::GetCommandPath(void) const
{ {
Value commandPath = GetConfig()->Get("command_path"); DynamicObject::Ptr config = GetConfig();
ObjectLock olock(config);
Value commandPath = config->Get("command_path");
if (commandPath.IsEmpty()) if (commandPath.IsEmpty())
return Application::GetLocalStateDir() + "/run/icinga.cmd"; return Application::GetLocalStateDir() + "/run/icinga.cmd";
else else
@ -214,31 +230,48 @@ void CompatComponent::DumpComments(ostream& fp, const Service::Ptr& owner, Compa
void CompatComponent::DumpDowntimes(ostream& fp, const Service::Ptr& owner, CompatObjectType type) void CompatComponent::DumpDowntimes(ostream& fp, const Service::Ptr& owner, CompatObjectType type)
{ {
ObjectLock olock(owner); Dictionary::Ptr downtimes;
String short_name, host_name;
Host::Ptr host;
Dictionary::Ptr downtimes = owner->GetDowntimes(); {
ObjectLock olock(owner);
downtimes = owner->GetDowntimes();
short_name = owner->GetShortName();
host = owner->GetHost();
}
{
ObjectLock olock(host);
host_name = host->GetName();
}
if (!downtimes) if (!downtimes)
return; return;
ObjectLock dlock(downtimes);
String id; String id;
Dictionary::Ptr downtime; Dictionary::Ptr downtime;
BOOST_FOREACH(tie(id, downtime), downtimes) { BOOST_FOREACH(tie(id, downtime), downtimes) {
if (Service::IsDowntimeExpired(downtime)) if (Service::IsDowntimeExpired(downtime))
continue; continue;
ObjectLock olock(downtime);
if (type == CompatTypeHost) if (type == CompatTypeHost)
fp << "hostdowntime {" << "\n"; fp << "hostdowntime {" << "\n";
else else
fp << "servicedowntime {" << "\n" fp << "servicedowntime {" << "\n"
<< "\t" << "service_description=" << owner->GetShortName() << "\n"; << "\t" << "service_description=" << short_name << "\n";
Dictionary::Ptr triggeredByObj = Service::GetDowntimeByID(downtime->Get("triggered_by")); Dictionary::Ptr triggeredByObj = Service::GetDowntimeByID(downtime->Get("triggered_by"));
int triggeredByLegacy = 0; int triggeredByLegacy = 0;
if (triggeredByObj) if (triggeredByObj)
triggeredByLegacy = triggeredByObj->Get("legacy_id"); triggeredByLegacy = triggeredByObj->Get("legacy_id");
fp << "\t" << "host_name=" << owner->GetHost()->GetName() << "\n" fp << "\t" << "host_name=" << host_name << "\n"
<< "\t" << "downtime_id=" << static_cast<String>(downtime->Get("legacy_id")) << "\n" << "\t" << "downtime_id=" << static_cast<String>(downtime->Get("legacy_id")) << "\n"
<< "\t" << "entry_time=" << static_cast<double>(downtime->Get("entry_time")) << "\n" << "\t" << "entry_time=" << static_cast<double>(downtime->Get("entry_time")) << "\n"
<< "\t" << "start_time=" << static_cast<double>(downtime->Get("start_time")) << "\n" << "\t" << "start_time=" << static_cast<double>(downtime->Get("start_time")) << "\n"
@ -357,6 +390,8 @@ void CompatComponent::DumpServiceStatusAttrs(ostream& fp, const Service::Ptr& se
} }
if (cr) { if (cr) {
ObjectLock olock(cr);
output = cr->Get("output"); output = cr->Get("output");
schedule_end = cr->Get("schedule_end"); schedule_end = cr->Get("schedule_end");
perfdata = cr->Get("performance_data_raw"); perfdata = cr->Get("performance_data_raw");

View File

@ -27,22 +27,14 @@ REGISTER_COMPONENT("delegation", DelegationComponent);
void DelegationComponent::Start(void) void DelegationComponent::Start(void)
{ {
m_DelegationTimer = boost::make_shared<Timer>(); m_DelegationTimer = boost::make_shared<Timer>();
// TODO: implement a handler for config changes for the delegation_interval variable // TODO: implement a handler for config changes for the delegation_interval variable
m_DelegationTimer->SetInterval(GetDelegationInterval()); m_DelegationTimer->SetInterval(30);
m_DelegationTimer->OnTimerExpired.connect(boost::bind(&DelegationComponent::DelegationTimerHandler, this)); m_DelegationTimer->OnTimerExpired.connect(boost::bind(&DelegationComponent::DelegationTimerHandler, this));
m_DelegationTimer->Start(); m_DelegationTimer->Start();
m_DelegationTimer->Reschedule(Utility::GetTime() + 10); m_DelegationTimer->Reschedule(Utility::GetTime() + 10);
} }
double DelegationComponent::GetDelegationInterval(void) const
{
Value interval = GetConfig()->Get("delegation_interval");
if (interval.IsEmpty())
return 30;
else
return interval;
}
bool DelegationComponent::IsEndpointChecker(const Endpoint::Ptr& endpoint) bool DelegationComponent::IsEndpointChecker(const Endpoint::Ptr& endpoint)
{ {
return (endpoint->HasSubscription("checker")); return (endpoint->HasSubscription("checker"));
@ -211,6 +203,8 @@ void DelegationComponent::DelegationTimerHandler(void)
Endpoint::Ptr endpoint; Endpoint::Ptr endpoint;
int count; int count;
BOOST_FOREACH(tie(endpoint, count), histogram) { BOOST_FOREACH(tie(endpoint, count), histogram) {
ObjectLock olock(endpoint);
stringstream msgbuf; stringstream msgbuf;
msgbuf << "histogram: " << endpoint->GetName() << " - " << count; msgbuf << "histogram: " << endpoint->GetName() << " - " << count;
Logger::Write(LogInformation, "delegation", msgbuf.str()); Logger::Write(LogInformation, "delegation", msgbuf.str());

View File

@ -39,8 +39,6 @@ private:
set<Endpoint::Ptr> GetCheckerCandidates(const Service::Ptr& service) const; set<Endpoint::Ptr> GetCheckerCandidates(const Service::Ptr& service) const;
static bool IsEndpointChecker(const Endpoint::Ptr& endpoint); static bool IsEndpointChecker(const Endpoint::Ptr& endpoint);
double GetDelegationInterval(void) const;
}; };
} }

View File

@ -30,10 +30,12 @@ void NotificationComponent::Start(void)
{ {
m_Endpoint = Endpoint::MakeEndpoint("notification", false); m_Endpoint = Endpoint::MakeEndpoint("notification", false);
ObjectLock olock(m_Endpoint); {
m_Endpoint->RegisterTopicHandler("icinga::SendNotifications", ObjectLock olock(m_Endpoint);
boost::bind(&NotificationComponent::SendNotificationsRequestHandler, this, _2, m_Endpoint->RegisterTopicHandler("icinga::SendNotifications",
_3)); boost::bind(&NotificationComponent::SendNotificationsRequestHandler, this, _2,
_3));
}
m_NotificationTimer = boost::make_shared<Timer>(); m_NotificationTimer = boost::make_shared<Timer>();
m_NotificationTimer->SetInterval(5); m_NotificationTimer->SetInterval(5);

View File

@ -222,7 +222,7 @@ void ReplicationComponent::RemoteObjectUpdateHandler(const RequestMessage& reque
// TODO: sanitize update, disallow __local // TODO: sanitize update, disallow __local
if (!object) { if (!object) {
object = dtype->CreateObject(update); object = DynamicType::CreateObject(dtype, update);
if (source == EndpointManager::GetInstance()->GetIdentity()) { if (source == EndpointManager::GetInstance()->GetIdentity()) {
/* the peer sent us an object that was originally created by us - /* the peer sent us an object that was originally created by us -

View File

@ -79,12 +79,9 @@ Application::~Application(void)
* *
* @returns The application object. * @returns The application object.
*/ */
Application::Ptr Application::GetInstance(void) Application *Application::GetInstance(void)
{ {
if (m_Instance) return m_Instance;
return m_Instance->GetSelf();
else
return Application::Ptr();
} }
int Application::GetArgC(void) int Application::GetArgC(void)
@ -110,6 +107,7 @@ void Application::SetArgV(char **argv)
void Application::ShutdownTimerHandler(void) void Application::ShutdownTimerHandler(void)
{ {
if (m_ShuttingDown) { if (m_ShuttingDown) {
Logger::Write(LogInformation, "base", "Shutting down Icinga...");
Application::GetInstance()->OnShutdown(); Application::GetInstance()->OnShutdown();
DynamicObject::DeactivateObjects(); DynamicObject::DeactivateObjects();
GetEQ().Stop(); GetEQ().Stop();
@ -177,14 +175,6 @@ void Application::RequestShutdown(void)
m_ShuttingDown = true; m_ShuttingDown = true;
} }
/**
* Terminates the application.
*/
void Application::Terminate(int exitCode)
{
_exit(exitCode);
}
/** /**
* Retrieves the full path of the executable. * Retrieves the full path of the executable.
* *
@ -295,7 +285,7 @@ void Application::SigIntHandler(int signum)
{ {
assert(signum == SIGINT); assert(signum == SIGINT);
Application::Ptr instance = Application::GetInstance(); Application *instance = Application::GetInstance();
if (!instance) if (!instance)
return; return;
@ -330,12 +320,12 @@ void Application::SigAbrtHandler(int signum)
*/ */
BOOL WINAPI Application::CtrlHandler(DWORD type) BOOL WINAPI Application::CtrlHandler(DWORD type)
{ {
Application::Ptr instance = Application::GetInstance(); Application *instance = Application::GetInstance();
if (!instance) if (!instance)
return TRUE; return TRUE;
instance->GetInstance()->RequestShutdown(); instance->RequestShutdown();
SetConsoleCtrlHandler(NULL, FALSE); SetConsoleCtrlHandler(NULL, FALSE);
return TRUE; return TRUE;

View File

@ -37,7 +37,7 @@ public:
Application(const Dictionary::Ptr& serializedUpdate); Application(const Dictionary::Ptr& serializedUpdate);
~Application(void); ~Application(void);
static Application::Ptr GetInstance(void); static Application *GetInstance(void);
int Run(void); int Run(void);
@ -57,7 +57,6 @@ public:
static void InstallExceptionHandlers(void); static void InstallExceptionHandlers(void);
static void RequestShutdown(void); static void RequestShutdown(void);
static void Terminate(int exitCode);
static void SetDebugging(bool debug); static void SetDebugging(bool debug);
static bool IsDebugging(void); static bool IsDebugging(void);

View File

@ -62,32 +62,44 @@ public:
/** /**
* Starts the async task. The caller must hold a reference to the AsyncTask * Starts the async task. The caller must hold a reference to the AsyncTask
* object until the completion callback is invoked. * object until the completion callback is invoked.
*
* @threadsafety Always.
*/ */
void Start(const CompletionCallback& completionCallback = CompletionCallback()) void Start(const CompletionCallback& completionCallback = CompletionCallback())
{ {
assert(!OwnsLock());
boost::mutex::scoped_lock lock(m_Mutex);
m_CompletionCallback = completionCallback; m_CompletionCallback = completionCallback;
Utility::QueueAsyncCallback(boost::bind(&AsyncTask<TClass, TResult>::RunInternal, this)); Utility::QueueAsyncCallback(boost::bind(&AsyncTask<TClass, TResult>::RunInternal, this));
} }
/** /**
* Checks whether the task is finished. * Checks whether the task is finished.
*
* @threadsafety Always.
*/ */
bool IsFinished(void) const bool IsFinished(void) const
{ {
assert(!OwnsLock());
boost::mutex::scoped_lock lock(m_Mutex); boost::mutex::scoped_lock lock(m_Mutex);
return m_Finished; return m_Finished;
} }
/** /**
* Retrieves the result of the task. Throws an exception if one is stored in * Blocks until the task is completed and retrieves the result. Throws an exception if one is stored in
* the AsyncTask object. * the AsyncTask object.
* *
* @returns The task's result. * @returns The task's result.
* @threadsafety Always.
*/ */
TResult GetResult(void) TResult GetResult(void)
{ {
if (!m_Finished) assert(!OwnsLock());
BOOST_THROW_EXCEPTION(runtime_error("GetResult called on an unfinished AsyncTask")); boost::mutex::scoped_lock lock(m_Mutex);
while (!m_Finished)
m_CV.wait(lock);
if (m_ResultRetrieved) if (m_ResultRetrieved)
BOOST_THROW_EXCEPTION(runtime_error("GetResult called on an AsyncTask whose result was already retrieved.")); BOOST_THROW_EXCEPTION(runtime_error("GetResult called on an AsyncTask whose result was already retrieved."));
@ -106,9 +118,13 @@ public:
* Finishes the task using an exception. * Finishes the task using an exception.
* *
* @param ex The exception. * @param ex The exception.
* @threadsafety Always.
*/ */
void FinishException(const boost::exception_ptr& ex) void FinishException(const boost::exception_ptr& ex)
{ {
assert(!OwnsLock());
boost::mutex::scoped_lock lock(m_Mutex);
m_Exception = ex; m_Exception = ex;
FinishInternal(); FinishInternal();
} }
@ -117,23 +133,17 @@ public:
* Finishes the task using an ordinary result. * Finishes the task using an ordinary result.
* *
* @param result The result. * @param result The result.
* @threadsafety Always.
*/ */
void FinishResult(const TResult& result) void FinishResult(const TResult& result)
{ {
assert(!OwnsLock());
boost::mutex::scoped_lock lock(m_Mutex);
m_Result = result; m_Result = result;
FinishInternal(); FinishInternal();
} }
/**
* Blocks until the task is completed.
*/
void Wait(void)
{
boost::mutex::scoped_lock lock(m_Mutex);
while (!m_Finished)
m_CV.wait(lock);
}
protected: protected:
/** /**
* Begins executing the task. The Run method must ensure * Begins executing the task. The Run method must ensure
@ -146,24 +156,22 @@ private:
/** /**
* Finishes the task and causes the completion callback to be invoked. This * Finishes the task and causes the completion callback to be invoked. This
* function must be called before the object is destroyed. * function must be called before the object is destroyed.
*
* @threadsafety Caller must hold m_Mutex.
*/ */
void FinishInternal(void) void FinishInternal(void)
{ {
CompletionCallback callback; assert(!m_Finished);
m_Finished = true;
m_CV.notify_all();
{
boost::mutex::scoped_lock lock(m_Mutex);
assert(!m_Finished);
m_Finished = true;
m_CV.notify_all();
if (!m_CompletionCallback.empty()) {
CompletionCallback callback;
m_CompletionCallback.swap(callback); m_CompletionCallback.swap(callback);
}
if (!callback.empty())
Utility::QueueAsyncCallback(boost::bind(callback, GetSelf())); Utility::QueueAsyncCallback(boost::bind(callback, GetSelf()));
}
} }
/** /**

View File

@ -21,7 +21,7 @@
using namespace icinga; using namespace icinga;
REGISTER_TYPE(Component, NULL); REGISTER_TYPE(Component);
map<String, Component::Factory> Component::m_Factories; map<String, Component::Factory> Component::m_Factories;

View File

@ -152,6 +152,8 @@ String Dictionary::Add(const Value& value)
*/ */
Dictionary::Iterator Dictionary::Begin(void) Dictionary::Iterator Dictionary::Begin(void)
{ {
assert(OwnsLock());
return m_Data.begin(); return m_Data.begin();
} }
@ -162,6 +164,8 @@ Dictionary::Iterator Dictionary::Begin(void)
*/ */
Dictionary::Iterator Dictionary::End(void) Dictionary::Iterator Dictionary::End(void)
{ {
assert(OwnsLock());
return m_Data.end(); return m_Data.end();
} }
@ -218,6 +222,8 @@ void Dictionary::Remove(const String& key)
*/ */
void Dictionary::Remove(Dictionary::Iterator it) void Dictionary::Remove(Dictionary::Iterator it)
{ {
ObjectLock olock(this);
String key = it->first; String key = it->first;
m_Data.erase(it); m_Data.erase(it);
} }
@ -228,9 +234,23 @@ void Dictionary::Remove(Dictionary::Iterator it)
*/ */
void Dictionary::Seal(void) void Dictionary::Seal(void)
{ {
ObjectLock olock(this);
m_Sealed = true; m_Sealed = true;
} }
/**
* Checks whether the dictionary is sealed.
*
* @returns true if the dictionary is sealed, false otherwise.
*/
bool Dictionary::IsSealed(void) const
{
ObjectLock olock(this);
return m_Sealed;
}
/** /**
* Makes a shallow copy of a dictionary. * Makes a shallow copy of a dictionary.
* *
@ -270,6 +290,8 @@ Dictionary::Ptr Dictionary::FromJson(cJSON *json)
dictionary->Set(i->string, Value::FromJson(i)); dictionary->Set(i->string, Value::FromJson(i));
} }
dictionary->Seal();
return dictionary; return dictionary;
} }

View File

@ -46,7 +46,9 @@ public:
void Set(const String& key, const Value& value); void Set(const String& key, const Value& value);
String Add(const Value& value); String Add(const Value& value);
bool Contains(const String& key) const; bool Contains(const String& key) const;
void Seal(void); void Seal(void);
bool IsSealed(void) const;
Iterator Begin(void); Iterator Begin(void);
Iterator End(void); Iterator End(void);

View File

@ -42,20 +42,13 @@ DynamicObject::DynamicObject(const Dictionary::Ptr& serializedObject)
RegisterAttribute("__source", Attribute_Local, &m_Source); RegisterAttribute("__source", Attribute_Local, &m_Source);
RegisterAttribute("methods", Attribute_Config, &m_Methods); RegisterAttribute("methods", Attribute_Config, &m_Methods);
{ if (!serializedObject->Contains("configTx"))
ObjectLock olock(serializedObject); BOOST_THROW_EXCEPTION(invalid_argument("Serialized object must contain a config snapshot."));
if (!serializedObject->Contains("configTx"))
BOOST_THROW_EXCEPTION(invalid_argument("Serialized object must contain a config snapshot."));
}
/* apply config state from the config item/remote update; /* apply config state from the config item/remote update;
* The DynamicType::CreateObject function takes care of restoring * The DynamicType::CreateObject function takes care of restoring
* non-config state after the object has been fully constructed */ * non-config state after the object has been fully constructed */
{ ApplyUpdate(serializedObject, Attribute_Config);
ObjectLock olock(this);
ApplyUpdate(serializedObject, Attribute_Config);
}
boost::call_once(m_TransactionOnce, &DynamicObject::Initialize); boost::call_once(m_TransactionOnce, &DynamicObject::Initialize);
} }
@ -77,6 +70,8 @@ void DynamicObject::Initialize(void)
Dictionary::Ptr DynamicObject::BuildUpdate(double sinceTx, int attributeTypes) const Dictionary::Ptr DynamicObject::BuildUpdate(double sinceTx, int attributeTypes) const
{ {
assert(OwnsLock());
DynamicObject::AttributeConstIterator it; DynamicObject::AttributeConstIterator it;
Dictionary::Ptr attrs = boost::make_shared<Dictionary>(); Dictionary::Ptr attrs = boost::make_shared<Dictionary>();
@ -120,23 +115,22 @@ Dictionary::Ptr DynamicObject::BuildUpdate(double sinceTx, int attributeTypes) c
void DynamicObject::ApplyUpdate(const Dictionary::Ptr& serializedUpdate, void DynamicObject::ApplyUpdate(const Dictionary::Ptr& serializedUpdate,
int allowedTypes) int allowedTypes)
{ {
Dictionary::Ptr attrs; assert(OwnsLock());
assert(serializedUpdate->IsSealed());
{ Value configTxValue = serializedUpdate->Get("configTx");
ObjectLock olock(serializedUpdate);
double configTx = 0; if ((allowedTypes & Attribute_Config) != 0 && !configTxValue.IsEmpty()) {
if ((allowedTypes & Attribute_Config) != 0 && double configTx = configTxValue;
serializedUpdate->Contains("configTx")) {
configTx = serializedUpdate->Get("configTx");
if (configTx > m_ConfigTx) if (configTx > m_ConfigTx)
ClearAttributesByType(Attribute_Config); ClearAttributesByType(Attribute_Config);
}
attrs = serializedUpdate->Get("attrs");
} }
Dictionary::Ptr attrs = serializedUpdate->Get("attrs");
assert(attrs->IsSealed());
{ {
ObjectLock olock(attrs); ObjectLock olock(attrs);
@ -146,7 +140,8 @@ void DynamicObject::ApplyUpdate(const Dictionary::Ptr& serializedUpdate,
continue; continue;
Dictionary::Ptr attr = it->second; Dictionary::Ptr attr = it->second;
ObjectLock alock(attr);
assert(attr->IsSealed());
int type = attr->Get("type"); int type = attr->Get("type");
@ -170,6 +165,8 @@ void DynamicObject::ApplyUpdate(const Dictionary::Ptr& serializedUpdate,
void DynamicObject::RegisterAttribute(const String& name, void DynamicObject::RegisterAttribute(const String& name,
AttributeType type, AttributeBase *boundAttribute) AttributeType type, AttributeBase *boundAttribute)
{ {
assert(OwnsLock());
AttributeHolder attr(type, boundAttribute); AttributeHolder attr(type, boundAttribute);
pair<DynamicObject::AttributeIterator, bool> tt; pair<DynamicObject::AttributeIterator, bool> tt;
@ -183,24 +180,38 @@ void DynamicObject::RegisterAttribute(const String& name,
} }
} }
/**
* @threadsafety Always.
*/
void DynamicObject::Set(const String& name, const Value& data) void DynamicObject::Set(const String& name, const Value& data)
{ {
InternalSetAttribute(name, data, GetCurrentTx()); InternalSetAttribute(name, data, GetCurrentTx());
} }
/**
* @threadsafety Always.
*/
void DynamicObject::Touch(const String& name) void DynamicObject::Touch(const String& name)
{ {
InternalSetAttribute(name, InternalGetAttribute(name), GetCurrentTx()); InternalSetAttribute(name, InternalGetAttribute(name), GetCurrentTx());
} }
/**
* @threadsafety Always.
*/
Value DynamicObject::Get(const String& name) const Value DynamicObject::Get(const String& name) const
{ {
return InternalGetAttribute(name); return InternalGetAttribute(name);
} }
/**
* @threadsafety Always.
*/
void DynamicObject::InternalSetAttribute(const String& name, const Value& data, void DynamicObject::InternalSetAttribute(const String& name, const Value& data,
double tx, bool allowEditConfig) double tx, bool allowEditConfig)
{ {
ObjectLock olock(this);
DynamicObject::AttributeIterator it; DynamicObject::AttributeIterator it;
it = m_Attributes.find(name); it = m_Attributes.find(name);
@ -242,8 +253,13 @@ void DynamicObject::InternalSetAttribute(const String& name, const Value& data,
m_ModifiedAttributes.insert(make_pair(name, oldValue)); m_ModifiedAttributes.insert(make_pair(name, oldValue));
} }
/**
* @threadsafety Always.
*/
Value DynamicObject::InternalGetAttribute(const String& name) const Value DynamicObject::InternalGetAttribute(const String& name) const
{ {
ObjectLock olock(this);
DynamicObject::AttributeConstIterator it; DynamicObject::AttributeConstIterator it;
it = m_Attributes.find(name); it = m_Attributes.find(name);
@ -253,13 +269,20 @@ Value DynamicObject::InternalGetAttribute(const String& name) const
return it->second.GetValue(); return it->second.GetValue();
} }
/**
* @threadsafety Always.
*/
bool DynamicObject::HasAttribute(const String& name) const bool DynamicObject::HasAttribute(const String& name) const
{ {
ObjectLock olock(this);
return (m_Attributes.find(name) != m_Attributes.end()); return (m_Attributes.find(name) != m_Attributes.end());
} }
void DynamicObject::ClearAttributesByType(AttributeType type) void DynamicObject::ClearAttributesByType(AttributeType type)
{ {
assert(OwnsLock());
DynamicObject::AttributeIterator at; DynamicObject::AttributeIterator at;
for (at = m_Attributes.begin(); at != m_Attributes.end(); at++) { for (at = m_Attributes.begin(); at != m_Attributes.end(); at++) {
if (at->second.GetType() != type) if (at->second.GetType() != type)
@ -269,44 +292,81 @@ void DynamicObject::ClearAttributesByType(AttributeType type)
} }
} }
/**
* @threadsafety Always.
*/
DynamicType::Ptr DynamicObject::GetType(void) const DynamicType::Ptr DynamicObject::GetType(void) const
{ {
ObjectLock olock(this);
return DynamicType::GetByName(m_Type); return DynamicType::GetByName(m_Type);
} }
/**
* @threadsafety Always.
*/
String DynamicObject::GetName(void) const String DynamicObject::GetName(void) const
{ {
ObjectLock olock(this);
return m_Name; return m_Name;
} }
/**
* @threadsafety Always.
*/
bool DynamicObject::IsLocal(void) const bool DynamicObject::IsLocal(void) const
{ {
ObjectLock olock(this);
return m_Local; return m_Local;
} }
/**
* @threadsafety Always.
*/
bool DynamicObject::IsAbstract(void) const bool DynamicObject::IsAbstract(void) const
{ {
ObjectLock olock(this);
return m_Abstract; return m_Abstract;
} }
/**
* @threadsafety Always.
*/
bool DynamicObject::IsRegistered(void) const bool DynamicObject::IsRegistered(void) const
{ {
ObjectLock olock(this);
return m_Registered; return m_Registered;
} }
/**
* @threadsafety Always.
*/
void DynamicObject::SetSource(const String& value) void DynamicObject::SetSource(const String& value)
{ {
ObjectLock olock(this);
m_Source = value; m_Source = value;
Touch("__source"); Touch("__source");
} }
/**
* @threadsafety Always.
*/
String DynamicObject::GetSource(void) const String DynamicObject::GetSource(void) const
{ {
ObjectLock olock(this);
return m_Source; return m_Source;
} }
void DynamicObject::Register(void) void DynamicObject::Register(void)
{ {
assert(OwnsLock());
/* It's now safe to send us attribute events. */ /* It's now safe to send us attribute events. */
SetEventSafe(true); SetEventSafe(true);
@ -352,11 +412,15 @@ void DynamicObject::OnRegistrationCompleted(void)
void DynamicObject::Start(void) void DynamicObject::Start(void)
{ {
assert(OwnsLock());
/* Nothing to do here. */ /* Nothing to do here. */
} }
void DynamicObject::Unregister(void) void DynamicObject::Unregister(void)
{ {
assert(OwnsLock());
DynamicType::Ptr dtype = GetType(); DynamicType::Ptr dtype = GetType();
ObjectLock olock(dtype); ObjectLock olock(dtype);
@ -371,17 +435,14 @@ void DynamicObject::Unregister(void)
ScriptTask::Ptr DynamicObject::MakeMethodTask(const String& method, ScriptTask::Ptr DynamicObject::MakeMethodTask(const String& method,
const vector<Value>& arguments) const vector<Value>& arguments)
{ {
String funcName; assert(OwnsLock());
Dictionary::Ptr methods = m_Methods; Dictionary::Ptr methods = m_Methods;
{ String funcName = methods->Get(method);
ObjectLock olock(methods);
if (!methods->Contains(method))
return ScriptTask::Ptr();
funcName = methods->Get(method); if (funcName.IsEmpty())
} return ScriptTask::Ptr();
ScriptFunction::Ptr func = ScriptFunction::GetByName(funcName); ScriptFunction::Ptr func = ScriptFunction::GetByName(funcName);
@ -409,15 +470,23 @@ void DynamicObject::DumpObjects(const String& filename)
StdioStream::Ptr sfp = boost::make_shared<StdioStream>(&fp, false); StdioStream::Ptr sfp = boost::make_shared<StdioStream>(&fp, false);
sfp->Start(); sfp->Start();
;
BOOST_FOREACH(const DynamicType::Ptr& type, DynamicType::GetTypes()) { BOOST_FOREACH(const DynamicType::Ptr& type, DynamicType::GetTypes()) {
String type_name;
{
ObjectLock olock(type);
type_name = type->GetName();
}
BOOST_FOREACH(const DynamicObject::Ptr& object, type->GetObjects()) { BOOST_FOREACH(const DynamicObject::Ptr& object, type->GetObjects()) {
ObjectLock olock(object);
if (object->IsLocal()) if (object->IsLocal())
continue; continue;
Dictionary::Ptr persistentObject = boost::make_shared<Dictionary>(); Dictionary::Ptr persistentObject = boost::make_shared<Dictionary>();
persistentObject->Set("type", object->GetType()->GetName()); persistentObject->Set("type", type_name);
persistentObject->Set("name", object->GetName()); persistentObject->Set("name", object->GetName());
int types = Attribute_Local | Attribute_Replicated; int types = Attribute_Local | Attribute_Replicated;
@ -472,6 +541,8 @@ void DynamicObject::RestoreObjects(const String& filename)
while (NetString::ReadStringFromStream(sfp, &message)) { while (NetString::ReadStringFromStream(sfp, &message)) {
Dictionary::Ptr persistentObject = Value::Deserialize(message); Dictionary::Ptr persistentObject = Value::Deserialize(message);
assert(persistentObject->IsSealed());
String type = persistentObject->Get("type"); String type = persistentObject->Get("type");
String name = persistentObject->Get("name"); String name = persistentObject->Get("name");
Dictionary::Ptr update = persistentObject->Get("update"); Dictionary::Ptr update = persistentObject->Get("update");
@ -479,7 +550,6 @@ void DynamicObject::RestoreObjects(const String& filename)
bool hasConfig = update->Contains("configTx"); bool hasConfig = update->Contains("configTx");
DynamicType::Ptr dt = DynamicType::GetByName(type); DynamicType::Ptr dt = DynamicType::GetByName(type);
ObjectLock dlock(dt);
if (!dt) if (!dt)
BOOST_THROW_EXCEPTION(invalid_argument("Invalid type: " + type)); BOOST_THROW_EXCEPTION(invalid_argument("Invalid type: " + type));
@ -487,9 +557,11 @@ void DynamicObject::RestoreObjects(const String& filename)
DynamicObject::Ptr object = dt->GetObject(name); DynamicObject::Ptr object = dt->GetObject(name);
if (hasConfig && !object) { if (hasConfig && !object) {
object = dt->CreateObject(update); object = DynamicType::CreateObject(dt, update);
ObjectLock olock(object);
object->Register(); object->Register();
} else if (object) { } else if (object) {
ObjectLock olock(object);
object->ApplyUpdate(update, Attribute_All); object->ApplyUpdate(update, Attribute_All);
} }
@ -506,14 +578,7 @@ void DynamicObject::RestoreObjects(const String& filename)
void DynamicObject::DeactivateObjects(void) void DynamicObject::DeactivateObjects(void)
{ {
BOOST_FOREACH(const DynamicType::Ptr& dt, DynamicType::GetTypes()) { BOOST_FOREACH(const DynamicType::Ptr& dt, DynamicType::GetTypes()) {
set<DynamicObject::Ptr> objects; BOOST_FOREACH(const DynamicObject::Ptr& object, dt->GetObjects()) {
{
ObjectLock olock(dt);
objects = dt->GetObjects();
}
BOOST_FOREACH(const DynamicObject::Ptr& object, objects) {
ObjectLock olock(object); ObjectLock olock(object);
object->Unregister(); object->Unregister();
} }
@ -563,14 +628,16 @@ void DynamicObject::NewTx(void)
continue; continue;
map<String, Value, string_iless> attrs; map<String, Value, string_iless> attrs;
bool event_safe;
{ {
ObjectLock olock(object); ObjectLock olock(object);
attrs.swap(object->m_ModifiedAttributes); attrs.swap(object->m_ModifiedAttributes);
event_safe = object->GetEventSafe();
} }
/* Check if it's safe to send events. */ /* Send attribute events if it's safe to do so. */
if (object->GetEventSafe()) { if (event_safe) {
map<String, Value, string_iless>::iterator it; map<String, Value, string_iless>::iterator it;
for (it = attrs.begin(); it != attrs.end(); it++) for (it = attrs.begin(); it != attrs.end(); it++)
object->OnAttributeChanged(it->first, it->second); object->OnAttributeChanged(it->first, it->second);
@ -589,24 +656,26 @@ void DynamicObject::OnAttributeChanged(const String&, const Value&)
DynamicObject::Ptr DynamicObject::GetObject(const String& type, const String& name) DynamicObject::Ptr DynamicObject::GetObject(const String& type, const String& name)
{ {
DynamicType::Ptr dtype = DynamicType::GetByName(type); DynamicType::Ptr dtype = DynamicType::GetByName(type);
return dtype->GetObject(name);
{
ObjectLock olock(dtype);
return dtype->GetObject(name);
}
} }
const DynamicObject::AttributeMap& DynamicObject::GetAttributes(void) const const DynamicObject::AttributeMap& DynamicObject::GetAttributes(void) const
{ {
assert(OwnsLock());
return m_Attributes; return m_Attributes;
} }
void DynamicObject::SetEventSafe(bool safe) void DynamicObject::SetEventSafe(bool safe)
{ {
assert(OwnsLock());
m_EventSafe = safe; m_EventSafe = safe;
} }
bool DynamicObject::GetEventSafe(void) const bool DynamicObject::GetEventSafe(void) const
{ {
assert(OwnsLock());
return m_EventSafe; return m_EventSafe;
} }

View File

@ -64,29 +64,28 @@ DynamicType::TypeSet DynamicType::GetTypes(void)
set<DynamicObject::Ptr> DynamicType::GetObjects(const String& type) set<DynamicObject::Ptr> DynamicType::GetObjects(const String& type)
{ {
DynamicType::Ptr dt = GetByName(type); DynamicType::Ptr dt = GetByName(type);
ObjectLock olock(dt);
return dt->GetObjects(); return dt->GetObjects();
} }
set<DynamicObject::Ptr> DynamicType::GetObjects(void) const set<DynamicObject::Ptr> DynamicType::GetObjects(void) const
{ {
ObjectLock olock(this);
return m_ObjectSet; /* Making a copy of the set here. */ return m_ObjectSet; /* Making a copy of the set here. */
} }
String DynamicType::GetName(void) const String DynamicType::GetName(void) const
{ {
ObjectLock olock(this);
return m_Name; return m_Name;
} }
void DynamicType::RegisterObject(const DynamicObject::Ptr& object) void DynamicType::RegisterObject(const DynamicObject::Ptr& object)
{ {
String name; String name = object->GetName();
{
ObjectLock olock(object);
name = object->GetName();
}
assert(OwnsLock());
ObjectMap::iterator it = m_ObjectMap.find(name); ObjectMap::iterator it = m_ObjectMap.find(name);
if (it != m_ObjectMap.end()) { if (it != m_ObjectMap.end()) {
@ -107,12 +106,18 @@ void DynamicType::UnregisterObject(const DynamicObject::Ptr& object)
ObjectLock olock(object); ObjectLock olock(object);
object->SetEventSafe(false); object->SetEventSafe(false);
assert(OwnsLock());
m_ObjectMap.erase(object->GetName()); m_ObjectMap.erase(object->GetName());
m_ObjectSet.erase(object); m_ObjectSet.erase(object);
} }
/**
* @threadsafety Always.
*/
DynamicObject::Ptr DynamicType::GetObject(const String& name) const DynamicObject::Ptr DynamicType::GetObject(const String& name) const
{ {
ObjectLock olock(this);
DynamicType::ObjectMap::const_iterator nt = m_ObjectMap.find(name); DynamicType::ObjectMap::const_iterator nt = m_ObjectMap.find(name);
if (nt == m_ObjectMap.end()) if (nt == m_ObjectMap.end())
@ -138,19 +143,20 @@ void DynamicType::RegisterType(const DynamicType::Ptr& type)
InternalGetTypeSet().insert(type); InternalGetTypeSet().insert(type);
} }
DynamicObject::Ptr DynamicType::CreateObject(const Dictionary::Ptr& serializedUpdate) const DynamicObject::Ptr DynamicType::CreateObject(const DynamicType::Ptr& self, const Dictionary::Ptr& serializedUpdate)
{ {
DynamicObject::Ptr object = m_ObjectFactory(serializedUpdate); ObjectFactory factory;
{
ObjectLock olock(self);
factory = self->m_ObjectFactory;
}
DynamicObject::Ptr object = factory(serializedUpdate);
{ {
ObjectLock olock(object); ObjectLock olock(object);
/* register attributes */
String name;
AttributeType type;
BOOST_FOREACH(tuples::tie(name, type), m_Attributes)
object->RegisterAttribute(name, type);
/* apply the object's non-config attributes */ /* apply the object's non-config attributes */
object->ApplyUpdate(serializedUpdate, Attribute_All & ~Attribute_Config); object->ApplyUpdate(serializedUpdate, Attribute_All & ~Attribute_Config);
} }
@ -158,35 +164,6 @@ DynamicObject::Ptr DynamicType::CreateObject(const Dictionary::Ptr& serializedUp
return object; return object;
} }
/**
* @threadsafety Always.
*/
bool DynamicType::TypeExists(const String& name)
{
return (GetByName(name));
}
void DynamicType::AddAttribute(const String& name, AttributeType type)
{
m_Attributes[name] = type;
}
void DynamicType::RemoveAttribute(const String& name)
{
m_Attributes.erase(name);
}
bool DynamicType::HasAttribute(const String& name)
{
return (m_Attributes.find(name) != m_Attributes.end());
}
void DynamicType::AddAttributes(const AttributeDescription *attributes, int attributeCount)
{
for (int i = 0; i < attributeCount; i++)
AddAttribute(attributes[i].Name, attributes[i].Type);
}
boost::mutex& DynamicType::GetStaticMutex(void) boost::mutex& DynamicType::GetStaticMutex(void)
{ {
static boost::mutex mutex; static boost::mutex mutex;

View File

@ -23,12 +23,6 @@
namespace icinga namespace icinga
{ {
struct AttributeDescription
{
String Name;
AttributeType Type;
};
class I2_BASE_API DynamicType : public Object class I2_BASE_API DynamicType : public Object
{ {
public: public:
@ -44,9 +38,8 @@ public:
static DynamicType::Ptr GetByName(const String& name); static DynamicType::Ptr GetByName(const String& name);
static void RegisterType(const DynamicType::Ptr& type); static void RegisterType(const DynamicType::Ptr& type);
static bool TypeExists(const String& name);
DynamicObject::Ptr CreateObject(const Dictionary::Ptr& serializedUpdate) const; static DynamicObject::Ptr CreateObject(const DynamicType::Ptr& self, const Dictionary::Ptr& serializedUpdate);
DynamicObject::Ptr GetObject(const String& name) const; DynamicObject::Ptr GetObject(const String& name) const;
void RegisterObject(const DynamicObject::Ptr& object); void RegisterObject(const DynamicObject::Ptr& object);
@ -57,16 +50,9 @@ public:
static set<DynamicObject::Ptr> GetObjects(const String& type); static set<DynamicObject::Ptr> GetObjects(const String& type);
void AddAttribute(const String& name, AttributeType type);
void RemoveAttribute(const String& name);
bool HasAttribute(const String& name);
void AddAttributes(const AttributeDescription *attributes, int attributeCount);
private: private:
String m_Name; String m_Name;
ObjectFactory m_ObjectFactory; ObjectFactory m_ObjectFactory;
map<String, AttributeType> m_Attributes;
typedef map<String, DynamicObject::Ptr, string_iless> ObjectMap; typedef map<String, DynamicObject::Ptr, string_iless> ObjectMap;
typedef set<DynamicObject::Ptr> ObjectSet; typedef set<DynamicObject::Ptr> ObjectSet;
@ -90,13 +76,10 @@ private:
class RegisterTypeHelper class RegisterTypeHelper
{ {
public: public:
RegisterTypeHelper(const String& name, const DynamicType::ObjectFactory& factory, const AttributeDescription* attributes, int attributeCount) RegisterTypeHelper(const String& name, const DynamicType::ObjectFactory& factory)
{ {
if (!DynamicType::TypeExists(name)) { DynamicType::Ptr type = boost::make_shared<DynamicType>(name, factory);
DynamicType::Ptr type = boost::make_shared<DynamicType>(name, factory); DynamicType::RegisterType(type);
type->AddAttributes(attributes, attributeCount);
DynamicType::RegisterType(type);
}
} }
}; };
@ -111,11 +94,11 @@ shared_ptr<T> DynamicObjectFactory(const Dictionary::Ptr& serializedUpdate)
return boost::make_shared<T>(serializedUpdate); return boost::make_shared<T>(serializedUpdate);
} }
#define REGISTER_TYPE_ALIAS(type, alias, attributeDesc) \ #define REGISTER_TYPE_ALIAS(type, alias) \
static RegisterTypeHelper g_RegisterDT_ ## type(alias, DynamicObjectFactory<type>, attributeDesc, (attributeDesc == NULL) ? 0 : sizeof(attributeDesc) / sizeof((static_cast<AttributeDescription *>(attributeDesc))[0])) static RegisterTypeHelper g_RegisterDT_ ## type(alias, DynamicObjectFactory<type>)
#define REGISTER_TYPE(type, attributeDesc) \ #define REGISTER_TYPE(type) \
REGISTER_TYPE_ALIAS(type, #type, attributeDesc) REGISTER_TYPE_ALIAS(type, #type)
} }

View File

@ -21,7 +21,7 @@
using namespace icinga; using namespace icinga;
REGISTER_TYPE(Logger, NULL); REGISTER_TYPE(Logger);
/** /**
* Constructor for the Logger class. * Constructor for the Logger class.
@ -37,7 +37,10 @@ Logger::Logger(const Dictionary::Ptr& properties)
if (!IsLocal()) if (!IsLocal())
BOOST_THROW_EXCEPTION(runtime_error("Logger objects must be local.")); BOOST_THROW_EXCEPTION(runtime_error("Logger objects must be local."));
}
void Logger::Start(void)
{
String type = m_Type; String type = m_Type;
if (type.IsEmpty()) if (type.IsEmpty())
BOOST_THROW_EXCEPTION(runtime_error("Logger objects must have a 'type' property.")); BOOST_THROW_EXCEPTION(runtime_error("Logger objects must have a 'type' property."));
@ -65,8 +68,9 @@ Logger::Logger(const Dictionary::Ptr& properties)
BOOST_THROW_EXCEPTION(runtime_error("Unknown log type: " + type)); BOOST_THROW_EXCEPTION(runtime_error("Unknown log type: " + type));
} }
impl->m_Config = this; impl->m_Config = GetSelf();
m_Impl = impl; m_Impl = impl;
} }
/** /**
@ -185,5 +189,5 @@ LogSeverity Logger::StringToSeverity(const String& severity)
*/ */
DynamicObject::Ptr ILogger::GetConfig(void) const DynamicObject::Ptr ILogger::GetConfig(void) const
{ {
return m_Config->GetSelf(); return m_Config.lock();
} }

View File

@ -71,7 +71,7 @@ protected:
DynamicObject::Ptr GetConfig(void) const; DynamicObject::Ptr GetConfig(void) const;
private: private:
DynamicObject *m_Config; DynamicObject::WeakPtr m_Config;
friend class Logger; friend class Logger;
}; };
@ -97,6 +97,9 @@ public:
LogSeverity GetMinSeverity(void) const; LogSeverity GetMinSeverity(void) const;
protected:
virtual void Start(void);
private: private:
Attribute<String> m_Type; Attribute<String> m_Type;
Attribute<String> m_Path; Attribute<String> m_Path;

View File

@ -21,10 +21,13 @@
using namespace icinga; using namespace icinga;
boost::mutex Object::m_DebugMutex;
/** /**
* Default constructor for the Object class. * Default constructor for the Object class.
*/ */
Object::Object(void) Object::Object(void)
: m_LockCount(0)
{ } { }
/** /**
@ -41,17 +44,32 @@ Object::~Object(void)
*/ */
Object::SharedPtrHolder Object::GetSelf(void) Object::SharedPtrHolder Object::GetSelf(void)
{ {
ObjectLock olock(this);
return Object::SharedPtrHolder(shared_from_this()); return Object::SharedPtrHolder(shared_from_this());
} }
/** /**
* Returns the mutex that must be held while calling non-static methods * Checks if the calling thread owns the lock on this object or is currently
* which have not been explicitly marked as thread-safe. * in the constructor or destructor and therefore implicitly owns the lock.
* *
* @returns The object's mutex. * @returns True if the calling thread owns the lock, false otherwise.
* @threadsafety Always.
*/ */
recursive_mutex& Object::GetMutex(void) const bool Object::OwnsLock(void) const
{ {
return m_Mutex; boost::mutex::scoped_lock lock(m_DebugMutex);
if (m_LockCount == 0 || m_LockOwner != boost::this_thread::get_id()) {
try {
shared_from_this();
} catch (const boost::bad_weak_ptr& ex) {
/* There's no shared_ptr to this object. Either someone created the object
* directly (e.g. on the stack) or we're in the constructor or destructor. Not holding the lock is ok here. */
return true;
}
return false;
}
return true;
} }

View File

@ -91,19 +91,28 @@ public:
holder instance */ holder instance */
}; };
SharedPtrHolder GetSelf(void); void VerifyLocked(void) const;
void WarnIfLocked(void) const;
recursive_mutex& GetMutex(void) const;
protected: protected:
Object(void); Object(void);
virtual ~Object(void); virtual ~Object(void);
SharedPtrHolder GetSelf(void);
bool OwnsLock(void) const;
private: private:
Object(const Object& other); Object(const Object& other);
Object& operator=(const Object& rhs); Object& operator=(const Object& rhs);
mutable recursive_mutex m_Mutex; mutable recursive_mutex m_Mutex;
mutable unsigned int m_LockCount;
mutable boost::thread::id m_LockOwner;
static boost::mutex m_DebugMutex;
friend class ObjectLock;
}; };
/** /**

View File

@ -25,6 +25,11 @@ ObjectLock::ObjectLock(void)
: m_Object(NULL), m_Lock() : m_Object(NULL), m_Lock()
{ } { }
ObjectLock::~ObjectLock(void)
{
Unlock();
}
ObjectLock::ObjectLock(const Object::Ptr& object) ObjectLock::ObjectLock(const Object::Ptr& object)
: m_Object(object.get()), m_Lock() : m_Object(object.get()), m_Lock()
{ {
@ -43,10 +48,23 @@ void ObjectLock::Lock(void)
{ {
assert(!m_Lock.owns_lock() && m_Object != NULL); assert(!m_Lock.owns_lock() && m_Object != NULL);
m_Lock = recursive_mutex::scoped_lock(m_Object->GetMutex()); m_Lock = recursive_mutex::scoped_lock(m_Object->m_Mutex);
{
boost::mutex::scoped_lock lock(Object::m_DebugMutex);
m_Object->m_LockCount++;
m_Object->m_LockOwner = boost::this_thread::get_id();
}
} }
void ObjectLock::Unlock(void) void ObjectLock::Unlock(void)
{ {
{
boost::mutex::scoped_lock lock(Object::m_DebugMutex);
if (m_Lock.owns_lock())
m_Object->m_LockCount--;
}
m_Lock = recursive_mutex::scoped_lock(); m_Lock = recursive_mutex::scoped_lock();
} }

View File

@ -31,6 +31,7 @@ public:
ObjectLock(void); ObjectLock(void);
ObjectLock(const Object::Ptr& object); ObjectLock(const Object::Ptr& object);
ObjectLock(const Object *object); ObjectLock(const Object *object);
~ObjectLock(void);
void Lock(void); void Lock(void);
void Unlock(void); void Unlock(void);

View File

@ -140,7 +140,6 @@ void Process::WorkerThreadProc(int taskFd)
if (fd >= 0) if (fd >= 0)
tasks[fd] = task; tasks[fd] = task;
} catch (...) { } catch (...) {
ObjectLock olock(task);
task->FinishException(boost::current_exception()); task->FinishException(boost::current_exception());
} }
} }
@ -156,7 +155,6 @@ void Process::WorkerThreadProc(int taskFd)
prev = it; prev = it;
tasks.erase(prev); tasks.erase(prev);
ObjectLock olock(task);
task->FinishResult(task->m_Result); task->FinishResult(task->m_Result);
} }
} }
@ -218,6 +216,8 @@ void Process::InitTask(void)
envp[i] = strdup(environ[i]); envp[i] = strdup(environ[i]);
if (m_ExtraEnvironment) { if (m_ExtraEnvironment) {
ObjectLock olock(m_ExtraEnvironment);
String key; String key;
Value value; Value value;
int index = envc; int index = envc;

View File

@ -44,6 +44,8 @@ vector<String> Process::SplitCommand(const Value& command)
if (command.IsObjectType<Dictionary>()) { if (command.IsObjectType<Dictionary>()) {
Dictionary::Ptr dict = command; Dictionary::Ptr dict = command;
ObjectLock olock(dict);
Value arg; Value arg;
BOOST_FOREACH(tie(tuples::ignore, arg), dict) { BOOST_FOREACH(tie(tuples::ignore, arg), dict) {
args.push_back(arg); args.push_back(arg);

View File

@ -22,16 +22,26 @@
using namespace icinga; using namespace icinga;
RingBuffer::RingBuffer(RingBuffer::SizeType slots) RingBuffer::RingBuffer(RingBuffer::SizeType slots)
: m_Slots(slots, 0), m_TimeValue(0) : Object(), m_Slots(slots, 0), m_TimeValue(0)
{ } { }
/**
* @threadsafety Always.
*/
RingBuffer::SizeType RingBuffer::GetLength(void) const RingBuffer::SizeType RingBuffer::GetLength(void) const
{ {
ObjectLock olock(this);
return m_Slots.size(); return m_Slots.size();
} }
/**
* @threadsafety Always.
*/
void RingBuffer::InsertValue(RingBuffer::SizeType tv, int num) void RingBuffer::InsertValue(RingBuffer::SizeType tv, int num)
{ {
ObjectLock olock(this);
vector<int>::size_type offsetTarget = tv % m_Slots.size(); vector<int>::size_type offsetTarget = tv % m_Slots.size();
if (tv > m_TimeValue) { if (tv > m_TimeValue) {
@ -53,8 +63,13 @@ void RingBuffer::InsertValue(RingBuffer::SizeType tv, int num)
m_Slots[offsetTarget] += num; m_Slots[offsetTarget] += num;
} }
/**
* @threadsafety Always.
*/
int RingBuffer::GetValues(RingBuffer::SizeType span) const int RingBuffer::GetValues(RingBuffer::SizeType span) const
{ {
ObjectLock olock(this);
if (span > m_Slots.size()) if (span > m_Slots.size())
span = m_Slots.size(); span = m_Slots.size();

View File

@ -28,9 +28,12 @@ namespace icinga
* *
* @ingroup base * @ingroup base
*/ */
class I2_BASE_API RingBuffer class I2_BASE_API RingBuffer : public Object
{ {
public: public:
typedef shared_ptr<RingBuffer> Ptr;
typedef weak_ptr<RingBuffer> WeakPtr;
typedef vector<int>::size_type SizeType; typedef vector<int>::size_type SizeType;
RingBuffer(SizeType slots); RingBuffer(SizeType slots);

View File

@ -21,7 +21,7 @@
using namespace icinga; using namespace icinga;
REGISTER_TYPE(Script, NULL); REGISTER_TYPE(Script);
/** /**
* Constructor for the Script class. * Constructor for the Script class.
@ -37,16 +37,22 @@ Script::Script(const Dictionary::Ptr& properties)
void Script::Start(void) void Script::Start(void)
{ {
assert(OwnsLock());
SpawnInterpreter(); SpawnInterpreter();
} }
String Script::GetLanguage(void) const String Script::GetLanguage(void) const
{ {
ObjectLock olock(this);
return m_Language; return m_Language;
} }
String Script::GetCode(void) const String Script::GetCode(void) const
{ {
ObjectLock olock(this);
return m_Code; return m_Code;
} }

View File

@ -30,13 +30,15 @@ ScriptFunction::ScriptFunction(const Callback& function)
void ScriptFunction::Register(const String& name, const ScriptFunction::Ptr& function) void ScriptFunction::Register(const String& name, const ScriptFunction::Ptr& function)
{ {
GetFunctions()[name] = function; boost::mutex::scoped_lock lock(GetMutex());
InternalGetFunctions()[name] = function;
OnRegistered(name, function); OnRegistered(name, function);
} }
void ScriptFunction::Unregister(const String& name) void ScriptFunction::Unregister(const String& name)
{ {
GetFunctions().erase(name); boost::mutex::scoped_lock lock(GetMutex());
InternalGetFunctions().erase(name);
OnUnregistered(name); OnUnregistered(name);
} }
@ -44,9 +46,10 @@ ScriptFunction::Ptr ScriptFunction::GetByName(const String& name)
{ {
map<String, ScriptFunction::Ptr>::iterator it; map<String, ScriptFunction::Ptr>::iterator it;
it = GetFunctions().find(name); boost::mutex::scoped_lock lock(GetMutex());
it = InternalGetFunctions().find(name);
if (it == GetFunctions().end()) if (it == InternalGetFunctions().end())
return ScriptFunction::Ptr(); return ScriptFunction::Ptr();
return it->second; return it->second;
@ -54,11 +57,28 @@ ScriptFunction::Ptr ScriptFunction::GetByName(const String& name)
void ScriptFunction::Invoke(const ScriptTask::Ptr& task, const vector<Value>& arguments) void ScriptFunction::Invoke(const ScriptTask::Ptr& task, const vector<Value>& arguments)
{ {
ObjectLock olock(this);
m_Callback(task, arguments); m_Callback(task, arguments);
} }
map<String, ScriptFunction::Ptr>& ScriptFunction::GetFunctions(void) map<String, ScriptFunction::Ptr> ScriptFunction::GetFunctions(void)
{
boost::mutex::scoped_lock lock(GetMutex());
return InternalGetFunctions(); /* makes a copy of the map */
}
/**
* @threadsafety Caller must hold the mutex returned by GetMutex().
*/
map<String, ScriptFunction::Ptr>& ScriptFunction::InternalGetFunctions(void)
{ {
static map<String, ScriptFunction::Ptr> functions; static map<String, ScriptFunction::Ptr> functions;
return functions; return functions;
} }
boost::mutex& ScriptFunction::GetMutex(void)
{
static boost::mutex mtx;
return mtx;
}

View File

@ -46,13 +46,16 @@ public:
void Invoke(const shared_ptr<ScriptTask>& task, const vector<Value>& arguments); void Invoke(const shared_ptr<ScriptTask>& task, const vector<Value>& arguments);
/* TODO(thread) make private */ static map<String, ScriptFunction::Ptr>& GetFunctions(void); static map<String, ScriptFunction::Ptr> GetFunctions(void);
static signals2::signal<void (const String&, const ScriptFunction::Ptr&)> OnRegistered; static signals2::signal<void (const String&, const ScriptFunction::Ptr&)> OnRegistered;
static signals2::signal<void (const String&)> OnUnregistered; static signals2::signal<void (const String&)> OnUnregistered;
private: private:
Callback m_Callback; Callback m_Callback;
static map<String, ScriptFunction::Ptr>& InternalGetFunctions(void);
static boost::mutex& GetMutex(void);
}; };
/** /**

View File

@ -33,6 +33,8 @@ ScriptInterpreter::~ScriptInterpreter(void)
void ScriptInterpreter::SubscribeFunction(const String& name) void ScriptInterpreter::SubscribeFunction(const String& name)
{ {
ObjectLock olock(this);
m_SubscribedFunctions.insert(name); m_SubscribedFunctions.insert(name);
ScriptFunction::Ptr sf = boost::make_shared<ScriptFunction>(boost::bind(&ScriptInterpreter::ProcessCall, this, _1, name, _2)); ScriptFunction::Ptr sf = boost::make_shared<ScriptFunction>(boost::bind(&ScriptInterpreter::ProcessCall, this, _1, name, _2));
@ -41,6 +43,8 @@ void ScriptInterpreter::SubscribeFunction(const String& name)
void ScriptInterpreter::UnsubscribeFunction(const String& name) void ScriptInterpreter::UnsubscribeFunction(const String& name)
{ {
ObjectLock olock(this);
m_SubscribedFunctions.erase(name); m_SubscribedFunctions.erase(name);
ScriptFunction::Unregister(name); ScriptFunction::Unregister(name);
} }

View File

@ -29,5 +29,12 @@ ScriptTask::ScriptTask(const ScriptFunction::Ptr& function,
void ScriptTask::Run(void) void ScriptTask::Run(void)
{ {
m_Function->Invoke(GetSelf(), m_Arguments); ScriptTask::Ptr self;
{
ObjectLock olock(this);
self = GetSelf();
}
m_Function->Invoke(self, m_Arguments);
} }

View File

@ -52,6 +52,8 @@ void StdioStream::Start(void)
size_t StdioStream::GetAvailableBytes(void) const size_t StdioStream::GetAvailableBytes(void) const
{ {
ObjectLock olock(this);
if (m_InnerStream->eof() && m_ReadAheadBuffer->GetAvailableBytes() == 0) if (m_InnerStream->eof() && m_ReadAheadBuffer->GetAvailableBytes() == 0)
return 0; return 0;
else else
@ -60,6 +62,8 @@ size_t StdioStream::GetAvailableBytes(void) const
size_t StdioStream::Read(void *buffer, size_t size) size_t StdioStream::Read(void *buffer, size_t size)
{ {
ObjectLock olock(this);
size_t peek_len, read_len; size_t peek_len, read_len;
peek_len = m_ReadAheadBuffer->GetAvailableBytes(); peek_len = m_ReadAheadBuffer->GetAvailableBytes();
@ -73,6 +77,8 @@ size_t StdioStream::Read(void *buffer, size_t size)
size_t StdioStream::Peek(void *buffer, size_t size) size_t StdioStream::Peek(void *buffer, size_t size)
{ {
ObjectLock olock(this);
size_t peek_len, read_len; size_t peek_len, read_len;
peek_len = m_ReadAheadBuffer->GetAvailableBytes(); peek_len = m_ReadAheadBuffer->GetAvailableBytes();
@ -87,11 +93,15 @@ size_t StdioStream::Peek(void *buffer, size_t size)
void StdioStream::Write(const void *buffer, size_t size) void StdioStream::Write(const void *buffer, size_t size)
{ {
ObjectLock olock(this);
m_InnerStream->write(static_cast<const char *>(buffer), size); m_InnerStream->write(static_cast<const char *>(buffer), size);
} }
void StdioStream::Close(void) void StdioStream::Close(void)
{ {
ObjectLock olock(this);
if (m_OwnsStream) if (m_OwnsStream)
delete m_InnerStream; delete m_InnerStream;

View File

@ -86,13 +86,12 @@ void Timer::Uninitialize(void)
* *
* @threadsafety Always. * @threadsafety Always.
*/ */
void Timer::Call(void) void Timer::Call(const Timer::Ptr& self)
{ {
OnTimerExpired(GetSelf()); self->OnTimerExpired(self);
/* Re-enable the timer so it can be called again. */ /* Re-enable the timer so it can be called again. */
m_Started = true; self->Start();
Reschedule();
} }
/** /**
@ -103,6 +102,8 @@ void Timer::Call(void)
*/ */
void Timer::SetInterval(double interval) void Timer::SetInterval(double interval)
{ {
assert(!OwnsLock());
boost::mutex::scoped_lock lock(m_Mutex); boost::mutex::scoped_lock lock(m_Mutex);
m_Interval = interval; m_Interval = interval;
} }
@ -115,6 +116,8 @@ void Timer::SetInterval(double interval)
*/ */
double Timer::GetInterval(void) const double Timer::GetInterval(void) const
{ {
assert(!OwnsLock());
boost::mutex::scoped_lock lock(m_Mutex); boost::mutex::scoped_lock lock(m_Mutex);
return m_Interval; return m_Interval;
} }
@ -126,7 +129,12 @@ double Timer::GetInterval(void) const
*/ */
void Timer::Start(void) void Timer::Start(void)
{ {
m_Started = true; assert(!OwnsLock());
{
boost::mutex::scoped_lock lock(m_Mutex);
m_Started = true;
}
Reschedule(); Reschedule();
} }
@ -138,6 +146,8 @@ void Timer::Start(void)
*/ */
void Timer::Stop(void) void Timer::Stop(void)
{ {
assert(!OwnsLock());
boost::mutex::scoped_lock lock(m_Mutex); boost::mutex::scoped_lock lock(m_Mutex);
m_Started = false; m_Started = false;
@ -156,6 +166,8 @@ void Timer::Stop(void)
*/ */
void Timer::Reschedule(double next) void Timer::Reschedule(double next)
{ {
assert(!OwnsLock());
boost::mutex::scoped_lock lock(m_Mutex); boost::mutex::scoped_lock lock(m_Mutex);
if (next < 0) { if (next < 0) {
@ -188,6 +200,8 @@ void Timer::Reschedule(double next)
*/ */
double Timer::GetNext(void) const double Timer::GetNext(void) const
{ {
assert(!OwnsLock());
boost::mutex::scoped_lock lock(m_Mutex); boost::mutex::scoped_lock lock(m_Mutex);
return m_Next; return m_Next;
} }

View File

@ -86,7 +86,7 @@ private:
static bool m_StopThread; static bool m_StopThread;
static TimerSet m_Timers; static TimerSet m_Timers;
void Call(void); static void Call(const Timer::Ptr& self);
static void TimerThreadProc(void); static void TimerThreadProc(void);

View File

@ -144,6 +144,7 @@ void ConfigItem::InternalLink(const Dictionary::Ptr& dictionary) const
BOOST_THROW_EXCEPTION(domain_error(message.str())); BOOST_THROW_EXCEPTION(domain_error(message.str()));
} }
ObjectLock olock(parent);
parent->InternalLink(dictionary); parent->InternalLink(dictionary);
} }
@ -208,10 +209,8 @@ DynamicObject::Ptr ConfigItem::Commit(const ConfigItem::Ptr& self)
m_Items[ikey] = self; m_Items[ikey] = self;
} }
if (!dobj) { if (!dobj)
ObjectLock olock(dtype);
dobj = dtype->GetObject(name); dobj = dtype->GetObject(name);
}
/* Register this item with its parents. */ /* Register this item with its parents. */
BOOST_FOREACH(const String& parentName, parents) { BOOST_FOREACH(const String& parentName, parents) {
@ -224,26 +223,36 @@ DynamicObject::Ptr ConfigItem::Commit(const ConfigItem::Ptr& self)
* DynamicObject::ApplyUpdate expects. */ * DynamicObject::ApplyUpdate expects. */
Dictionary::Ptr attrs = boost::make_shared<Dictionary>(); Dictionary::Ptr attrs = boost::make_shared<Dictionary>();
String key; double tx = DynamicObject::GetCurrentTx();
Value data;
BOOST_FOREACH(tie(key, data), properties) { {
Dictionary::Ptr attr = boost::make_shared<Dictionary>(); ObjectLock olock(properties);
attr->Set("data", data);
attr->Set("type", Attribute_Config); String key;
attr->Set("tx", DynamicObject::GetCurrentTx()); Value data;
attrs->Set(key, attr); BOOST_FOREACH(tie(key, data), properties) {
Dictionary::Ptr attr = boost::make_shared<Dictionary>();
attr->Set("data", data);
attr->Set("type", Attribute_Config);
attr->Set("tx", tx);
attr->Seal();
attrs->Set(key, attr);
}
} }
attrs->Seal();
Dictionary::Ptr update = boost::make_shared<Dictionary>(); Dictionary::Ptr update = boost::make_shared<Dictionary>();
update->Set("attrs", attrs); update->Set("attrs", attrs);
update->Set("configTx", DynamicObject::GetCurrentTx()); update->Set("configTx", DynamicObject::GetCurrentTx());
update->Seal();
/* Update or create the object and apply the configuration settings. */ /* Update or create the object and apply the configuration settings. */
bool was_null = false; bool was_null = false;
if (!dobj) { if (!dobj) {
ObjectLock dlock(dtype); dobj = DynamicType::CreateObject(dtype, update);
dobj = dtype->CreateObject(update);
was_null = true; was_null = true;
} }

View File

@ -52,14 +52,24 @@ DebugInfo ConfigType::GetDebugInfo(void) const
void ConfigType::ValidateItem(const ConfigItem::Ptr& item) const void ConfigType::ValidateItem(const ConfigItem::Ptr& item) const
{ {
Dictionary::Ptr attrs = item->Link(); String type, name;
Dictionary::Ptr attrs;
{
ObjectLock olock(item);
attrs = item->Link();
type = item->GetType();
name = item->GetName();
}
ObjectLock olock(attrs);
/* Don't validate abstract items. */ /* Don't validate abstract items. */
if (attrs->Get("__abstract")) if (attrs->Get("__abstract"))
return; return;
vector<String> locations; vector<String> locations;
locations.push_back("Object '" + item->GetName() + "' (Type: '" + item->GetType() + "')"); locations.push_back("Object '" + name + "' (Type: '" + type + "')");
ConfigType::Ptr parent; ConfigType::Ptr parent;
if (m_Parent.IsEmpty()) { if (m_Parent.IsEmpty()) {
@ -70,8 +80,10 @@ void ConfigType::ValidateItem(const ConfigItem::Ptr& item) const
} }
vector<TypeRuleList::Ptr> ruleLists; vector<TypeRuleList::Ptr> ruleLists;
if (parent) if (parent) {
ObjectLock plock(parent);
ruleLists.push_back(parent->m_RuleList); ruleLists.push_back(parent->m_RuleList);
}
ruleLists.push_back(m_RuleList); ruleLists.push_back(m_RuleList);
@ -131,15 +143,12 @@ void ConfigType::ValidateDictionary(const Dictionary::Ptr& dictionary,
ScriptTask::Ptr task = boost::make_shared<ScriptTask>(func, arguments); ScriptTask::Ptr task = boost::make_shared<ScriptTask>(func, arguments);
task->Start(); task->Start();
task->Wait(); task->GetResult();
{
ObjectLock olock(task);
task->GetResult();
}
} }
} }
ObjectLock olock(dictionary);
String key; String key;
Value value; Value value;
BOOST_FOREACH(tie(key, value), dictionary) { BOOST_FOREACH(tie(key, value), dictionary) {

View File

@ -54,6 +54,7 @@ void Expression::Execute(const Dictionary::Ptr& dictionary) const
case OperatorSet: case OperatorSet:
if (valueExprl) { if (valueExprl) {
ObjectLock olock(valueExprl);
dict = boost::make_shared<Dictionary>(); dict = boost::make_shared<Dictionary>();
valueExprl->Execute(dict); valueExprl->Execute(dict);
newValue = dict; newValue = dict;
@ -62,7 +63,10 @@ void Expression::Execute(const Dictionary::Ptr& dictionary) const
break; break;
case OperatorPlus: case OperatorPlus:
oldValue = dictionary->Get(m_Key); {
ObjectLock olock(dictionary);
oldValue = dictionary->Get(m_Key);
}
if (oldValue.IsObjectType<Dictionary>()) if (oldValue.IsObjectType<Dictionary>())
dict = oldValue; dict = oldValue;
@ -83,8 +87,13 @@ void Expression::Execute(const Dictionary::Ptr& dictionary) const
newValue = dict; newValue = dict;
if (valueExprl) { if (valueExprl) {
ObjectLock olock(valueExprl);
valueExprl->Execute(dict); valueExprl->Execute(dict);
} else if (valueDict) { } else if (valueDict) {
ObjectLock olock(valueDict);
ObjectLock dlock(dict);
String key; String key;
Value value; Value value;
BOOST_FOREACH(tie(key, value), valueDict) { BOOST_FOREACH(tie(key, value), valueDict) {
@ -103,6 +112,7 @@ void Expression::Execute(const Dictionary::Ptr& dictionary) const
BOOST_THROW_EXCEPTION(runtime_error("Not yet implemented.")); BOOST_THROW_EXCEPTION(runtime_error("Not yet implemented."));
} }
ObjectLock olock(dictionary);
if (m_Key.IsEmpty()) if (m_Key.IsEmpty())
dictionary->Add(newValue); dictionary->Add(newValue);
else else

View File

@ -21,7 +21,6 @@
using namespace icinga; using namespace icinga;
boost::mutex CIB::m_Mutex;
RingBuffer CIB::m_ActiveChecksStatistics(15 * 60); RingBuffer CIB::m_ActiveChecksStatistics(15 * 60);
RingBuffer CIB::m_PassiveChecksStatistics(15 * 60); RingBuffer CIB::m_PassiveChecksStatistics(15 * 60);
@ -30,7 +29,6 @@ RingBuffer CIB::m_PassiveChecksStatistics(15 * 60);
*/ */
void CIB::UpdateActiveChecksStatistics(long tv, int num) void CIB::UpdateActiveChecksStatistics(long tv, int num)
{ {
boost::mutex::scoped_lock lock(m_Mutex);
m_ActiveChecksStatistics.InsertValue(tv, num); m_ActiveChecksStatistics.InsertValue(tv, num);
} }
@ -39,7 +37,6 @@ void CIB::UpdateActiveChecksStatistics(long tv, int num)
*/ */
int CIB::GetActiveChecksStatistics(long timespan) int CIB::GetActiveChecksStatistics(long timespan)
{ {
boost::mutex::scoped_lock lock(m_Mutex);
return m_ActiveChecksStatistics.GetValues(timespan); return m_ActiveChecksStatistics.GetValues(timespan);
} }
@ -48,7 +45,6 @@ int CIB::GetActiveChecksStatistics(long timespan)
*/ */
void CIB::UpdatePassiveChecksStatistics(long tv, int num) void CIB::UpdatePassiveChecksStatistics(long tv, int num)
{ {
boost::mutex::scoped_lock lock(m_Mutex);
m_PassiveChecksStatistics.InsertValue(tv, num); m_PassiveChecksStatistics.InsertValue(tv, num);
} }
@ -57,6 +53,5 @@ void CIB::UpdatePassiveChecksStatistics(long tv, int num)
*/ */
int CIB::GetPassiveChecksStatistics(long timespan) int CIB::GetPassiveChecksStatistics(long timespan)
{ {
boost::mutex::scoped_lock lock(m_Mutex);
return m_PassiveChecksStatistics.GetValues(timespan); return m_PassiveChecksStatistics.GetValues(timespan);
} }

View File

@ -27,7 +27,7 @@ bool Host::m_ServicesCacheValid = true;
REGISTER_SCRIPTFUNCTION("ValidateServiceDictionary", &Host::ValidateServiceDictionary); REGISTER_SCRIPTFUNCTION("ValidateServiceDictionary", &Host::ValidateServiceDictionary);
REGISTER_TYPE(Host, NULL); REGISTER_TYPE(Host);
Host::Host(const Dictionary::Ptr& properties) Host::Host(const Dictionary::Ptr& properties)
: DynamicObject(properties) : DynamicObject(properties)
@ -146,6 +146,8 @@ bool Host::IsReachable(const Host::Ptr& self)
template<bool copyServiceAttrs, typename TDict> template<bool copyServiceAttrs, typename TDict>
static void CopyServiceAttributes(TDict serviceDesc, const ConfigItemBuilder::Ptr& builder) static void CopyServiceAttributes(TDict serviceDesc, const ConfigItemBuilder::Ptr& builder)
{ {
ObjectLock olock(serviceDesc);
/* TODO: we only need to copy macros if this is an inline definition, /* TODO: we only need to copy macros if this is an inline definition,
* i.e. "typeid(serviceDesc)" != Service, however for now we just * i.e. "typeid(serviceDesc)" != Service, however for now we just
* copy them anyway. */ * copy them anyway. */
@ -211,6 +213,7 @@ void Host::UpdateSlaveServices(const Host::Ptr& self)
} }
newServices = boost::make_shared<Dictionary>(); newServices = boost::make_shared<Dictionary>();
ObjectLock nlock(newServices);
DebugInfo debug_info; DebugInfo debug_info;
@ -220,6 +223,7 @@ void Host::UpdateSlaveServices(const Host::Ptr& self)
} }
if (serviceDescs) { if (serviceDescs) {
ObjectLock olock(serviceDescs);
String svcname; String svcname;
Value svcdesc; Value svcdesc;
BOOST_FOREACH(tie(svcname, svcdesc), serviceDescs) { BOOST_FOREACH(tie(svcname, svcdesc), serviceDescs) {
@ -244,9 +248,16 @@ void Host::UpdateSlaveServices(const Host::Ptr& self)
} else if (svcdesc.IsObjectType<Dictionary>()) { } else if (svcdesc.IsObjectType<Dictionary>()) {
Dictionary::Ptr service = svcdesc; Dictionary::Ptr service = svcdesc;
Dictionary::Ptr templates = service->Get("templates"); Dictionary::Ptr templates;
{
ObjectLock olock(service);
templates = service->Get("templates");
}
if (templates) { if (templates) {
ObjectLock olock(templates);
String tmpl; String tmpl;
BOOST_FOREACH(tie(tuples::ignore, tmpl), templates) { BOOST_FOREACH(tie(tuples::ignore, tmpl), templates) {
builder->AddParent(tmpl); builder->AddParent(tmpl);
@ -268,6 +279,8 @@ void Host::UpdateSlaveServices(const Host::Ptr& self)
} }
if (oldServices) { if (oldServices) {
ObjectLock olock(oldServices);
ConfigItem::Ptr service; ConfigItem::Ptr service;
BOOST_FOREACH(tie(tuples::ignore, service), oldServices) { BOOST_FOREACH(tie(tuples::ignore, service), oldServices) {
if (!service) if (!service)
@ -280,6 +293,7 @@ void Host::UpdateSlaveServices(const Host::Ptr& self)
newServices->Seal(); newServices->Seal();
ObjectLock olock(self);
self->Set("slave_services", newServices); self->Set("slave_services", newServices);
} }
@ -288,7 +302,14 @@ void Host::OnAttributeChanged(const String& name, const Value&)
if (name == "hostgroups") if (name == "hostgroups")
HostGroup::InvalidateMembersCache(); HostGroup::InvalidateMembersCache();
else if (name == "services") { else if (name == "services") {
UpdateSlaveServices(GetSelf()); Host::Ptr self;
{
ObjectLock olock(this);
self = GetSelf();
}
UpdateSlaveServices(self);
} else if (name == "notifications") { } else if (name == "notifications") {
set<Service::Ptr> services; set<Service::Ptr> services;
@ -388,6 +409,7 @@ void Host::ValidateServiceDictionary(const ScriptTask::Ptr& task, const vector<V
String location = arguments[0]; String location = arguments[0];
Dictionary::Ptr attrs = arguments[1]; Dictionary::Ptr attrs = arguments[1];
ObjectLock olock(attrs);
String key; String key;
Value value; Value value;
@ -399,9 +421,9 @@ void Host::ValidateServiceDictionary(const ScriptTask::Ptr& task, const vector<V
} else if (value.IsObjectType<Dictionary>()) { } else if (value.IsObjectType<Dictionary>()) {
Dictionary::Ptr serviceDesc = value; Dictionary::Ptr serviceDesc = value;
if (serviceDesc->Contains("service")) name = serviceDesc->Get("service");
name = serviceDesc->Get("service");
else if (name.IsEmpty())
name = key; name = key;
} else { } else {
continue; continue;
@ -538,6 +560,7 @@ set<Service::Ptr> Host::GetParentServices(const Host::Ptr& self)
Dictionary::Ptr Host::CalculateDynamicMacros(const Host::Ptr& self) Dictionary::Ptr Host::CalculateDynamicMacros(const Host::Ptr& self)
{ {
Dictionary::Ptr macros = boost::make_shared<Dictionary>(); Dictionary::Ptr macros = boost::make_shared<Dictionary>();
ObjectLock mlock(macros);
{ {
ObjectLock olock(self); ObjectLock olock(self);

View File

@ -25,7 +25,7 @@ boost::mutex HostGroup::m_Mutex;
map<String, vector<Host::WeakPtr> > HostGroup::m_MembersCache; map<String, vector<Host::WeakPtr> > HostGroup::m_MembersCache;
bool HostGroup::m_MembersCacheValid = true; bool HostGroup::m_MembersCacheValid = true;
REGISTER_TYPE(HostGroup, NULL); REGISTER_TYPE(HostGroup);
HostGroup::HostGroup(const Dictionary::Ptr& properties) HostGroup::HostGroup(const Dictionary::Ptr& properties)
: DynamicObject(properties) : DynamicObject(properties)

View File

@ -21,7 +21,7 @@
using namespace icinga; using namespace icinga;
REGISTER_TYPE(IcingaApplication, NULL); REGISTER_TYPE(IcingaApplication);
#ifndef _WIN32 #ifndef _WIN32
# include "icinga-version.h" # include "icinga-version.h"
@ -156,6 +156,7 @@ shared_ptr<SSL_CTX> IcingaApplication::GetSSLContext(void) const
Dictionary::Ptr IcingaApplication::CalculateDynamicMacros(const IcingaApplication::Ptr& self) Dictionary::Ptr IcingaApplication::CalculateDynamicMacros(const IcingaApplication::Ptr& self)
{ {
Dictionary::Ptr macros = boost::make_shared<Dictionary>(); Dictionary::Ptr macros = boost::make_shared<Dictionary>();
ObjectLock mlock(macros);
double now = Utility::GetTime(); double now = Utility::GetTime();

View File

@ -21,7 +21,7 @@
using namespace icinga; using namespace icinga;
REGISTER_TYPE(Notification, NULL); REGISTER_TYPE(Notification);
Notification::Notification(const Dictionary::Ptr& properties) Notification::Notification(const Dictionary::Ptr& properties)
: DynamicObject(properties) : DynamicObject(properties)
@ -76,6 +76,8 @@ set<User::Ptr> Notification::GetUsers(void) const
Dictionary::Ptr users = m_Users; Dictionary::Ptr users = m_Users;
if (users) { if (users) {
ObjectLock olock(users);
String name; String name;
BOOST_FOREACH(tie(tuples::ignore, name), users) { BOOST_FOREACH(tie(tuples::ignore, name), users) {
User::Ptr user = User::GetByName(name); User::Ptr user = User::GetByName(name);
@ -97,6 +99,8 @@ set<UserGroup::Ptr> Notification::GetGroups(void) const
Dictionary::Ptr groups = m_Groups; Dictionary::Ptr groups = m_Groups;
if (groups) { if (groups) {
ObjectLock olock(groups);
String name; String name;
BOOST_FOREACH(tie(tuples::ignore, name), groups) { BOOST_FOREACH(tie(tuples::ignore, name), groups) {
UserGroup::Ptr ug = UserGroup::GetByName(name); UserGroup::Ptr ug = UserGroup::GetByName(name);
@ -256,10 +260,7 @@ void Notification::NotificationCompletedHandler(const ScriptTask::Ptr& task)
m_Tasks.erase(task); m_Tasks.erase(task);
try { try {
{ task->GetResult();
ObjectLock tlock(task);
(void) task->GetResult();
}
Logger::Write(LogInformation, "icinga", "Completed sending notification for service '" + GetService()->GetName() + "'"); Logger::Write(LogInformation, "icinga", "Completed sending notification for service '" + GetService()->GetName() + "'");
} catch (const exception& ex) { } catch (const exception& ex) {

View File

@ -21,7 +21,7 @@
using namespace icinga; using namespace icinga;
REGISTER_TYPE(PerfdataWriter, NULL); REGISTER_TYPE(PerfdataWriter);
PerfdataWriter::PerfdataWriter(const Dictionary::Ptr& properties) PerfdataWriter::PerfdataWriter(const Dictionary::Ptr& properties)
: DynamicObject(properties) : DynamicObject(properties)
@ -49,6 +49,7 @@ void PerfdataWriter::Start(void)
{ {
ObjectLock olock(m_Endpoint); ObjectLock olock(m_Endpoint);
m_Endpoint->RegisterTopicHandler("checker::CheckResult", m_Endpoint->RegisterTopicHandler("checker::CheckResult",
boost::bind(&PerfdataWriter::CheckResultRequestHandler, this, _3)); boost::bind(&PerfdataWriter::CheckResultRequestHandler, this, _3));
} }

View File

@ -59,10 +59,8 @@ void PluginCheckTask::ProcessFinishedHandler(PluginCheckTask ct)
ProcessResult pr; ProcessResult pr;
try { try {
ObjectLock olock(ct.m_Process);
pr = ct.m_Process->GetResult(); pr = ct.m_Process->GetResult();
} catch (...) { } catch (...) {
ObjectLock olock(ct.m_Task);
ct.m_Task->FinishException(boost::current_exception()); ct.m_Task->FinishException(boost::current_exception());
return; return;
@ -76,7 +74,6 @@ void PluginCheckTask::ProcessFinishedHandler(PluginCheckTask ct)
result->Set("execution_start", pr.ExecutionStart); result->Set("execution_start", pr.ExecutionStart);
result->Set("execution_end", pr.ExecutionEnd); result->Set("execution_end", pr.ExecutionEnd);
ObjectLock olock(ct.m_Task);
ct.m_Task->FinishResult(result); ct.m_Task->FinishResult(result);
} }

View File

@ -79,7 +79,6 @@ void PluginNotificationTask::ProcessFinishedHandler(PluginNotificationTask ct)
try { try {
{ {
ObjectLock tlock(ct.m_Process);
pr = ct.m_Process->GetResult(); pr = ct.m_Process->GetResult();
} }
@ -91,15 +90,9 @@ void PluginNotificationTask::ProcessFinishedHandler(PluginNotificationTask ct)
Logger::Write(LogWarning, "icinga", msgbuf.str()); Logger::Write(LogWarning, "icinga", msgbuf.str());
} }
{ ct.m_Task->FinishResult(Empty);
ObjectLock olock(ct.m_Task);
ct.m_Task->FinishResult(Empty);
}
} catch (...) { } catch (...) {
{ ct.m_Task->FinishException(boost::current_exception());
ObjectLock olock(ct.m_Task);
ct.m_Task->FinishException(boost::current_exception());
}
return; return;
} }

View File

@ -407,8 +407,12 @@ void Service::BeginExecuteCheck(const Service::Ptr& self, const function<void (v
/* keep track of scheduling info in case the check type doesn't provide its own information */ /* keep track of scheduling info in case the check type doesn't provide its own information */
Dictionary::Ptr checkInfo = boost::make_shared<Dictionary>(); Dictionary::Ptr checkInfo = boost::make_shared<Dictionary>();
checkInfo->Set("schedule_start", self->GetNextCheck());
checkInfo->Set("execution_start", Utility::GetTime()); {
ObjectLock olock(checkInfo);
checkInfo->Set("schedule_start", self->GetNextCheck());
checkInfo->Set("execution_start", Utility::GetTime());
}
vector<Dictionary::Ptr> macroDicts; vector<Dictionary::Ptr> macroDicts;
macroDicts.push_back(self->GetMacros()); macroDicts.push_back(self->GetMacros());
@ -460,16 +464,12 @@ void Service::CheckCompletedHandler(const Dictionary::Ptr& checkInfo,
{ {
checkInfo->Set("execution_end", Utility::GetTime()); checkInfo->Set("execution_end", Utility::GetTime());
checkInfo->Set("schedule_end", Utility::GetTime()); checkInfo->Set("schedule_end", Utility::GetTime());
checkInfo->Seal();
Dictionary::Ptr result; Dictionary::Ptr result;
try { try {
Value vresult; Value vresult = task->GetResult();
{
ObjectLock tlock(task);
vresult = task->GetResult();
}
if (vresult.IsObjectType<Dictionary>()) if (vresult.IsObjectType<Dictionary>())
result = vresult; result = vresult;
@ -511,6 +511,8 @@ void Service::CheckCompletedHandler(const Dictionary::Ptr& checkInfo,
result->Set("current_checker", em->GetIdentity()); result->Set("current_checker", em->GetIdentity());
} }
result->Seal();
} }
{ {

View File

@ -158,6 +158,8 @@ set<Notification::Ptr> Service::GetNotifications(const Service::Ptr& self)
template<typename TDict> template<typename TDict>
static void CopyNotificationAttributes(TDict notificationDesc, const ConfigItemBuilder::Ptr& builder) static void CopyNotificationAttributes(TDict notificationDesc, const ConfigItemBuilder::Ptr& builder)
{ {
ObjectLock olock(notificationDesc);
/* TODO: we only need to copy macros if this is an inline definition, /* TODO: we only need to copy macros if this is an inline definition,
* i.e. "typeid(notificationDesc)" != Notification, however for now we just * i.e. "typeid(notificationDesc)" != Notification, however for now we just
* copy them anyway. */ * copy them anyway. */
@ -214,6 +216,8 @@ void Service::UpdateSlaveNotifications(const Service::Ptr& self)
Dictionary::Ptr newNotifications; Dictionary::Ptr newNotifications;
newNotifications = boost::make_shared<Dictionary>(); newNotifications = boost::make_shared<Dictionary>();
ObjectLock nlock(newNotifications);
String host_name; String host_name;
{ {
@ -256,6 +260,8 @@ void Service::UpdateSlaveNotifications(const Service::Ptr& self)
Dictionary::Ptr templates = notification->Get("templates"); Dictionary::Ptr templates = notification->Get("templates");
if (templates) { if (templates) {
ObjectLock tlock(templates);
String tmpl; String tmpl;
BOOST_FOREACH(tie(tuples::ignore, tmpl), templates) { BOOST_FOREACH(tie(tuples::ignore, tmpl), templates) {
builder->AddParent(tmpl); builder->AddParent(tmpl);
@ -277,6 +283,8 @@ void Service::UpdateSlaveNotifications(const Service::Ptr& self)
} }
if (oldNotifications) { if (oldNotifications) {
ObjectLock olock(oldNotifications);
ConfigItem::Ptr notification; ConfigItem::Ptr notification;
BOOST_FOREACH(tie(tuples::ignore, notification), oldNotifications) { BOOST_FOREACH(tie(tuples::ignore, notification), oldNotifications) {
if (!notification) if (!notification)

View File

@ -21,7 +21,7 @@
using namespace icinga; using namespace icinga;
REGISTER_TYPE(Service, NULL); REGISTER_TYPE(Service);
Service::Service(const Dictionary::Ptr& serializedObject) Service::Service(const Dictionary::Ptr& serializedObject)
: DynamicObject(serializedObject) : DynamicObject(serializedObject)
@ -274,28 +274,39 @@ void Service::ClearAcknowledgement(void)
void Service::OnAttributeChanged(const String& name, const Value& oldValue) void Service::OnAttributeChanged(const String& name, const Value& oldValue)
{ {
Service::Ptr self;
String service_name;
bool abstract;
{
ObjectLock olock(this);
self = GetSelf();
service_name = GetName();
abstract = IsAbstract();
}
if (name == "current_checker") if (name == "current_checker")
OnCheckerChanged(GetSelf(), oldValue); OnCheckerChanged(self, oldValue);
else if (name == "next_check") else if (name == "next_check")
OnNextCheckChanged(GetSelf(), oldValue); OnNextCheckChanged(self, oldValue);
else if (name == "servicegroups") else if (name == "servicegroups")
ServiceGroup::InvalidateMembersCache(); ServiceGroup::InvalidateMembersCache();
else if (name == "host_name" || name == "short_name") { else if (name == "host_name" || name == "short_name") {
Host::InvalidateServicesCache(); Host::InvalidateServicesCache();
UpdateSlaveNotifications(GetSelf()); UpdateSlaveNotifications(self);
} else if (name == "downtimes") } else if (name == "downtimes")
Service::InvalidateDowntimesCache(); Service::InvalidateDowntimesCache();
else if (name == "comments") else if (name == "comments")
Service::InvalidateCommentsCache(); Service::InvalidateCommentsCache();
else if (name == "notifications") else if (name == "notifications")
UpdateSlaveNotifications(GetSelf()); UpdateSlaveNotifications(self);
else if (name == "check_interval") { else if (name == "check_interval") {
ObjectLock(this); ObjectLock olock(this);
ConfigItem::Ptr item = ConfigItem::GetObject("Service", GetName()); ConfigItem::Ptr item = ConfigItem::GetObject("Service", service_name);
/* update the next check timestamp if we're the owner of this service */ /* update the next check timestamp if we're the owner of this service */
if (item && !IsAbstract()) if (item && !abstract)
UpdateNextCheck(); UpdateNextCheck();
} }
} }
@ -366,6 +377,7 @@ set<Service::Ptr> Service::GetParentServices(const Service::Ptr& self)
Dictionary::Ptr Service::CalculateDynamicMacros(const Service::Ptr& self) Dictionary::Ptr Service::CalculateDynamicMacros(const Service::Ptr& self)
{ {
Dictionary::Ptr macros = boost::make_shared<Dictionary>(); Dictionary::Ptr macros = boost::make_shared<Dictionary>();
ObjectLock mlock(macros);
Dictionary::Ptr cr; Dictionary::Ptr cr;

View File

@ -25,7 +25,7 @@ boost::mutex ServiceGroup::m_Mutex;
map<String, vector<Service::WeakPtr> > ServiceGroup::m_MembersCache; map<String, vector<Service::WeakPtr> > ServiceGroup::m_MembersCache;
bool ServiceGroup::m_MembersCacheValid = true; bool ServiceGroup::m_MembersCacheValid = true;
REGISTER_TYPE(ServiceGroup, NULL); REGISTER_TYPE(ServiceGroup);
ServiceGroup::ServiceGroup(const Dictionary::Ptr& properties) ServiceGroup::ServiceGroup(const Dictionary::Ptr& properties)
: DynamicObject(properties) : DynamicObject(properties)

View File

@ -21,7 +21,7 @@
using namespace icinga; using namespace icinga;
REGISTER_TYPE(User, NULL); REGISTER_TYPE(User);
User::User(const Dictionary::Ptr& properties) User::User(const Dictionary::Ptr& properties)
: DynamicObject(properties) : DynamicObject(properties)

View File

@ -25,7 +25,7 @@ boost::mutex UserGroup::m_Mutex;
map<String, vector<User::WeakPtr> > UserGroup::m_MembersCache; map<String, vector<User::WeakPtr> > UserGroup::m_MembersCache;
bool UserGroup::m_MembersCacheValid = true; bool UserGroup::m_MembersCacheValid = true;
REGISTER_TYPE(UserGroup, NULL); REGISTER_TYPE(UserGroup);
UserGroup::UserGroup(const Dictionary::Ptr& properties) UserGroup::UserGroup(const Dictionary::Ptr& properties)
: DynamicObject(properties) : DynamicObject(properties)

View File

@ -21,7 +21,7 @@
using namespace icinga; using namespace icinga;
REGISTER_TYPE(Endpoint, NULL); REGISTER_TYPE(Endpoint);
signals2::signal<void (const Endpoint::Ptr&)> Endpoint::OnConnected; signals2::signal<void (const Endpoint::Ptr&)> Endpoint::OnConnected;
signals2::signal<void (const Endpoint::Ptr&)> Endpoint::OnDisconnected; signals2::signal<void (const Endpoint::Ptr&)> Endpoint::OnDisconnected;
@ -128,8 +128,11 @@ void Endpoint::RegisterSubscription(const String& topic)
if (!subscriptions) if (!subscriptions)
subscriptions = boost::make_shared<Dictionary>(); subscriptions = boost::make_shared<Dictionary>();
ObjectLock olock(subscriptions);
if (!subscriptions->Contains(topic)) { if (!subscriptions->Contains(topic)) {
Dictionary::Ptr newSubscriptions = subscriptions->ShallowClone(); Dictionary::Ptr newSubscriptions = subscriptions->ShallowClone();
ObjectLock nlock(newSubscriptions);
newSubscriptions->Set(topic, topic); newSubscriptions->Set(topic, topic);
SetSubscriptions(newSubscriptions); SetSubscriptions(newSubscriptions);
} }
@ -144,8 +147,14 @@ void Endpoint::UnregisterSubscription(const String& topic)
{ {
Dictionary::Ptr subscriptions = GetSubscriptions(); Dictionary::Ptr subscriptions = GetSubscriptions();
if (subscriptions && subscriptions->Contains(topic)) { if (!subscriptions)
return;
ObjectLock olock(subscriptions);
if (subscriptions->Contains(topic)) {
Dictionary::Ptr newSubscriptions = subscriptions->ShallowClone(); Dictionary::Ptr newSubscriptions = subscriptions->ShallowClone();
ObjectLock nlock(newSubscriptions);
newSubscriptions->Remove(topic); newSubscriptions->Remove(topic);
SetSubscriptions(newSubscriptions); SetSubscriptions(newSubscriptions);
} }
@ -222,6 +231,9 @@ void Endpoint::OnAttributeChanged(const String& name, const Value& oldValue)
newSubscriptions = GetSubscriptions(); newSubscriptions = GetSubscriptions();
ObjectLock olock(oldSubscriptions);
ObjectLock nlock(newSubscriptions);
if (oldSubscriptions) { if (oldSubscriptions) {
String subscription; String subscription;
BOOST_FOREACH(tie(tuples::ignore, subscription), oldSubscriptions) { BOOST_FOREACH(tie(tuples::ignore, subscription), oldSubscriptions) {

View File

@ -331,7 +331,11 @@ void EndpointManager::SubscriptionTimerHandler(void)
if (!endpoint->IsLocalEndpoint() || endpoint == m_Endpoint) if (!endpoint->IsLocalEndpoint() || endpoint == m_Endpoint)
continue; continue;
if (endpoint->GetSubscriptions()) { Dictionary::Ptr endpointSubscriptions = endpoint->GetSubscriptions();
if (endpointSubscriptions) {
ObjectLock olock(endpointSubscriptions);
String topic; String topic;
BOOST_FOREACH(tie(tuples::ignore, topic), endpoint->GetSubscriptions()) { BOOST_FOREACH(tie(tuples::ignore, topic), endpoint->GetSubscriptions()) {
subscriptions->Set(topic, topic); subscriptions->Set(topic, topic);