mirror of https://github.com/Icinga/icinga2.git
Merge pull request #8440 from Icinga/bugfix/message-routing-for-global-zones
Fix cluster message routing for global zones
This commit is contained in:
commit
39bc1590f6
|
@ -1032,6 +1032,17 @@ void ApiListener::SyncSendMessage(const Endpoint::Ptr& endpoint, const Dictionar
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Relay a message to a directly connected zone or to a global zone.
|
||||||
|
* If some other zone is passed as the target zone, it is not relayed.
|
||||||
|
*
|
||||||
|
* @param targetZone The zone to relay to
|
||||||
|
* @param origin Information about where this message is relayed from (if it was not generated locally)
|
||||||
|
* @param message The message to relay
|
||||||
|
* @param currentZoneMaster The current master node of the local zone
|
||||||
|
* @return true if the message has been relayed to all relevant endpoints,
|
||||||
|
* false if it hasn't and must be persisted in the replay log
|
||||||
|
*/
|
||||||
bool ApiListener::RelayMessageOne(const Zone::Ptr& targetZone, const MessageOrigin::Ptr& origin, const Dictionary::Ptr& message, const Endpoint::Ptr& currentZoneMaster)
|
bool ApiListener::RelayMessageOne(const Zone::Ptr& targetZone, const MessageOrigin::Ptr& origin, const Dictionary::Ptr& message, const Endpoint::Ptr& currentZoneMaster)
|
||||||
{
|
{
|
||||||
ASSERT(targetZone);
|
ASSERT(targetZone);
|
||||||
|
@ -1050,25 +1061,26 @@ bool ApiListener::RelayMessageOne(const Zone::Ptr& targetZone, const MessageOrig
|
||||||
|
|
||||||
std::vector<Endpoint::Ptr> skippedEndpoints;
|
std::vector<Endpoint::Ptr> skippedEndpoints;
|
||||||
|
|
||||||
bool relayed = false, log_needed = false, log_done = false;
|
std::set<Zone::Ptr> allTargetZones;
|
||||||
|
|
||||||
std::set<Endpoint::Ptr> targetEndpoints;
|
|
||||||
|
|
||||||
if (targetZone->GetGlobal()) {
|
if (targetZone->GetGlobal()) {
|
||||||
targetEndpoints = localZone->GetEndpoints();
|
/* if the zone is global, the message has to be relayed to our local zone and direct children */
|
||||||
|
allTargetZones.insert(localZone);
|
||||||
for (const Zone::Ptr& zone : ConfigType::GetObjectsByType<Zone>()) {
|
for (const Zone::Ptr& zone : ConfigType::GetObjectsByType<Zone>()) {
|
||||||
/* Fetch immediate child zone members */
|
|
||||||
if (zone->GetParent() == localZone) {
|
if (zone->GetParent() == localZone) {
|
||||||
std::set<Endpoint::Ptr> endpoints = zone->GetEndpoints();
|
allTargetZones.insert(zone);
|
||||||
targetEndpoints.insert(endpoints.begin(), endpoints.end());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
targetEndpoints = targetZone->GetEndpoints();
|
/* whereas if it's not global, the message is just relayed to the zone itself */
|
||||||
|
allTargetZones.insert(targetZone);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const Endpoint::Ptr& targetEndpoint : targetEndpoints) {
|
bool needsReplay = false;
|
||||||
|
|
||||||
|
for (const Zone::Ptr& currentTargetZone : allTargetZones) {
|
||||||
|
bool relayed = false, log_needed = false, log_done = false;
|
||||||
|
|
||||||
|
for (const Endpoint::Ptr& targetEndpoint : currentTargetZone->GetEndpoints()) {
|
||||||
/* Don't relay messages to ourselves. */
|
/* Don't relay messages to ourselves. */
|
||||||
if (targetEndpoint == localEndpoint)
|
if (targetEndpoint == localEndpoint)
|
||||||
continue;
|
continue;
|
||||||
|
@ -1077,7 +1089,7 @@ bool ApiListener::RelayMessageOne(const Zone::Ptr& targetZone, const MessageOrig
|
||||||
|
|
||||||
/* Don't relay messages to disconnected endpoints. */
|
/* Don't relay messages to disconnected endpoints. */
|
||||||
if (!targetEndpoint->GetConnected()) {
|
if (!targetEndpoint->GetConnected()) {
|
||||||
if (targetZone == localZone)
|
if (currentTargetZone == localZone)
|
||||||
log_done = false;
|
log_done = false;
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
|
@ -1088,7 +1100,7 @@ bool ApiListener::RelayMessageOne(const Zone::Ptr& targetZone, const MessageOrig
|
||||||
/* Don't relay the message to the zone through more than one endpoint unless this is our own zone.
|
/* Don't relay the message to the zone through more than one endpoint unless this is our own zone.
|
||||||
* 'relayed' is set to true on success below, enabling the checks in the second iteration.
|
* 'relayed' is set to true on success below, enabling the checks in the second iteration.
|
||||||
*/
|
*/
|
||||||
if (relayed && targetZone != localZone) {
|
if (relayed && currentTargetZone != localZone) {
|
||||||
skippedEndpoints.push_back(targetEndpoint);
|
skippedEndpoints.push_back(targetEndpoint);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1100,7 +1112,7 @@ bool ApiListener::RelayMessageOne(const Zone::Ptr& targetZone, const MessageOrig
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Don't relay messages back to the zone which we got the message from. */
|
/* Don't relay messages back to the zone which we got the message from. */
|
||||||
if (origin && origin->FromZone && targetZone == origin->FromZone) {
|
if (origin && origin->FromZone && currentTargetZone == origin->FromZone) {
|
||||||
skippedEndpoints.push_back(targetEndpoint);
|
skippedEndpoints.push_back(targetEndpoint);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1128,6 +1140,11 @@ bool ApiListener::RelayMessageOne(const Zone::Ptr& targetZone, const MessageOrig
|
||||||
SyncSendMessage(targetEndpoint, message);
|
SyncSendMessage(targetEndpoint, message);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (log_needed && !log_done) {
|
||||||
|
needsReplay = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (!skippedEndpoints.empty()) {
|
if (!skippedEndpoints.empty()) {
|
||||||
double ts = message->Get("ts");
|
double ts = message->Get("ts");
|
||||||
|
|
||||||
|
@ -1135,7 +1152,7 @@ bool ApiListener::RelayMessageOne(const Zone::Ptr& targetZone, const MessageOrig
|
||||||
skippedEndpoint->SetLocalLogPosition(ts);
|
skippedEndpoint->SetLocalLogPosition(ts);
|
||||||
}
|
}
|
||||||
|
|
||||||
return !log_needed || log_done;
|
return !needsReplay;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ApiListener::SyncRelayMessage(const MessageOrigin::Ptr& origin,
|
void ApiListener::SyncRelayMessage(const MessageOrigin::Ptr& origin,
|
||||||
|
|
Loading…
Reference in New Issue