From 282f8fd17397bb026de4845fed1de34c84716fac Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Fri, 15 Feb 2019 15:43:58 +0100 Subject: [PATCH] IoEngine: explicitly join I/O threads --- lib/base/io-engine.cpp | 21 ++++++++++++++++++--- lib/base/io-engine.hpp | 9 +++++++++ 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/lib/base/io-engine.cpp b/lib/base/io-engine.cpp index daad42df0..0079ca251 100644 --- a/lib/base/io-engine.cpp +++ b/lib/base/io-engine.cpp @@ -99,7 +99,7 @@ boost::asio::io_service& IoEngine::GetIoService() return m_IoService; } -IoEngine::IoEngine() : m_IoService(), m_KeepAlive(m_IoService), m_AlreadyExpiredTimer(m_IoService) +IoEngine::IoEngine() : m_IoService(), m_KeepAlive(m_IoService), m_Threads(decltype(m_Threads)::size_type(std::thread::hardware_concurrency())), m_AlreadyExpiredTimer(m_IoService) { m_AlreadyExpiredTimer.expires_at(boost::posix_time::neg_infin); @@ -111,8 +111,21 @@ IoEngine::IoEngine() : m_IoService(), m_KeepAlive(m_IoService), m_AlreadyExpired m_CpuBoundSemaphore.store(concurrency - 1u); } - for (auto i (std::thread::hardware_concurrency()); i; --i) { - std::thread(&IoEngine::RunEventLoop, this).detach(); + for (auto& thread : m_Threads) { + thread = std::thread(&IoEngine::RunEventLoop, this); + } +} + +IoEngine::~IoEngine() +{ + for (auto& thread : m_Threads) { + m_IoService.post([]() { + throw TerminateIoThread(); + }); + } + + for (auto& thread : m_Threads) { + thread.join(); } } @@ -122,6 +135,8 @@ void IoEngine::RunEventLoop() try { m_IoService.run(); + break; + } catch (const TerminateIoThread&) { break; } catch (const std::exception& e) { Log(LogCritical, "IoEngine", "Exception during I/O operation!"); diff --git a/lib/base/io-engine.hpp b/lib/base/io-engine.hpp index e383b2e42..05610ca6f 100644 --- a/lib/base/io-engine.hpp +++ b/lib/base/io-engine.hpp @@ -22,7 +22,10 @@ #include "base/lazy-init.hpp" #include +#include #include +#include +#include #include #include #include @@ -82,6 +85,7 @@ public: IoEngine(IoEngine&&) = delete; IoEngine& operator=(const IoEngine&) = delete; IoEngine& operator=(IoEngine&&) = delete; + ~IoEngine(); static IoEngine& Get(); @@ -96,8 +100,13 @@ private: boost::asio::io_service m_IoService; boost::asio::io_service::work m_KeepAlive; + std::vector m_Threads; boost::asio::deadline_timer m_AlreadyExpiredTimer; std::atomic_uint_fast32_t m_CpuBoundSemaphore; }; +class TerminateIoThread : public std::exception +{ +}; + #endif /* IO_ENGINE_H */