mirror of
https://github.com/Icinga/icinga2.git
synced 2025-09-22 09:17:43 +02:00
Merge pull request #10293 from Icinga/graceful-tls-disconnect-214
Add a dedicated method for disconnecting TLS connections
This commit is contained in:
commit
2c0925cedd
@ -146,9 +146,14 @@ void AsioConditionVariable::Wait(boost::asio::yield_context yc)
|
|||||||
m_Timer.async_wait(yc[ec]);
|
m_Timer.async_wait(yc[ec]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cancels any pending timeout callback.
|
||||||
|
*
|
||||||
|
* Must be called in the strand in which the callback was scheduled!
|
||||||
|
*/
|
||||||
void Timeout::Cancel()
|
void Timeout::Cancel()
|
||||||
{
|
{
|
||||||
m_Cancelled.store(true);
|
m_Cancelled->store(true);
|
||||||
|
|
||||||
boost::system::error_code ec;
|
boost::system::error_code ec;
|
||||||
m_Timer.cancel(ec);
|
m_Timer.cancel(ec);
|
||||||
|
@ -3,10 +3,12 @@
|
|||||||
#ifndef IO_ENGINE_H
|
#ifndef IO_ENGINE_H
|
||||||
#define IO_ENGINE_H
|
#define IO_ENGINE_H
|
||||||
|
|
||||||
|
#include "base/atomic.hpp"
|
||||||
|
#include "base/debug.hpp"
|
||||||
#include "base/exception.hpp"
|
#include "base/exception.hpp"
|
||||||
#include "base/lazy-init.hpp"
|
#include "base/lazy-init.hpp"
|
||||||
#include "base/logger.hpp"
|
#include "base/logger.hpp"
|
||||||
#include "base/shared-object.hpp"
|
#include "base/shared.hpp"
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
#include <exception>
|
#include <exception>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
@ -163,51 +165,80 @@ private:
|
|||||||
/**
|
/**
|
||||||
* I/O timeout emulator
|
* I/O timeout emulator
|
||||||
*
|
*
|
||||||
|
* This class provides a workaround for Boost.ASIO's lack of built-in timeout support.
|
||||||
|
* While Boost.ASIO handles asynchronous operations, it does not natively support timeouts for these operations.
|
||||||
|
* This class uses a boost::asio::deadline_timer to emulate a timeout by scheduling a callback to be triggered
|
||||||
|
* after a specified duration, effectively adding timeout behavior where none exists.
|
||||||
|
* The callback is executed within the provided strand, ensuring thread-safety.
|
||||||
|
*
|
||||||
|
* The constructor returns immediately after scheduling the timeout callback.
|
||||||
|
* The callback itself is invoked asynchronously when the timeout occurs.
|
||||||
|
* This allows the caller to continue execution while the timeout is running in the background.
|
||||||
|
*
|
||||||
|
* The class provides a Cancel() method to unschedule any pending callback. If the callback has already been run,
|
||||||
|
* calling Cancel() has no effect. This method can be used to abort the timeout early if the monitored operation
|
||||||
|
* completes before the callback has been run. The Timeout destructor also automatically cancels any pending callback.
|
||||||
|
* A callback is considered pending even if the timeout has already expired,
|
||||||
|
* but the callback has not been executed yet due to a busy strand.
|
||||||
|
*
|
||||||
* @ingroup base
|
* @ingroup base
|
||||||
*/
|
*/
|
||||||
class Timeout : public SharedObject
|
class Timeout
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
DECLARE_PTR_TYPEDEFS(Timeout);
|
using Timer = boost::asio::deadline_timer;
|
||||||
|
|
||||||
template<class Executor, class TimeoutFromNow, class OnTimeout>
|
/**
|
||||||
Timeout(boost::asio::io_context& io, Executor& executor, TimeoutFromNow timeoutFromNow, OnTimeout onTimeout)
|
* Schedules onTimeout to be triggered after timeoutFromNow on strand.
|
||||||
: m_Timer(io)
|
*
|
||||||
|
* @param strand The strand in which the callback will be executed.
|
||||||
|
* The caller must also run in this strand, as well as Cancel() and the destructor!
|
||||||
|
* @param timeoutFromNow The duration after which the timeout callback will be triggered.
|
||||||
|
* @param onTimeout The callback to invoke when the timeout occurs.
|
||||||
|
*/
|
||||||
|
template<class OnTimeout>
|
||||||
|
Timeout(boost::asio::io_context::strand& strand, const Timer::duration_type& timeoutFromNow, OnTimeout onTimeout)
|
||||||
|
: m_Timer(strand.context(), timeoutFromNow), m_Cancelled(Shared<Atomic<bool>>::Make(false))
|
||||||
{
|
{
|
||||||
Ptr keepAlive (this);
|
VERIFY(strand.running_in_this_thread());
|
||||||
|
|
||||||
m_Cancelled.store(false);
|
m_Timer.async_wait(boost::asio::bind_executor(
|
||||||
m_Timer.expires_from_now(std::move(timeoutFromNow));
|
strand, [cancelled = m_Cancelled, onTimeout = std::move(onTimeout)](boost::system::error_code ec) {
|
||||||
|
if (!ec && !cancelled->load()) {
|
||||||
IoEngine::SpawnCoroutine(executor, [this, keepAlive, onTimeout](boost::asio::yield_context yc) {
|
onTimeout();
|
||||||
if (m_Cancelled.load()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
boost::system::error_code ec;
|
|
||||||
|
|
||||||
m_Timer.async_wait(yc[ec]);
|
|
||||||
|
|
||||||
if (ec) {
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
if (m_Cancelled.load()) {
|
Timeout(const Timeout&) = delete;
|
||||||
return;
|
Timeout(Timeout&&) = delete;
|
||||||
}
|
Timeout& operator=(const Timeout&) = delete;
|
||||||
|
Timeout& operator=(Timeout&&) = delete;
|
||||||
|
|
||||||
auto f (onTimeout);
|
/**
|
||||||
f(std::move(yc));
|
* Cancels any pending timeout callback.
|
||||||
});
|
*
|
||||||
|
* Must be called in the strand in which the callback was scheduled!
|
||||||
|
*/
|
||||||
|
~Timeout()
|
||||||
|
{
|
||||||
|
Cancel();
|
||||||
}
|
}
|
||||||
|
|
||||||
void Cancel();
|
void Cancel();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
boost::asio::deadline_timer m_Timer;
|
Timer m_Timer;
|
||||||
std::atomic<bool> m_Cancelled;
|
|
||||||
|
/**
|
||||||
|
* Indicates whether the Timeout has been cancelled.
|
||||||
|
*
|
||||||
|
* This must be Shared<> between the lambda in the constructor and Cancel() for the case
|
||||||
|
* the destructor calls Cancel() while the lambda is already queued in the strand.
|
||||||
|
* The whole Timeout instance can't be kept alive by the lambda because this would delay the destructor.
|
||||||
|
*/
|
||||||
|
Shared<Atomic<bool>>::Ptr m_Cancelled;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -7,6 +7,8 @@
|
|||||||
#include "base/logger.hpp"
|
#include "base/logger.hpp"
|
||||||
#include "base/configuration.hpp"
|
#include "base/configuration.hpp"
|
||||||
#include "base/convert.hpp"
|
#include "base/convert.hpp"
|
||||||
|
#include "base/defer.hpp"
|
||||||
|
#include "base/io-engine.hpp"
|
||||||
#include <boost/asio/ssl/context.hpp>
|
#include <boost/asio/ssl/context.hpp>
|
||||||
#include <boost/asio/ssl/verify_context.hpp>
|
#include <boost/asio/ssl/verify_context.hpp>
|
||||||
#include <boost/asio/ssl/verify_mode.hpp>
|
#include <boost/asio/ssl/verify_mode.hpp>
|
||||||
@ -103,3 +105,62 @@ void UnbufferedAsioTlsStream::BeforeHandshake(handshake_type type)
|
|||||||
}
|
}
|
||||||
#endif /* SSL_CTRL_SET_TLSEXT_HOSTNAME */
|
#endif /* SSL_CTRL_SET_TLSEXT_HOSTNAME */
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Forcefully close the connection, typically (details are up to the operating system) using a TCP RST.
|
||||||
|
*/
|
||||||
|
void AsioTlsStream::ForceDisconnect()
|
||||||
|
{
|
||||||
|
if (!lowest_layer().is_open()) {
|
||||||
|
// Already disconnected, nothing to do.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
boost::system::error_code ec;
|
||||||
|
|
||||||
|
// Close the socket. In case the connection wasn't shut down cleanly by GracefulDisconnect(), the operating system
|
||||||
|
// will typically terminate the connection with a TCP RST. Otherwise, this just releases the file descriptor.
|
||||||
|
lowest_layer().close(ec);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Try to cleanly shut down the connection. This involves sending a TLS close_notify shutdown alert and terminating the
|
||||||
|
* underlying TCP connection. Sending these additional messages can block, hence the method takes a yield context and
|
||||||
|
* internally implements a timeout of 10 seconds for the operation after which the connection is forcefully terminated
|
||||||
|
* using ForceDisconnect().
|
||||||
|
*
|
||||||
|
* @param strand Asio strand used for other operations on this connection.
|
||||||
|
* @param yc Yield context for Asio coroutines
|
||||||
|
*/
|
||||||
|
void AsioTlsStream::GracefulDisconnect(boost::asio::io_context::strand& strand, boost::asio::yield_context& yc)
|
||||||
|
{
|
||||||
|
if (!lowest_layer().is_open()) {
|
||||||
|
// Already disconnected, nothing to do.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
Timeout shutdownTimeout (strand, boost::posix_time::seconds(10),
|
||||||
|
[this] {
|
||||||
|
// Forcefully terminate the connection if async_shutdown() blocked more than 10 seconds.
|
||||||
|
ForceDisconnect();
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
// Close the TLS connection, effectively uses SSL_shutdown() to send a close_notify shutdown alert to the peer.
|
||||||
|
boost::system::error_code ec;
|
||||||
|
next_layer().async_shutdown(yc[ec]);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!lowest_layer().is_open()) {
|
||||||
|
// Connection got closed in the meantime, most likely by the timeout, so nothing more to do.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Shut down the TCP connection.
|
||||||
|
boost::system::error_code ec;
|
||||||
|
lowest_layer().shutdown(lowest_layer_type::shutdown_both, ec);
|
||||||
|
|
||||||
|
// Clean up the connection (closes the file descriptor).
|
||||||
|
ForceDisconnect();
|
||||||
|
}
|
||||||
|
@ -111,6 +111,9 @@ public:
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ForceDisconnect();
|
||||||
|
void GracefulDisconnect(boost::asio::io_context::strand& strand, boost::asio::yield_context& yc);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
inline
|
inline
|
||||||
AsioTlsStream(UnbufferedAsioTlsStreamParams init)
|
AsioTlsStream(UnbufferedAsioTlsStreamParams init)
|
||||||
|
@ -318,7 +318,6 @@ void RedisConnection::Connect(asio::yield_context& yc)
|
|||||||
auto conn (Shared<AsioTlsStream>::Make(m_Strand.context(), *m_TLSContext, m_Host));
|
auto conn (Shared<AsioTlsStream>::Make(m_Strand.context(), *m_TLSContext, m_Host));
|
||||||
auto& tlsConn (conn->next_layer());
|
auto& tlsConn (conn->next_layer());
|
||||||
auto connectTimeout (MakeTimeout(conn));
|
auto connectTimeout (MakeTimeout(conn));
|
||||||
Defer cancelTimeout ([&connectTimeout]() { connectTimeout->Cancel(); });
|
|
||||||
|
|
||||||
icinga::Connect(conn->lowest_layer(), m_Host, Convert::ToString(m_Port), yc);
|
icinga::Connect(conn->lowest_layer(), m_Host, Convert::ToString(m_Port), yc);
|
||||||
tlsConn.async_handshake(tlsConn.client, yc);
|
tlsConn.async_handshake(tlsConn.client, yc);
|
||||||
@ -348,7 +347,6 @@ void RedisConnection::Connect(asio::yield_context& yc)
|
|||||||
|
|
||||||
auto conn (Shared<TcpConn>::Make(m_Strand.context()));
|
auto conn (Shared<TcpConn>::Make(m_Strand.context()));
|
||||||
auto connectTimeout (MakeTimeout(conn));
|
auto connectTimeout (MakeTimeout(conn));
|
||||||
Defer cancelTimeout ([&connectTimeout]() { connectTimeout->Cancel(); });
|
|
||||||
|
|
||||||
icinga::Connect(conn->next_layer(), m_Host, Convert::ToString(m_Port), yc);
|
icinga::Connect(conn->next_layer(), m_Host, Convert::ToString(m_Port), yc);
|
||||||
Handshake(conn, yc);
|
Handshake(conn, yc);
|
||||||
@ -361,7 +359,6 @@ void RedisConnection::Connect(asio::yield_context& yc)
|
|||||||
|
|
||||||
auto conn (Shared<UnixConn>::Make(m_Strand.context()));
|
auto conn (Shared<UnixConn>::Make(m_Strand.context()));
|
||||||
auto connectTimeout (MakeTimeout(conn));
|
auto connectTimeout (MakeTimeout(conn));
|
||||||
Defer cancelTimeout ([&connectTimeout]() { connectTimeout->Cancel(); });
|
|
||||||
|
|
||||||
conn->next_layer().async_connect(Unix::endpoint(m_Path.CStr()), yc);
|
conn->next_layer().async_connect(Unix::endpoint(m_Path.CStr()), yc);
|
||||||
Handshake(conn, yc);
|
Handshake(conn, yc);
|
||||||
|
@ -222,7 +222,7 @@ namespace icinga
|
|||||||
void Handshake(StreamPtr& stream, boost::asio::yield_context& yc);
|
void Handshake(StreamPtr& stream, boost::asio::yield_context& yc);
|
||||||
|
|
||||||
template<class StreamPtr>
|
template<class StreamPtr>
|
||||||
Timeout::Ptr MakeTimeout(StreamPtr& stream);
|
Timeout MakeTimeout(StreamPtr& stream);
|
||||||
|
|
||||||
String m_Path;
|
String m_Path;
|
||||||
String m_Host;
|
String m_Host;
|
||||||
@ -509,15 +509,12 @@ void RedisConnection::Handshake(StreamPtr& strm, boost::asio::yield_context& yc)
|
|||||||
* @param stream Redis server connection
|
* @param stream Redis server connection
|
||||||
*/
|
*/
|
||||||
template<class StreamPtr>
|
template<class StreamPtr>
|
||||||
Timeout::Ptr RedisConnection::MakeTimeout(StreamPtr& stream)
|
Timeout RedisConnection::MakeTimeout(StreamPtr& stream)
|
||||||
{
|
{
|
||||||
Ptr keepAlive (this);
|
return Timeout(
|
||||||
|
|
||||||
return new Timeout(
|
|
||||||
m_Strand.context(),
|
|
||||||
m_Strand,
|
m_Strand,
|
||||||
boost::posix_time::microseconds(intmax_t(m_ConnectTimeout * 1000000)),
|
boost::posix_time::microseconds(intmax_t(m_ConnectTimeout * 1000000)),
|
||||||
[keepAlive, stream](boost::asio::yield_context yc) {
|
[stream] {
|
||||||
boost::system::error_code ec;
|
boost::system::error_code ec;
|
||||||
stream->lowest_layer().cancel(ec);
|
stream->lowest_layer().cancel(ec);
|
||||||
}
|
}
|
||||||
|
@ -102,6 +102,8 @@ static void DoIfwNetIo(
|
|||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
|
// Using async_shutdown() instead of AsioTlsStream::GracefulDisconnect() as this whole function
|
||||||
|
// is already guarded by a timeout based on the check timeout.
|
||||||
boost::system::error_code ec;
|
boost::system::error_code ec;
|
||||||
sslConn.async_shutdown(yc[ec]);
|
sslConn.async_shutdown(yc[ec]);
|
||||||
}
|
}
|
||||||
@ -454,8 +456,8 @@ void IfwApiCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckRes
|
|||||||
IoEngine::SpawnCoroutine(
|
IoEngine::SpawnCoroutine(
|
||||||
*strand,
|
*strand,
|
||||||
[strand, checkable, cr, psCommand, psHost, expectedSan, psPort, conn, req, checkTimeout, reportResult = std::move(reportResult)](asio::yield_context yc) {
|
[strand, checkable, cr, psCommand, psHost, expectedSan, psPort, conn, req, checkTimeout, reportResult = std::move(reportResult)](asio::yield_context yc) {
|
||||||
Timeout::Ptr timeout = new Timeout(strand->context(), *strand, boost::posix_time::microseconds(int64_t(checkTimeout * 1e6)),
|
Timeout timeout (*strand, boost::posix_time::microseconds(int64_t(checkTimeout * 1e6)),
|
||||||
[&conn, &checkable](boost::asio::yield_context yc) {
|
[&conn, &checkable] {
|
||||||
Log(LogNotice, "IfwApiCheckTask")
|
Log(LogNotice, "IfwApiCheckTask")
|
||||||
<< "Timeout while checking " << checkable->GetReflectionType()->GetName()
|
<< "Timeout while checking " << checkable->GetReflectionType()->GetName()
|
||||||
<< " '" << checkable->GetName() << "', cancelling attempt";
|
<< " '" << checkable->GetName() << "', cancelling attempt";
|
||||||
@ -465,8 +467,6 @@ void IfwApiCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckRes
|
|||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
Defer cancelTimeout ([&timeout]() { timeout->Cancel(); });
|
|
||||||
|
|
||||||
DoIfwNetIo(yc, cr, psCommand, psHost, expectedSan, psPort, *conn, *req);
|
DoIfwNetIo(yc, cr, psCommand, psHost, expectedSan, psPort, *conn, *req);
|
||||||
|
|
||||||
cr->SetExecutionEnd(Utility::GetTime());
|
cr->SetExecutionEnd(Utility::GetTime());
|
||||||
|
@ -534,16 +534,15 @@ void ApiListener::ListenerCoroutineProc(boost::asio::yield_context yc, const Sha
|
|||||||
auto strand (Shared<asio::io_context::strand>::Make(io));
|
auto strand (Shared<asio::io_context::strand>::Make(io));
|
||||||
|
|
||||||
IoEngine::SpawnCoroutine(*strand, [this, strand, sslConn, remoteEndpoint](asio::yield_context yc) {
|
IoEngine::SpawnCoroutine(*strand, [this, strand, sslConn, remoteEndpoint](asio::yield_context yc) {
|
||||||
Timeout::Ptr timeout(new Timeout(strand->context(), *strand, boost::posix_time::microseconds(int64_t(GetConnectTimeout() * 1e6)),
|
Timeout timeout (*strand, boost::posix_time::microseconds(int64_t(GetConnectTimeout() * 1e6)),
|
||||||
[sslConn, remoteEndpoint](asio::yield_context yc) {
|
[sslConn, remoteEndpoint] {
|
||||||
Log(LogWarning, "ApiListener")
|
Log(LogWarning, "ApiListener")
|
||||||
<< "Timeout while processing incoming connection from " << remoteEndpoint;
|
<< "Timeout while processing incoming connection from " << remoteEndpoint;
|
||||||
|
|
||||||
boost::system::error_code ec;
|
boost::system::error_code ec;
|
||||||
sslConn->lowest_layer().cancel(ec);
|
sslConn->lowest_layer().cancel(ec);
|
||||||
}
|
}
|
||||||
));
|
);
|
||||||
Defer cancelTimeout([timeout]() { timeout->Cancel(); });
|
|
||||||
|
|
||||||
NewClientHandler(yc, strand, sslConn, String(), RoleServer);
|
NewClientHandler(yc, strand, sslConn, String(), RoleServer);
|
||||||
});
|
});
|
||||||
@ -585,8 +584,8 @@ void ApiListener::AddConnection(const Endpoint::Ptr& endpoint)
|
|||||||
|
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
|
|
||||||
Timeout::Ptr timeout(new Timeout(strand->context(), *strand, boost::posix_time::microseconds(int64_t(GetConnectTimeout() * 1e6)),
|
Timeout timeout (*strand, boost::posix_time::microseconds(int64_t(GetConnectTimeout() * 1e6)),
|
||||||
[sslConn, endpoint, host, port](asio::yield_context yc) {
|
[sslConn, endpoint, host, port] {
|
||||||
Log(LogCritical, "ApiListener")
|
Log(LogCritical, "ApiListener")
|
||||||
<< "Timeout while reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host
|
<< "Timeout while reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host
|
||||||
<< "' and port '" << port << "', cancelling attempt";
|
<< "' and port '" << port << "', cancelling attempt";
|
||||||
@ -594,8 +593,7 @@ void ApiListener::AddConnection(const Endpoint::Ptr& endpoint)
|
|||||||
boost::system::error_code ec;
|
boost::system::error_code ec;
|
||||||
sslConn->lowest_layer().cancel(ec);
|
sslConn->lowest_layer().cancel(ec);
|
||||||
}
|
}
|
||||||
));
|
);
|
||||||
Defer cancelTimeout([&timeout]() { timeout->Cancel(); });
|
|
||||||
|
|
||||||
Connect(sslConn->lowest_layer(), host, port, yc);
|
Connect(sslConn->lowest_layer(), host, port, yc);
|
||||||
|
|
||||||
@ -683,19 +681,16 @@ void ApiListener::NewClientHandlerInternal(
|
|||||||
boost::system::error_code ec;
|
boost::system::error_code ec;
|
||||||
|
|
||||||
{
|
{
|
||||||
Timeout::Ptr handshakeTimeout (new Timeout(
|
Timeout handshakeTimeout (
|
||||||
strand->context(),
|
|
||||||
*strand,
|
*strand,
|
||||||
boost::posix_time::microseconds(intmax_t(Configuration::TlsHandshakeTimeout * 1000000)),
|
boost::posix_time::microseconds(intmax_t(Configuration::TlsHandshakeTimeout * 1000000)),
|
||||||
[strand, client](asio::yield_context yc) {
|
[client] {
|
||||||
boost::system::error_code ec;
|
boost::system::error_code ec;
|
||||||
client->lowest_layer().cancel(ec);
|
client->lowest_layer().cancel(ec);
|
||||||
}
|
}
|
||||||
));
|
);
|
||||||
|
|
||||||
sslConn.async_handshake(role == RoleClient ? sslConn.client : sslConn.server, yc[ec]);
|
sslConn.async_handshake(role == RoleClient ? sslConn.client : sslConn.server, yc[ec]);
|
||||||
|
|
||||||
handshakeTimeout->Cancel();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ec) {
|
if (ec) {
|
||||||
@ -719,6 +714,9 @@ void ApiListener::NewClientHandlerInternal(
|
|||||||
// Ignore the error, but do not throw an exception being swallowed at all cost.
|
// Ignore the error, but do not throw an exception being swallowed at all cost.
|
||||||
// https://github.com/Icinga/icinga2/issues/7351
|
// https://github.com/Icinga/icinga2/issues/7351
|
||||||
boost::system::error_code ec;
|
boost::system::error_code ec;
|
||||||
|
|
||||||
|
// Using async_shutdown() instead of AsioTlsStream::GracefulDisconnect() as this whole function
|
||||||
|
// is already guarded by a timeout based on the connect timeout.
|
||||||
sslConn.async_shutdown(yc[ec]);
|
sslConn.async_shutdown(yc[ec]);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -88,23 +88,9 @@ void HttpServerConnection::Disconnect(boost::asio::yield_context yc)
|
|||||||
Log(LogInformation, "HttpServerConnection")
|
Log(LogInformation, "HttpServerConnection")
|
||||||
<< "HTTP client disconnected (from " << m_PeerAddress << ")";
|
<< "HTTP client disconnected (from " << m_PeerAddress << ")";
|
||||||
|
|
||||||
/*
|
|
||||||
* Do not swallow exceptions in a coroutine.
|
|
||||||
* https://github.com/Icinga/icinga2/issues/7351
|
|
||||||
* We must not catch `detail::forced_unwind exception` as
|
|
||||||
* this is used for unwinding the stack.
|
|
||||||
*
|
|
||||||
* Just use the error_code dummy here.
|
|
||||||
*/
|
|
||||||
boost::system::error_code ec;
|
|
||||||
|
|
||||||
m_CheckLivenessTimer.cancel();
|
m_CheckLivenessTimer.cancel();
|
||||||
|
|
||||||
m_Stream->lowest_layer().cancel(ec);
|
m_Stream->GracefulDisconnect(m_IoStrand, yc);
|
||||||
|
|
||||||
m_Stream->next_layer().async_shutdown(yc[ec]);
|
|
||||||
|
|
||||||
m_Stream->lowest_layer().shutdown(m_Stream->lowest_layer().shutdown_both, ec);
|
|
||||||
|
|
||||||
auto listener (ApiListener::GetInstance());
|
auto listener (ApiListener::GetInstance());
|
||||||
|
|
||||||
|
@ -250,54 +250,42 @@ void JsonRpcConnection::Disconnect()
|
|||||||
if (!m_ShuttingDown.exchange(true)) {
|
if (!m_ShuttingDown.exchange(true)) {
|
||||||
JsonRpcConnection::Ptr keepAlive (this);
|
JsonRpcConnection::Ptr keepAlive (this);
|
||||||
|
|
||||||
IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
|
Log(LogNotice, "JsonRpcConnection")
|
||||||
Log(LogWarning, "JsonRpcConnection")
|
<< "Disconnecting API client for identity '" << m_Identity << "'";
|
||||||
<< "API client disconnected for identity '" << m_Identity << "'";
|
|
||||||
|
IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
|
||||||
|
m_OutgoingMessagesQueued.Set();
|
||||||
|
|
||||||
|
{
|
||||||
|
Timeout writerTimeout(
|
||||||
|
m_IoStrand,
|
||||||
|
boost::posix_time::seconds(5),
|
||||||
|
[this]() {
|
||||||
|
// The writer coroutine could not finish soon enough to unblock the waiter down blow,
|
||||||
|
// so we have to do this on our own, and the coroutine will be terminated forcibly when
|
||||||
|
// the ops on the underlying socket are cancelled.
|
||||||
|
boost::system::error_code ec;
|
||||||
|
m_Stream->lowest_layer().cancel(ec);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
m_WriterDone.Wait(yc);
|
||||||
|
// We don't need to explicitly cancel the timer here; its destructor will handle it for us.
|
||||||
|
}
|
||||||
|
|
||||||
|
m_CheckLivenessTimer.cancel();
|
||||||
|
m_HeartbeatTimer.cancel();
|
||||||
|
|
||||||
|
m_Stream->GracefulDisconnect(m_IoStrand, yc);
|
||||||
|
|
||||||
// We need to unregister the endpoint client as soon as possible not to confuse Icinga 2,
|
|
||||||
// given that Endpoint::GetConnected() is just performing a check that the endpoint's client
|
|
||||||
// cache is not empty, which could result in an already disconnected endpoint never trying to
|
|
||||||
// reconnect again. See #7444.
|
|
||||||
if (m_Endpoint) {
|
if (m_Endpoint) {
|
||||||
m_Endpoint->RemoveClient(this);
|
m_Endpoint->RemoveClient(this);
|
||||||
} else {
|
} else {
|
||||||
ApiListener::GetInstance()->RemoveAnonymousClient(this);
|
ApiListener::GetInstance()->RemoveAnonymousClient(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
m_OutgoingMessagesQueued.Set();
|
Log(LogWarning, "JsonRpcConnection")
|
||||||
|
<< "API client disconnected for identity '" << m_Identity << "'";
|
||||||
m_WriterDone.Wait(yc);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Do not swallow exceptions in a coroutine.
|
|
||||||
* https://github.com/Icinga/icinga2/issues/7351
|
|
||||||
* We must not catch `detail::forced_unwind exception` as
|
|
||||||
* this is used for unwinding the stack.
|
|
||||||
*
|
|
||||||
* Just use the error_code dummy here.
|
|
||||||
*/
|
|
||||||
boost::system::error_code ec;
|
|
||||||
|
|
||||||
m_CheckLivenessTimer.cancel();
|
|
||||||
m_HeartbeatTimer.cancel();
|
|
||||||
|
|
||||||
m_Stream->lowest_layer().cancel(ec);
|
|
||||||
|
|
||||||
Timeout::Ptr shutdownTimeout (new Timeout(
|
|
||||||
m_IoStrand.context(),
|
|
||||||
m_IoStrand,
|
|
||||||
boost::posix_time::seconds(10),
|
|
||||||
[this, keepAlive](asio::yield_context yc) {
|
|
||||||
boost::system::error_code ec;
|
|
||||||
m_Stream->lowest_layer().cancel(ec);
|
|
||||||
}
|
|
||||||
));
|
|
||||||
|
|
||||||
m_Stream->next_layer().async_shutdown(yc[ec]);
|
|
||||||
|
|
||||||
shutdownTimeout->Cancel();
|
|
||||||
|
|
||||||
m_Stream->lowest_layer().shutdown(m_Stream->lowest_layer().shutdown_both, ec);
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -9,6 +9,7 @@ set(base_test_SOURCES
|
|||||||
base-convert.cpp
|
base-convert.cpp
|
||||||
base-dictionary.cpp
|
base-dictionary.cpp
|
||||||
base-fifo.cpp
|
base-fifo.cpp
|
||||||
|
base-io-engine.cpp
|
||||||
base-json.cpp
|
base-json.cpp
|
||||||
base-match.cpp
|
base-match.cpp
|
||||||
base-netstring.cpp
|
base-netstring.cpp
|
||||||
@ -76,6 +77,11 @@ add_boost_test(base
|
|||||||
base_dictionary/keys_ordered
|
base_dictionary/keys_ordered
|
||||||
base_fifo/construct
|
base_fifo/construct
|
||||||
base_fifo/io
|
base_fifo/io
|
||||||
|
base_io_engine/timeout_run
|
||||||
|
base_io_engine/timeout_cancelled
|
||||||
|
base_io_engine/timeout_scope
|
||||||
|
base_io_engine/timeout_due_cancelled
|
||||||
|
base_io_engine/timeout_due_scope
|
||||||
base_json/encode
|
base_json/encode
|
||||||
base_json/decode
|
base_json/decode
|
||||||
base_json/invalid1
|
base_json/invalid1
|
||||||
|
159
test/base-io-engine.cpp
Normal file
159
test/base-io-engine.cpp
Normal file
@ -0,0 +1,159 @@
|
|||||||
|
/* Icinga 2 | (c) 2024 Icinga GmbH | GPLv2+ */
|
||||||
|
|
||||||
|
#include "base/io-engine.hpp"
|
||||||
|
#include "base/utility.hpp"
|
||||||
|
#include <boost/asio.hpp>
|
||||||
|
#include <boost/date_time/posix_time/posix_time.hpp>
|
||||||
|
#include <BoostTestTargetConfig.h>
|
||||||
|
#include <thread>
|
||||||
|
|
||||||
|
using namespace icinga;
|
||||||
|
|
||||||
|
BOOST_AUTO_TEST_SUITE(base_io_engine)
|
||||||
|
|
||||||
|
BOOST_AUTO_TEST_CASE(timeout_run)
|
||||||
|
{
|
||||||
|
boost::asio::io_context io;
|
||||||
|
boost::asio::io_context::strand strand (io);
|
||||||
|
int called = 0;
|
||||||
|
|
||||||
|
boost::asio::spawn(strand, [&](boost::asio::yield_context yc) {
|
||||||
|
boost::asio::deadline_timer timer (io);
|
||||||
|
|
||||||
|
Timeout timeout (strand, boost::posix_time::millisec(300), [&called] { ++called; });
|
||||||
|
BOOST_CHECK_EQUAL(called, 0);
|
||||||
|
|
||||||
|
timer.expires_from_now(boost::posix_time::millisec(200));
|
||||||
|
timer.async_wait(yc);
|
||||||
|
BOOST_CHECK_EQUAL(called, 0);
|
||||||
|
|
||||||
|
timer.expires_from_now(boost::posix_time::millisec(200));
|
||||||
|
timer.async_wait(yc);
|
||||||
|
});
|
||||||
|
|
||||||
|
std::thread eventLoop ([&io] { io.run(); });
|
||||||
|
io.run();
|
||||||
|
eventLoop.join();
|
||||||
|
|
||||||
|
BOOST_CHECK_EQUAL(called, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
BOOST_AUTO_TEST_CASE(timeout_cancelled)
|
||||||
|
{
|
||||||
|
boost::asio::io_context io;
|
||||||
|
boost::asio::io_context::strand strand (io);
|
||||||
|
int called = 0;
|
||||||
|
|
||||||
|
boost::asio::spawn(strand, [&](boost::asio::yield_context yc) {
|
||||||
|
boost::asio::deadline_timer timer (io);
|
||||||
|
Timeout timeout (strand, boost::posix_time::millisec(300), [&called] { ++called; });
|
||||||
|
|
||||||
|
timer.expires_from_now(boost::posix_time::millisec(200));
|
||||||
|
timer.async_wait(yc);
|
||||||
|
|
||||||
|
timeout.Cancel();
|
||||||
|
BOOST_CHECK_EQUAL(called, 0);
|
||||||
|
|
||||||
|
timer.expires_from_now(boost::posix_time::millisec(200));
|
||||||
|
timer.async_wait(yc);
|
||||||
|
});
|
||||||
|
|
||||||
|
std::thread eventLoop ([&io] { io.run(); });
|
||||||
|
io.run();
|
||||||
|
eventLoop.join();
|
||||||
|
|
||||||
|
BOOST_CHECK_EQUAL(called, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
BOOST_AUTO_TEST_CASE(timeout_scope)
|
||||||
|
{
|
||||||
|
boost::asio::io_context io;
|
||||||
|
boost::asio::io_context::strand strand (io);
|
||||||
|
int called = 0;
|
||||||
|
|
||||||
|
boost::asio::spawn(strand, [&](boost::asio::yield_context yc) {
|
||||||
|
boost::asio::deadline_timer timer (io);
|
||||||
|
|
||||||
|
{
|
||||||
|
Timeout timeout (strand, boost::posix_time::millisec(300), [&called] { ++called; });
|
||||||
|
|
||||||
|
timer.expires_from_now(boost::posix_time::millisec(200));
|
||||||
|
timer.async_wait(yc);
|
||||||
|
}
|
||||||
|
|
||||||
|
BOOST_CHECK_EQUAL(called, 0);
|
||||||
|
|
||||||
|
timer.expires_from_now(boost::posix_time::millisec(200));
|
||||||
|
timer.async_wait(yc);
|
||||||
|
});
|
||||||
|
|
||||||
|
std::thread eventLoop ([&io] { io.run(); });
|
||||||
|
io.run();
|
||||||
|
eventLoop.join();
|
||||||
|
|
||||||
|
BOOST_CHECK_EQUAL(called, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
BOOST_AUTO_TEST_CASE(timeout_due_cancelled)
|
||||||
|
{
|
||||||
|
boost::asio::io_context io;
|
||||||
|
boost::asio::io_context::strand strand (io);
|
||||||
|
int called = 0;
|
||||||
|
|
||||||
|
boost::asio::spawn(strand, [&](boost::asio::yield_context yc) {
|
||||||
|
boost::asio::deadline_timer timer (io);
|
||||||
|
Timeout timeout (strand, boost::posix_time::millisec(300), [&called] { ++called; });
|
||||||
|
|
||||||
|
// Give the timeout enough time to become due while blocking its strand to prevent it from actually running...
|
||||||
|
Utility::Sleep(0.4);
|
||||||
|
|
||||||
|
BOOST_CHECK_EQUAL(called, 0);
|
||||||
|
|
||||||
|
// ... so that this shall still work:
|
||||||
|
timeout.Cancel();
|
||||||
|
|
||||||
|
BOOST_CHECK_EQUAL(called, 0);
|
||||||
|
|
||||||
|
timer.expires_from_now(boost::posix_time::millisec(100));
|
||||||
|
timer.async_wait(yc);
|
||||||
|
});
|
||||||
|
|
||||||
|
std::thread eventLoop ([&io] { io.run(); });
|
||||||
|
io.run();
|
||||||
|
eventLoop.join();
|
||||||
|
|
||||||
|
BOOST_CHECK_EQUAL(called, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
BOOST_AUTO_TEST_CASE(timeout_due_scope)
|
||||||
|
{
|
||||||
|
boost::asio::io_context io;
|
||||||
|
boost::asio::io_context::strand strand (io);
|
||||||
|
int called = 0;
|
||||||
|
|
||||||
|
boost::asio::spawn(strand, [&](boost::asio::yield_context yc) {
|
||||||
|
boost::asio::deadline_timer timer (io);
|
||||||
|
|
||||||
|
{
|
||||||
|
Timeout timeout (strand, boost::posix_time::millisec(300), [&called] { ++called; });
|
||||||
|
|
||||||
|
// Give the timeout enough time to become due while blocking its strand to prevent it from actually running...
|
||||||
|
Utility::Sleep(0.4);
|
||||||
|
|
||||||
|
BOOST_CHECK_EQUAL(called, 0);
|
||||||
|
} // ... so that Timeout#~Timeout() shall still work here.
|
||||||
|
|
||||||
|
BOOST_CHECK_EQUAL(called, 0);
|
||||||
|
|
||||||
|
timer.expires_from_now(boost::posix_time::millisec(100));
|
||||||
|
timer.async_wait(yc);
|
||||||
|
});
|
||||||
|
|
||||||
|
std::thread eventLoop ([&io] { io.run(); });
|
||||||
|
io.run();
|
||||||
|
eventLoop.join();
|
||||||
|
|
||||||
|
BOOST_CHECK_EQUAL(called, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
BOOST_AUTO_TEST_SUITE_END()
|
Loading…
x
Reference in New Issue
Block a user