WIP: rework JsonRpcConnection disconnect logic

This commit is contained in:
Julian Brost 2024-11-13 15:28:49 +01:00
parent a6445c782b
commit 9ff3d58535
5 changed files with 209 additions and 65 deletions

View File

@ -185,6 +185,8 @@ add_definitions(-DBOOST_FILESYSTEM_NO_DEPRECATED)
# Required for Boost v1.74+
add_definitions(-DBOOST_ASIO_USE_TS_EXECUTOR_AS_DEFAULT)
#add_definitions(-DBOOST_ASIO_ENABLE_HANDLER_TRACKING)
link_directories(${Boost_LIBRARY_DIRS})
include_directories(${Boost_INCLUDE_DIRS})

View File

@ -368,6 +368,18 @@ void ApiListener::Stop(bool runtimeDeleted)
m_Timer->Stop(true);
m_RenewOwnCertTimer->Stop(true);
for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
client->Disconnect();
}
}
for (const JsonRpcConnection::Ptr& client : m_AnonymousClients) {
client->Disconnect();
}
Utility::Sleep(3);
ObjectImpl<ApiListener>::Stop(runtimeDeleted);
Log(LogInformation, "ApiListener")
@ -507,7 +519,7 @@ void ApiListener::ListenerCoroutineProc(boost::asio::yield_context yc, const Sha
lastModified = Utility::GetFileCreationTime(crlPath);
}
for (;;) {
while (IsActive()) {
try {
asio::ip::tcp::socket socket (io);

View File

@ -23,22 +23,22 @@ REGISTER_APIFUNCTION(Heartbeat, event, &JsonRpcConnection::HeartbeatAPIHandler);
void JsonRpcConnection::HandleAndWriteHeartbeats(boost::asio::yield_context yc)
{
boost::system::error_code ec;
//boost::system::error_code ec;
for (;;) {
m_HeartbeatTimer.expires_from_now(boost::posix_time::seconds(20));
m_HeartbeatTimer.async_wait(yc[ec]);
//for (;;) {
// m_HeartbeatTimer.expires_from_now(boost::posix_time::seconds(20));
// m_HeartbeatTimer.async_wait(yc[ec]);
if (m_ShuttingDown) {
break;
}
SendMessageInternal(new Dictionary({
{ "jsonrpc", "2.0" },
{ "method", "event::Heartbeat" },
{ "params", new Dictionary() }
}));
}
// if (m_State != State::Active) {
// break;
// }
//
// SendMessageInternal(new Dictionary({
// { "jsonrpc", "2.0" },
// { "method", "event::Heartbeat" },
// { "params", new Dictionary() }
// }));
//}
}
Value JsonRpcConnection::HeartbeatAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)

View File

@ -39,11 +39,15 @@ JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
const AsioTlsStream::Ptr& stream, ConnectionRole role, boost::asio::io_context& io)
: m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), m_Role(role),
m_Timestamp(Utility::GetTime()), m_Seen(Utility::GetTime()), m_IoStrand(io),
m_OutgoingMessagesQueued(io), m_WriterDone(io), m_ShuttingDown(false),
m_CheckLivenessTimer(io), m_HeartbeatTimer(io)
m_OutgoingMessagesQueued(io), m_ReadLoopDone(io), m_State(State::Active)
// m_CheckLivenessTimer(io), m_HeartbeatTimer(io)
{
if (authenticated)
m_Endpoint = Endpoint::GetByName(identity);
// SSL *ssl = m_Stream->next_layer().native_handle();
// SSL_set_msg_callback(ssl, SSL_trace);
// SSL_set_msg_callback_arg(ssl, BIO_new_fp(stdout,0));
}
void JsonRpcConnection::Start()
@ -54,24 +58,42 @@ void JsonRpcConnection::Start()
IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleIncomingMessages(yc); });
IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { WriteOutgoingMessages(yc); });
IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleAndWriteHeartbeats(yc); });
IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { CheckLiveness(yc); });
//IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleAndWriteHeartbeats(yc); });
//IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { CheckLiveness(yc); });
}
void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
{
Defer signalDone ([this]() { m_ReadLoopDone.Set(); });
m_Stream->next_layer().SetSeen(&m_Seen);
while (!m_ShuttingDown) {
// Log(LogInformation, "JsonRpcConnection") << "Starting read loop for " << m_Identity;
while (true) {
String message;
try {
// Log(LogInformation, "JsonRpcConnection") << "Waiting for message from " << m_Identity;
message = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024);
// Log(LogInformation, "JsonRpcConnection") << "Successfully read message from " << m_Identity;
} catch (const std::exception& ex) {
Log(m_ShuttingDown ? LogDebug : LogNotice, "JsonRpcConnection")
// Log(LogInformation, "JsonRpcConnection") << "Read loop exception for '" << m_Identity << "': " << ex.what();
if (auto err = dynamic_cast<const boost::system::system_error*>(&ex); err && err->code() == boost::asio::error::eof) {
Log(LogInformation, "JsonRpcConnection") << "Remote " << m_Identity << " sent TLS shutdown";
break;
}
Log(LogInformation, "JsonRpcConnection")
<< "Error while reading JSON-RPC message for identity '" << m_Identity
<< "': " << DiagnosticInformation(ex);
ForceDisconnectInternal();
return;
}
if (message == "{}") {
Log(LogInformation, "JsonRpcConnection") << "Remote " << m_Identity << " send JSON-RPC shutdown";
break;
}
@ -84,20 +106,31 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
l_TaskStats.InsertValue(Utility::GetTime(), 1);
} catch (const std::exception& ex) {
Log(m_ShuttingDown ? LogDebug : LogWarning, "JsonRpcConnection")
Log(LogInformation, "JsonRpcConnection")
<< "Error while processing JSON-RPC message for identity '" << m_Identity
<< "': " << DiagnosticInformation(ex);
break;
ForceDisconnectInternal();
return;
}
}
// Log(LogInformation, "JsonRpcConnection") << "Read loop for " << m_Identity << " terminated";
// boost::asio::deadline_timer timer (m_IoStrand);
// timer.expires_from_now(boost::posix_time::seconds(3));
// boost::system::error_code ec;
// timer.async_wait(yc[ec]);
// Utility::Sleep(2);
Log(LogInformation, "JsonRpcConnection") << "Trigger disconnect after read loop teminated for " << m_Identity;
Disconnect();
}
void JsonRpcConnection::WriteOutgoingMessages(boost::asio::yield_context yc)
{
Defer signalWriterDone ([this]() { m_WriterDone.Set(); });
// Log(LogInformation, "JsonRpcConnection") << "Starting write loop for " << m_Identity;
do {
m_OutgoingMessagesQueued.Wait(yc);
@ -119,16 +152,78 @@ void JsonRpcConnection::WriteOutgoingMessages(boost::asio::yield_context yc)
m_Stream->async_flush(yc);
} catch (const std::exception& ex) {
Log(m_ShuttingDown ? LogDebug : LogWarning, "JsonRpcConnection")
Log(LogInformation, "JsonRpcConnection")
<< "Error while sending JSON-RPC message for identity '"
<< m_Identity << "'\n" << DiagnosticInformation(ex);
break;
ForceDisconnectInternal();
return;
}
}
} while (!m_ShuttingDown);
} while (m_State == State::Active);
Disconnect();
// Log(LogInformation, "JsonRpcConnection") << "Write loop for " << m_Identity << " terminated";
bool clean = true;
boost::system::error_code ec;
if (m_State == State::Disconnected) {
return;
}
JsonRpc::SendRawMessage(m_Stream, "{}", yc[ec]);
if (m_State == State::Disconnected) {
return;
} else if (ec) {
Log(LogInformation, "JsonRpcConnection") << "JSON-RPC shutdown for " << m_Identity << " failed (write): " << ec.message();
clean = false;
}
m_Stream->async_flush(yc);
if (m_State == State::Disconnected) {
return;
} else if (ec) {
Log(LogInformation, "JsonRpcConnection") << "JSON-RPC shutdown for " << m_Identity << " failed (flush): " << ec.message();
clean = false;
}
// Log(LogInformation, "JsonRpcConnection") << "Starting TLS shutdown for " << m_Identity;
m_Stream->next_layer().async_shutdown(yc[ec]);
if (m_State == State::Disconnected) {
return;
} else if (ec == boost::asio::error::operation_aborted) {
// Yada, yada, asio doing strange things...
ec.clear();
m_Stream->next_layer().async_shutdown(yc[ec]);
}
if (m_State == State::Disconnected) {
return;
} else if (ec) {
Log(LogInformation, "JsonRpcConnection") << "TLS shutdown for " << m_Identity << " failed: " << ec.message();
clean = false;
}
// async_shutdown() should wait for the close notify from the peer. However, in case of an error, it may exit
// before the read loop finished, so wait for it explicitly.
m_ReadLoopDone.Wait(yc);
// Shut down the TCP connection.
ec.clear();
// Log(LogInformation, "JsonRpcConnection") << "Starting TCP shutdown for " << m_Identity;
m_Stream->lowest_layer().shutdown(AsioTlsStream::lowest_layer_type::shutdown_both, ec);
if (m_State == State::Disconnected) {
return;
} else if (ec) {
Log(LogInformation, "JsonRpcConnection") << "TCP shutdown for " << m_Identity << " failed: " << ec.message();
clean = false;
}
ForceDisconnectInternal(clean);
if (clean) {
Log(LogInformation, "JsonRpcConnection") << "Disconnected " << m_Identity << " cleanly";
}
}
double JsonRpcConnection::GetTimestamp() const
@ -163,7 +258,7 @@ ConnectionRole JsonRpcConnection::GetRole() const
void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message)
{
if (m_ShuttingDown) {
if (m_State != State::Active) {
BOOST_THROW_EXCEPTION(std::runtime_error("Cannot send message to already disconnected API client '" + GetIdentity() + "'!"));
}
@ -174,14 +269,14 @@ void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message)
void JsonRpcConnection::SendRawMessage(const String& message)
{
if (m_ShuttingDown) {
if (m_State != State::Active) {
BOOST_THROW_EXCEPTION(std::runtime_error("Cannot send message to already disconnected API client '" + GetIdentity() + "'!"));
}
Ptr keepAlive (this);
m_IoStrand.post([this, keepAlive, message]() {
if (m_ShuttingDown) {
if (m_State != State::Active) {
return;
}
@ -192,7 +287,7 @@ void JsonRpcConnection::SendRawMessage(const String& message)
void JsonRpcConnection::SendMessageInternal(const Dictionary::Ptr& message)
{
if (m_ShuttingDown) {
if (m_State != State::Active) {
return;
}
@ -202,35 +297,59 @@ void JsonRpcConnection::SendMessageInternal(const Dictionary::Ptr& message)
void JsonRpcConnection::Disconnect()
{
namespace asio = boost::asio;
if (State expected = State::Active; m_State.compare_exchange_strong(expected, State::Disconnecting)) {
Log(LogInformation, "JsonRpcConnection") << "Disconnecting " << m_Identity << (m_IoStrand.running_in_this_thread() ? " (internal trigger)" : " (external trigger)");
// Wake write loop to initiate the actual disconnect.
if (m_IoStrand.running_in_this_thread()) {
m_OutgoingMessagesQueued.Set();
} else {
m_IoStrand.post([conn=Ptr(this)]() {
conn->m_OutgoingMessagesQueued.Set();
});
}
}
}
if (!m_ShuttingDown.exchange(true)) {
JsonRpcConnection::Ptr keepAlive (this);
void JsonRpcConnection::ForceDisconnectInternal(bool clean)
{
ASSERT(m_IoStrand.running_in_this_thread());
IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
Log(LogWarning, "JsonRpcConnection")
<< "API client disconnected for identity '" << m_Identity << "'";
if (m_State == State::Disconnected) {
return;
}
if (!clean) {
Log(LogWarning, "JsonRpcConnection") << "Forcefully disconnecting API client for identity '" << m_Identity << "'.";
}
m_Stream->ForceDisconnect();
Cleanup();
}
void JsonRpcConnection::Cleanup()
{
ASSERT(m_IoStrand.running_in_this_thread());
if (m_State.exchange(State::Disconnected) == State::Disconnected) {
return;
}
// We need to unregister the endpoint client as soon as possible not to confuse Icinga 2,
// given that Endpoint::GetConnected() is just performing a check that the endpoint's client
// cache is not empty, which could result in an already disconnected endpoint never trying to
// reconnect again. See #7444.
if (m_Endpoint) {
m_Endpoint->RemoveClient(this);
} else {
ApiListener::GetInstance()->RemoveAnonymousClient(this);
}
// Wake WriteOutgoingMessages() if it waiting for new messages to send.
m_OutgoingMessagesQueued.Set();
m_WriterDone.Wait(yc);
// Signal CheckLiveness() coroutine to terminate.
// m_CheckLivenessTimer.cancel();
m_CheckLivenessTimer.cancel();
m_HeartbeatTimer.cancel();
// Signal HandleAndWriteHeartbeats() coroutine to terminate.
// m_HeartbeatTimer.cancel();
m_Stream->GracefulDisconnect(m_IoStrand, yc);
});
}
Log(LogWarning, "JsonRpcConnection") << "API client disconnected for identity '" << m_Identity << "'";
}
void JsonRpcConnection::MessageHandler(const String& jsonString)
@ -325,6 +444,7 @@ Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary::
void JsonRpcConnection::CheckLiveness(boost::asio::yield_context yc)
{
#if 0
boost::system::error_code ec;
if (!m_Authenticated) {
@ -338,7 +458,7 @@ void JsonRpcConnection::CheckLiveness(boost::asio::yield_context yc)
m_CheckLivenessTimer.expires_from_now(boost::posix_time::seconds(10));
m_CheckLivenessTimer.async_wait(yc[ec]);
if (m_ShuttingDown) {
if (m_State != State::Active) {
return;
}
@ -353,7 +473,7 @@ void JsonRpcConnection::CheckLiveness(boost::asio::yield_context yc)
m_CheckLivenessTimer.expires_from_now(boost::posix_time::seconds(30));
m_CheckLivenessTimer.async_wait(yc[ec]);
if (m_ShuttingDown) {
if (m_State != State::Active) {
break;
}
@ -361,11 +481,12 @@ void JsonRpcConnection::CheckLiveness(boost::asio::yield_context yc)
Log(LogInformation, "JsonRpcConnection")
<< "No messages for identity '" << m_Identity << "' have been received in the last 60 seconds.";
Disconnect();
ForceDisconnectInternal();
break;
}
}
}
#endif
}
double JsonRpcConnection::GetWorkQueueRate()

View File

@ -66,6 +66,13 @@ public:
static void SendCertificateRequest(const JsonRpcConnection::Ptr& aclient, const intrusive_ptr<MessageOrigin>& origin, const String& path);
private:
enum class State
{
Active, // up and running (initial state, as JsonRpcConnection is constructed from an open connection)
Disconnecting, // in the process of being shut down gracefully
Disconnected, // completely shut down
};
String m_Identity;
bool m_Authenticated;
Endpoint::Ptr m_Endpoint;
@ -76,9 +83,9 @@ private:
boost::asio::io_context::strand m_IoStrand;
std::vector<String> m_OutgoingMessagesQueue;
AsioConditionVariable m_OutgoingMessagesQueued;
AsioConditionVariable m_WriterDone;
Atomic<bool> m_ShuttingDown;
boost::asio::deadline_timer m_CheckLivenessTimer, m_HeartbeatTimer;
AsioConditionVariable m_ReadLoopDone;
Atomic<State> m_State;
//boost::asio::deadline_timer m_CheckLivenessTimer, m_HeartbeatTimer;
JsonRpcConnection(const String& identity, bool authenticated, const AsioTlsStream::Ptr& stream, ConnectionRole role, boost::asio::io_context& io);
@ -93,6 +100,8 @@ private:
void CertificateRequestResponseHandler(const Dictionary::Ptr& message);
void SendMessageInternal(const Dictionary::Ptr& request);
void ForceDisconnectInternal(bool clean = false);
void Cleanup();
};
}