Merge pull request #10254 from Icinga/Timeout-Cancel

Timeout: use less resources, clean them up better and make cancellation deterministic
This commit is contained in:
Julian Brost 2025-01-08 16:28:54 +01:00 committed by GitHub
commit fba56f0e61
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 250 additions and 65 deletions

View File

@ -146,9 +146,14 @@ void AsioConditionVariable::Wait(boost::asio::yield_context yc)
m_Timer.async_wait(yc[ec]);
}
/**
* Cancels any pending timeout callback.
*
* Must be called in the strand in which the callback was scheduled!
*/
void Timeout::Cancel()
{
m_Cancelled.store(true);
m_Cancelled->store(true);
boost::system::error_code ec;
m_Timer.cancel(ec);

View File

@ -3,10 +3,12 @@
#ifndef IO_ENGINE_H
#define IO_ENGINE_H
#include "base/atomic.hpp"
#include "base/debug.hpp"
#include "base/exception.hpp"
#include "base/lazy-init.hpp"
#include "base/logger.hpp"
#include "base/shared-object.hpp"
#include "base/shared.hpp"
#include <atomic>
#include <exception>
#include <memory>
@ -163,51 +165,80 @@ private:
/**
* I/O timeout emulator
*
* This class provides a workaround for Boost.ASIO's lack of built-in timeout support.
* While Boost.ASIO handles asynchronous operations, it does not natively support timeouts for these operations.
* This class uses a boost::asio::deadline_timer to emulate a timeout by scheduling a callback to be triggered
* after a specified duration, effectively adding timeout behavior where none exists.
* The callback is executed within the provided strand, ensuring thread-safety.
*
* The constructor returns immediately after scheduling the timeout callback.
* The callback itself is invoked asynchronously when the timeout occurs.
* This allows the caller to continue execution while the timeout is running in the background.
*
* The class provides a Cancel() method to unschedule any pending callback. If the callback has already been run,
* calling Cancel() has no effect. This method can be used to abort the timeout early if the monitored operation
* completes before the callback has been run. The Timeout destructor also automatically cancels any pending callback.
* A callback is considered pending even if the timeout has already expired,
* but the callback has not been executed yet due to a busy strand.
*
* @ingroup base
*/
class Timeout : public SharedObject
class Timeout
{
public:
DECLARE_PTR_TYPEDEFS(Timeout);
using Timer = boost::asio::deadline_timer;
template<class Executor, class TimeoutFromNow, class OnTimeout>
Timeout(boost::asio::io_context& io, Executor& executor, TimeoutFromNow timeoutFromNow, OnTimeout onTimeout)
: m_Timer(io)
/**
* Schedules onTimeout to be triggered after timeoutFromNow on strand.
*
* @param strand The strand in which the callback will be executed.
* The caller must also run in this strand, as well as Cancel() and the destructor!
* @param timeoutFromNow The duration after which the timeout callback will be triggered.
* @param onTimeout The callback to invoke when the timeout occurs.
*/
template<class OnTimeout>
Timeout(boost::asio::io_context::strand& strand, const Timer::duration_type& timeoutFromNow, OnTimeout onTimeout)
: m_Timer(strand.context(), timeoutFromNow), m_Cancelled(Shared<Atomic<bool>>::Make(false))
{
Ptr keepAlive (this);
VERIFY(strand.running_in_this_thread());
m_Cancelled.store(false);
m_Timer.expires_from_now(std::move(timeoutFromNow));
IoEngine::SpawnCoroutine(executor, [this, keepAlive, onTimeout](boost::asio::yield_context yc) {
if (m_Cancelled.load()) {
return;
}
{
boost::system::error_code ec;
m_Timer.async_wait(yc[ec]);
if (ec) {
return;
m_Timer.async_wait(boost::asio::bind_executor(
strand, [cancelled = m_Cancelled, onTimeout = std::move(onTimeout)](boost::system::error_code ec) {
if (!ec && !cancelled->load()) {
onTimeout();
}
}
));
}
if (m_Cancelled.load()) {
return;
}
Timeout(const Timeout&) = delete;
Timeout(Timeout&&) = delete;
Timeout& operator=(const Timeout&) = delete;
Timeout& operator=(Timeout&&) = delete;
auto f (onTimeout);
f(std::move(yc));
});
/**
* Cancels any pending timeout callback.
*
* Must be called in the strand in which the callback was scheduled!
*/
~Timeout()
{
Cancel();
}
void Cancel();
private:
boost::asio::deadline_timer m_Timer;
std::atomic<bool> m_Cancelled;
Timer m_Timer;
/**
* Indicates whether the Timeout has been cancelled.
*
* This must be Shared<> between the lambda in the constructor and Cancel() for the case
* the destructor calls Cancel() while the lambda is already queued in the strand.
* The whole Timeout instance can't be kept alive by the lambda because this would delay the destructor.
*/
Shared<Atomic<bool>>::Ptr m_Cancelled;
};
}

View File

@ -140,15 +140,12 @@ void AsioTlsStream::GracefulDisconnect(boost::asio::io_context::strand& strand,
}
{
Timeout::Ptr shutdownTimeout(new Timeout(strand.context(), strand, boost::posix_time::seconds(10),
[this](boost::asio::yield_context yc) {
Timeout shutdownTimeout (strand, boost::posix_time::seconds(10),
[this] {
// Forcefully terminate the connection if async_shutdown() blocked more than 10 seconds.
ForceDisconnect();
}
));
Defer cancelTimeout ([&shutdownTimeout]() {
shutdownTimeout->Cancel();
});
);
// Close the TLS connection, effectively uses SSL_shutdown() to send a close_notify shutdown alert to the peer.
boost::system::error_code ec;

View File

@ -318,7 +318,6 @@ void RedisConnection::Connect(asio::yield_context& yc)
auto conn (Shared<AsioTlsStream>::Make(m_Strand.context(), *m_TLSContext, m_Host));
auto& tlsConn (conn->next_layer());
auto connectTimeout (MakeTimeout(conn));
Defer cancelTimeout ([&connectTimeout]() { connectTimeout->Cancel(); });
icinga::Connect(conn->lowest_layer(), m_Host, Convert::ToString(m_Port), yc);
tlsConn.async_handshake(tlsConn.client, yc);
@ -348,7 +347,6 @@ void RedisConnection::Connect(asio::yield_context& yc)
auto conn (Shared<TcpConn>::Make(m_Strand.context()));
auto connectTimeout (MakeTimeout(conn));
Defer cancelTimeout ([&connectTimeout]() { connectTimeout->Cancel(); });
icinga::Connect(conn->next_layer(), m_Host, Convert::ToString(m_Port), yc);
Handshake(conn, yc);
@ -361,7 +359,6 @@ void RedisConnection::Connect(asio::yield_context& yc)
auto conn (Shared<UnixConn>::Make(m_Strand.context()));
auto connectTimeout (MakeTimeout(conn));
Defer cancelTimeout ([&connectTimeout]() { connectTimeout->Cancel(); });
conn->next_layer().async_connect(Unix::endpoint(m_Path.CStr()), yc);
Handshake(conn, yc);

View File

@ -222,7 +222,7 @@ namespace icinga
void Handshake(StreamPtr& stream, boost::asio::yield_context& yc);
template<class StreamPtr>
Timeout::Ptr MakeTimeout(StreamPtr& stream);
Timeout MakeTimeout(StreamPtr& stream);
String m_Path;
String m_Host;
@ -512,15 +512,12 @@ void RedisConnection::Handshake(StreamPtr& strm, boost::asio::yield_context& yc)
* @param stream Redis server connection
*/
template<class StreamPtr>
Timeout::Ptr RedisConnection::MakeTimeout(StreamPtr& stream)
Timeout RedisConnection::MakeTimeout(StreamPtr& stream)
{
Ptr keepAlive (this);
return new Timeout(
m_Strand.context(),
return Timeout(
m_Strand,
boost::posix_time::microseconds(intmax_t(m_ConnectTimeout * 1000000)),
[keepAlive, stream](boost::asio::yield_context yc) {
[stream] {
boost::system::error_code ec;
stream->lowest_layer().cancel(ec);
}

View File

@ -456,8 +456,8 @@ void IfwApiCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckRes
IoEngine::SpawnCoroutine(
*strand,
[strand, checkable, cr, psCommand, psHost, expectedSan, psPort, conn, req, checkTimeout, reportResult = std::move(reportResult)](asio::yield_context yc) {
Timeout::Ptr timeout = new Timeout(strand->context(), *strand, boost::posix_time::microseconds(int64_t(checkTimeout * 1e6)),
[&conn, &checkable](boost::asio::yield_context yc) {
Timeout timeout (*strand, boost::posix_time::microseconds(int64_t(checkTimeout * 1e6)),
[&conn, &checkable] {
Log(LogNotice, "IfwApiCheckTask")
<< "Timeout while checking " << checkable->GetReflectionType()->GetName()
<< " '" << checkable->GetName() << "', cancelling attempt";
@ -467,8 +467,6 @@ void IfwApiCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckRes
}
);
Defer cancelTimeout ([&timeout]() { timeout->Cancel(); });
DoIfwNetIo(yc, cr, psCommand, psHost, expectedSan, psPort, *conn, *req);
cr->SetExecutionEnd(Utility::GetTime());

View File

@ -534,16 +534,15 @@ void ApiListener::ListenerCoroutineProc(boost::asio::yield_context yc, const Sha
auto strand (Shared<asio::io_context::strand>::Make(io));
IoEngine::SpawnCoroutine(*strand, [this, strand, sslConn, remoteEndpoint](asio::yield_context yc) {
Timeout::Ptr timeout(new Timeout(strand->context(), *strand, boost::posix_time::microseconds(int64_t(GetConnectTimeout() * 1e6)),
[sslConn, remoteEndpoint](asio::yield_context yc) {
Timeout timeout (*strand, boost::posix_time::microseconds(int64_t(GetConnectTimeout() * 1e6)),
[sslConn, remoteEndpoint] {
Log(LogWarning, "ApiListener")
<< "Timeout while processing incoming connection from " << remoteEndpoint;
boost::system::error_code ec;
sslConn->lowest_layer().cancel(ec);
}
));
Defer cancelTimeout([timeout]() { timeout->Cancel(); });
);
NewClientHandler(yc, strand, sslConn, String(), RoleServer);
});
@ -585,8 +584,8 @@ void ApiListener::AddConnection(const Endpoint::Ptr& endpoint)
lock.unlock();
Timeout::Ptr timeout(new Timeout(strand->context(), *strand, boost::posix_time::microseconds(int64_t(GetConnectTimeout() * 1e6)),
[sslConn, endpoint, host, port](asio::yield_context yc) {
Timeout timeout (*strand, boost::posix_time::microseconds(int64_t(GetConnectTimeout() * 1e6)),
[sslConn, endpoint, host, port] {
Log(LogCritical, "ApiListener")
<< "Timeout while reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host
<< "' and port '" << port << "', cancelling attempt";
@ -594,8 +593,7 @@ void ApiListener::AddConnection(const Endpoint::Ptr& endpoint)
boost::system::error_code ec;
sslConn->lowest_layer().cancel(ec);
}
));
Defer cancelTimeout([&timeout]() { timeout->Cancel(); });
);
Connect(sslConn->lowest_layer(), host, port, yc);
@ -683,19 +681,16 @@ void ApiListener::NewClientHandlerInternal(
boost::system::error_code ec;
{
Timeout::Ptr handshakeTimeout (new Timeout(
strand->context(),
Timeout handshakeTimeout (
*strand,
boost::posix_time::microseconds(intmax_t(Configuration::TlsHandshakeTimeout * 1000000)),
[strand, client](asio::yield_context yc) {
[client] {
boost::system::error_code ec;
client->lowest_layer().cancel(ec);
}
));
);
sslConn.async_handshake(role == RoleClient ? sslConn.client : sslConn.server, yc[ec]);
handshakeTimeout->Cancel();
}
if (ec) {

View File

@ -62,6 +62,7 @@ set(base_test_SOURCES
base-convert.cpp
base-dictionary.cpp
base-fifo.cpp
base-io-engine.cpp
base-json.cpp
base-match.cpp
base-netstring.cpp
@ -128,6 +129,11 @@ add_boost_test(base
base_dictionary/keys_ordered
base_fifo/construct
base_fifo/io
base_io_engine/timeout_run
base_io_engine/timeout_cancelled
base_io_engine/timeout_scope
base_io_engine/timeout_due_cancelled
base_io_engine/timeout_due_scope
base_json/encode
base_json/decode
base_json/invalid1

159
test/base-io-engine.cpp Normal file
View File

@ -0,0 +1,159 @@
/* Icinga 2 | (c) 2024 Icinga GmbH | GPLv2+ */
#include "base/io-engine.hpp"
#include "base/utility.hpp"
#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <BoostTestTargetConfig.h>
#include <thread>
using namespace icinga;
BOOST_AUTO_TEST_SUITE(base_io_engine)
BOOST_AUTO_TEST_CASE(timeout_run)
{
boost::asio::io_context io;
boost::asio::io_context::strand strand (io);
int called = 0;
boost::asio::spawn(strand, [&](boost::asio::yield_context yc) {
boost::asio::deadline_timer timer (io);
Timeout timeout (strand, boost::posix_time::millisec(300), [&called] { ++called; });
BOOST_CHECK_EQUAL(called, 0);
timer.expires_from_now(boost::posix_time::millisec(200));
timer.async_wait(yc);
BOOST_CHECK_EQUAL(called, 0);
timer.expires_from_now(boost::posix_time::millisec(200));
timer.async_wait(yc);
});
std::thread eventLoop ([&io] { io.run(); });
io.run();
eventLoop.join();
BOOST_CHECK_EQUAL(called, 1);
}
BOOST_AUTO_TEST_CASE(timeout_cancelled)
{
boost::asio::io_context io;
boost::asio::io_context::strand strand (io);
int called = 0;
boost::asio::spawn(strand, [&](boost::asio::yield_context yc) {
boost::asio::deadline_timer timer (io);
Timeout timeout (strand, boost::posix_time::millisec(300), [&called] { ++called; });
timer.expires_from_now(boost::posix_time::millisec(200));
timer.async_wait(yc);
timeout.Cancel();
BOOST_CHECK_EQUAL(called, 0);
timer.expires_from_now(boost::posix_time::millisec(200));
timer.async_wait(yc);
});
std::thread eventLoop ([&io] { io.run(); });
io.run();
eventLoop.join();
BOOST_CHECK_EQUAL(called, 0);
}
BOOST_AUTO_TEST_CASE(timeout_scope)
{
boost::asio::io_context io;
boost::asio::io_context::strand strand (io);
int called = 0;
boost::asio::spawn(strand, [&](boost::asio::yield_context yc) {
boost::asio::deadline_timer timer (io);
{
Timeout timeout (strand, boost::posix_time::millisec(300), [&called] { ++called; });
timer.expires_from_now(boost::posix_time::millisec(200));
timer.async_wait(yc);
}
BOOST_CHECK_EQUAL(called, 0);
timer.expires_from_now(boost::posix_time::millisec(200));
timer.async_wait(yc);
});
std::thread eventLoop ([&io] { io.run(); });
io.run();
eventLoop.join();
BOOST_CHECK_EQUAL(called, 0);
}
BOOST_AUTO_TEST_CASE(timeout_due_cancelled)
{
boost::asio::io_context io;
boost::asio::io_context::strand strand (io);
int called = 0;
boost::asio::spawn(strand, [&](boost::asio::yield_context yc) {
boost::asio::deadline_timer timer (io);
Timeout timeout (strand, boost::posix_time::millisec(300), [&called] { ++called; });
// Give the timeout enough time to become due while blocking its strand to prevent it from actually running...
Utility::Sleep(0.4);
BOOST_CHECK_EQUAL(called, 0);
// ... so that this shall still work:
timeout.Cancel();
BOOST_CHECK_EQUAL(called, 0);
timer.expires_from_now(boost::posix_time::millisec(100));
timer.async_wait(yc);
});
std::thread eventLoop ([&io] { io.run(); });
io.run();
eventLoop.join();
BOOST_CHECK_EQUAL(called, 0);
}
BOOST_AUTO_TEST_CASE(timeout_due_scope)
{
boost::asio::io_context io;
boost::asio::io_context::strand strand (io);
int called = 0;
boost::asio::spawn(strand, [&](boost::asio::yield_context yc) {
boost::asio::deadline_timer timer (io);
{
Timeout timeout (strand, boost::posix_time::millisec(300), [&called] { ++called; });
// Give the timeout enough time to become due while blocking its strand to prevent it from actually running...
Utility::Sleep(0.4);
BOOST_CHECK_EQUAL(called, 0);
} // ... so that Timeout#~Timeout() shall still work here.
BOOST_CHECK_EQUAL(called, 0);
timer.expires_from_now(boost::posix_time::millisec(100));
timer.async_wait(yc);
});
std::thread eventLoop ([&io] { io.run(); });
io.run();
eventLoop.join();
BOOST_CHECK_EQUAL(called, 0);
}
BOOST_AUTO_TEST_SUITE_END()