Improve the performance for ApiListener::SyncRelayMessage (part 2)

refs #11014
This commit is contained in:
Gunnar Beutner 2016-01-27 12:18:16 +01:00
parent 0d5592a615
commit f9efc2ffe7
4 changed files with 104 additions and 70 deletions

View File

@ -607,6 +607,77 @@ void ApiListener::SyncSendMessage(const Endpoint::Ptr& endpoint, const Dictionar
} }
} }
bool ApiListener::RelayMessageOne(const Zone::Ptr& targetZone, const MessageOrigin::Ptr& origin, const Dictionary::Ptr& message)
{
ASSERT(targetZone);
bool is_master = IsMaster();
Endpoint::Ptr master = GetMaster();
Zone::Ptr myZone = Zone::GetLocalZone();
/* only relay the message to a) the same zone, b) the parent zone and c) direct child zones */
if (targetZone != myZone && targetZone != myZone->GetParent() && targetZone->GetParent() != myZone)
return true;
std::vector<Endpoint::Ptr> skippedEndpoints;
bool relayed = false, log_needed = false, log_done = false;
BOOST_FOREACH(const Endpoint::Ptr& endpoint, targetZone->GetEndpoints()) {
/* don't relay messages to ourselves */
if (endpoint == GetLocalEndpoint())
continue;
log_needed = true;
/* don't relay messages to disconnected endpoints */
if (!endpoint->GetConnected()) {
if (targetZone == myZone)
log_done = false;
continue;
}
log_done = true;
/* don't relay the message to the zone through more than one endpoint unless this is our own zone */
if (relayed && targetZone != myZone) {
skippedEndpoints.push_back(endpoint);
continue;
}
/* don't relay messages back to the endpoint which we got the message from */
if (origin && origin->FromClient && endpoint == origin->FromClient->GetEndpoint()) {
skippedEndpoints.push_back(endpoint);
continue;
}
/* don't relay messages back to the zone which we got the message from */
if (origin && origin->FromZone && targetZone == origin->FromZone) {
skippedEndpoints.push_back(endpoint);
continue;
}
/* only relay message to the master if we're not currently the master */
if (!is_master && master != endpoint) {
skippedEndpoints.push_back(endpoint);
continue;
}
relayed = true;
SyncSendMessage(endpoint, message);
}
if (!skippedEndpoints.empty()) {
double ts = message->Get("ts");
BOOST_FOREACH(const Endpoint::Ptr& endpoint, skippedEndpoints)
endpoint->SetLocalLogPosition(ts);
}
return !log_needed || log_done;
}
void ApiListener::SyncRelayMessage(const MessageOrigin::Ptr& origin, void ApiListener::SyncRelayMessage(const MessageOrigin::Ptr& origin,
const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log) const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
@ -620,80 +691,27 @@ void ApiListener::SyncRelayMessage(const MessageOrigin::Ptr& origin,
if (origin && origin->FromZone) if (origin && origin->FromZone)
message->Set("originZone", origin->FromZone->GetName()); message->Set("originZone", origin->FromZone->GetName());
bool is_master = IsMaster(); Zone::Ptr target_zone;
Endpoint::Ptr master = GetMaster();
Zone::Ptr my_zone = Zone::GetLocalZone();
std::vector<Endpoint::Ptr> skippedEndpoints; if (secobj) {
std::set<Zone::Ptr> allZones; if (secobj->GetReflectionType() == Zone::TypeInstance)
std::set<Zone::Ptr> finishedZones; target_zone = static_pointer_cast<Zone>(secobj);
std::set<Zone::Ptr> finishedLogZones; else
target_zone = static_pointer_cast<Zone>(secobj->GetZone());
BOOST_FOREACH(const Endpoint::Ptr& endpoint, ConfigType::GetObjectsByType<Endpoint>()) {
/* don't relay messages to ourselves */
if (endpoint == GetLocalEndpoint())
continue;
Zone::Ptr target_zone = endpoint->GetZone();
/* only relay messages to zones which have access to the object */
if (!target_zone->CanAccessObject(secobj))
continue;
allZones.insert(target_zone);
/* don't relay messages to disconnected endpoints */
if (!endpoint->GetConnected()) {
if (target_zone == my_zone)
finishedLogZones.erase(target_zone);
continue;
} }
finishedLogZones.insert(target_zone); if (!target_zone)
target_zone = Zone::GetLocalZone();
/* don't relay the message to the zone through more than one endpoint unless this is our own zone */ bool need_log = !RelayMessageOne(target_zone, origin, message);
if (finishedZones.find(target_zone) != finishedZones.end() && target_zone != my_zone) {
skippedEndpoints.push_back(endpoint); BOOST_FOREACH(const Zone::Ptr& zone, target_zone->GetAllParents()) {
continue; if (!RelayMessageOne(zone, origin, message))
need_log = true;
} }
/* don't relay messages back to the endpoint which we got the message from */ if (log && need_log)
if (origin && origin->FromClient && endpoint == origin->FromClient->GetEndpoint()) {
skippedEndpoints.push_back(endpoint);
continue;
}
/* don't relay messages back to the zone which we got the message from */
if (origin && origin->FromZone && target_zone == origin->FromZone) {
skippedEndpoints.push_back(endpoint);
continue;
}
/* only relay message to the master if we're not currently the master */
if (!is_master && master != endpoint) {
skippedEndpoints.push_back(endpoint);
continue;
}
/* only relay the message to a) the same zone, b) the parent zone and c) direct child zones */
if (target_zone != my_zone && target_zone != my_zone->GetParent() &&
secobj->GetZoneName() != target_zone->GetName()) {
skippedEndpoints.push_back(endpoint);
continue;
}
finishedZones.insert(target_zone);
SyncSendMessage(endpoint, message);
}
if (log && allZones.size() != finishedLogZones.size())
PersistMessage(message, secobj); PersistMessage(message, secobj);
BOOST_FOREACH(const Endpoint::Ptr& endpoint, skippedEndpoints)
endpoint->SetLocalLogPosition(ts);
} }
String ApiListener::GetApiDir(void) String ApiListener::GetApiDir(void)

