diff --git a/lib/base/workqueue.cpp b/lib/base/workqueue.cpp index a18c9818b..e55eab2ad 100644 --- a/lib/base/workqueue.cpp +++ b/lib/base/workqueue.cpp @@ -60,6 +60,40 @@ String WorkQueue::GetName() const return m_Name; } +boost::mutex::scoped_lock WorkQueue::AcquireLock() +{ + return boost::mutex::scoped_lock(m_Mutex); +} + +/** + * Enqueues a task. Tasks are guaranteed to be executed in the order + * they were enqueued in except if there is more than one worker thread. + */ +void WorkQueue::EnqueueUnlocked(boost::mutex::scoped_lock& lock, std::function&& function, WorkQueuePriority priority) +{ + if (!m_Spawned) { + Log(LogNotice, "WorkQueue") + << "Spawning WorkQueue threads for '" << m_Name << "'"; + + for (int i = 0; i < m_ThreadCount; i++) { + m_Threads.create_thread(std::bind(&WorkQueue::WorkerThreadProc, this)); + } + + m_Spawned = true; + } + + bool wq_thread = IsWorkerThread(); + + if (!wq_thread) { + while (m_Tasks.size() >= m_MaxItems && m_MaxItems != 0) + m_CVFull.wait(lock); + } + + m_Tasks.emplace(std::move(function), priority, ++m_NextTaskID); + + m_CVEmpty.notify_one(); +} + /** * Enqueues a task. Tasks are guaranteed to be executed in the order * they were enqueued in except if there is more than one worker thread or when @@ -77,27 +111,8 @@ void WorkQueue::Enqueue(std::function&& function, WorkQueuePriority pri return; } - boost::mutex::scoped_lock lock(m_Mutex); - - if (!m_Spawned) { - Log(LogNotice, "WorkQueue") - << "Spawning WorkQueue threads for '" << m_Name << "'"; - - for (int i = 0; i < m_ThreadCount; i++) { - m_Threads.create_thread(std::bind(&WorkQueue::WorkerThreadProc, this)); - } - - m_Spawned = true; - } - - if (!wq_thread) { - while (m_Tasks.size() >= m_MaxItems && m_MaxItems != 0) - m_CVFull.wait(lock); - } - - m_Tasks.emplace(std::move(function), priority, ++m_NextTaskID); - - m_CVEmpty.notify_one(); + auto lock = AcquireLock(); + EnqueueUnlocked(lock, std::move(function), priority); } /** @@ -231,6 +246,25 @@ void WorkQueue::StatusTimerHandler() } } +void WorkQueue::RunTaskFunction(const TaskFunction& func) +{ + try { + func(); + } catch (const std::exception&) { + boost::exception_ptr eptr = boost::current_exception(); + + { + boost::mutex::scoped_lock mutex(m_Mutex); + + if (!m_ExceptionCallback) + m_Exceptions.push_back(eptr); + } + + if (m_ExceptionCallback) + m_ExceptionCallback(eptr); + } +} + void WorkQueue::WorkerThreadProc() { std::ostringstream idbuf; @@ -258,19 +292,7 @@ void WorkQueue::WorkerThreadProc() lock.unlock(); - try { - task.Function(); - } catch (const std::exception&) { - lock.lock(); - - if (!m_ExceptionCallback) - m_Exceptions.push_back(boost::current_exception()); - - lock.unlock(); - - if (m_ExceptionCallback) - m_ExceptionCallback(boost::current_exception()); - } + RunTaskFunction(task.Function); /* clear the task so whatever other resources it holds are released _before_ we re-acquire the mutex */ task = Task(); diff --git a/lib/base/workqueue.hpp b/lib/base/workqueue.hpp index 14d0dd5fb..cdb6c437b 100644 --- a/lib/base/workqueue.hpp +++ b/lib/base/workqueue.hpp @@ -41,15 +41,17 @@ enum WorkQueuePriority PriorityHigh }; +using TaskFunction = std::function; + struct Task { Task() = default; - Task(std::function function, WorkQueuePriority priority, int id) + Task(TaskFunction function, WorkQueuePriority priority, int id) : Function(std::move(function)), Priority(priority), ID(id) { } - std::function Function; + TaskFunction Function; WorkQueuePriority Priority{PriorityNormal}; int ID{-1}; }; @@ -72,10 +74,42 @@ public: void SetName(const String& name); String GetName() const; - void Enqueue(std::function&& function, WorkQueuePriority priority = PriorityNormal, + boost::mutex::scoped_lock AcquireLock(); + void EnqueueUnlocked(boost::mutex::scoped_lock& lock, TaskFunction&& function, WorkQueuePriority priority = PriorityNormal); + void Enqueue(TaskFunction&& function, WorkQueuePriority priority = PriorityNormal, bool allowInterleaved = false); void Join(bool stop = false); + template + void ParallelFor(const VectorType& items, const FuncType& func) + { + using SizeType = decltype(items.size()); + + SizeType totalCount = items.size(); + + auto lock = AcquireLock(); + + SizeType offset = 0; + + for (int i = 0; i < m_ThreadCount; i++) { + SizeType count = totalCount / static_cast(m_ThreadCount); + if (static_cast(i) < totalCount % static_cast(m_ThreadCount)) + count++; + + EnqueueUnlocked(lock, [&items, func, offset, count, this]() { + for (SizeType j = offset; j < offset + count; j++) { + RunTaskFunction([&func, &items, j]() { + func(items[j]); + }); + } + }); + + offset += count; + } + + ASSERT(offset == items.size()); + } + bool IsWorkerThread() const; size_t GetLength() const; @@ -119,6 +153,8 @@ private: void WorkerThreadProc(); void StatusTimerHandler(); + + void RunTaskFunction(const TaskFunction& func); }; } diff --git a/lib/config/configitem.cpp b/lib/config/configitem.cpp index 87037e92f..e1b9da781 100644 --- a/lib/config/configitem.cpp +++ b/lib/config/configitem.cpp @@ -428,12 +428,12 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue if (items.empty()) return true; - for (const ItemPair& ip : items) { + for (const auto& ip : items) newItems.push_back(ip.first); - upq.Enqueue([&]() { - ip.first->Commit(ip.second); - }); - } + + upq.ParallelFor(items, [](const ItemPair& ip) { + ip.first->Commit(ip.second); + }); upq.Join(); @@ -468,36 +468,29 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue if (unresolved_dep) continue; - for (const ItemPair& ip : items) { + upq.ParallelFor(items, [&type](const ItemPair& ip) { const ConfigItem::Ptr& item = ip.first; - if (!item->m_Object) - continue; + if (!item->m_Object || item->m_Type != type) + return; - if (item->m_Type == type) { - upq.Enqueue([&]() { - try { - item->m_Object->OnAllConfigLoaded(); - } catch (const std::exception& ex) { - if (item->m_IgnoreOnError) { - Log(LogNotice, "ConfigObject") - << "Ignoring config object '" << item->m_Name << "' of type '" << item->m_Type->GetName() << "' due to errors: " << DiagnosticInformation(ex); + try { + item->m_Object->OnAllConfigLoaded(); + } catch (const std::exception& ex) { + if (!item->m_IgnoreOnError) + throw; - item->Unregister(); + Log(LogNotice, "ConfigObject") + << "Ignoring config object '" << item->m_Name << "' of type '" << item->m_Type->GetName() << "' due to errors: " << DiagnosticInformation(ex); - { - boost::mutex::scoped_lock lock(item->m_Mutex); - item->m_IgnoredItems.push_back(item->m_DebugInfo.Path); - } + item->Unregister(); - return; - } - - throw; - } - }); + { + boost::mutex::scoped_lock lock(item->m_Mutex); + item->m_IgnoredItems.push_back(item->m_DebugInfo.Path); + } } - } + }); completed_types.insert(type); @@ -507,19 +500,15 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue return false; for (const String& loadDep : type->GetLoadDependencies()) { - for (const ItemPair& ip : items) { + upq.ParallelFor(items, [loadDep, &type](const ItemPair& ip) { const ConfigItem::Ptr& item = ip.first; - if (!item->m_Object) - continue; + if (!item->m_Object || item->m_Type->GetName() != loadDep) + return; - if (item->m_Type->GetName() == loadDep) { - upq.Enqueue([&]() { - ActivationScope ascope(item->m_ActivationContext); - item->m_Object->CreateChildObjects(type); - }); - } - } + ActivationScope ascope(item->m_ActivationContext); + item->m_Object->CreateChildObjects(type); + }); } upq.Join(); @@ -606,14 +595,8 @@ bool ConfigItem::ActivateItems(WorkQueue& upq, const std::vectorGetName() << "' of type '" << object->GetReflectionType()->GetName() << "'"; #endif /* I2_DEBUG */ - upq.Enqueue(std::bind(&ConfigObject::PreActivate, object)); - } - upq.Join(); - - if (upq.HasExceptions()) { - upq.ReportExceptions("ConfigItem"); - return false; + object->PreActivate(); } if (!silent) @@ -629,7 +612,8 @@ bool ConfigItem::ActivateItems(WorkQueue& upq, const std::vectorGetName() << "' of type '" << object->GetReflectionType()->GetName() << "'"; #endif /* I2_DEBUG */ - upq.Enqueue(std::bind(&ConfigObject::Activate, object, runtimeCreated)); + + object->Activate(runtimeCreated); } upq.Join();