diff --git a/lib/config/configitem.cpp b/lib/config/configitem.cpp index cd23cd858..8beaa4463 100644 --- a/lib/config/configitem.cpp +++ b/lib/config/configitem.cpp @@ -26,6 +26,7 @@ #include #include #include +#include using namespace icinga; @@ -387,12 +388,15 @@ ConfigItem::Ptr ConfigItem::GetByTypeAndName(const Type::Ptr& type, const String bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue& upq, std::vector& newItems) { typedef std::pair ItemPair; - std::vector items; + std::unordered_map> itemsByType; + std::vector::size_type total = 0; { std::unique_lock lock(m_Mutex); for (const TypeMap::value_type& kv : m_Items) { + std::vector items; + for (const ItemMap::value_type& kv2 : kv.second) { if (kv2.second->m_Abstract || kv2.second->m_Object) continue; @@ -402,6 +406,11 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue items.emplace_back(kv2.second, false); } + + if (!items.empty()) { + total += items.size(); + itemsByType.emplace(kv.first.get(), std::move(items)); + } } ItemList newUnnamedItems; @@ -415,22 +424,25 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue if (item->m_Abstract || item->m_Object) continue; - items.emplace_back(item, true); + itemsByType[item->m_Type.get()].emplace_back(item, true); + ++total; } m_UnnamedItems.swap(newUnnamedItems); } - if (items.empty()) + if (!total) return true; // Shuffle all items to evenly distribute them over the threads of the workqueue. This increases perfomance // noticably in environments with lots of objects and available threads. - std::shuffle(std::begin(items), std::end(items), std::default_random_engine {}); + for (auto& kv : itemsByType) { + std::shuffle(std::begin(kv.second), std::end(kv.second), std::default_random_engine{}); + } #ifdef I2_DEBUG Log(LogDebug, "configitem") - << "Committing " << items.size() << " new items."; + << "Committing " << total << " new items."; #endif /* I2_DEBUG */ std::set types; @@ -463,27 +475,30 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue std::atomic committed_items(0); std::mutex newItemsMutex; - upq.ParallelFor(items, [&type, &committed_items, &newItems, &newItemsMutex](const ItemPair& ip) { - const ConfigItem::Ptr& item = ip.first; + { + auto items (itemsByType.find(type.get())); - if (item->m_Type != type) - return; + if (items != itemsByType.end()) { + upq.ParallelFor(items->second, [&committed_items, &newItems, &newItemsMutex](const ItemPair& ip) { + const ConfigItem::Ptr& item = ip.first; - if (!item->Commit(ip.second)) { - if (item->IsIgnoreOnError()) { - item->Unregister(); - } + if (!item->Commit(ip.second)) { + if (item->IsIgnoreOnError()) { + item->Unregister(); + } - return; + return; + } + + committed_items++; + + std::unique_lock lock(newItemsMutex); + newItems.emplace_back(item); + }); + + upq.Join(); } - - committed_items++; - - std::unique_lock lock(newItemsMutex); - newItems.emplace_back(item); - }); - - upq.Join(); + } itemsCount += committed_items; @@ -526,35 +541,42 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue continue; std::atomic notified_items(0); - upq.ParallelFor(items, [&type, ¬ified_items](const ItemPair& ip) { - const ConfigItem::Ptr& item = ip.first; - if (!item->m_Object || item->m_Type != type) - return; + { + auto items (itemsByType.find(type.get())); - try { - item->m_Object->OnAllConfigLoaded(); - notified_items++; - } catch (const std::exception& ex) { - if (!item->m_IgnoreOnError) - throw; + if (items != itemsByType.end()) { + upq.ParallelFor(items->second, [¬ified_items](const ItemPair& ip) { + const ConfigItem::Ptr& item = ip.first; - Log(LogNotice, "ConfigObject") - << "Ignoring config object '" << item->m_Name << "' of type '" << item->m_Type->GetName() << "' due to errors: " << DiagnosticInformation(ex); + if (!item->m_Object) + return; - item->Unregister(); + try { + item->m_Object->OnAllConfigLoaded(); + notified_items++; + } catch (const std::exception& ex) { + if (!item->m_IgnoreOnError) + throw; - { - std::unique_lock lock(item->m_Mutex); - item->m_IgnoredItems.push_back(item->m_DebugInfo.Path); - } + Log(LogNotice, "ConfigObject") + << "Ignoring config object '" << item->m_Name << "' of type '" << item->m_Type->GetName() << "' due to errors: " << DiagnosticInformation(ex); + + item->Unregister(); + + { + std::unique_lock lock(item->m_Mutex); + item->m_IgnoredItems.push_back(item->m_DebugInfo.Path); + } + } + }); + + upq.Join(); } - }); + } completed_types.insert(type); - upq.Join(); - #ifdef I2_DEBUG if (notified_items > 0) Log(LogDebug, "configitem") @@ -566,16 +588,20 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue notified_items = 0; for (auto loadDep : type->GetLoadDependencies()) { - upq.ParallelFor(items, [loadDep, &type, ¬ified_items](const ItemPair& ip) { - const ConfigItem::Ptr& item = ip.first; + auto items (itemsByType.find(loadDep)); - if (!item->m_Object || item->m_Type.get() != loadDep) - return; + if (items != itemsByType.end()) { + upq.ParallelFor(items->second, [&type, ¬ified_items](const ItemPair& ip) { + const ConfigItem::Ptr& item = ip.first; - ActivationScope ascope(item->m_ActivationContext); - item->m_Object->CreateChildObjects(type); - notified_items++; - }); + if (!item->m_Object) + return; + + ActivationScope ascope(item->m_ActivationContext); + item->m_Object->CreateChildObjects(type); + notified_items++; + }); + } } upq.Join();