Implement support for cleaning up expired API callbacks

refs #5450
This commit is contained in:
Gunnar Beutner 2017-08-31 11:10:14 +02:00
parent cc43dc734b
commit 03f5ccd252
2 changed files with 34 additions and 6 deletions

View File

@ -194,13 +194,18 @@ void JsonRpcConnection::MessageHandler(const String& jsonString)
String id = vid;
auto it = m_ApiCallbacks.find(id);
ApiCallbackInfo aci;
if (it == m_ApiCallbacks.end())
return;
{
boost::mutex::scoped_lock lock(m_ApiCallbacksMutex);
auto it = m_ApiCallbacks.find(id);
ApiCallbackInfo aci = it->second;
m_ApiCallbacks.erase(it);
if (it == m_ApiCallbacks.end())
return;
aci = it->second;
m_ApiCallbacks.erase(it);
}
try {
aci.Callback(message);
@ -301,6 +306,11 @@ Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary::
return Empty;
}
bool ApiCallbackInfo::IsExpired(void) const
{
return Timestamp < Utility::GetTime() - 300;
}
void JsonRpcConnection::CheckLiveness(void)
{
if (m_Seen < Utility::GetTime() - 60 && (!m_Endpoint || !m_Endpoint->GetSyncing())) {
@ -308,6 +318,18 @@ void JsonRpcConnection::CheckLiveness(void)
<< "No messages for identity '" << m_Identity << "' have been received in the last 60 seconds.";
Disconnect();
}
{
boost::mutex::scoped_lock lock(m_ApiCallbacksMutex);
for (auto it = m_ApiCallbacks.begin(), last = m_ApiCallbacks.end(); it != last; ) {
if (it->second.IsExpired()) {
it = m_ApiCallbacks.erase(it);
} else {
++it;
}
}
}
}
void JsonRpcConnection::TimeoutTimerHandler(void)
@ -363,5 +385,8 @@ void JsonRpcConnection::RegisterCallback(const String& id, const boost::function
aci.Timestamp = Utility::GetTime();
aci.Callback = callback;
m_ApiCallbacks[id] = aci;
{
boost::mutex::scoped_lock lock(m_ApiCallbacksMutex);
m_ApiCallbacks[id] = aci;
}
}

View File

@ -47,6 +47,8 @@ struct ApiCallbackInfo
{
double Timestamp;
boost::function<void (const Dictionary::Ptr&)> Callback;
bool IsExpired(void) const;
};
/**
@ -96,6 +98,7 @@ private:
double m_HeartbeatTimeout;
boost::mutex m_DataHandlerMutex;
std::map<String, ApiCallbackInfo> m_ApiCallbacks;
boost::mutex m_ApiCallbacksMutex;
StreamReadContext m_Context;