Merge pull request #5955 from Icinga/feature/workqueue-lock-contention

Avoid mutex contention in the config parser
This commit is contained in:
Michael Friedrich 2018-01-08 17:11:43 +01:00 committed by GitHub
commit 17edb17784
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 125 additions and 83 deletions

View File

@ -60,6 +60,40 @@ String WorkQueue::GetName() const
return m_Name;
}
boost::mutex::scoped_lock WorkQueue::AcquireLock()
{
return boost::mutex::scoped_lock(m_Mutex);
}
/**
* Enqueues a task. Tasks are guaranteed to be executed in the order
* they were enqueued in except if there is more than one worker thread.
*/
void WorkQueue::EnqueueUnlocked(boost::mutex::scoped_lock& lock, std::function<void ()>&& function, WorkQueuePriority priority)
{
if (!m_Spawned) {
Log(LogNotice, "WorkQueue")
<< "Spawning WorkQueue threads for '" << m_Name << "'";
for (int i = 0; i < m_ThreadCount; i++) {
m_Threads.create_thread(std::bind(&WorkQueue::WorkerThreadProc, this));
}
m_Spawned = true;
}
bool wq_thread = IsWorkerThread();
if (!wq_thread) {
while (m_Tasks.size() >= m_MaxItems && m_MaxItems != 0)
m_CVFull.wait(lock);
}
m_Tasks.emplace(std::move(function), priority, ++m_NextTaskID);
m_CVEmpty.notify_one();
}
/**
* Enqueues a task. Tasks are guaranteed to be executed in the order
* they were enqueued in except if there is more than one worker thread or when
@ -77,27 +111,8 @@ void WorkQueue::Enqueue(std::function<void ()>&& function, WorkQueuePriority pri
return;
}
boost::mutex::scoped_lock lock(m_Mutex);
if (!m_Spawned) {
Log(LogNotice, "WorkQueue")
<< "Spawning WorkQueue threads for '" << m_Name << "'";
for (int i = 0; i < m_ThreadCount; i++) {
m_Threads.create_thread(std::bind(&WorkQueue::WorkerThreadProc, this));
}
m_Spawned = true;
}
if (!wq_thread) {
while (m_Tasks.size() >= m_MaxItems && m_MaxItems != 0)
m_CVFull.wait(lock);
}
m_Tasks.emplace(std::move(function), priority, ++m_NextTaskID);
m_CVEmpty.notify_one();
auto lock = AcquireLock();
EnqueueUnlocked(lock, std::move(function), priority);
}
/**
@ -231,6 +246,25 @@ void WorkQueue::StatusTimerHandler()
}
}
void WorkQueue::RunTaskFunction(const TaskFunction& func)
{
try {
func();
} catch (const std::exception&) {
boost::exception_ptr eptr = boost::current_exception();
{
boost::mutex::scoped_lock mutex(m_Mutex);
if (!m_ExceptionCallback)
m_Exceptions.push_back(eptr);
}
if (m_ExceptionCallback)
m_ExceptionCallback(eptr);
}
}
void WorkQueue::WorkerThreadProc()
{
std::ostringstream idbuf;
@ -258,19 +292,7 @@ void WorkQueue::WorkerThreadProc()
lock.unlock();
try {
task.Function();
} catch (const std::exception&) {
lock.lock();
if (!m_ExceptionCallback)
m_Exceptions.push_back(boost::current_exception());
lock.unlock();
if (m_ExceptionCallback)
m_ExceptionCallback(boost::current_exception());
}
RunTaskFunction(task.Function);
/* clear the task so whatever other resources it holds are released _before_ we re-acquire the mutex */
task = Task();

View File

@ -41,15 +41,17 @@ enum WorkQueuePriority
PriorityHigh
};
using TaskFunction = std::function<void ()>;
struct Task
{
Task() = default;
Task(std::function<void (void)> function, WorkQueuePriority priority, int id)
Task(TaskFunction function, WorkQueuePriority priority, int id)
: Function(std::move(function)), Priority(priority), ID(id)
{ }
std::function<void (void)> Function;
TaskFunction Function;
WorkQueuePriority Priority{PriorityNormal};
int ID{-1};
};
@ -72,10 +74,42 @@ public:
void SetName(const String& name);
String GetName() const;
void Enqueue(std::function<void (void)>&& function, WorkQueuePriority priority = PriorityNormal,
boost::mutex::scoped_lock AcquireLock();
void EnqueueUnlocked(boost::mutex::scoped_lock& lock, TaskFunction&& function, WorkQueuePriority priority = PriorityNormal);
void Enqueue(TaskFunction&& function, WorkQueuePriority priority = PriorityNormal,
bool allowInterleaved = false);
void Join(bool stop = false);
template<typename VectorType, typename FuncType>
void ParallelFor(const VectorType& items, const FuncType& func)
{
using SizeType = decltype(items.size());
SizeType totalCount = items.size();
auto lock = AcquireLock();
SizeType offset = 0;
for (int i = 0; i < m_ThreadCount; i++) {
SizeType count = totalCount / static_cast<SizeType>(m_ThreadCount);
if (static_cast<SizeType>(i) < totalCount % static_cast<SizeType>(m_ThreadCount))
count++;
EnqueueUnlocked(lock, [&items, func, offset, count, this]() {
for (SizeType j = offset; j < offset + count; j++) {
RunTaskFunction([&func, &items, j]() {
func(items[j]);
});
}
});
offset += count;
}
ASSERT(offset == items.size());
}
bool IsWorkerThread() const;
size_t GetLength() const;
@ -119,6 +153,8 @@ private:
void WorkerThreadProc();
void StatusTimerHandler();
void RunTaskFunction(const TaskFunction& func);
};
}

