mirror of https://github.com/Icinga/icinga2.git
Improve the performance for ApiListener::SyncRelayMessage (part 2)
refs #11014
This commit is contained in:
parent
9ae1f1abee
commit
8c05003101
|
@ -613,6 +613,77 @@ void ApiListener::SyncSendMessage(const Endpoint::Ptr& endpoint, const Dictionar
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool ApiListener::RelayMessageOne(const Zone::Ptr& targetZone, const MessageOrigin::Ptr& origin, const Dictionary::Ptr& message)
|
||||||
|
{
|
||||||
|
ASSERT(targetZone);
|
||||||
|
|
||||||
|
bool is_master = IsMaster();
|
||||||
|
Endpoint::Ptr master = GetMaster();
|
||||||
|
Zone::Ptr myZone = Zone::GetLocalZone();
|
||||||
|
|
||||||
|
/* only relay the message to a) the same zone, b) the parent zone and c) direct child zones */
|
||||||
|
if (targetZone != myZone && targetZone != myZone->GetParent() && targetZone->GetParent() != myZone)
|
||||||
|
return true;
|
||||||
|
|
||||||
|
std::vector<Endpoint::Ptr> skippedEndpoints;
|
||||||
|
|
||||||
|
bool relayed = false, log_needed = false, log_done = false;
|
||||||
|
|
||||||
|
BOOST_FOREACH(const Endpoint::Ptr& endpoint, targetZone->GetEndpoints()) {
|
||||||
|
/* don't relay messages to ourselves */
|
||||||
|
if (endpoint == GetLocalEndpoint())
|
||||||
|
continue;
|
||||||
|
|
||||||
|
log_needed = true;
|
||||||
|
|
||||||
|
/* don't relay messages to disconnected endpoints */
|
||||||
|
if (!endpoint->GetConnected()) {
|
||||||
|
if (targetZone == myZone)
|
||||||
|
log_done = false;
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
log_done = true;
|
||||||
|
|
||||||
|
/* don't relay the message to the zone through more than one endpoint unless this is our own zone */
|
||||||
|
if (relayed && targetZone != myZone) {
|
||||||
|
skippedEndpoints.push_back(endpoint);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* don't relay messages back to the endpoint which we got the message from */
|
||||||
|
if (origin && origin->FromClient && endpoint == origin->FromClient->GetEndpoint()) {
|
||||||
|
skippedEndpoints.push_back(endpoint);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* don't relay messages back to the zone which we got the message from */
|
||||||
|
if (origin && origin->FromZone && targetZone == origin->FromZone) {
|
||||||
|
skippedEndpoints.push_back(endpoint);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* only relay message to the master if we're not currently the master */
|
||||||
|
if (!is_master && master != endpoint) {
|
||||||
|
skippedEndpoints.push_back(endpoint);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
relayed = true;
|
||||||
|
|
||||||
|
SyncSendMessage(endpoint, message);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!skippedEndpoints.empty()) {
|
||||||
|
double ts = message->Get("ts");
|
||||||
|
|
||||||
|
BOOST_FOREACH(const Endpoint::Ptr& endpoint, skippedEndpoints)
|
||||||
|
endpoint->SetLocalLogPosition(ts);
|
||||||
|
}
|
||||||
|
|
||||||
|
return !log_needed || log_done;
|
||||||
|
}
|
||||||
|
|
||||||
void ApiListener::SyncRelayMessage(const MessageOrigin::Ptr& origin,
|
void ApiListener::SyncRelayMessage(const MessageOrigin::Ptr& origin,
|
||||||
const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
|
const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
|
||||||
|
@ -626,80 +697,27 @@ void ApiListener::SyncRelayMessage(const MessageOrigin::Ptr& origin,
|
||||||
if (origin && origin->FromZone)
|
if (origin && origin->FromZone)
|
||||||
message->Set("originZone", origin->FromZone->GetName());
|
message->Set("originZone", origin->FromZone->GetName());
|
||||||
|
|
||||||
bool is_master = IsMaster();
|
Zone::Ptr target_zone;
|
||||||
Endpoint::Ptr master = GetMaster();
|
|
||||||
Zone::Ptr my_zone = Zone::GetLocalZone();
|
|
||||||
|
|
||||||
std::vector<Endpoint::Ptr> skippedEndpoints;
|
if (secobj) {
|
||||||
std::set<Zone::Ptr> allZones;
|
if (secobj->GetReflectionType() == Zone::TypeInstance)
|
||||||
std::set<Zone::Ptr> finishedZones;
|
target_zone = static_pointer_cast<Zone>(secobj);
|
||||||
std::set<Zone::Ptr> finishedLogZones;
|
else
|
||||||
|
target_zone = static_pointer_cast<Zone>(secobj->GetZone());
|
||||||
BOOST_FOREACH(const Endpoint::Ptr& endpoint, ConfigType::GetObjectsByType<Endpoint>()) {
|
|
||||||
/* don't relay messages to ourselves */
|
|
||||||
if (endpoint == GetLocalEndpoint())
|
|
||||||
continue;
|
|
||||||
|
|
||||||
Zone::Ptr target_zone = endpoint->GetZone();
|
|
||||||
|
|
||||||
|
|
||||||
/* only relay messages to zones which have access to the object */
|
|
||||||
if (!target_zone->CanAccessObject(secobj))
|
|
||||||
continue;
|
|
||||||
|
|
||||||
allZones.insert(target_zone);
|
|
||||||
|
|
||||||
/* don't relay messages to disconnected endpoints */
|
|
||||||
if (!endpoint->GetConnected()) {
|
|
||||||
if (target_zone == my_zone)
|
|
||||||
finishedLogZones.erase(target_zone);
|
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
finishedLogZones.insert(target_zone);
|
|
||||||
|
|
||||||
/* don't relay the message to the zone through more than one endpoint unless this is our own zone */
|
|
||||||
if (finishedZones.find(target_zone) != finishedZones.end() && target_zone != my_zone) {
|
|
||||||
skippedEndpoints.push_back(endpoint);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* don't relay messages back to the endpoint which we got the message from */
|
|
||||||
if (origin && origin->FromClient && endpoint == origin->FromClient->GetEndpoint()) {
|
|
||||||
skippedEndpoints.push_back(endpoint);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* don't relay messages back to the zone which we got the message from */
|
|
||||||
if (origin && origin->FromZone && target_zone == origin->FromZone) {
|
|
||||||
skippedEndpoints.push_back(endpoint);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* only relay message to the master if we're not currently the master */
|
|
||||||
if (!is_master && master != endpoint) {
|
|
||||||
skippedEndpoints.push_back(endpoint);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* only relay the message to a) the same zone, b) the parent zone and c) direct child zones */
|
|
||||||
if (target_zone != my_zone && target_zone != my_zone->GetParent() &&
|
|
||||||
secobj->GetZoneName() != target_zone->GetName()) {
|
|
||||||
skippedEndpoints.push_back(endpoint);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
finishedZones.insert(target_zone);
|
|
||||||
|
|
||||||
SyncSendMessage(endpoint, message);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (log && allZones.size() != finishedLogZones.size())
|
if (!target_zone)
|
||||||
PersistMessage(message, secobj);
|
target_zone = Zone::GetLocalZone();
|
||||||
|
|
||||||
BOOST_FOREACH(const Endpoint::Ptr& endpoint, skippedEndpoints)
|
bool need_log = !RelayMessageOne(target_zone, origin, message);
|
||||||
endpoint->SetLocalLogPosition(ts);
|
|
||||||
|
BOOST_FOREACH(const Zone::Ptr& zone, target_zone->GetAllParents()) {
|
||||||
|
if (!RelayMessageOne(zone, origin, message))
|
||||||
|
need_log = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (log && need_log)
|
||||||
|
PersistMessage(message, secobj);
|
||||||
}
|
}
|
||||||
|
|
||||||
String ApiListener::GetApiDir(void)
|
String ApiListener::GetApiDir(void)
|
||||||
|
|
|
@ -128,6 +128,7 @@ private:
|
||||||
Stream::Ptr m_LogFile;
|
Stream::Ptr m_LogFile;
|
||||||
size_t m_LogMessageCount;
|
size_t m_LogMessageCount;
|
||||||
|
|
||||||
|
bool RelayMessageOne(const Zone::Ptr& zone, const MessageOrigin::Ptr& origin, const Dictionary::Ptr& message);
|
||||||
void SyncRelayMessage(const MessageOrigin::Ptr& origin, const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log);
|
void SyncRelayMessage(const MessageOrigin::Ptr& origin, const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log);
|
||||||
void PersistMessage(const Dictionary::Ptr& message, const ConfigObject::Ptr& secobj);
|
void PersistMessage(const Dictionary::Ptr& message, const ConfigObject::Ptr& secobj);
|
||||||
|
|
||||||
|
|
|
@ -30,6 +30,14 @@ REGISTER_TYPE(Zone);
|
||||||
void Zone::OnAllConfigLoaded(void)
|
void Zone::OnAllConfigLoaded(void)
|
||||||
{
|
{
|
||||||
m_Parent = Zone::GetByName(GetParentRaw());
|
m_Parent = Zone::GetByName(GetParentRaw());
|
||||||
|
|
||||||
|
Zone::Ptr zone = m_Parent;
|
||||||
|
|
||||||
|
while (zone) {
|
||||||
|
m_AllParents.push_back(zone);
|
||||||
|
|
||||||
|
zone = Zone::GetByName(zone->GetParentRaw());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Zone::Ptr Zone::GetParent(void) const
|
Zone::Ptr Zone::GetParent(void) const
|
||||||
|
@ -59,6 +67,11 @@ std::set<Endpoint::Ptr> Zone::GetEndpoints(void) const
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::vector<Zone::Ptr> Zone::GetAllParents(void) const
|
||||||
|
{
|
||||||
|
return m_AllParents;
|
||||||
|
}
|
||||||
|
|
||||||
bool Zone::CanAccessObject(const ConfigObject::Ptr& object)
|
bool Zone::CanAccessObject(const ConfigObject::Ptr& object)
|
||||||
{
|
{
|
||||||
Zone::Ptr object_zone;
|
Zone::Ptr object_zone;
|
||||||
|
|
|
@ -40,6 +40,7 @@ public:
|
||||||
|
|
||||||
Zone::Ptr GetParent(void) const;
|
Zone::Ptr GetParent(void) const;
|
||||||
std::set<Endpoint::Ptr> GetEndpoints(void) const;
|
std::set<Endpoint::Ptr> GetEndpoints(void) const;
|
||||||
|
std::vector<Zone::Ptr> GetAllParents(void) const;
|
||||||
|
|
||||||
bool CanAccessObject(const ConfigObject::Ptr& object);
|
bool CanAccessObject(const ConfigObject::Ptr& object);
|
||||||
bool IsChildOf(const Zone::Ptr& zone);
|
bool IsChildOf(const Zone::Ptr& zone);
|
||||||
|
@ -49,6 +50,7 @@ public:
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Zone::Ptr m_Parent;
|
Zone::Ptr m_Parent;
|
||||||
|
std::vector<Zone::Ptr> m_AllParents;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue