mirror of
https://github.com/Icinga/icinga2.git
synced 2025-04-07 20:25:08 +02:00
ConfigItem::CommitNewItems(): allow fast search of pending items by type
This commit is contained in:
parent
33e609d791
commit
ae693cb7e1
@ -26,6 +26,7 @@
|
||||
#include <fstream>
|
||||
#include <algorithm>
|
||||
#include <random>
|
||||
#include <unordered_map>
|
||||
|
||||
using namespace icinga;
|
||||
|
||||
@ -387,12 +388,15 @@ ConfigItem::Ptr ConfigItem::GetByTypeAndName(const Type::Ptr& type, const String
|
||||
bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue& upq, std::vector<ConfigItem::Ptr>& newItems)
|
||||
{
|
||||
typedef std::pair<ConfigItem::Ptr, bool> ItemPair;
|
||||
std::vector<ItemPair> items;
|
||||
std::unordered_map<Type*, std::vector<ItemPair>> itemsByType;
|
||||
std::vector<ItemPair>::size_type total = 0;
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(m_Mutex);
|
||||
|
||||
for (const TypeMap::value_type& kv : m_Items) {
|
||||
std::vector<ItemPair> items;
|
||||
|
||||
for (const ItemMap::value_type& kv2 : kv.second) {
|
||||
if (kv2.second->m_Abstract || kv2.second->m_Object)
|
||||
continue;
|
||||
@ -402,6 +406,11 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue
|
||||
|
||||
items.emplace_back(kv2.second, false);
|
||||
}
|
||||
|
||||
if (!items.empty()) {
|
||||
total += items.size();
|
||||
itemsByType.emplace(kv.first.get(), std::move(items));
|
||||
}
|
||||
}
|
||||
|
||||
ItemList newUnnamedItems;
|
||||
@ -415,22 +424,25 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue
|
||||
if (item->m_Abstract || item->m_Object)
|
||||
continue;
|
||||
|
||||
items.emplace_back(item, true);
|
||||
itemsByType[item->m_Type.get()].emplace_back(item, true);
|
||||
++total;
|
||||
}
|
||||
|
||||
m_UnnamedItems.swap(newUnnamedItems);
|
||||
}
|
||||
|
||||
if (items.empty())
|
||||
if (!total)
|
||||
return true;
|
||||
|
||||
// Shuffle all items to evenly distribute them over the threads of the workqueue. This increases perfomance
|
||||
// noticably in environments with lots of objects and available threads.
|
||||
std::shuffle(std::begin(items), std::end(items), std::default_random_engine {});
|
||||
for (auto& kv : itemsByType) {
|
||||
std::shuffle(std::begin(kv.second), std::end(kv.second), std::default_random_engine{});
|
||||
}
|
||||
|
||||
#ifdef I2_DEBUG
|
||||
Log(LogDebug, "configitem")
|
||||
<< "Committing " << items.size() << " new items.";
|
||||
<< "Committing " << total << " new items.";
|
||||
#endif /* I2_DEBUG */
|
||||
|
||||
std::set<Type::Ptr> types;
|
||||
@ -463,27 +475,30 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue
|
||||
std::atomic<int> committed_items(0);
|
||||
std::mutex newItemsMutex;
|
||||
|
||||
upq.ParallelFor(items, [&type, &committed_items, &newItems, &newItemsMutex](const ItemPair& ip) {
|
||||
const ConfigItem::Ptr& item = ip.first;
|
||||
{
|
||||
auto items (itemsByType.find(type.get()));
|
||||
|
||||
if (item->m_Type != type)
|
||||
return;
|
||||
if (items != itemsByType.end()) {
|
||||
upq.ParallelFor(items->second, [&committed_items, &newItems, &newItemsMutex](const ItemPair& ip) {
|
||||
const ConfigItem::Ptr& item = ip.first;
|
||||
|
||||
if (!item->Commit(ip.second)) {
|
||||
if (item->IsIgnoreOnError()) {
|
||||
item->Unregister();
|
||||
}
|
||||
if (!item->Commit(ip.second)) {
|
||||
if (item->IsIgnoreOnError()) {
|
||||
item->Unregister();
|
||||
}
|
||||
|
||||
return;
|
||||
return;
|
||||
}
|
||||
|
||||
committed_items++;
|
||||
|
||||
std::unique_lock<std::mutex> lock(newItemsMutex);
|
||||
newItems.emplace_back(item);
|
||||
});
|
||||
|
||||
upq.Join();
|
||||
}
|
||||
|
||||
committed_items++;
|
||||
|
||||
std::unique_lock<std::mutex> lock(newItemsMutex);
|
||||
newItems.emplace_back(item);
|
||||
});
|
||||
|
||||
upq.Join();
|
||||
}
|
||||
|
||||
itemsCount += committed_items;
|
||||
|
||||
@ -526,35 +541,42 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue
|
||||
continue;
|
||||
|
||||
std::atomic<int> notified_items(0);
|
||||
upq.ParallelFor(items, [&type, ¬ified_items](const ItemPair& ip) {
|
||||
const ConfigItem::Ptr& item = ip.first;
|
||||
|
||||
if (!item->m_Object || item->m_Type != type)
|
||||
return;
|
||||
{
|
||||
auto items (itemsByType.find(type.get()));
|
||||
|
||||
try {
|
||||
item->m_Object->OnAllConfigLoaded();
|
||||
notified_items++;
|
||||
} catch (const std::exception& ex) {
|
||||
if (!item->m_IgnoreOnError)
|
||||
throw;
|
||||
if (items != itemsByType.end()) {
|
||||
upq.ParallelFor(items->second, [¬ified_items](const ItemPair& ip) {
|
||||
const ConfigItem::Ptr& item = ip.first;
|
||||
|
||||
Log(LogNotice, "ConfigObject")
|
||||
<< "Ignoring config object '" << item->m_Name << "' of type '" << item->m_Type->GetName() << "' due to errors: " << DiagnosticInformation(ex);
|
||||
if (!item->m_Object)
|
||||
return;
|
||||
|
||||
item->Unregister();
|
||||
try {
|
||||
item->m_Object->OnAllConfigLoaded();
|
||||
notified_items++;
|
||||
} catch (const std::exception& ex) {
|
||||
if (!item->m_IgnoreOnError)
|
||||
throw;
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(item->m_Mutex);
|
||||
item->m_IgnoredItems.push_back(item->m_DebugInfo.Path);
|
||||
}
|
||||
Log(LogNotice, "ConfigObject")
|
||||
<< "Ignoring config object '" << item->m_Name << "' of type '" << item->m_Type->GetName() << "' due to errors: " << DiagnosticInformation(ex);
|
||||
|
||||
item->Unregister();
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(item->m_Mutex);
|
||||
item->m_IgnoredItems.push_back(item->m_DebugInfo.Path);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
upq.Join();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
completed_types.insert(type);
|
||||
|
||||
upq.Join();
|
||||
|
||||
#ifdef I2_DEBUG
|
||||
if (notified_items > 0)
|
||||
Log(LogDebug, "configitem")
|
||||
@ -566,16 +588,20 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue
|
||||
|
||||
notified_items = 0;
|
||||
for (auto loadDep : type->GetLoadDependencies()) {
|
||||
upq.ParallelFor(items, [loadDep, &type, ¬ified_items](const ItemPair& ip) {
|
||||
const ConfigItem::Ptr& item = ip.first;
|
||||
auto items (itemsByType.find(loadDep));
|
||||
|
||||
if (!item->m_Object || item->m_Type.get() != loadDep)
|
||||
return;
|
||||
if (items != itemsByType.end()) {
|
||||
upq.ParallelFor(items->second, [&type, ¬ified_items](const ItemPair& ip) {
|
||||
const ConfigItem::Ptr& item = ip.first;
|
||||
|
||||
ActivationScope ascope(item->m_ActivationContext);
|
||||
item->m_Object->CreateChildObjects(type);
|
||||
notified_items++;
|
||||
});
|
||||
if (!item->m_Object)
|
||||
return;
|
||||
|
||||
ActivationScope ascope(item->m_ActivationContext);
|
||||
item->m_Object->CreateChildObjects(type);
|
||||
notified_items++;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
upq.Join();
|
||||
|
Loading…
x
Reference in New Issue
Block a user