View File

@ -128,6 +128,7 @@ private:
Stream::Ptr m_LogFile; Stream::Ptr m_LogFile;
size_t m_LogMessageCount; size_t m_LogMessageCount;
bool RelayMessageOne(const Zone::Ptr& zone, const MessageOrigin::Ptr& origin, const Dictionary::Ptr& message);
void SyncRelayMessage(const MessageOrigin::Ptr& origin, const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log); void SyncRelayMessage(const MessageOrigin::Ptr& origin, const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log);
void PersistMessage(const Dictionary::Ptr& message, const ConfigObject::Ptr& secobj); void PersistMessage(const Dictionary::Ptr& message, const ConfigObject::Ptr& secobj);

View File

@ -30,6 +30,14 @@ REGISTER_TYPE(Zone);
void Zone::OnAllConfigLoaded(void) void Zone::OnAllConfigLoaded(void)
{ {
m_Parent = Zone::GetByName(GetParentRaw()); m_Parent = Zone::GetByName(GetParentRaw());
Zone::Ptr zone = m_Parent;
while (zone) {
m_AllParents.push_back(zone);
zone = Zone::GetByName(zone->GetParentRaw());
}
} }
Zone::Ptr Zone::GetParent(void) const Zone::Ptr Zone::GetParent(void) const
@ -59,6 +67,11 @@ std::set<Endpoint::Ptr> Zone::GetEndpoints(void) const
return result; return result;
} }
std::vector<Zone::Ptr> Zone::GetAllParents(void) const
{
return m_AllParents;
}
bool Zone::CanAccessObject(const ConfigObject::Ptr& object) bool Zone::CanAccessObject(const ConfigObject::Ptr& object)
{ {
Zone::Ptr object_zone; Zone::Ptr object_zone;

View File

@ -40,6 +40,7 @@ public:
Zone::Ptr GetParent(void) const; Zone::Ptr GetParent(void) const;
std::set<Endpoint::Ptr> GetEndpoints(void) const; std::set<Endpoint::Ptr> GetEndpoints(void) const;
std::vector<Zone::Ptr> GetAllParents(void) const;
bool CanAccessObject(const ConfigObject::Ptr& object); bool CanAccessObject(const ConfigObject::Ptr& object);
bool IsChildOf(const Zone::Ptr& zone); bool IsChildOf(const Zone::Ptr& zone);
@ -49,6 +50,7 @@ public:
private: private:
Zone::Ptr m_Parent; Zone::Ptr m_Parent;
std::vector<Zone::Ptr> m_AllParents;
}; };
} }