mirror of
https://github.com/Icinga/icinga2.git
synced 2025-07-25 22:54:57 +02:00
Avoid mutex contention in the config parser
This commit is contained in:
parent
b30944246f
commit
d9010c7b9f
@ -60,6 +60,40 @@ String WorkQueue::GetName() const
|
|||||||
return m_Name;
|
return m_Name;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boost::mutex::scoped_lock WorkQueue::AcquireLock()
|
||||||
|
{
|
||||||
|
return boost::mutex::scoped_lock(m_Mutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Enqueues a task. Tasks are guaranteed to be executed in the order
|
||||||
|
* they were enqueued in except if there is more than one worker thread.
|
||||||
|
*/
|
||||||
|
void WorkQueue::EnqueueUnlocked(boost::mutex::scoped_lock& lock, std::function<void ()>&& function, WorkQueuePriority priority)
|
||||||
|
{
|
||||||
|
if (!m_Spawned) {
|
||||||
|
Log(LogNotice, "WorkQueue")
|
||||||
|
<< "Spawning WorkQueue threads for '" << m_Name << "'";
|
||||||
|
|
||||||
|
for (int i = 0; i < m_ThreadCount; i++) {
|
||||||
|
m_Threads.create_thread(std::bind(&WorkQueue::WorkerThreadProc, this));
|
||||||
|
}
|
||||||
|
|
||||||
|
m_Spawned = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool wq_thread = IsWorkerThread();
|
||||||
|
|
||||||
|
if (!wq_thread) {
|
||||||
|
while (m_Tasks.size() >= m_MaxItems && m_MaxItems != 0)
|
||||||
|
m_CVFull.wait(lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
m_Tasks.emplace(std::move(function), priority, ++m_NextTaskID);
|
||||||
|
|
||||||
|
m_CVEmpty.notify_one();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Enqueues a task. Tasks are guaranteed to be executed in the order
|
* Enqueues a task. Tasks are guaranteed to be executed in the order
|
||||||
* they were enqueued in except if there is more than one worker thread or when
|
* they were enqueued in except if there is more than one worker thread or when
|
||||||
@ -77,27 +111,8 @@ void WorkQueue::Enqueue(std::function<void ()>&& function, WorkQueuePriority pri
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
boost::mutex::scoped_lock lock(m_Mutex);
|
auto lock = AcquireLock();
|
||||||
|
EnqueueUnlocked(lock, std::move(function), priority);
|
||||||
if (!m_Spawned) {
|
|
||||||
Log(LogNotice, "WorkQueue")
|
|
||||||
<< "Spawning WorkQueue threads for '" << m_Name << "'";
|
|
||||||
|
|
||||||
for (int i = 0; i < m_ThreadCount; i++) {
|
|
||||||
m_Threads.create_thread(std::bind(&WorkQueue::WorkerThreadProc, this));
|
|
||||||
}
|
|
||||||
|
|
||||||
m_Spawned = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!wq_thread) {
|
|
||||||
while (m_Tasks.size() >= m_MaxItems && m_MaxItems != 0)
|
|
||||||
m_CVFull.wait(lock);
|
|
||||||
}
|
|
||||||
|
|
||||||
m_Tasks.emplace(std::move(function), priority, ++m_NextTaskID);
|
|
||||||
|
|
||||||
m_CVEmpty.notify_one();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -231,6 +246,25 @@ void WorkQueue::StatusTimerHandler()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void WorkQueue::RunTaskFunction(const TaskFunction& func)
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
func();
|
||||||
|
} catch (const std::exception&) {
|
||||||
|
boost::exception_ptr eptr = boost::current_exception();
|
||||||
|
|
||||||
|
{
|
||||||
|
boost::mutex::scoped_lock mutex(m_Mutex);
|
||||||
|
|
||||||
|
if (!m_ExceptionCallback)
|
||||||
|
m_Exceptions.push_back(eptr);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (m_ExceptionCallback)
|
||||||
|
m_ExceptionCallback(eptr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void WorkQueue::WorkerThreadProc()
|
void WorkQueue::WorkerThreadProc()
|
||||||
{
|
{
|
||||||
std::ostringstream idbuf;
|
std::ostringstream idbuf;
|
||||||
@ -258,19 +292,7 @@ void WorkQueue::WorkerThreadProc()
|
|||||||
|
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
|
|
||||||
try {
|
RunTaskFunction(task.Function);
|
||||||
task.Function();
|
|
||||||
} catch (const std::exception&) {
|
|
||||||
lock.lock();
|
|
||||||
|
|
||||||
if (!m_ExceptionCallback)
|
|
||||||
m_Exceptions.push_back(boost::current_exception());
|
|
||||||
|
|
||||||
lock.unlock();
|
|
||||||
|
|
||||||
if (m_ExceptionCallback)
|
|
||||||
m_ExceptionCallback(boost::current_exception());
|
|
||||||
}
|
|
||||||
|
|
||||||
/* clear the task so whatever other resources it holds are released _before_ we re-acquire the mutex */
|
/* clear the task so whatever other resources it holds are released _before_ we re-acquire the mutex */
|
||||||
task = Task();
|
task = Task();
|
||||||
|
@ -41,15 +41,17 @@ enum WorkQueuePriority
|
|||||||
PriorityHigh
|
PriorityHigh
|
||||||
};
|
};
|
||||||
|
|
||||||
|
using TaskFunction = std::function<void ()>;
|
||||||
|
|
||||||
struct Task
|
struct Task
|
||||||
{
|
{
|
||||||
Task() = default;
|
Task() = default;
|
||||||
|
|
||||||
Task(std::function<void (void)> function, WorkQueuePriority priority, int id)
|
Task(TaskFunction function, WorkQueuePriority priority, int id)
|
||||||
: Function(std::move(function)), Priority(priority), ID(id)
|
: Function(std::move(function)), Priority(priority), ID(id)
|
||||||
{ }
|
{ }
|
||||||
|
|
||||||
std::function<void (void)> Function;
|
TaskFunction Function;
|
||||||
WorkQueuePriority Priority{PriorityNormal};
|
WorkQueuePriority Priority{PriorityNormal};
|
||||||
int ID{-1};
|
int ID{-1};
|
||||||
};
|
};
|
||||||
@ -72,10 +74,42 @@ public:
|
|||||||
void SetName(const String& name);
|
void SetName(const String& name);
|
||||||
String GetName() const;
|
String GetName() const;
|
||||||
|
|
||||||
void Enqueue(std::function<void (void)>&& function, WorkQueuePriority priority = PriorityNormal,
|
boost::mutex::scoped_lock AcquireLock();
|
||||||
|
void EnqueueUnlocked(boost::mutex::scoped_lock& lock, TaskFunction&& function, WorkQueuePriority priority = PriorityNormal);
|
||||||
|
void Enqueue(TaskFunction&& function, WorkQueuePriority priority = PriorityNormal,
|
||||||
bool allowInterleaved = false);
|
bool allowInterleaved = false);
|
||||||
void Join(bool stop = false);
|
void Join(bool stop = false);
|
||||||
|
|
||||||
|
template<typename VectorType, typename FuncType>
|
||||||
|
void ParallelFor(const VectorType& items, const FuncType& func)
|
||||||
|
{
|
||||||
|
using SizeType = decltype(items.size());
|
||||||
|
|
||||||
|
SizeType totalCount = items.size();
|
||||||
|
|
||||||
|
auto lock = AcquireLock();
|
||||||
|
|
||||||
|
SizeType offset = 0;
|
||||||
|
|
||||||
|
for (int i = 0; i < m_ThreadCount; i++) {
|
||||||
|
SizeType count = totalCount / static_cast<SizeType>(m_ThreadCount);
|
||||||
|
if (static_cast<SizeType>(i) < totalCount % static_cast<SizeType>(m_ThreadCount))
|
||||||
|
count++;
|
||||||
|
|
||||||
|
EnqueueUnlocked(lock, [&items, func, offset, count, this]() {
|
||||||
|
for (SizeType j = offset; j < offset + count; j++) {
|
||||||
|
RunTaskFunction([&func, &items, j]() {
|
||||||
|
func(items[j]);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
offset += count;
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(offset == items.size());
|
||||||
|
}
|
||||||
|
|
||||||
bool IsWorkerThread() const;
|
bool IsWorkerThread() const;
|
||||||
|
|
||||||
size_t GetLength() const;
|
size_t GetLength() const;
|
||||||
@ -119,6 +153,8 @@ private:
|
|||||||
|
|
||||||
void WorkerThreadProc();
|
void WorkerThreadProc();
|
||||||
void StatusTimerHandler();
|
void StatusTimerHandler();
|
||||||
|
|
||||||
|
void RunTaskFunction(const TaskFunction& func);
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -428,12 +428,12 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue
|
|||||||
if (items.empty())
|
if (items.empty())
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
for (const ItemPair& ip : items) {
|
for (const auto& ip : items)
|
||||||
newItems.push_back(ip.first);
|
newItems.push_back(ip.first);
|
||||||
upq.Enqueue([&]() {
|
|
||||||
ip.first->Commit(ip.second);
|
upq.ParallelFor(items, [](const ItemPair& ip) {
|
||||||
});
|
ip.first->Commit(ip.second);
|
||||||
}
|
});
|
||||||
|
|
||||||
upq.Join();
|
upq.Join();
|
||||||
|
|
||||||
@ -468,36 +468,29 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue
|
|||||||
if (unresolved_dep)
|
if (unresolved_dep)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
for (const ItemPair& ip : items) {
|
upq.ParallelFor(items, [&type](const ItemPair& ip) {
|
||||||
const ConfigItem::Ptr& item = ip.first;
|
const ConfigItem::Ptr& item = ip.first;
|
||||||
|
|
||||||
if (!item->m_Object)
|
if (!item->m_Object || item->m_Type != type)
|
||||||
continue;
|
return;
|
||||||
|
|
||||||
if (item->m_Type == type) {
|
try {
|
||||||
upq.Enqueue([&]() {
|
item->m_Object->OnAllConfigLoaded();
|
||||||
try {
|
} catch (const std::exception& ex) {
|
||||||
item->m_Object->OnAllConfigLoaded();
|
if (!item->m_IgnoreOnError)
|
||||||
} catch (const std::exception& ex) {
|
throw;
|
||||||
if (item->m_IgnoreOnError) {
|
|
||||||
Log(LogNotice, "ConfigObject")
|
|
||||||
<< "Ignoring config object '" << item->m_Name << "' of type '" << item->m_Type->GetName() << "' due to errors: " << DiagnosticInformation(ex);
|
|
||||||
|
|
||||||
item->Unregister();
|
Log(LogNotice, "ConfigObject")
|
||||||
|
<< "Ignoring config object '" << item->m_Name << "' of type '" << item->m_Type->GetName() << "' due to errors: " << DiagnosticInformation(ex);
|
||||||
|
|
||||||
{
|
item->Unregister();
|
||||||
boost::mutex::scoped_lock lock(item->m_Mutex);
|
|
||||||
item->m_IgnoredItems.push_back(item->m_DebugInfo.Path);
|
|
||||||
}
|
|
||||||
|
|
||||||
return;
|
{
|
||||||
}
|
boost::mutex::scoped_lock lock(item->m_Mutex);
|
||||||
|
item->m_IgnoredItems.push_back(item->m_DebugInfo.Path);
|
||||||
throw;
|
}
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
|
|
||||||
completed_types.insert(type);
|
completed_types.insert(type);
|
||||||
|
|
||||||
@ -507,19 +500,15 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue
|
|||||||
return false;
|
return false;
|
||||||
|
|
||||||
for (const String& loadDep : type->GetLoadDependencies()) {
|
for (const String& loadDep : type->GetLoadDependencies()) {
|
||||||
for (const ItemPair& ip : items) {
|
upq.ParallelFor(items, [loadDep, &type](const ItemPair& ip) {
|
||||||
const ConfigItem::Ptr& item = ip.first;
|
const ConfigItem::Ptr& item = ip.first;
|
||||||
|
|
||||||
if (!item->m_Object)
|
if (!item->m_Object || item->m_Type->GetName() != loadDep)
|
||||||
continue;
|
return;
|
||||||
|
|
||||||
if (item->m_Type->GetName() == loadDep) {
|
ActivationScope ascope(item->m_ActivationContext);
|
||||||
upq.Enqueue([&]() {
|
item->m_Object->CreateChildObjects(type);
|
||||||
ActivationScope ascope(item->m_ActivationContext);
|
});
|
||||||
item->m_Object->CreateChildObjects(type);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
upq.Join();
|
upq.Join();
|
||||||
@ -606,14 +595,8 @@ bool ConfigItem::ActivateItems(WorkQueue& upq, const std::vector<ConfigItem::Ptr
|
|||||||
Log(LogDebug, "ConfigItem")
|
Log(LogDebug, "ConfigItem")
|
||||||
<< "Setting 'active' to true for object '" << object->GetName() << "' of type '" << object->GetReflectionType()->GetName() << "'";
|
<< "Setting 'active' to true for object '" << object->GetName() << "' of type '" << object->GetReflectionType()->GetName() << "'";
|
||||||
#endif /* I2_DEBUG */
|
#endif /* I2_DEBUG */
|
||||||
upq.Enqueue(std::bind(&ConfigObject::PreActivate, object));
|
|
||||||
}
|
|
||||||
|
|
||||||
upq.Join();
|
object->PreActivate();
|
||||||
|
|
||||||
if (upq.HasExceptions()) {
|
|
||||||
upq.ReportExceptions("ConfigItem");
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!silent)
|
if (!silent)
|
||||||
@ -629,7 +612,8 @@ bool ConfigItem::ActivateItems(WorkQueue& upq, const std::vector<ConfigItem::Ptr
|
|||||||
Log(LogDebug, "ConfigItem")
|
Log(LogDebug, "ConfigItem")
|
||||||
<< "Activating object '" << object->GetName() << "' of type '" << object->GetReflectionType()->GetName() << "'";
|
<< "Activating object '" << object->GetName() << "' of type '" << object->GetReflectionType()->GetName() << "'";
|
||||||
#endif /* I2_DEBUG */
|
#endif /* I2_DEBUG */
|
||||||
upq.Enqueue(std::bind(&ConfigObject::Activate, object, runtimeCreated));
|
|
||||||
|
object->Activate(runtimeCreated);
|
||||||
}
|
}
|
||||||
|
|
||||||
upq.Join();
|
upq.Join();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user