Merge pull request #9643 from Icinga/hardware_concurrency

Always use Configuration#Concurrency, not `std:🧵:hardware_concurrency()`
This commit is contained in:
Julian Brost 2023-05-23 19:23:14 +02:00 committed by GitHub
commit 2470e930eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 36 additions and 8 deletions

View File

@ -439,6 +439,12 @@ static int Main()
Configuration::SetReadOnly(true); Configuration::SetReadOnly(true);
if (!Configuration::ConcurrencyWasModified) {
Configuration::Concurrency = std::thread::hardware_concurrency();
}
Application::GetTP().Restart();
/* Ensure that all defined constants work in the way we expect them. */ /* Ensure that all defined constants work in the way we expect them. */
HandleLegacyDefines(); HandleLegacyDefines();

View File

@ -25,7 +25,8 @@ String Configuration::ApiBindHost = []() {
String Configuration::ApiBindPort{"5665"}; String Configuration::ApiBindPort{"5665"};
bool Configuration::AttachDebugger{false}; bool Configuration::AttachDebugger{false};
String Configuration::CacheDir; String Configuration::CacheDir;
int Configuration::Concurrency{static_cast<int>(std::thread::hardware_concurrency())}; int Configuration::Concurrency{1};
bool Configuration::ConcurrencyWasModified{false};
String Configuration::ConfigDir; String Configuration::ConfigDir;
String Configuration::DataDir; String Configuration::DataDir;
String Configuration::EventEngine; String Configuration::EventEngine;
@ -114,6 +115,7 @@ int Configuration::GetConcurrency() const
void Configuration::SetConcurrency(int val, bool suppress_events, const Value& cookie) void Configuration::SetConcurrency(int val, bool suppress_events, const Value& cookie)
{ {
HandleUserWrite("Concurrency", &Configuration::Concurrency, val, m_ReadOnly); HandleUserWrite("Concurrency", &Configuration::Concurrency, val, m_ReadOnly);
Configuration::ConcurrencyWasModified = true;
} }
String Configuration::GetConfigDir() const String Configuration::GetConfigDir() const

View File

@ -118,6 +118,7 @@ public:
static bool AttachDebugger; static bool AttachDebugger;
static String CacheDir; static String CacheDir;
static int Concurrency; static int Concurrency;
static bool ConcurrencyWasModified;
static String ConfigDir; static String ConfigDir;
static String DataDir; static String DataDir;
static String EventEngine; static String EventEngine;

View File

@ -1,5 +1,6 @@
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
#include "base/configuration.hpp"
#include "base/exception.hpp" #include "base/exception.hpp"
#include "base/io-engine.hpp" #include "base/io-engine.hpp"
#include "base/lazy-init.hpp" #include "base/lazy-init.hpp"
@ -84,10 +85,10 @@ boost::asio::io_context& IoEngine::GetIoContext()
return m_IoContext; return m_IoContext;
} }
IoEngine::IoEngine() : m_IoContext(), m_KeepAlive(boost::asio::make_work_guard(m_IoContext)), m_Threads(decltype(m_Threads)::size_type(std::thread::hardware_concurrency() * 2u)), m_AlreadyExpiredTimer(m_IoContext) IoEngine::IoEngine() : m_IoContext(), m_KeepAlive(boost::asio::make_work_guard(m_IoContext)), m_Threads(decltype(m_Threads)::size_type(Configuration::Concurrency * 2u)), m_AlreadyExpiredTimer(m_IoContext)
{ {
m_AlreadyExpiredTimer.expires_at(boost::posix_time::neg_infin); m_AlreadyExpiredTimer.expires_at(boost::posix_time::neg_infin);
m_CpuBoundSemaphore.store(std::thread::hardware_concurrency() * 3u / 2u); m_CpuBoundSemaphore.store(Configuration::Concurrency * 3u / 2u);
for (auto& thread : m_Threads) { for (auto& thread : m_Threads) {
thread = std::thread(&IoEngine::RunEventLoop, this); thread = std::thread(&IoEngine::RunEventLoop, this);

View File

@ -5,8 +5,7 @@
using namespace icinga; using namespace icinga;
ThreadPool::ThreadPool(size_t threads) ThreadPool::ThreadPool() : m_Pending(0)
: m_Threads(threads), m_Pending(0)
{ {
Start(); Start();
} }
@ -21,10 +20,15 @@ void ThreadPool::Start()
boost::unique_lock<decltype(m_Mutex)> lock (m_Mutex); boost::unique_lock<decltype(m_Mutex)> lock (m_Mutex);
if (!m_Pool) { if (!m_Pool) {
m_Pool = decltype(m_Pool)(new boost::asio::thread_pool(m_Threads)); InitializePool();
} }
} }
void ThreadPool::InitializePool()
{
m_Pool = decltype(m_Pool)(new boost::asio::thread_pool(Configuration::Concurrency * 2u));
}
void ThreadPool::Stop() void ThreadPool::Stop()
{ {
boost::unique_lock<decltype(m_Mutex)> lock (m_Mutex); boost::unique_lock<decltype(m_Mutex)> lock (m_Mutex);
@ -34,3 +38,14 @@ void ThreadPool::Stop()
m_Pool = nullptr; m_Pool = nullptr;
} }
} }
void ThreadPool::Restart()
{
boost::unique_lock<decltype(m_Mutex)> lock (m_Mutex);
if (m_Pool) {
m_Pool->join();
}
InitializePool();
}

View File

@ -4,6 +4,7 @@
#define THREADPOOL_H #define THREADPOOL_H
#include "base/atomic.hpp" #include "base/atomic.hpp"
#include "base/configuration.hpp"
#include "base/exception.hpp" #include "base/exception.hpp"
#include "base/logger.hpp" #include "base/logger.hpp"
#include <cstddef> #include <cstddef>
@ -36,11 +37,12 @@ class ThreadPool
public: public:
typedef std::function<void ()> WorkFunction; typedef std::function<void ()> WorkFunction;
ThreadPool(size_t threads = std::thread::hardware_concurrency() * 2u); ThreadPool();
~ThreadPool(); ~ThreadPool();
void Start(); void Start();
void Stop(); void Stop();
void Restart();
/** /**
* Appends a work item to the work queue. Work items will be processed in FIFO order. * Appends a work item to the work queue. Work items will be processed in FIFO order.
@ -89,8 +91,9 @@ public:
private: private:
boost::shared_mutex m_Mutex; boost::shared_mutex m_Mutex;
std::unique_ptr<boost::asio::thread_pool> m_Pool; std::unique_ptr<boost::asio::thread_pool> m_Pool;
size_t m_Threads;
Atomic<uint_fast64_t> m_Pending; Atomic<uint_fast64_t> m_Pending;
void InitializePool();
}; };
} }