mirror of
https://github.com/Icinga/icinga2.git
synced 2025-07-25 14:44:32 +02:00
Merge pull request #10146 from Icinga/JsonRpcConnection-2.14
JsonRpcConnection#Send*(): discard messages ASAP once shutting down
This commit is contained in:
commit
27a4a25c9f
@ -22,6 +22,8 @@ public:
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Defer() = default;
|
||||||
|
|
||||||
Defer(const Defer&) = delete;
|
Defer(const Defer&) = delete;
|
||||||
Defer(Defer&&) = delete;
|
Defer(Defer&&) = delete;
|
||||||
Defer& operator=(const Defer&) = delete;
|
Defer& operator=(const Defer&) = delete;
|
||||||
@ -39,6 +41,11 @@ public:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline void SetFunc(std::function<void()> func)
|
||||||
|
{
|
||||||
|
m_Func = std::move(func);
|
||||||
|
}
|
||||||
|
|
||||||
inline
|
inline
|
||||||
void Cancel()
|
void Cancel()
|
||||||
{
|
{
|
||||||
|
@ -1020,17 +1020,22 @@ void ApiListener::ApiTimerHandler()
|
|||||||
maxTs = client->GetTimestamp();
|
maxTs = client->GetTimestamp();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Log(LogNotice, "ApiListener")
|
||||||
|
<< "Setting log position for identity '" << endpoint->GetName() << "': "
|
||||||
|
<< Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", ts);
|
||||||
|
|
||||||
for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
|
for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
|
||||||
if (client->GetTimestamp() == maxTs) {
|
if (client->GetTimestamp() == maxTs) {
|
||||||
client->SendMessage(lmessage);
|
try {
|
||||||
|
client->SendMessage(lmessage);
|
||||||
|
} catch (const std::runtime_error& ex) {
|
||||||
|
Log(LogNotice, "ApiListener")
|
||||||
|
<< "Error while setting log position for identity '" << endpoint->GetName() << "': " << DiagnosticInformation(ex, false);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
client->Disconnect();
|
client->Disconnect();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Log(LogNotice, "ApiListener")
|
|
||||||
<< "Setting log position for identity '" << endpoint->GetName() << "': "
|
|
||||||
<< Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", ts);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1194,7 +1199,12 @@ void ApiListener::SyncSendMessage(const Endpoint::Ptr& endpoint, const Dictionar
|
|||||||
if (client->GetTimestamp() != maxTs)
|
if (client->GetTimestamp() != maxTs)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
client->SendMessage(message);
|
try {
|
||||||
|
client->SendMessage(message);
|
||||||
|
} catch (const std::runtime_error& ex) {
|
||||||
|
Log(LogNotice, "ApiListener")
|
||||||
|
<< "Error while sending message to endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex, false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1434,10 +1444,12 @@ void ApiListener::LogGlobHandler(std::vector<int>& files, const String& file)
|
|||||||
void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
|
void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
|
||||||
{
|
{
|
||||||
Endpoint::Ptr endpoint = client->GetEndpoint();
|
Endpoint::Ptr endpoint = client->GetEndpoint();
|
||||||
|
Defer resetEndpointSyncing ([&endpoint]() {
|
||||||
|
ObjectLock olock(endpoint);
|
||||||
|
endpoint->SetSyncing(false);
|
||||||
|
});
|
||||||
|
|
||||||
if (endpoint->GetLogDuration() == 0) {
|
if (endpoint->GetLogDuration() == 0) {
|
||||||
ObjectLock olock2(endpoint);
|
|
||||||
endpoint->SetSyncing(false);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1454,8 +1466,6 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
|
|||||||
Zone::Ptr target_zone = target_endpoint->GetZone();
|
Zone::Ptr target_zone = target_endpoint->GetZone();
|
||||||
|
|
||||||
if (!target_zone) {
|
if (!target_zone) {
|
||||||
ObjectLock olock2(endpoint);
|
|
||||||
endpoint->SetSyncing(false);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1463,12 +1473,14 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
|
|||||||
std::unique_lock<std::mutex> lock(m_LogLock);
|
std::unique_lock<std::mutex> lock(m_LogLock);
|
||||||
|
|
||||||
CloseLogFile();
|
CloseLogFile();
|
||||||
|
Defer reopenLog;
|
||||||
|
|
||||||
if (count == -1 || count > 50000) {
|
if (count == -1 || count > 50000) {
|
||||||
OpenLogFile();
|
OpenLogFile();
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
} else {
|
} else {
|
||||||
last_sync = true;
|
last_sync = true;
|
||||||
|
reopenLog.SetFunc([this]() { OpenLogFile(); });
|
||||||
}
|
}
|
||||||
|
|
||||||
count = 0;
|
count = 0;
|
||||||
@ -1541,8 +1553,7 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
|
|||||||
|
|
||||||
Log(LogDebug, "ApiListener")
|
Log(LogDebug, "ApiListener")
|
||||||
<< "Error while replaying log for endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex);
|
<< "Error while replaying log for endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex);
|
||||||
|
return;
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
peer_ts = pmessage->Get("timestamp");
|
peer_ts = pmessage->Get("timestamp");
|
||||||
@ -1575,14 +1586,7 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (last_sync) {
|
if (last_sync) {
|
||||||
{
|
return;
|
||||||
ObjectLock olock2(endpoint);
|
|
||||||
endpoint->SetSyncing(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
OpenLogFile();
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -165,6 +165,10 @@ ConnectionRole JsonRpcConnection::GetRole() const
|
|||||||
|
|
||||||
void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message)
|
void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message)
|
||||||
{
|
{
|
||||||
|
if (m_ShuttingDown) {
|
||||||
|
BOOST_THROW_EXCEPTION(std::runtime_error("Cannot send message to already disconnected API client '" + GetIdentity() + "'!"));
|
||||||
|
}
|
||||||
|
|
||||||
Ptr keepAlive (this);
|
Ptr keepAlive (this);
|
||||||
|
|
||||||
m_IoStrand.post([this, keepAlive, message]() { SendMessageInternal(message); });
|
m_IoStrand.post([this, keepAlive, message]() { SendMessageInternal(message); });
|
||||||
@ -172,9 +176,17 @@ void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message)
|
|||||||
|
|
||||||
void JsonRpcConnection::SendRawMessage(const String& message)
|
void JsonRpcConnection::SendRawMessage(const String& message)
|
||||||
{
|
{
|
||||||
|
if (m_ShuttingDown) {
|
||||||
|
BOOST_THROW_EXCEPTION(std::runtime_error("Cannot send message to already disconnected API client '" + GetIdentity() + "'!"));
|
||||||
|
}
|
||||||
|
|
||||||
Ptr keepAlive (this);
|
Ptr keepAlive (this);
|
||||||
|
|
||||||
m_IoStrand.post([this, keepAlive, message]() {
|
m_IoStrand.post([this, keepAlive, message]() {
|
||||||
|
if (m_ShuttingDown) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
m_OutgoingMessagesQueue.emplace_back(message);
|
m_OutgoingMessagesQueue.emplace_back(message);
|
||||||
m_OutgoingMessagesQueued.Set();
|
m_OutgoingMessagesQueued.Set();
|
||||||
});
|
});
|
||||||
@ -182,6 +194,10 @@ void JsonRpcConnection::SendRawMessage(const String& message)
|
|||||||
|
|
||||||
void JsonRpcConnection::SendMessageInternal(const Dictionary::Ptr& message)
|
void JsonRpcConnection::SendMessageInternal(const Dictionary::Ptr& message)
|
||||||
{
|
{
|
||||||
|
if (m_ShuttingDown) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
m_OutgoingMessagesQueue.emplace_back(JsonEncode(message));
|
m_OutgoingMessagesQueue.emplace_back(JsonEncode(message));
|
||||||
m_OutgoingMessagesQueued.Set();
|
m_OutgoingMessagesQueued.Set();
|
||||||
}
|
}
|
||||||
@ -190,12 +206,10 @@ void JsonRpcConnection::Disconnect()
|
|||||||
{
|
{
|
||||||
namespace asio = boost::asio;
|
namespace asio = boost::asio;
|
||||||
|
|
||||||
JsonRpcConnection::Ptr keepAlive (this);
|
if (!m_ShuttingDown.exchange(true)) {
|
||||||
|
JsonRpcConnection::Ptr keepAlive (this);
|
||||||
IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
|
|
||||||
if (!m_ShuttingDown) {
|
|
||||||
m_ShuttingDown = true;
|
|
||||||
|
|
||||||
|
IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
|
||||||
Log(LogWarning, "JsonRpcConnection")
|
Log(LogWarning, "JsonRpcConnection")
|
||||||
<< "API client disconnected for identity '" << m_Identity << "'";
|
<< "API client disconnected for identity '" << m_Identity << "'";
|
||||||
|
|
||||||
@ -243,8 +257,8 @@ void JsonRpcConnection::Disconnect()
|
|||||||
shutdownTimeout->Cancel();
|
shutdownTimeout->Cancel();
|
||||||
|
|
||||||
m_Stream->lowest_layer().shutdown(m_Stream->lowest_layer().shutdown_both, ec);
|
m_Stream->lowest_layer().shutdown(m_Stream->lowest_layer().shutdown_both, ec);
|
||||||
}
|
});
|
||||||
});
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void JsonRpcConnection::MessageHandler(const String& jsonString)
|
void JsonRpcConnection::MessageHandler(const String& jsonString)
|
||||||
|
@ -5,6 +5,7 @@
|
|||||||
|
|
||||||
#include "remote/i2-remote.hpp"
|
#include "remote/i2-remote.hpp"
|
||||||
#include "remote/endpoint.hpp"
|
#include "remote/endpoint.hpp"
|
||||||
|
#include "base/atomic.hpp"
|
||||||
#include "base/io-engine.hpp"
|
#include "base/io-engine.hpp"
|
||||||
#include "base/tlsstream.hpp"
|
#include "base/tlsstream.hpp"
|
||||||
#include "base/timer.hpp"
|
#include "base/timer.hpp"
|
||||||
@ -77,7 +78,7 @@ private:
|
|||||||
std::vector<String> m_OutgoingMessagesQueue;
|
std::vector<String> m_OutgoingMessagesQueue;
|
||||||
AsioConditionVariable m_OutgoingMessagesQueued;
|
AsioConditionVariable m_OutgoingMessagesQueued;
|
||||||
AsioConditionVariable m_WriterDone;
|
AsioConditionVariable m_WriterDone;
|
||||||
bool m_ShuttingDown;
|
Atomic<bool> m_ShuttingDown;
|
||||||
boost::asio::deadline_timer m_CheckLivenessTimer, m_HeartbeatTimer;
|
boost::asio::deadline_timer m_CheckLivenessTimer, m_HeartbeatTimer;
|
||||||
|
|
||||||
JsonRpcConnection(const String& identity, bool authenticated, const Shared<AsioTlsStream>::Ptr& stream, ConnectionRole role, boost::asio::io_context& io);
|
JsonRpcConnection(const String& identity, bool authenticated, const Shared<AsioTlsStream>::Ptr& stream, ConnectionRole role, boost::asio::io_context& io);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user