Merge pull request #9991 from Icinga/JsonRpcConnection-9985

JsonRpcConnection#Send*(): discard messages ASAP once shutting down
This commit is contained in:
Alexander Aleksandrovič Klimov 2024-09-03 15:13:30 +02:00 committed by GitHub
commit 0951230ce1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 54 additions and 28 deletions

View File

@ -22,6 +22,8 @@ public:
{
}
Defer() = default;
Defer(const Defer&) = delete;
Defer(Defer&&) = delete;
Defer& operator=(const Defer&) = delete;
@ -39,6 +41,11 @@ public:
}
}
inline void SetFunc(std::function<void()> func)
{
m_Func = std::move(func);
}
inline
void Cancel()
{

View File

@ -1021,17 +1021,22 @@ void ApiListener::ApiTimerHandler()
maxTs = client->GetTimestamp();
}
Log(LogNotice, "ApiListener")
<< "Setting log position for identity '" << endpoint->GetName() << "': "
<< Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", ts);
for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
if (client->GetTimestamp() == maxTs) {
client->SendMessage(lmessage);
try {
client->SendMessage(lmessage);
} catch (const std::runtime_error& ex) {
Log(LogNotice, "ApiListener")
<< "Error while setting log position for identity '" << endpoint->GetName() << "': " << DiagnosticInformation(ex, false);
}
} else {
client->Disconnect();
}
}
Log(LogNotice, "ApiListener")
<< "Setting log position for identity '" << endpoint->GetName() << "': "
<< Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", ts);
}
}
@ -1195,7 +1200,12 @@ void ApiListener::SyncSendMessage(const Endpoint::Ptr& endpoint, const Dictionar
if (client->GetTimestamp() != maxTs)
continue;
client->SendMessage(message);
try {
client->SendMessage(message);
} catch (const std::runtime_error& ex) {
Log(LogNotice, "ApiListener")
<< "Error while sending message to endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex, false);
}
}
}
}
@ -1437,10 +1447,12 @@ void ApiListener::LogGlobHandler(std::vector<int>& files, const String& file)
void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
{
Endpoint::Ptr endpoint = client->GetEndpoint();
Defer resetEndpointSyncing ([&endpoint]() {
ObjectLock olock(endpoint);
endpoint->SetSyncing(false);
});
if (endpoint->GetLogDuration() == 0) {
ObjectLock olock2(endpoint);
endpoint->SetSyncing(false);
return;
}
@ -1457,8 +1469,6 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
Zone::Ptr target_zone = target_endpoint->GetZone();
if (!target_zone) {
ObjectLock olock2(endpoint);
endpoint->SetSyncing(false);
return;
}
@ -1466,12 +1476,14 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
std::unique_lock<std::mutex> lock(m_LogLock);
CloseLogFile();
Defer reopenLog;
if (count == -1 || count > 50000) {
OpenLogFile();
lock.unlock();
} else {
last_sync = true;
reopenLog.SetFunc([this]() { OpenLogFile(); });
}
count = 0;
@ -1544,8 +1556,7 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
Log(LogDebug, "ApiListener")
<< "Error while replaying log for endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex);
break;
return;
}
peer_ts = pmessage->Get("timestamp");
@ -1578,14 +1589,7 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
}
if (last_sync) {
{
ObjectLock olock2(endpoint);
endpoint->SetSyncing(false);
}
OpenLogFile();
break;
return;
}
}
}

View File

@ -163,6 +163,10 @@ ConnectionRole JsonRpcConnection::GetRole() const
void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message)
{
if (m_ShuttingDown) {
BOOST_THROW_EXCEPTION(std::runtime_error("Cannot send message to already disconnected API client '" + GetIdentity() + "'!"));
}
Ptr keepAlive (this);
m_IoStrand.post([this, keepAlive, message]() { SendMessageInternal(message); });
@ -170,9 +174,17 @@ void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message)
void JsonRpcConnection::SendRawMessage(const String& message)
{
if (m_ShuttingDown) {
BOOST_THROW_EXCEPTION(std::runtime_error("Cannot send message to already disconnected API client '" + GetIdentity() + "'!"));
}
Ptr keepAlive (this);
m_IoStrand.post([this, keepAlive, message]() {
if (m_ShuttingDown) {
return;
}
m_OutgoingMessagesQueue.emplace_back(message);
m_OutgoingMessagesQueued.Set();
});
@ -180,6 +192,10 @@ void JsonRpcConnection::SendRawMessage(const String& message)
void JsonRpcConnection::SendMessageInternal(const Dictionary::Ptr& message)
{
if (m_ShuttingDown) {
return;
}
m_OutgoingMessagesQueue.emplace_back(JsonEncode(message));
m_OutgoingMessagesQueued.Set();
}
@ -188,12 +204,10 @@ void JsonRpcConnection::Disconnect()
{
namespace asio = boost::asio;
JsonRpcConnection::Ptr keepAlive (this);
IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
if (!m_ShuttingDown) {
m_ShuttingDown = true;
if (!m_ShuttingDown.exchange(true)) {
JsonRpcConnection::Ptr keepAlive (this);
IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
Log(LogWarning, "JsonRpcConnection")
<< "API client disconnected for identity '" << m_Identity << "'";
@ -241,8 +255,8 @@ void JsonRpcConnection::Disconnect()
shutdownTimeout->Cancel();
m_Stream->lowest_layer().shutdown(m_Stream->lowest_layer().shutdown_both, ec);
}
});
});
}
}
void JsonRpcConnection::MessageHandler(const String& jsonString)

View File

@ -5,6 +5,7 @@
#include "remote/i2-remote.hpp"
#include "remote/endpoint.hpp"
#include "base/atomic.hpp"
#include "base/io-engine.hpp"
#include "base/tlsstream.hpp"
#include "base/timer.hpp"
@ -77,7 +78,7 @@ private:
std::vector<String> m_OutgoingMessagesQueue;
AsioConditionVariable m_OutgoingMessagesQueued;
AsioConditionVariable m_WriterDone;
bool m_ShuttingDown;
Atomic<bool> m_ShuttingDown;
boost::asio::deadline_timer m_CheckLivenessTimer, m_HeartbeatTimer;
JsonRpcConnection(const String& identity, bool authenticated, const Shared<AsioTlsStream>::Ptr& stream, ConnectionRole role, boost::asio::io_context& io);