Michael Friedrich 2019-09-09 15:11:38 +02:00
parent 9e16502581
commit 5fa7331cc9
18 changed files with 52 additions and 51 deletions

View File

@ -7,8 +7,9 @@
#include <exception> #include <exception>
#include <memory> #include <memory>
#include <thread> #include <thread>
#include <boost/asio/io_service.hpp> #include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp> #include <boost/asio/spawn.hpp>
#include <boost/asio/post.hpp>
#include <boost/date_time/posix_time/ptime.hpp> #include <boost/date_time/posix_time/ptime.hpp>
#include <boost/system/error_code.hpp> #include <boost/system/error_code.hpp>
@ -78,12 +79,12 @@ IoEngine& IoEngine::Get()
return *m_Instance.Get(); return *m_Instance.Get();
} }
boost::asio::io_service& IoEngine::GetIoService() boost::asio::io_context& IoEngine::GetIoContext()
{ {
return m_IoService; return m_IoContext;
} }
IoEngine::IoEngine() : m_IoService(), m_KeepAlive(m_IoService), m_Threads(decltype(m_Threads)::size_type(std::thread::hardware_concurrency() * 2u)), m_AlreadyExpiredTimer(m_IoService) IoEngine::IoEngine() : m_IoContext(), m_KeepAlive(boost::asio::make_work_guard(m_IoContext)), m_Threads(decltype(m_Threads)::size_type(std::thread::hardware_concurrency() * 2u)), m_AlreadyExpiredTimer(m_IoContext)
{ {
m_AlreadyExpiredTimer.expires_at(boost::posix_time::neg_infin); m_AlreadyExpiredTimer.expires_at(boost::posix_time::neg_infin);
m_CpuBoundSemaphore.store(std::thread::hardware_concurrency() * 3u / 2u); m_CpuBoundSemaphore.store(std::thread::hardware_concurrency() * 3u / 2u);
@ -96,7 +97,7 @@ IoEngine::IoEngine() : m_IoService(), m_KeepAlive(m_IoService), m_Threads(declty
IoEngine::~IoEngine() IoEngine::~IoEngine()
{ {
for (auto& thread : m_Threads) { for (auto& thread : m_Threads) {
m_IoService.post([]() { boost::asio::post(m_IoContext, []() {
throw TerminateIoThread(); throw TerminateIoThread();
}); });
} }
@ -110,7 +111,7 @@ void IoEngine::RunEventLoop()
{ {
for (;;) { for (;;) {
try { try {
m_IoService.run(); m_IoContext.run();
break; break;
} catch (const TerminateIoThread&) { } catch (const TerminateIoThread&) {
@ -122,7 +123,7 @@ void IoEngine::RunEventLoop()
} }
} }
AsioConditionVariable::AsioConditionVariable(boost::asio::io_service& io, bool init) AsioConditionVariable::AsioConditionVariable(boost::asio::io_context& io, bool init)
: m_Timer(io) : m_Timer(io)
{ {
m_Timer.expires_at(init ? boost::posix_time::neg_infin : boost::posix_time::pos_infin); m_Timer.expires_at(init ? boost::posix_time::neg_infin : boost::posix_time::pos_infin);

View File

@ -10,7 +10,7 @@
#include <thread> #include <thread>
#include <vector> #include <vector>
#include <boost/asio/deadline_timer.hpp> #include <boost/asio/deadline_timer.hpp>
#include <boost/asio/io_service.hpp> #include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp> #include <boost/asio/spawn.hpp>
namespace icinga namespace icinga
@ -75,7 +75,7 @@ public:
static IoEngine& Get(); static IoEngine& Get();
boost::asio::io_service& GetIoService(); boost::asio::io_context& GetIoContext();
private: private:
IoEngine(); IoEngine();
@ -84,8 +84,8 @@ private:
static LazyInit<std::unique_ptr<IoEngine>> m_Instance; static LazyInit<std::unique_ptr<IoEngine>> m_Instance;
boost::asio::io_service m_IoService; boost::asio::io_context m_IoContext;
boost::asio::io_service::work m_KeepAlive; boost::asio::executor_work_guard<boost::asio::io_context::executor_type> m_KeepAlive;
std::vector<std::thread> m_Threads; std::vector<std::thread> m_Threads;
boost::asio::deadline_timer m_AlreadyExpiredTimer; boost::asio::deadline_timer m_AlreadyExpiredTimer;
std::atomic_int_fast32_t m_CpuBoundSemaphore; std::atomic_int_fast32_t m_CpuBoundSemaphore;
@ -103,7 +103,7 @@ class TerminateIoThread : public std::exception
class AsioConditionVariable class AsioConditionVariable
{ {
public: public:
AsioConditionVariable(boost::asio::io_service& io, bool init = false); AsioConditionVariable(boost::asio::io_context& io, bool init = false);
void Set(); void Set();
void Clear(); void Clear();

View File

@ -38,7 +38,7 @@ void Connect(Socket& socket, const String& node, const String& service)
{ {
using boost::asio::ip::tcp; using boost::asio::ip::tcp;
tcp::resolver resolver (IoEngine::Get().GetIoService()); tcp::resolver resolver (IoEngine::Get().GetIoContext());
tcp::resolver::query query (node, service); tcp::resolver::query query (node, service);
auto result (resolver.resolve(query)); auto result (resolver.resolve(query));
auto current (result.begin()); auto current (result.begin());
@ -67,7 +67,7 @@ void Connect(Socket& socket, const String& node, const String& service, boost::a
{ {
using boost::asio::ip::tcp; using boost::asio::ip::tcp;
tcp::resolver resolver (IoEngine::Get().GetIoService()); tcp::resolver resolver (IoEngine::Get().GetIoContext());
tcp::resolver::query query (node, service); tcp::resolver::query query (node, service);
auto result (resolver.async_resolve(query, yc)); auto result (resolver.async_resolve(query, yc));
auto current (result.begin()); auto current (result.begin());

View File

@ -11,7 +11,7 @@
#include <memory> #include <memory>
#include <utility> #include <utility>
#include <boost/asio/buffered_stream.hpp> #include <boost/asio/buffered_stream.hpp>
#include <boost/asio/io_service.hpp> #include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp> #include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl/context.hpp> #include <boost/asio/ssl/context.hpp>
#include <boost/asio/ssl/stream.hpp> #include <boost/asio/ssl/stream.hpp>
@ -21,7 +21,7 @@ namespace icinga
struct UnbufferedAsioTlsStreamParams struct UnbufferedAsioTlsStreamParams
{ {
boost::asio::io_service& IoService; boost::asio::io_context& IoContext;
boost::asio::ssl::context& SslContext; boost::asio::ssl::context& SslContext;
const String& Hostname; const String& Hostname;
}; };
@ -33,7 +33,7 @@ class UnbufferedAsioTlsStream : public AsioTcpTlsStream
public: public:
inline inline
UnbufferedAsioTlsStream(UnbufferedAsioTlsStreamParams& init) UnbufferedAsioTlsStream(UnbufferedAsioTlsStreamParams& init)
: stream(init.IoService, init.SslContext), m_VerifyOK(true), m_Hostname(init.Hostname) : stream(init.IoContext, init.SslContext), m_VerifyOK(true), m_Hostname(init.Hostname)
{ {
} }
@ -71,8 +71,8 @@ class AsioTlsStream : public boost::asio::buffered_stream<UnbufferedAsioTlsStrea
{ {
public: public:
inline inline
AsioTlsStream(boost::asio::io_service& ioService, boost::asio::ssl::context& sslContext, const String& hostname = String()) AsioTlsStream(boost::asio::io_context& ioContext, boost::asio::ssl::context& sslContext, const String& hostname = String())
: AsioTlsStream(UnbufferedAsioTlsStreamParams{ioService, sslContext, hostname}) : AsioTlsStream(UnbufferedAsioTlsStreamParams{ioContext, sslContext, hostname})
{ {
} }

View File

@ -537,7 +537,7 @@ std::shared_ptr<AsioTlsStream> ConsoleCommand::Connect()
String host = l_Url->GetHost(); String host = l_Url->GetHost();
String port = l_Url->GetPort(); String port = l_Url->GetPort();
std::shared_ptr<AsioTlsStream> stream = std::make_shared<AsioTlsStream>(IoEngine::Get().GetIoService(), *sslContext, host); std::shared_ptr<AsioTlsStream> stream = std::make_shared<AsioTlsStream>(IoEngine::Get().GetIoContext(), *sslContext, host);
try { try {
icinga::Connect(stream->lowest_layer(), host, port); icinga::Connect(stream->lowest_layer(), host, port);

View File

@ -598,9 +598,9 @@ OptionalTlsStream ElasticsearchWriter::Connect()
throw; throw;
} }
stream.first = std::make_shared<AsioTlsStream>(IoEngine::Get().GetIoService(), *sslContext, GetHost()); stream.first = std::make_shared<AsioTlsStream>(IoEngine::Get().GetIoContext(), *sslContext, GetHost());
} else { } else {
stream.second = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoService()); stream.second = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoContext());
} }
try { try {

View File

@ -173,9 +173,9 @@ void GelfWriter::ReconnectInternal()
throw; throw;
} }
m_Stream.first = std::make_shared<AsioTlsStream>(IoEngine::Get().GetIoService(), *sslContext, GetHost()); m_Stream.first = std::make_shared<AsioTlsStream>(IoEngine::Get().GetIoContext(), *sslContext, GetHost());
} else { } else {
m_Stream.second = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoService()); m_Stream.second = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoContext());
} }
try { try {

View File

@ -187,7 +187,7 @@ void GraphiteWriter::ReconnectInternal()
Log(LogNotice, "GraphiteWriter") Log(LogNotice, "GraphiteWriter")
<< "Reconnecting to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'."; << "Reconnecting to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'.";
m_Stream = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoService()); m_Stream = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoContext());
try { try {
icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort()); icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort());

View File

@ -187,9 +187,9 @@ OptionalTlsStream InfluxdbWriter::Connect()
throw; throw;
} }
stream.first = std::make_shared<AsioTlsStream>(IoEngine::Get().GetIoService(), *sslContext, GetHost()); stream.first = std::make_shared<AsioTlsStream>(IoEngine::Get().GetIoContext(), *sslContext, GetHost());
} else { } else {
stream.second = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoService()); stream.second = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoContext());
} }
try { try {

View File

@ -121,7 +121,7 @@ void OpenTsdbWriter::ReconnectTimerHandler()
* http://opentsdb.net/docs/build/html/user_guide/writing/index.html#telnet * http://opentsdb.net/docs/build/html/user_guide/writing/index.html#telnet
*/ */
m_Stream = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoService()); m_Stream = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoContext());
try { try {
icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort()); icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort());
@ -338,4 +338,4 @@ String OpenTsdbWriter::EscapeMetric(const String& str)
boost::replace_all(result, ":", "_"); boost::replace_all(result, ":", "_");
return result; return result;
} }

View File

@ -363,7 +363,7 @@ bool ApiListener::AddListener(const String& node, const String& service)
return false; return false;
} }
auto& io (IoEngine::Get().GetIoService()); auto& io (IoEngine::Get().GetIoContext());
auto acceptor (std::make_shared<tcp::acceptor>(io)); auto acceptor (std::make_shared<tcp::acceptor>(io));
try { try {
@ -427,7 +427,7 @@ void ApiListener::ListenerCoroutineProc(boost::asio::yield_context yc, const std
{ {
namespace asio = boost::asio; namespace asio = boost::asio;
auto& io (IoEngine::Get().GetIoService()); auto& io (IoEngine::Get().GetIoContext());
for (;;) { for (;;) {
try { try {
@ -460,7 +460,7 @@ void ApiListener::AddConnection(const Endpoint::Ptr& endpoint)
return; return;
} }
auto& io (IoEngine::Get().GetIoService()); auto& io (IoEngine::Get().GetIoContext());
asio::spawn(io, [this, endpoint, &io, sslContext](asio::yield_context yc) { asio::spawn(io, [this, endpoint, &io, sslContext](asio::yield_context yc) {
String host = endpoint->GetHost(); String host = endpoint->GetHost();
@ -664,7 +664,7 @@ void ApiListener::NewClientHandlerInternal(boost::asio::yield_context yc, const
endpoint->AddClient(aclient); endpoint->AddClient(aclient);
asio::spawn(IoEngine::Get().GetIoService(), [this, aclient, endpoint, needSync](asio::yield_context yc) { asio::spawn(IoEngine::Get().GetIoContext(), [this, aclient, endpoint, needSync](asio::yield_context yc) {
CpuBoundWork syncClient (yc); CpuBoundWork syncClient (yc);
SyncClient(aclient, endpoint, needSync); SyncClient(aclient, endpoint, needSync);

View File

@ -131,7 +131,7 @@ std::map<String, EventsInbox::Filter> EventsInbox::m_Filters ({{"", EventsInbox:
EventsRouter EventsRouter::m_Instance; EventsRouter EventsRouter::m_Instance;
EventsInbox::EventsInbox(String filter, const String& filterSource) EventsInbox::EventsInbox(String filter, const String& filterSource)
: m_Timer(IoEngine::Get().GetIoService()) : m_Timer(IoEngine::Get().GetIoContext())
{ {
std::unique_lock<std::mutex> lock (m_FiltersMutex); std::unique_lock<std::mutex> lock (m_FiltersMutex);
m_Filter = m_Filters.find(filter); m_Filter = m_Filters.find(filter);

View File

@ -22,7 +22,7 @@
#include <memory> #include <memory>
#include <stdexcept> #include <stdexcept>
#include <boost/asio/error.hpp> #include <boost/asio/error.hpp>
#include <boost/asio/io_service.hpp> #include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp> #include <boost/asio/spawn.hpp>
#include <boost/beast/core.hpp> #include <boost/beast/core.hpp>
#include <boost/beast/http.hpp> #include <boost/beast/http.hpp>
@ -35,11 +35,11 @@ using namespace icinga;
auto const l_ServerHeader ("Icinga/" + Application::GetAppVersion()); auto const l_ServerHeader ("Icinga/" + Application::GetAppVersion());
HttpServerConnection::HttpServerConnection(const String& identity, bool authenticated, const std::shared_ptr<AsioTlsStream>& stream) HttpServerConnection::HttpServerConnection(const String& identity, bool authenticated, const std::shared_ptr<AsioTlsStream>& stream)
: HttpServerConnection(identity, authenticated, stream, IoEngine::Get().GetIoService()) : HttpServerConnection(identity, authenticated, stream, IoEngine::Get().GetIoContext())
{ {
} }
HttpServerConnection::HttpServerConnection(const String& identity, bool authenticated, const std::shared_ptr<AsioTlsStream>& stream, boost::asio::io_service& io) HttpServerConnection::HttpServerConnection(const String& identity, bool authenticated, const std::shared_ptr<AsioTlsStream>& stream, boost::asio::io_context& io)
: m_Stream(stream), m_Seen(Utility::GetTime()), m_IoStrand(io), m_ShuttingDown(false), m_HasStartedStreaming(false), : m_Stream(stream), m_Seen(Utility::GetTime()), m_IoStrand(io), m_ShuttingDown(false), m_HasStartedStreaming(false),
m_CheckLivenessTimer(io) m_CheckLivenessTimer(io)
{ {

View File

@ -8,8 +8,8 @@
#include "base/tlsstream.hpp" #include "base/tlsstream.hpp"
#include <memory> #include <memory>
#include <boost/asio/deadline_timer.hpp> #include <boost/asio/deadline_timer.hpp>
#include <boost/asio/io_service.hpp> #include <boost/asio/io_context.hpp>
#include <boost/asio/io_service_strand.hpp> #include <boost/asio/io_context_strand.hpp>
#include <boost/asio/spawn.hpp> #include <boost/asio/spawn.hpp>
namespace icinga namespace icinga
@ -38,12 +38,12 @@ private:
std::shared_ptr<AsioTlsStream> m_Stream; std::shared_ptr<AsioTlsStream> m_Stream;
double m_Seen; double m_Seen;
String m_PeerAddress; String m_PeerAddress;
boost::asio::io_service::strand m_IoStrand; boost::asio::io_context::strand m_IoStrand;
bool m_ShuttingDown; bool m_ShuttingDown;
bool m_HasStartedStreaming; bool m_HasStartedStreaming;
boost::asio::deadline_timer m_CheckLivenessTimer; boost::asio::deadline_timer m_CheckLivenessTimer;
HttpServerConnection(const String& identity, bool authenticated, const std::shared_ptr<AsioTlsStream>& stream, boost::asio::io_service& io); HttpServerConnection(const String& identity, bool authenticated, const std::shared_ptr<AsioTlsStream>& stream, boost::asio::io_context& io);
void ProcessMessages(boost::asio::yield_context yc); void ProcessMessages(boost::asio::yield_context yc);
void CheckLiveness(boost::asio::yield_context yc); void CheckLiveness(boost::asio::yield_context yc);

View File

@ -16,7 +16,7 @@
#include "base/tlsstream.hpp" #include "base/tlsstream.hpp"
#include <memory> #include <memory>
#include <utility> #include <utility>
#include <boost/asio/io_service.hpp> #include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp> #include <boost/asio/spawn.hpp>
#include <boost/date_time/posix_time/posix_time_duration.hpp> #include <boost/date_time/posix_time/posix_time_duration.hpp>
#include <boost/system/system_error.hpp> #include <boost/system/system_error.hpp>
@ -31,12 +31,12 @@ static RingBuffer l_TaskStats (15 * 60);
JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated, JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
const std::shared_ptr<AsioTlsStream>& stream, ConnectionRole role) const std::shared_ptr<AsioTlsStream>& stream, ConnectionRole role)
: JsonRpcConnection(identity, authenticated, stream, role, IoEngine::Get().GetIoService()) : JsonRpcConnection(identity, authenticated, stream, role, IoEngine::Get().GetIoContext())
{ {
} }
JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated, JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
const std::shared_ptr<AsioTlsStream>& stream, ConnectionRole role, boost::asio::io_service& io) const std::shared_ptr<AsioTlsStream>& stream, ConnectionRole role, boost::asio::io_context& io)
: m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), m_Role(role), : m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), m_Role(role),
m_Timestamp(Utility::GetTime()), m_Seen(Utility::GetTime()), m_NextHeartbeat(0), m_IoStrand(io), m_Timestamp(Utility::GetTime()), m_Seen(Utility::GetTime()), m_NextHeartbeat(0), m_IoStrand(io),
m_OutgoingMessagesQueued(io), m_WriterDone(io), m_ShuttingDown(false), m_OutgoingMessagesQueued(io), m_WriterDone(io), m_ShuttingDown(false),

View File

@ -11,8 +11,8 @@
#include "base/workqueue.hpp" #include "base/workqueue.hpp"
#include <memory> #include <memory>
#include <vector> #include <vector>
#include <boost/asio/io_service.hpp> #include <boost/asio/io_context.hpp>
#include <boost/asio/io_service_strand.hpp> #include <boost/asio/io_context_strand.hpp>
#include <boost/asio/spawn.hpp> #include <boost/asio/spawn.hpp>
namespace icinga namespace icinga
@ -73,14 +73,14 @@ private:
double m_Timestamp; double m_Timestamp;
double m_Seen; double m_Seen;
double m_NextHeartbeat; double m_NextHeartbeat;
boost::asio::io_service::strand m_IoStrand; boost::asio::io_context::strand m_IoStrand;
std::vector<String> m_OutgoingMessagesQueue; std::vector<String> m_OutgoingMessagesQueue;
AsioConditionVariable m_OutgoingMessagesQueued; AsioConditionVariable m_OutgoingMessagesQueued;
AsioConditionVariable m_WriterDone; AsioConditionVariable m_WriterDone;
bool m_ShuttingDown; bool m_ShuttingDown;
boost::asio::deadline_timer m_CheckLivenessTimer, m_HeartbeatTimer; boost::asio::deadline_timer m_CheckLivenessTimer, m_HeartbeatTimer;
JsonRpcConnection(const String& identity, bool authenticated, const std::shared_ptr<AsioTlsStream>& stream, ConnectionRole role, boost::asio::io_service& io); JsonRpcConnection(const String& identity, bool authenticated, const std::shared_ptr<AsioTlsStream>& stream, ConnectionRole role, boost::asio::io_context& io);
void HandleIncomingMessages(boost::asio::yield_context yc); void HandleIncomingMessages(boost::asio::yield_context yc);
void WriteOutgoingMessages(boost::asio::yield_context yc); void WriteOutgoingMessages(boost::asio::yield_context yc);

View File

@ -93,7 +93,7 @@ std::shared_ptr<X509> PkiUtility::FetchCert(const String& host, const String& po
return std::shared_ptr<X509>(); return std::shared_ptr<X509>();
} }
auto stream (std::make_shared<AsioTlsStream>(IoEngine::Get().GetIoService(), *sslContext, host)); auto stream (std::make_shared<AsioTlsStream>(IoEngine::Get().GetIoContext(), *sslContext, host));
try { try {
Connect(stream->lowest_layer(), host, port); Connect(stream->lowest_layer(), host, port);
@ -161,7 +161,7 @@ int PkiUtility::RequestCertificate(const String& host, const String& port, const
return 1; return 1;
} }
auto stream (std::make_shared<AsioTlsStream>(IoEngine::Get().GetIoService(), *sslContext, host)); auto stream (std::make_shared<AsioTlsStream>(IoEngine::Get().GetIoContext(), *sslContext, host));
try { try {
Connect(stream->lowest_layer(), host, port); Connect(stream->lowest_layer(), host, port);

View File

@ -186,7 +186,7 @@ static std::shared_ptr<AsioTlsStream> Connect(const String& host, const String&
throw; throw;
} }
std::shared_ptr<AsioTlsStream> stream = std::make_shared<AsioTlsStream>(IoEngine::Get().GetIoService(), *sslContext, host); std::shared_ptr<AsioTlsStream> stream = std::make_shared<AsioTlsStream>(IoEngine::Get().GetIoContext(), *sslContext, host);
try { try {
icinga::Connect(stream->lowest_layer(), host, port); icinga::Connect(stream->lowest_layer(), host, port);