Re-add JsonRpcConnection#Disconnect()

This commit is contained in:
Alexander A. Klimov 2019-02-20 12:00:11 +01:00
parent 2d16b02520
commit 84b411501b
3 changed files with 82 additions and 46 deletions

View File

@ -213,6 +213,16 @@ void ApiListener::UpdateSSLContext()
}
m_SSLContext = context;
for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
client->Disconnect();
}
}
for (const JsonRpcConnection::Ptr& client : m_AnonymousClients) {
client->Disconnect();
}
}
void ApiListener::OnAllConfigLoaded()
@ -841,6 +851,8 @@ void ApiListener::ApiTimerHandler()
for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
if (client->GetTimestamp() == maxTs) {
client->SendMessage(lmessage);
} else {
client->Disconnect();
}
}

View File

@ -27,32 +27,28 @@ JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
const std::shared_ptr<AsioTlsStream>& stream, ConnectionRole role)
: m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream),
m_Role(role), m_Timestamp(Utility::GetTime()), m_IoStrand(stream->get_io_service()),
m_OutgoingMessagesQueued(stream->get_io_service()), m_ReaderHasError(false), m_RunningCoroutines(0)
m_OutgoingMessagesQueued(stream->get_io_service()), m_WriterDone(stream->get_io_service()), m_ShuttingDown(false)
{
if (authenticated)
m_Endpoint = Endpoint::GetByName(identity);
m_OutgoingMessagesQueued.expires_at(boost::posix_time::pos_infin);
m_WriterDone.expires_at(boost::posix_time::pos_infin);
}
void JsonRpcConnection::Start()
{
namespace asio = boost::asio;
m_RunningCoroutines = 2;
JsonRpcConnection::Ptr preventGc (this);
asio::spawn(m_IoStrand, [this](asio::yield_context yc) { HandleIncomingMessages(yc); });
asio::spawn(m_IoStrand, [this](asio::yield_context yc) { WriteOutgoingMessages(yc); });
asio::spawn(m_IoStrand, [this, preventGc](asio::yield_context yc) { HandleIncomingMessages(yc); });
asio::spawn(m_IoStrand, [this, preventGc](asio::yield_context yc) { WriteOutgoingMessages(yc); });
}
void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
{
Defer shutdownStreamOnce ([this, &yc]() {
m_ReaderHasError = true;
m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin);
ShutdownStreamOnce(yc);
});
Defer disconnect ([this]() { Disconnect(); });
for (;;) {
String message;
@ -60,9 +56,11 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
try {
message = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024);
} catch (const std::exception& ex) {
Log(LogWarning, "JsonRpcConnection")
<< "Error while reading JSON-RPC message for identity '" << m_Identity
<< "': " << DiagnosticInformation(ex);
if (!m_ShuttingDown) {
Log(LogWarning, "JsonRpcConnection")
<< "Error while reading JSON-RPC message for identity '" << m_Identity
<< "': " << DiagnosticInformation(ex);
}
break;
}
@ -72,9 +70,11 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
MessageHandler(message);
} catch (const std::exception& ex) {
Log(LogWarning, "JsonRpcConnection")
<< "Error while processing JSON-RPC message for identity '" << m_Identity
<< "': " << DiagnosticInformation(ex);
if (!m_ShuttingDown) {
Log(LogWarning, "JsonRpcConnection")
<< "Error while processing JSON-RPC message for identity '" << m_Identity
<< "': " << DiagnosticInformation(ex);
}
break;
}
@ -83,7 +83,9 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
void JsonRpcConnection::WriteOutgoingMessages(boost::asio::yield_context yc)
{
Defer shutdownStreamOnce ([this, &yc]() { ShutdownStreamOnce(yc); });
Defer disconnect ([this]() { Disconnect(); });
Defer signalWriterDone ([this]() { m_WriterDone.expires_at(boost::posix_time::neg_infin); });
do {
try {
@ -108,36 +110,17 @@ void JsonRpcConnection::WriteOutgoingMessages(boost::asio::yield_context yc)
m_Stream->async_flush(yc);
} catch (const std::exception& ex) {
std::ostringstream info;
info << "Error while sending JSON-RPC message for identity '" << m_Identity << "'";
Log(LogWarning, "JsonRpcConnection")
<< info.str() << "\n" << DiagnosticInformation(ex);
if (!m_ShuttingDown) {
std::ostringstream info;
info << "Error while sending JSON-RPC message for identity '" << m_Identity << "'";
Log(LogWarning, "JsonRpcConnection")
<< info.str() << "\n" << DiagnosticInformation(ex);
}
break;
}
}
} while (!m_ReaderHasError);
}
void JsonRpcConnection::ShutdownStreamOnce(boost::asio::yield_context& yc)
{
if (!--m_RunningCoroutines) {
try {
m_Stream->next_layer().async_shutdown(yc);
} catch (...) {
// https://stackoverflow.com/questions/130117/throwing-exceptions-out-of-a-destructor
}
Log(LogWarning, "JsonRpcConnection")
<< "API client disconnected for identity '" << m_Identity << "'";
if (m_Endpoint) {
m_Endpoint->RemoveClient(this);
} else {
auto listener (ApiListener::GetInstance());
listener->RemoveAnonymousClient(this);
}
}
} while (!m_ShuttingDown);
}
double JsonRpcConnection::GetTimestamp() const
@ -178,6 +161,46 @@ void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message)
});
}
void JsonRpcConnection::Disconnect()
{
namespace asio = boost::asio;
JsonRpcConnection::Ptr preventGc (this);
asio::spawn(m_IoStrand, [this, preventGc](asio::yield_context yc) {
if (!m_ShuttingDown) {
m_ShuttingDown = true;
Log(LogWarning, "JsonRpcConnection")
<< "API client disconnected for identity '" << m_Identity << "'";
m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin);
try {
m_WriterDone.async_wait(yc);
} catch (...) {
}
try {
m_Stream->next_layer().async_shutdown(yc);
} catch (...) {
}
try {
m_Stream->lowest_layer().shutdown(m_Stream->lowest_layer().shutdown_both);
} catch (...) {
}
if (m_Endpoint) {
m_Endpoint->RemoveClient(this);
} else {
auto listener (ApiListener::GetInstance());
listener->RemoveAnonymousClient(this);
}
}
});
}
void JsonRpcConnection::MessageHandler(const String& jsonString)
{
Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString);

View File

@ -52,6 +52,8 @@ public:
std::shared_ptr<AsioTlsStream> GetStream() const;
ConnectionRole GetRole() const;
void Disconnect();
void SendMessage(const Dictionary::Ptr& request);
static Value HeartbeatAPIHandler(const intrusive_ptr<MessageOrigin>& origin, const Dictionary::Ptr& params);
@ -68,12 +70,11 @@ private:
boost::asio::io_service::strand m_IoStrand;
std::vector<Dictionary::Ptr> m_OutgoingMessagesQueue;
boost::asio::deadline_timer m_OutgoingMessagesQueued;
bool m_ReaderHasError;
unsigned char m_RunningCoroutines;
boost::asio::deadline_timer m_WriterDone;
bool m_ShuttingDown;
void HandleIncomingMessages(boost::asio::yield_context yc);
void WriteOutgoingMessages(boost::asio::yield_context yc);
void ShutdownStreamOnce(boost::asio::yield_context& yc);
bool ProcessMessage();
void MessageHandler(const String& jsonString);