mirror of https://github.com/Icinga/icinga2.git
Introduce IoEngine::SpawnCoroutine wrapping asio::spawn and Boost exceptions
This is required to - catch all exceptions and wrap them into Boost exceptions. They are the only ones allowed with Boost.Coroutine. - set a dedicated coroutine stack size for Windows. refs #7431
This commit is contained in:
parent
0dbbba44dd
commit
2c0e0da2d9
|
@ -9,6 +9,8 @@
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
#include <stdexcept>
|
||||||
|
#include <boost/exception/all.hpp>
|
||||||
#include <boost/asio/deadline_timer.hpp>
|
#include <boost/asio/deadline_timer.hpp>
|
||||||
#include <boost/asio/io_context.hpp>
|
#include <boost/asio/io_context.hpp>
|
||||||
#include <boost/asio/spawn.hpp>
|
#include <boost/asio/spawn.hpp>
|
||||||
|
@ -77,6 +79,82 @@ public:
|
||||||
|
|
||||||
boost::asio::io_context& GetIoContext();
|
boost::asio::io_context& GetIoContext();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Custom exceptions thrown in a Boost.Coroutine may cause stack corruption.
|
||||||
|
* Ensure that these are wrapped correctly.
|
||||||
|
*
|
||||||
|
* Inspired by https://github.com/niekbouman/commelec-api/blob/master/commelec-api/coroutine-exception.hpp
|
||||||
|
* Source: http://boost.2283326.n4.nabble.com/coroutine-only-std-exceptions-are-caught-from-coroutines-td4683671.html
|
||||||
|
*/
|
||||||
|
static inline boost::exception_ptr convertExceptionPtr(std::exception_ptr ex) {
|
||||||
|
try {
|
||||||
|
throw boost::enable_current_exception(ex);
|
||||||
|
} catch (...) {
|
||||||
|
return boost::current_exception();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline void rethrowBoostExceptionPointer() {
|
||||||
|
std::exception_ptr sep;
|
||||||
|
sep = std::current_exception();
|
||||||
|
boost::exception_ptr bep = convertExceptionPtr(sep);
|
||||||
|
boost::rethrow_exception(bep);
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline size_t GetCoroutineStackSize() {
|
||||||
|
#ifdef _WIN32
|
||||||
|
// Increase the stack size for Windows coroutines to prevent exception corruption.
|
||||||
|
// Rationale: Low cost Windows agent only & https://github.com/Icinga/icinga2/issues/7431
|
||||||
|
return 8 * 1024 * 1024;
|
||||||
|
#else /* _WIN32 */
|
||||||
|
return boost::coroutines::stack_allocator::traits_type::default_size(); // Default 64 KB
|
||||||
|
#endif /* _WIN32 */
|
||||||
|
}
|
||||||
|
|
||||||
|
/* With dedicated strand in *Connection classes. */
|
||||||
|
template <typename Handler, typename Function>
|
||||||
|
static void SpawnCoroutine(Handler h, Function f) {
|
||||||
|
|
||||||
|
boost::asio::spawn(std::forward<Handler>(h),
|
||||||
|
[f](boost::asio::yield_context yc) {
|
||||||
|
|
||||||
|
try {
|
||||||
|
f(yc);
|
||||||
|
} catch (const boost::coroutines::detail::forced_unwind &) {
|
||||||
|
// Required for proper stack unwinding when coroutines are destroyed.
|
||||||
|
// https://github.com/boostorg/coroutine/issues/39
|
||||||
|
throw;
|
||||||
|
} catch (...) {
|
||||||
|
// Handle uncaught exceptions outside of the coroutine.
|
||||||
|
rethrowBoostExceptionPointer();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
boost::coroutines::attributes(GetCoroutineStackSize()) // Set a pre-defined stack size.
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Without strand in the IO executor's context. */
|
||||||
|
template <typename Function>
|
||||||
|
static void SpawnCoroutine(boost::asio::io_context& io, Function f) {
|
||||||
|
|
||||||
|
boost::asio::spawn(io,
|
||||||
|
[f](boost::asio::yield_context yc) {
|
||||||
|
|
||||||
|
try {
|
||||||
|
f(yc);
|
||||||
|
} catch (const boost::coroutines::detail::forced_unwind &) {
|
||||||
|
// Required for proper stack unwinding when coroutines are destroyed.
|
||||||
|
// https://github.com/boostorg/coroutine/issues/39
|
||||||
|
throw;
|
||||||
|
} catch (...) {
|
||||||
|
// Handle uncaught exceptions outside of the coroutine.
|
||||||
|
rethrowBoostExceptionPointer();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
boost::coroutines::attributes(GetCoroutineStackSize()) // Set a pre-defined stack size.
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
IoEngine();
|
IoEngine();
|
||||||
|
|
||||||
|
|
|
@ -416,7 +416,7 @@ bool ApiListener::AddListener(const String& node, const String& service)
|
||||||
Log(LogInformation, "ApiListener")
|
Log(LogInformation, "ApiListener")
|
||||||
<< "Started new listener on '[" << localEndpoint.address() << "]:" << localEndpoint.port() << "'";
|
<< "Started new listener on '[" << localEndpoint.address() << "]:" << localEndpoint.port() << "'";
|
||||||
|
|
||||||
asio::spawn(io, [this, acceptor, sslContext](asio::yield_context yc) { ListenerCoroutineProc(yc, acceptor, sslContext); });
|
IoEngine::SpawnCoroutine(io, [this, acceptor, sslContext](asio::yield_context yc) { ListenerCoroutineProc(yc, acceptor, sslContext); });
|
||||||
|
|
||||||
UpdateStatusFile(localEndpoint);
|
UpdateStatusFile(localEndpoint);
|
||||||
|
|
||||||
|
@ -435,7 +435,7 @@ void ApiListener::ListenerCoroutineProc(boost::asio::yield_context yc, const std
|
||||||
|
|
||||||
server->async_accept(sslConn->lowest_layer(), yc);
|
server->async_accept(sslConn->lowest_layer(), yc);
|
||||||
|
|
||||||
asio::spawn(io, [this, sslConn](asio::yield_context yc) { NewClientHandler(yc, sslConn, String(), RoleServer); });
|
IoEngine::SpawnCoroutine(io, [this, sslConn](asio::yield_context yc) { NewClientHandler(yc, sslConn, String(), RoleServer); });
|
||||||
} catch (const std::exception& ex) {
|
} catch (const std::exception& ex) {
|
||||||
Log(LogCritical, "ApiListener")
|
Log(LogCritical, "ApiListener")
|
||||||
<< "Cannot accept new connection: " << ex.what();
|
<< "Cannot accept new connection: " << ex.what();
|
||||||
|
@ -462,7 +462,7 @@ void ApiListener::AddConnection(const Endpoint::Ptr& endpoint)
|
||||||
|
|
||||||
auto& io (IoEngine::Get().GetIoContext());
|
auto& io (IoEngine::Get().GetIoContext());
|
||||||
|
|
||||||
asio::spawn(io, [this, endpoint, &io, sslContext](asio::yield_context yc) {
|
IoEngine::SpawnCoroutine(io, [this, endpoint, &io, sslContext](asio::yield_context yc) {
|
||||||
String host = endpoint->GetHost();
|
String host = endpoint->GetHost();
|
||||||
String port = endpoint->GetPort();
|
String port = endpoint->GetPort();
|
||||||
|
|
||||||
|
@ -664,11 +664,12 @@ void ApiListener::NewClientHandlerInternal(boost::asio::yield_context yc, const
|
||||||
|
|
||||||
endpoint->AddClient(aclient);
|
endpoint->AddClient(aclient);
|
||||||
|
|
||||||
asio::spawn(IoEngine::Get().GetIoContext(), [this, aclient, endpoint, needSync](asio::yield_context yc) {
|
IoEngine::SpawnCoroutine(IoEngine::Get().GetIoContext(), [this, aclient, endpoint, needSync](asio::yield_context yc) {
|
||||||
CpuBoundWork syncClient (yc);
|
CpuBoundWork syncClient (yc);
|
||||||
|
|
||||||
SyncClient(aclient, endpoint, needSync);
|
SyncClient(aclient, endpoint, needSync);
|
||||||
});
|
});
|
||||||
|
|
||||||
} else if (!AddAnonymousClient(aclient)) {
|
} else if (!AddAnonymousClient(aclient)) {
|
||||||
Log(LogNotice, "ApiListener")
|
Log(LogNotice, "ApiListener")
|
||||||
<< "Ignoring anonymous JSON-RPC connection " << conninfo
|
<< "Ignoring anonymous JSON-RPC connection " << conninfo
|
||||||
|
|
|
@ -63,8 +63,8 @@ void HttpServerConnection::Start()
|
||||||
|
|
||||||
HttpServerConnection::Ptr keepAlive (this);
|
HttpServerConnection::Ptr keepAlive (this);
|
||||||
|
|
||||||
asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { ProcessMessages(yc); });
|
IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { ProcessMessages(yc); });
|
||||||
asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { CheckLiveness(yc); });
|
IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { CheckLiveness(yc); });
|
||||||
}
|
}
|
||||||
|
|
||||||
void HttpServerConnection::Disconnect()
|
void HttpServerConnection::Disconnect()
|
||||||
|
@ -73,7 +73,7 @@ void HttpServerConnection::Disconnect()
|
||||||
|
|
||||||
HttpServerConnection::Ptr keepAlive (this);
|
HttpServerConnection::Ptr keepAlive (this);
|
||||||
|
|
||||||
asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
|
IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
|
||||||
if (!m_ShuttingDown) {
|
if (!m_ShuttingDown) {
|
||||||
m_ShuttingDown = true;
|
m_ShuttingDown = true;
|
||||||
|
|
||||||
|
@ -117,7 +117,7 @@ void HttpServerConnection::StartStreaming()
|
||||||
|
|
||||||
HttpServerConnection::Ptr keepAlive (this);
|
HttpServerConnection::Ptr keepAlive (this);
|
||||||
|
|
||||||
asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
|
IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
|
||||||
if (!m_ShuttingDown) {
|
if (!m_ShuttingDown) {
|
||||||
char buf[128];
|
char buf[128];
|
||||||
asio::mutable_buffer readBuf (buf, 128);
|
asio::mutable_buffer readBuf (buf, 128);
|
||||||
|
|
|
@ -52,10 +52,10 @@ void JsonRpcConnection::Start()
|
||||||
|
|
||||||
JsonRpcConnection::Ptr keepAlive (this);
|
JsonRpcConnection::Ptr keepAlive (this);
|
||||||
|
|
||||||
asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleIncomingMessages(yc); });
|
IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleIncomingMessages(yc); });
|
||||||
asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { WriteOutgoingMessages(yc); });
|
IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { WriteOutgoingMessages(yc); });
|
||||||
asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleAndWriteHeartbeats(yc); });
|
IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleAndWriteHeartbeats(yc); });
|
||||||
asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { CheckLiveness(yc); });
|
IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { CheckLiveness(yc); });
|
||||||
}
|
}
|
||||||
|
|
||||||
void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
|
void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
|
||||||
|
@ -193,7 +193,7 @@ void JsonRpcConnection::Disconnect()
|
||||||
|
|
||||||
JsonRpcConnection::Ptr keepAlive (this);
|
JsonRpcConnection::Ptr keepAlive (this);
|
||||||
|
|
||||||
asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
|
IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
|
||||||
if (!m_ShuttingDown) {
|
if (!m_ShuttingDown) {
|
||||||
m_ShuttingDown = true;
|
m_ShuttingDown = true;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue