diff --git a/lib/base/io-engine.hpp b/lib/base/io-engine.hpp index 1c6d46045..b9e4ee6c3 100644 --- a/lib/base/io-engine.hpp +++ b/lib/base/io-engine.hpp @@ -9,6 +9,8 @@ #include #include #include +#include +#include #include #include #include @@ -77,6 +79,82 @@ public: boost::asio::io_context& GetIoContext(); + /* + * Custom exceptions thrown in a Boost.Coroutine may cause stack corruption. + * Ensure that these are wrapped correctly. + * + * Inspired by https://github.com/niekbouman/commelec-api/blob/master/commelec-api/coroutine-exception.hpp + * Source: http://boost.2283326.n4.nabble.com/coroutine-only-std-exceptions-are-caught-from-coroutines-td4683671.html + */ + static inline boost::exception_ptr convertExceptionPtr(std::exception_ptr ex) { + try { + throw boost::enable_current_exception(ex); + } catch (...) { + return boost::current_exception(); + } + } + + static inline void rethrowBoostExceptionPointer() { + std::exception_ptr sep; + sep = std::current_exception(); + boost::exception_ptr bep = convertExceptionPtr(sep); + boost::rethrow_exception(bep); + } + + static inline size_t GetCoroutineStackSize() { +#ifdef _WIN32 + // Increase the stack size for Windows coroutines to prevent exception corruption. + // Rationale: Low cost Windows agent only & https://github.com/Icinga/icinga2/issues/7431 + return 8 * 1024 * 1024; +#else /* _WIN32 */ + return boost::coroutines::stack_allocator::traits_type::default_size(); // Default 64 KB +#endif /* _WIN32 */ + } + + /* With dedicated strand in *Connection classes. */ + template + static void SpawnCoroutine(Handler h, Function f) { + + boost::asio::spawn(std::forward(h), + [f](boost::asio::yield_context yc) { + + try { + f(yc); + } catch (const boost::coroutines::detail::forced_unwind &) { + // Required for proper stack unwinding when coroutines are destroyed. + // https://github.com/boostorg/coroutine/issues/39 + throw; + } catch (...) { + // Handle uncaught exceptions outside of the coroutine. + rethrowBoostExceptionPointer(); + } + }, + boost::coroutines::attributes(GetCoroutineStackSize()) // Set a pre-defined stack size. + ); + } + + /* Without strand in the IO executor's context. */ + template + static void SpawnCoroutine(boost::asio::io_context& io, Function f) { + + boost::asio::spawn(io, + [f](boost::asio::yield_context yc) { + + try { + f(yc); + } catch (const boost::coroutines::detail::forced_unwind &) { + // Required for proper stack unwinding when coroutines are destroyed. + // https://github.com/boostorg/coroutine/issues/39 + throw; + } catch (...) { + // Handle uncaught exceptions outside of the coroutine. + rethrowBoostExceptionPointer(); + } + }, + boost::coroutines::attributes(GetCoroutineStackSize()) // Set a pre-defined stack size. + ); + } + private: IoEngine(); diff --git a/lib/remote/apilistener.cpp b/lib/remote/apilistener.cpp index 5d82b6dc8..81385e6da 100644 --- a/lib/remote/apilistener.cpp +++ b/lib/remote/apilistener.cpp @@ -416,7 +416,7 @@ bool ApiListener::AddListener(const String& node, const String& service) Log(LogInformation, "ApiListener") << "Started new listener on '[" << localEndpoint.address() << "]:" << localEndpoint.port() << "'"; - asio::spawn(io, [this, acceptor, sslContext](asio::yield_context yc) { ListenerCoroutineProc(yc, acceptor, sslContext); }); + IoEngine::SpawnCoroutine(io, [this, acceptor, sslContext](asio::yield_context yc) { ListenerCoroutineProc(yc, acceptor, sslContext); }); UpdateStatusFile(localEndpoint); @@ -435,7 +435,7 @@ void ApiListener::ListenerCoroutineProc(boost::asio::yield_context yc, const std server->async_accept(sslConn->lowest_layer(), yc); - asio::spawn(io, [this, sslConn](asio::yield_context yc) { NewClientHandler(yc, sslConn, String(), RoleServer); }); + IoEngine::SpawnCoroutine(io, [this, sslConn](asio::yield_context yc) { NewClientHandler(yc, sslConn, String(), RoleServer); }); } catch (const std::exception& ex) { Log(LogCritical, "ApiListener") << "Cannot accept new connection: " << ex.what(); @@ -462,7 +462,7 @@ void ApiListener::AddConnection(const Endpoint::Ptr& endpoint) auto& io (IoEngine::Get().GetIoContext()); - asio::spawn(io, [this, endpoint, &io, sslContext](asio::yield_context yc) { + IoEngine::SpawnCoroutine(io, [this, endpoint, &io, sslContext](asio::yield_context yc) { String host = endpoint->GetHost(); String port = endpoint->GetPort(); @@ -664,11 +664,12 @@ void ApiListener::NewClientHandlerInternal(boost::asio::yield_context yc, const endpoint->AddClient(aclient); - asio::spawn(IoEngine::Get().GetIoContext(), [this, aclient, endpoint, needSync](asio::yield_context yc) { + IoEngine::SpawnCoroutine(IoEngine::Get().GetIoContext(), [this, aclient, endpoint, needSync](asio::yield_context yc) { CpuBoundWork syncClient (yc); SyncClient(aclient, endpoint, needSync); }); + } else if (!AddAnonymousClient(aclient)) { Log(LogNotice, "ApiListener") << "Ignoring anonymous JSON-RPC connection " << conninfo diff --git a/lib/remote/httpserverconnection.cpp b/lib/remote/httpserverconnection.cpp index 556f60857..2589c9d7d 100644 --- a/lib/remote/httpserverconnection.cpp +++ b/lib/remote/httpserverconnection.cpp @@ -63,8 +63,8 @@ void HttpServerConnection::Start() HttpServerConnection::Ptr keepAlive (this); - asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { ProcessMessages(yc); }); - asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { CheckLiveness(yc); }); + IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { ProcessMessages(yc); }); + IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { CheckLiveness(yc); }); } void HttpServerConnection::Disconnect() @@ -73,7 +73,7 @@ void HttpServerConnection::Disconnect() HttpServerConnection::Ptr keepAlive (this); - asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { + IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { if (!m_ShuttingDown) { m_ShuttingDown = true; @@ -117,7 +117,7 @@ void HttpServerConnection::StartStreaming() HttpServerConnection::Ptr keepAlive (this); - asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { + IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { if (!m_ShuttingDown) { char buf[128]; asio::mutable_buffer readBuf (buf, 128); diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp index eb9a946a4..b6d1d41e6 100644 --- a/lib/remote/jsonrpcconnection.cpp +++ b/lib/remote/jsonrpcconnection.cpp @@ -52,10 +52,10 @@ void JsonRpcConnection::Start() JsonRpcConnection::Ptr keepAlive (this); - asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleIncomingMessages(yc); }); - asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { WriteOutgoingMessages(yc); }); - asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleAndWriteHeartbeats(yc); }); - asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { CheckLiveness(yc); }); + IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleIncomingMessages(yc); }); + IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { WriteOutgoingMessages(yc); }); + IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleAndWriteHeartbeats(yc); }); + IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { CheckLiveness(yc); }); } void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc) @@ -193,7 +193,7 @@ void JsonRpcConnection::Disconnect() JsonRpcConnection::Ptr keepAlive (this); - asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { + IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { if (!m_ShuttingDown) { m_ShuttingDown = true;