mirror of
https://github.com/Icinga/icinga2.git
synced 2025-07-22 13:14:32 +02:00
Merge pull request #9577 from Icinga/ConfigItem-CommitNewItems
ConfigItem::CommitNewItems(): allow fast search of pending items by type
This commit is contained in:
commit
dd99a5ace9
@ -136,9 +136,10 @@ Value Type::GetField(int id) const
|
|||||||
BOOST_THROW_EXCEPTION(std::runtime_error("Invalid field ID."));
|
BOOST_THROW_EXCEPTION(std::runtime_error("Invalid field ID."));
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<String> Type::GetLoadDependencies() const
|
const std::unordered_set<Type*>& Type::GetLoadDependencies() const
|
||||||
{
|
{
|
||||||
return std::vector<String>();
|
static const std::unordered_set<Type*> noDeps;
|
||||||
|
return noDeps;
|
||||||
}
|
}
|
||||||
|
|
||||||
int Type::GetActivationPriority() const
|
int Type::GetActivationPriority() const
|
||||||
|
@ -7,6 +7,7 @@
|
|||||||
#include "base/string.hpp"
|
#include "base/string.hpp"
|
||||||
#include "base/object.hpp"
|
#include "base/object.hpp"
|
||||||
#include "base/initialize.hpp"
|
#include "base/initialize.hpp"
|
||||||
|
#include <unordered_set>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
namespace icinga
|
namespace icinga
|
||||||
@ -85,7 +86,7 @@ public:
|
|||||||
void SetField(int id, const Value& value, bool suppress_events = false, const Value& cookie = Empty) override;
|
void SetField(int id, const Value& value, bool suppress_events = false, const Value& cookie = Empty) override;
|
||||||
Value GetField(int id) const override;
|
Value GetField(int id) const override;
|
||||||
|
|
||||||
virtual std::vector<String> GetLoadDependencies() const;
|
virtual const std::unordered_set<Type*>& GetLoadDependencies() const;
|
||||||
virtual int GetActivationPriority() const;
|
virtual int GetActivationPriority() const;
|
||||||
|
|
||||||
typedef std::function<void (const Object::Ptr&, const Value&)> AttributeHandler;
|
typedef std::function<void (const Object::Ptr&, const Value&)> AttributeHandler;
|
||||||
|
@ -26,6 +26,7 @@
|
|||||||
#include <fstream>
|
#include <fstream>
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
#include <random>
|
#include <random>
|
||||||
|
#include <unordered_map>
|
||||||
|
|
||||||
using namespace icinga;
|
using namespace icinga;
|
||||||
|
|
||||||
@ -387,12 +388,15 @@ ConfigItem::Ptr ConfigItem::GetByTypeAndName(const Type::Ptr& type, const String
|
|||||||
bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue& upq, std::vector<ConfigItem::Ptr>& newItems)
|
bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue& upq, std::vector<ConfigItem::Ptr>& newItems)
|
||||||
{
|
{
|
||||||
typedef std::pair<ConfigItem::Ptr, bool> ItemPair;
|
typedef std::pair<ConfigItem::Ptr, bool> ItemPair;
|
||||||
std::vector<ItemPair> items;
|
std::unordered_map<Type*, std::vector<ItemPair>> itemsByType;
|
||||||
|
std::vector<ItemPair>::size_type total = 0;
|
||||||
|
|
||||||
{
|
{
|
||||||
std::unique_lock<std::mutex> lock(m_Mutex);
|
std::unique_lock<std::mutex> lock(m_Mutex);
|
||||||
|
|
||||||
for (const TypeMap::value_type& kv : m_Items) {
|
for (const TypeMap::value_type& kv : m_Items) {
|
||||||
|
std::vector<ItemPair> items;
|
||||||
|
|
||||||
for (const ItemMap::value_type& kv2 : kv.second) {
|
for (const ItemMap::value_type& kv2 : kv.second) {
|
||||||
if (kv2.second->m_Abstract || kv2.second->m_Object)
|
if (kv2.second->m_Abstract || kv2.second->m_Object)
|
||||||
continue;
|
continue;
|
||||||
@ -402,6 +406,11 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue
|
|||||||
|
|
||||||
items.emplace_back(kv2.second, false);
|
items.emplace_back(kv2.second, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!items.empty()) {
|
||||||
|
total += items.size();
|
||||||
|
itemsByType.emplace(kv.first.get(), std::move(items));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ItemList newUnnamedItems;
|
ItemList newUnnamedItems;
|
||||||
@ -415,22 +424,25 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue
|
|||||||
if (item->m_Abstract || item->m_Object)
|
if (item->m_Abstract || item->m_Object)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
items.emplace_back(item, true);
|
itemsByType[item->m_Type.get()].emplace_back(item, true);
|
||||||
|
++total;
|
||||||
}
|
}
|
||||||
|
|
||||||
m_UnnamedItems.swap(newUnnamedItems);
|
m_UnnamedItems.swap(newUnnamedItems);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (items.empty())
|
if (!total)
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
// Shuffle all items to evenly distribute them over the threads of the workqueue. This increases perfomance
|
// Shuffle all items to evenly distribute them over the threads of the workqueue. This increases perfomance
|
||||||
// noticably in environments with lots of objects and available threads.
|
// noticably in environments with lots of objects and available threads.
|
||||||
std::shuffle(std::begin(items), std::end(items), std::default_random_engine {});
|
for (auto& kv : itemsByType) {
|
||||||
|
std::shuffle(std::begin(kv.second), std::end(kv.second), std::default_random_engine{});
|
||||||
|
}
|
||||||
|
|
||||||
#ifdef I2_DEBUG
|
#ifdef I2_DEBUG
|
||||||
Log(LogDebug, "configitem")
|
Log(LogDebug, "configitem")
|
||||||
<< "Committing " << items.size() << " new items.";
|
<< "Committing " << total << " new items.";
|
||||||
#endif /* I2_DEBUG */
|
#endif /* I2_DEBUG */
|
||||||
|
|
||||||
std::set<Type::Ptr> types;
|
std::set<Type::Ptr> types;
|
||||||
@ -450,8 +462,7 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue
|
|||||||
bool unresolved_dep = false;
|
bool unresolved_dep = false;
|
||||||
|
|
||||||
/* skip this type (for now) if there are unresolved load dependencies */
|
/* skip this type (for now) if there are unresolved load dependencies */
|
||||||
for (const String& loadDep : type->GetLoadDependencies()) {
|
for (auto pLoadDep : type->GetLoadDependencies()) {
|
||||||
Type::Ptr pLoadDep = Type::GetByName(loadDep);
|
|
||||||
if (types.find(pLoadDep) != types.end() && completed_types.find(pLoadDep) == completed_types.end()) {
|
if (types.find(pLoadDep) != types.end() && completed_types.find(pLoadDep) == completed_types.end()) {
|
||||||
unresolved_dep = true;
|
unresolved_dep = true;
|
||||||
break;
|
break;
|
||||||
@ -464,11 +475,12 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue
|
|||||||
std::atomic<int> committed_items(0);
|
std::atomic<int> committed_items(0);
|
||||||
std::mutex newItemsMutex;
|
std::mutex newItemsMutex;
|
||||||
|
|
||||||
upq.ParallelFor(items, [&type, &committed_items, &newItems, &newItemsMutex](const ItemPair& ip) {
|
{
|
||||||
const ConfigItem::Ptr& item = ip.first;
|
auto items (itemsByType.find(type.get()));
|
||||||
|
|
||||||
if (item->m_Type != type)
|
if (items != itemsByType.end()) {
|
||||||
return;
|
upq.ParallelFor(items->second, [&committed_items, &newItems, &newItemsMutex](const ItemPair& ip) {
|
||||||
|
const ConfigItem::Ptr& item = ip.first;
|
||||||
|
|
||||||
if (!item->Commit(ip.second)) {
|
if (!item->Commit(ip.second)) {
|
||||||
if (item->IsIgnoreOnError()) {
|
if (item->IsIgnoreOnError()) {
|
||||||
@ -485,6 +497,8 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue
|
|||||||
});
|
});
|
||||||
|
|
||||||
upq.Join();
|
upq.Join();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
itemsCount += committed_items;
|
itemsCount += committed_items;
|
||||||
|
|
||||||
@ -516,8 +530,7 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue
|
|||||||
bool unresolved_dep = false;
|
bool unresolved_dep = false;
|
||||||
|
|
||||||
/* skip this type (for now) if there are unresolved load dependencies */
|
/* skip this type (for now) if there are unresolved load dependencies */
|
||||||
for (const String& loadDep : type->GetLoadDependencies()) {
|
for (auto pLoadDep : type->GetLoadDependencies()) {
|
||||||
Type::Ptr pLoadDep = Type::GetByName(loadDep);
|
|
||||||
if (types.find(pLoadDep) != types.end() && completed_types.find(pLoadDep) == completed_types.end()) {
|
if (types.find(pLoadDep) != types.end() && completed_types.find(pLoadDep) == completed_types.end()) {
|
||||||
unresolved_dep = true;
|
unresolved_dep = true;
|
||||||
break;
|
break;
|
||||||
@ -528,10 +541,15 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue
|
|||||||
continue;
|
continue;
|
||||||
|
|
||||||
std::atomic<int> notified_items(0);
|
std::atomic<int> notified_items(0);
|
||||||
upq.ParallelFor(items, [&type, ¬ified_items](const ItemPair& ip) {
|
|
||||||
|
{
|
||||||
|
auto items (itemsByType.find(type.get()));
|
||||||
|
|
||||||
|
if (items != itemsByType.end()) {
|
||||||
|
upq.ParallelFor(items->second, [¬ified_items](const ItemPair& ip) {
|
||||||
const ConfigItem::Ptr& item = ip.first;
|
const ConfigItem::Ptr& item = ip.first;
|
||||||
|
|
||||||
if (!item->m_Object || item->m_Type != type)
|
if (!item->m_Object)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@ -553,9 +571,11 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
completed_types.insert(type);
|
|
||||||
|
|
||||||
upq.Join();
|
upq.Join();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
completed_types.insert(type);
|
||||||
|
|
||||||
#ifdef I2_DEBUG
|
#ifdef I2_DEBUG
|
||||||
if (notified_items > 0)
|
if (notified_items > 0)
|
||||||
@ -567,11 +587,14 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue
|
|||||||
return false;
|
return false;
|
||||||
|
|
||||||
notified_items = 0;
|
notified_items = 0;
|
||||||
for (const String& loadDep : type->GetLoadDependencies()) {
|
for (auto loadDep : type->GetLoadDependencies()) {
|
||||||
upq.ParallelFor(items, [loadDep, &type, ¬ified_items](const ItemPair& ip) {
|
auto items (itemsByType.find(loadDep));
|
||||||
|
|
||||||
|
if (items != itemsByType.end()) {
|
||||||
|
upq.ParallelFor(items->second, [&type, ¬ified_items](const ItemPair& ip) {
|
||||||
const ConfigItem::Ptr& item = ip.first;
|
const ConfigItem::Ptr& item = ip.first;
|
||||||
|
|
||||||
if (!item->m_Object || item->m_Type->GetName() != loadDep)
|
if (!item->m_Object)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
ActivationScope ascope(item->m_ActivationContext);
|
ActivationScope ascope(item->m_ActivationContext);
|
||||||
@ -579,6 +602,7 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue
|
|||||||
notified_items++;
|
notified_items++;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
upq.Join();
|
upq.Join();
|
||||||
|
|
||||||
|
@ -376,14 +376,16 @@ void ClassCompiler::HandleClass(const Klass& klass, const ClassDebugInfo&)
|
|||||||
<< "}" << std::endl << std::endl;
|
<< "}" << std::endl << std::endl;
|
||||||
|
|
||||||
/* GetLoadDependencies */
|
/* GetLoadDependencies */
|
||||||
m_Header << "\t" << "std::vector<String> GetLoadDependencies() const override;" << std::endl;
|
m_Header << "\t" << "const std::unordered_set<Type*>& GetLoadDependencies() const override;" << std::endl;
|
||||||
|
|
||||||
m_Impl << "std::vector<String> TypeImpl<" << klass.Name << ">::GetLoadDependencies() const" << std::endl
|
m_Impl << "const std::unordered_set<Type*>& TypeImpl<" << klass.Name << ">::GetLoadDependencies() const" << std::endl
|
||||||
<< "{" << std::endl
|
<< "{" << std::endl
|
||||||
<< "\t" << "std::vector<String> deps;" << std::endl;
|
<< "\t" << "static const std::unordered_set<Type*> deps ({" << std::endl;
|
||||||
|
|
||||||
for (const std::string& dep : klass.LoadDependencies)
|
for (const std::string& dep : klass.LoadDependencies)
|
||||||
m_Impl << "\t" << "deps.emplace_back(\"" << dep << "\");" << std::endl;
|
m_Impl << "\t\t" << "GetByName(\"" << dep << "\").get()," << std::endl;
|
||||||
|
|
||||||
|
m_Impl << "\t" << "});" << std::endl;
|
||||||
|
|
||||||
m_Impl << "\t" << "return deps;" << std::endl
|
m_Impl << "\t" << "return deps;" << std::endl
|
||||||
<< "}" << std::endl << std::endl;
|
<< "}" << std::endl << std::endl;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user