Fine-grained locks (WIP, Part 5).

This commit is contained in:
Gunnar Beutner 2013-02-19 12:17:31 +01:00
parent ecc95b3dc0
commit 1daeb8c010
11 changed files with 81 additions and 35 deletions

View File

@ -160,7 +160,7 @@ void ReplicationComponent::LocalObjectUnregisteredHandler(const DynamicObject::P
MakeObjectMessage(object, "config::ObjectRemoved", 0, false));
}
void ReplicationComponent::TransactionClosingHandler(const set<DynamicObject *>& modifiedObjects)
void ReplicationComponent::TransactionClosingHandler(const set<DynamicObject::WeakPtr>& modifiedObjects)
{
if (modifiedObjects.empty())
return;
@ -169,11 +169,14 @@ void ReplicationComponent::TransactionClosingHandler(const set<DynamicObject *>&
msgbuf << "Sending " << modifiedObjects.size() << " replication updates.";
Logger::Write(LogDebug, "replication", msgbuf.str());
BOOST_FOREACH(DynamicObject *robject, modifiedObjects) {
DynamicObject::Ptr object = robject->GetSelf();
BOOST_FOREACH(const DynamicObject::WeakPtr& wobject, modifiedObjects) {
DynamicObject::Ptr object = wobject.lock();
if (!object)
continue;
if (!ShouldReplicateObject(object))
continue;
continue;
RequestMessage request = MakeObjectMessage(object, "config::ObjectUpdate", DynamicObject::GetCurrentTx(), true);
EndpointManager::GetInstance()->SendMulticastMessage(m_Endpoint, request);

View File

@ -41,7 +41,7 @@ private:
void LocalObjectRegisteredHandler(const DynamicObject::Ptr& object);
void LocalObjectUnregisteredHandler(const DynamicObject::Ptr& object);
void TransactionClosingHandler(const set<DynamicObject *>& modifiedObjects);
void TransactionClosingHandler(const set<DynamicObject::WeakPtr>& modifiedObjects);
void RemoteObjectUpdateHandler(const RequestMessage& request);
void RemoteObjectRemovedHandler(const RequestMessage& request);

View File

@ -86,6 +86,7 @@ public:
*/
TResult GetResult(void)
{
boost::mutex::scoped_lock lock(m_Mutex);
if (!m_Finished)
BOOST_THROW_EXCEPTION(runtime_error("GetResult called on an unfinished AsyncTask"));
@ -109,6 +110,7 @@ public:
*/
void FinishException(const boost::exception_ptr& ex)
{
boost::mutex::scoped_lock lock(m_Mutex);
m_Exception = ex;
FinishInternal();
}
@ -120,6 +122,7 @@ public:
*/
void FinishResult(const TResult& result)
{
boost::mutex::scoped_lock lock(m_Mutex);
m_Result = result;
FinishInternal();
}
@ -146,20 +149,19 @@ private:
/**
* Finishes the task and causes the completion callback to be invoked. This
* function must be called before the object is destroyed.
*
* @threadsafety Caller must hold m_Mutex.
*/
void FinishInternal(void)
{
{
boost::mutex::scoped_lock lock(m_Mutex);
assert(!m_Finished);
assert(!m_Finished);
m_Finished = true;
m_Finished = true;
m_CV.notify_all();
}
m_CV.notify_all();
if (!m_CompletionCallback.empty()) {
m_CompletionCallback(GetSelf());
Utility::QueueAsyncCallback(boost::bind(m_CompletionCallback, GetSelf()));
/* Clear callback because the bound function might hold a
* reference to this task. */

View File

@ -22,14 +22,14 @@
using namespace icinga;
double DynamicObject::m_CurrentTx = 0;
set<DynamicObject *> DynamicObject::m_ModifiedObjects;
set<DynamicObject::WeakPtr> DynamicObject::m_ModifiedObjects;
boost::mutex DynamicObject::m_TransactionMutex;
boost::once_flag DynamicObject::m_TransactionOnce;
Timer::Ptr DynamicObject::m_TransactionTimer;
signals2::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnRegistered;
signals2::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnUnregistered;
signals2::signal<void (double, const set<DynamicObject *>&)> DynamicObject::OnTransactionClosing;
signals2::signal<void (double, const set<DynamicObject::WeakPtr>&)> DynamicObject::OnTransactionClosing;
DynamicObject::DynamicObject(const Dictionary::Ptr& serializedObject)
: m_Events(false), m_ConfigTx(0)
@ -63,10 +63,7 @@ DynamicObject::DynamicObject(const Dictionary::Ptr& serializedObject)
* @threadsafety Always.
*/
DynamicObject::~DynamicObject(void)
{
boost::mutex::scoped_lock lock(m_TransactionMutex);
m_ModifiedObjects.erase(this);
}
{ }
void DynamicObject::Initialize(void)
{
@ -82,15 +79,19 @@ void DynamicObject::Initialize(void)
*/
void DynamicObject::SendLocalUpdateEvents(void)
{
map<String, Value, string_iless> attrs;
{
ObjectLock olock(this);
attrs.swap(m_ModifiedAttributes);
}
/* Check if it's safe to send events. */
if (GetEvents()) {
map<String, Value, string_iless>::iterator it;
for (it = m_ModifiedAttributes.begin(); it != m_ModifiedAttributes.end(); it++) {
for (it = attrs.begin(); it != attrs.end(); it++)
OnAttributeChanged(it->first, it->second);
}
}
m_ModifiedAttributes.clear();
}
Dictionary::Ptr DynamicObject::BuildUpdate(double sinceTx, int attributeTypes) const
@ -235,9 +236,19 @@ void DynamicObject::InternalSetAttribute(const String& name, const Value& data,
if (tt.first->second.Type & Attribute_Config)
m_ConfigTx = tx;
{
DynamicObject::Ptr self;
try {
self = GetSelf();
} catch (const boost::bad_weak_ptr& ex) {
/* We're being called from the constructor. Ignore
* the exception. The OnInitCompleted() function
* will take care of adding this object to the list
* of modified objects. */
}
if (self) {
boost::mutex::scoped_lock lock(m_TransactionMutex);
m_ModifiedObjects.insert(this);
m_ModifiedObjects.insert(self);
}
/* Use insert() rather than [] so we don't overwrite
@ -527,7 +538,7 @@ double DynamicObject::GetCurrentTx(void)
void DynamicObject::NewTx(void)
{
double tx;
set<DynamicObject *> objects;
set<DynamicObject::WeakPtr> objects;
{
boost::mutex::scoped_lock lock(m_TransactionMutex);
@ -537,8 +548,12 @@ void DynamicObject::NewTx(void)
m_CurrentTx = Utility::GetTime();
}
BOOST_FOREACH(DynamicObject *object, objects) {
ObjectLock olock(object);
BOOST_FOREACH(const DynamicObject::WeakPtr& wobject, objects) {
DynamicObject::Ptr object = wobject.lock();
if (!object)
continue;
object->SendLocalUpdateEvents();
}
@ -546,7 +561,13 @@ void DynamicObject::NewTx(void)
}
void DynamicObject::OnInitCompleted(void)
{ }
{
/* Add this new object to the list of modified objects.
* We're doing this here because we can't construct
* a while WeakPtr from within the object's constructor. */
boost::mutex::scoped_lock lock(m_TransactionMutex);
m_ModifiedObjects.insert(GetSelf());
}
void DynamicObject::OnAttributeChanged(const String&, const Value&)
{ }

View File

@ -98,7 +98,7 @@ public:
static signals2::signal<void (const DynamicObject::Ptr&)> OnRegistered;
static signals2::signal<void (const DynamicObject::Ptr&)> OnUnregistered;
static signals2::signal<void (double, const set<DynamicObject *>&)> OnTransactionClosing;
static signals2::signal<void (double, const set<DynamicObject::WeakPtr>&)> OnTransactionClosing;
ScriptTask::Ptr MakeMethodTask(const String& method,
const vector<Value>& arguments);
@ -153,7 +153,7 @@ private:
/* This has to be a set of raw pointers because the DynamicObject
* constructor has to be able to insert objects into this list. */
static set<DynamicObject *> m_ModifiedObjects;
static set<DynamicObject::WeakPtr> m_ModifiedObjects;
static boost::mutex m_TransactionMutex;
static boost::once_flag m_TransactionOnce;
static Timer::Ptr m_TransactionTimer;

View File

@ -27,12 +27,14 @@ using namespace icinga;
EventQueue::EventQueue(void)
: m_Stopped(false)
{
int cpus = thread::hardware_concurrency();
int thread_count = thread::hardware_concurrency();
if (cpus < 4)
cpus = 4;
if (thread_count < 4)
thread_count = 4;
for (int i = 0; i < cpus; i++)
thread_count *= 8;
for (int i = 0; i < thread_count; i++)
m_Threads.create_thread(boost::bind(&EventQueue::QueueThreadProc, this));
}

View File

@ -34,6 +34,8 @@ Script::Script(const Dictionary::Ptr& properties)
void Script::OnInitCompleted(void)
{
DynamicObject::OnInitCompleted();
SpawnInterpreter();
}

View File

@ -21,6 +21,8 @@
using namespace icinga;
boost::mutex StreamLogger::m_Mutex;
/**
* Constructor for the StreamLogger class.
*/
@ -94,6 +96,7 @@ void StreamLogger::ProcessLogEntry(ostream& stream, bool tty, const LogEntry& en
}
}
boost::mutex::scoped_lock lock(m_Mutex);
stream << "[" << timestamp << "] "
<< Logger::SeverityToString(entry.Severity) << "/" << entry.Facility << ": "
<< entry.Message;

View File

@ -47,6 +47,7 @@ protected:
virtual void ProcessLogEntry(const LogEntry& entry);
private:
static boost::mutex m_Mutex;
ostream *m_Stream;
bool m_OwnsStream;
bool m_Tty;

View File

@ -138,13 +138,23 @@ bool Host::IsReachable(void)
bool Host::IsInDowntime(void) const
{
Service::Ptr service = GetHostCheckService();
if (!service)
return false;
ObjectLock olock(service);
return (service || service->IsInDowntime());
}
bool Host::IsUp(void) const
{
Service::Ptr service = GetHostCheckService();
return (!service || service->GetState() == StateOK || service->GetState() == StateWarning);
if (!service)
return true;
ObjectLock olock(service);
return (service->GetState() == StateOK || service->GetState() == StateWarning);
}
template<bool copyServiceAttrs, typename TDict>

View File

@ -222,6 +222,8 @@ void Service::SetAcknowledgementExpiry(double timestamp)
void Service::OnAttributeChanged(const String& name, const Value& oldValue)
{
ObjectLock olock(this);
if (name == "checker")
OnCheckerChanged(GetSelf(), oldValue);
else if (name == "next_check")