View File

@ -428,12 +428,12 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue
if (items.empty())
return true;
for (const ItemPair& ip : items) {
for (const auto& ip : items)
newItems.push_back(ip.first);
upq.Enqueue([&]() {
ip.first->Commit(ip.second);
});
}
upq.ParallelFor(items, [](const ItemPair& ip) {
ip.first->Commit(ip.second);
});
upq.Join();
@ -468,36 +468,29 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue
if (unresolved_dep)
continue;
for (const ItemPair& ip : items) {
upq.ParallelFor(items, [&type](const ItemPair& ip) {
const ConfigItem::Ptr& item = ip.first;
if (!item->m_Object)
continue;
if (!item->m_Object || item->m_Type != type)
return;
if (item->m_Type == type) {
upq.Enqueue([&]() {
try {
item->m_Object->OnAllConfigLoaded();
} catch (const std::exception& ex) {
if (item->m_IgnoreOnError) {
Log(LogNotice, "ConfigObject")
<< "Ignoring config object '" << item->m_Name << "' of type '" << item->m_Type->GetName() << "' due to errors: " << DiagnosticInformation(ex);
try {
item->m_Object->OnAllConfigLoaded();
} catch (const std::exception& ex) {
if (!item->m_IgnoreOnError)
throw;
item->Unregister();
Log(LogNotice, "ConfigObject")
<< "Ignoring config object '" << item->m_Name << "' of type '" << item->m_Type->GetName() << "' due to errors: " << DiagnosticInformation(ex);
{
boost::mutex::scoped_lock lock(item->m_Mutex);
item->m_IgnoredItems.push_back(item->m_DebugInfo.Path);
}
item->Unregister();
return;
}
throw;
}
});
{
boost::mutex::scoped_lock lock(item->m_Mutex);
item->m_IgnoredItems.push_back(item->m_DebugInfo.Path);
}
}
}
});
completed_types.insert(type);
@ -507,19 +500,15 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue
return false;
for (const String& loadDep : type->GetLoadDependencies()) {
for (const ItemPair& ip : items) {
upq.ParallelFor(items, [loadDep, &type](const ItemPair& ip) {
const ConfigItem::Ptr& item = ip.first;
if (!item->m_Object)
continue;
if (!item->m_Object || item->m_Type->GetName() != loadDep)
return;
if (item->m_Type->GetName() == loadDep) {
upq.Enqueue([&]() {
ActivationScope ascope(item->m_ActivationContext);
item->m_Object->CreateChildObjects(type);
});
}
}
ActivationScope ascope(item->m_ActivationContext);
item->m_Object->CreateChildObjects(type);
});
}
upq.Join();
@ -606,14 +595,8 @@ bool ConfigItem::ActivateItems(WorkQueue& upq, const std::vector<ConfigItem::Ptr
Log(LogDebug, "ConfigItem")
<< "Setting 'active' to true for object '" << object->GetName() << "' of type '" << object->GetReflectionType()->GetName() << "'";
#endif /* I2_DEBUG */
upq.Enqueue(std::bind(&ConfigObject::PreActivate, object));
}
upq.Join();
if (upq.HasExceptions()) {
upq.ReportExceptions("ConfigItem");
return false;
object->PreActivate();
}
if (!silent)
@ -629,7 +612,8 @@ bool ConfigItem::ActivateItems(WorkQueue& upq, const std::vector<ConfigItem::Ptr
Log(LogDebug, "ConfigItem")
<< "Activating object '" << object->GetName() << "' of type '" << object->GetReflectionType()->GetName() << "'";
#endif /* I2_DEBUG */
upq.Enqueue(std::bind(&ConfigObject::Activate, object, runtimeCreated));
object->Activate(runtimeCreated);
}
upq.Join();