Icinga DB: make icinga:history:stream:*#event_id deterministic

... i.e. UUID -> SHA1(env, eventType, x...) given that SHA1(env, x...) = type-specific ID.
Rationale: allow both masters to write the same history concurrently (while not
in split-brain), so that REPLACE INTO deduplicates the same events written twice.

* ack: SHA1(env, "ack_set"|"ack_clear", checkable.name, setTime)
* comment: SHA1(env, "comment_add"|"comment_remove", comment.name)
* downtime: SHA1(env, "downtime_start"|"downtime_end", downtime.name)
* flapping: SHA1(env, "flapping_start"|"flapping_end", checkable.name, startTime)
* notification: SHA1(env, "notification", notification.name, notificationType, sendTime)
* state: SHA1(env, "state_change", checkable.name, changeTime)
This commit is contained in:
Alexander A. Klimov 2021-10-11 17:36:40 +02:00
parent 5c44365c4e
commit b1714a10c2
3 changed files with 43 additions and 15 deletions

View File

@ -1539,10 +1539,11 @@ void IcingaDB::SendStateChange(const ConfigObject::Ptr& object, const CheckResul
hard_state = service ? Convert::ToLong(service->GetLastHardState()) : Convert::ToLong(host->GetLastHardState()); hard_state = service ? Convert::ToLong(service->GetLastHardState()) : Convert::ToLong(host->GetLastHardState());
} }
auto eventTime (TimestampToMilliseconds(cr->GetExecutionEnd())); auto eventTime (cr->GetExecutionEnd());
auto eventTs (TimestampToMilliseconds(eventTime));
Array::Ptr rawId = new Array(Prepend(GetEnvironment(), GetObjectIdentifiersWithoutEnv(object))); Array::Ptr rawId = new Array(Prepend(GetEnvironment(), GetObjectIdentifiersWithoutEnv(object)));
rawId->Add(eventTime); rawId->Add(eventTs);
std::vector<String> xAdd ({ std::vector<String> xAdd ({
"XADD", "icinga:history:stream:state", "*", "XADD", "icinga:history:stream:state", "*",
@ -1556,8 +1557,8 @@ void IcingaDB::SendStateChange(const ConfigObject::Ptr& object, const CheckResul
"previous_soft_state", Convert::ToString(GetPreviousState(checkable, service, StateTypeSoft)), "previous_soft_state", Convert::ToString(GetPreviousState(checkable, service, StateTypeSoft)),
"previous_hard_state", Convert::ToString(GetPreviousState(checkable, service, StateTypeHard)), "previous_hard_state", Convert::ToString(GetPreviousState(checkable, service, StateTypeHard)),
"max_check_attempts", Convert::ToString(checkable->GetMaxCheckAttempts()), "max_check_attempts", Convert::ToString(checkable->GetMaxCheckAttempts()),
"event_time", Convert::ToString(eventTime), "event_time", Convert::ToString(eventTs),
"event_id", Utility::NewUniqueID(), "event_id", CalcEventID("state_change", object, eventTime),
"event_type", "state_change" "event_type", "state_change"
}); });
@ -1619,16 +1620,17 @@ void IcingaDB::SendSentNotification(
} }
auto usersAmount (users.size()); auto usersAmount (users.size());
auto notificationHistoryId = Utility::NewUniqueID();
auto sendTs (TimestampToMilliseconds(sendTime)); auto sendTs (TimestampToMilliseconds(sendTime));
Array::Ptr rawId = new Array(Prepend(GetEnvironment(), GetObjectIdentifiersWithoutEnv(notification))); Array::Ptr rawId = new Array(Prepend(GetEnvironment(), GetObjectIdentifiersWithoutEnv(notification)));
rawId->Add(GetNotificationTypeByEnum(type)); rawId->Add(GetNotificationTypeByEnum(type));
rawId->Add(sendTs); rawId->Add(sendTs);
auto notificationHistoryId (HashValue(rawId));
std::vector<String> xAdd ({ std::vector<String> xAdd ({
"XADD", "icinga:history:stream:notification", "*", "XADD", "icinga:history:stream:notification", "*",
"id", HashValue(rawId), "id", notificationHistoryId,
"environment_id", m_EnvironmentId, "environment_id", m_EnvironmentId,
"notification_id", GetObjectIdentifier(notification), "notification_id", GetObjectIdentifier(notification),
"host_id", GetObjectIdentifier(host), "host_id", GetObjectIdentifier(host),
@ -1639,7 +1641,7 @@ void IcingaDB::SendSentNotification(
"text", Utility::ValidateUTF8(finalText), "text", Utility::ValidateUTF8(finalText),
"users_notified", Convert::ToString(usersAmount), "users_notified", Convert::ToString(usersAmount),
"send_time", Convert::ToString(sendTs), "send_time", Convert::ToString(sendTs),
"event_id", Utility::NewUniqueID(), "event_id", CalcEventID("notification", notification, sendTime, type),
"event_type", "notification" "event_type", "notification"
}); });
@ -1700,7 +1702,7 @@ void IcingaDB::SendStartedDowntime(const Downtime::Ptr& downtime)
"scheduled_end_time", Convert::ToString(TimestampToMilliseconds(downtime->GetEndTime())), "scheduled_end_time", Convert::ToString(TimestampToMilliseconds(downtime->GetEndTime())),
"has_been_cancelled", Convert::ToString((unsigned short)downtime->GetWasCancelled()), "has_been_cancelled", Convert::ToString((unsigned short)downtime->GetWasCancelled()),
"trigger_time", Convert::ToString(TimestampToMilliseconds(downtime->GetTriggerTime())), "trigger_time", Convert::ToString(TimestampToMilliseconds(downtime->GetTriggerTime())),
"event_id", Utility::NewUniqueID(), "event_id", CalcEventID("downtime_start", downtime),
"event_type", "downtime_start" "event_type", "downtime_start"
}); });
@ -1787,7 +1789,7 @@ void IcingaDB::SendRemovedDowntime(const Downtime::Ptr& downtime)
"has_been_cancelled", Convert::ToString((unsigned short)downtime->GetWasCancelled()), "has_been_cancelled", Convert::ToString((unsigned short)downtime->GetWasCancelled()),
"trigger_time", Convert::ToString(TimestampToMilliseconds(downtime->GetTriggerTime())), "trigger_time", Convert::ToString(TimestampToMilliseconds(downtime->GetTriggerTime())),
"cancel_time", Convert::ToString(TimestampToMilliseconds(Utility::GetTime())), "cancel_time", Convert::ToString(TimestampToMilliseconds(Utility::GetTime())),
"event_id", Utility::NewUniqueID(), "event_id", CalcEventID("downtime_end", downtime),
"event_type", "downtime_end" "event_type", "downtime_end"
}); });
@ -1864,7 +1866,7 @@ void IcingaDB::SendAddedComment(const Comment::Ptr& comment)
"entry_type", Convert::ToString(comment->GetEntryType()), "entry_type", Convert::ToString(comment->GetEntryType()),
"is_persistent", Convert::ToString((unsigned short)comment->GetPersistent()), "is_persistent", Convert::ToString((unsigned short)comment->GetPersistent()),
"is_sticky", Convert::ToString((unsigned short)(comment->GetEntryType() == CommentAcknowledgement && comment->GetCheckable()->GetAcknowledgement() == AcknowledgementSticky)), "is_sticky", Convert::ToString((unsigned short)(comment->GetEntryType() == CommentAcknowledgement && comment->GetCheckable()->GetAcknowledgement() == AcknowledgementSticky)),
"event_id", Utility::NewUniqueID(), "event_id", CalcEventID("comment_add", comment),
"event_type", "comment_add" "event_type", "comment_add"
}); });
@ -1921,7 +1923,7 @@ void IcingaDB::SendRemovedComment(const Comment::Ptr& comment)
"entry_type", Convert::ToString(comment->GetEntryType()), "entry_type", Convert::ToString(comment->GetEntryType()),
"is_persistent", Convert::ToString((unsigned short)comment->GetPersistent()), "is_persistent", Convert::ToString((unsigned short)comment->GetPersistent()),
"is_sticky", Convert::ToString((unsigned short)(comment->GetEntryType() == CommentAcknowledgement && comment->GetCheckable()->GetAcknowledgement() == AcknowledgementSticky)), "is_sticky", Convert::ToString((unsigned short)(comment->GetEntryType() == CommentAcknowledgement && comment->GetCheckable()->GetAcknowledgement() == AcknowledgementSticky)),
"event_id", Utility::NewUniqueID(), "event_id", CalcEventID("comment_remove", comment),
"event_type", "comment_remove" "event_type", "comment_remove"
}); });
@ -1982,8 +1984,7 @@ void IcingaDB::SendFlappingChange(const Checkable::Ptr& checkable, double change
"environment_id", m_EnvironmentId, "environment_id", m_EnvironmentId,
"host_id", GetObjectIdentifier(host), "host_id", GetObjectIdentifier(host),
"flapping_threshold_low", Convert::ToString(checkable->GetFlappingThresholdLow()), "flapping_threshold_low", Convert::ToString(checkable->GetFlappingThresholdLow()),
"flapping_threshold_high", Convert::ToString(checkable->GetFlappingThresholdHigh()), "flapping_threshold_high", Convert::ToString(checkable->GetFlappingThresholdHigh())
"event_id", Utility::NewUniqueID()
}); });
if (service) { if (service) {
@ -2025,6 +2026,8 @@ void IcingaDB::SendFlappingChange(const Checkable::Ptr& checkable, double change
xAdd.emplace_back("start_time"); xAdd.emplace_back("start_time");
xAdd.emplace_back(Convert::ToString(startTime)); xAdd.emplace_back(Convert::ToString(startTime));
xAdd.emplace_back("event_id");
xAdd.emplace_back(CalcEventID(checkable->IsFlapping() ? "flapping_start" : "flapping_end", checkable, startTime));
xAdd.emplace_back("id"); xAdd.emplace_back("id");
xAdd.emplace_back(HashValue(new Array({GetEnvironment(), checkable->GetName(), startTime}))); xAdd.emplace_back(HashValue(new Array({GetEnvironment(), checkable->GetName(), startTime})));
@ -2069,7 +2072,6 @@ void IcingaDB::SendAcknowledgementSet(const Checkable::Ptr& checkable, const Str
std::vector<String> xAdd ({ std::vector<String> xAdd ({
"XADD", "icinga:history:stream:acknowledgement", "*", "XADD", "icinga:history:stream:acknowledgement", "*",
"event_id", Utility::NewUniqueID(),
"environment_id", m_EnvironmentId, "environment_id", m_EnvironmentId,
"host_id", GetObjectIdentifier(host), "host_id", GetObjectIdentifier(host),
"event_type", "ack_set", "event_type", "ack_set",
@ -2105,6 +2107,8 @@ void IcingaDB::SendAcknowledgementSet(const Checkable::Ptr& checkable, const Str
xAdd.emplace_back("set_time"); xAdd.emplace_back("set_time");
xAdd.emplace_back(Convert::ToString(setTime)); xAdd.emplace_back(Convert::ToString(setTime));
xAdd.emplace_back("event_id");
xAdd.emplace_back(CalcEventID("ack_set", checkable, setTime));
xAdd.emplace_back("id"); xAdd.emplace_back("id");
xAdd.emplace_back(HashValue(new Array({GetEnvironment(), checkable->GetName(), setTime}))); xAdd.emplace_back(HashValue(new Array({GetEnvironment(), checkable->GetName(), setTime})));
@ -2122,7 +2126,6 @@ void IcingaDB::SendAcknowledgementCleared(const Checkable::Ptr& checkable, const
std::vector<String> xAdd ({ std::vector<String> xAdd ({
"XADD", "icinga:history:stream:acknowledgement", "*", "XADD", "icinga:history:stream:acknowledgement", "*",
"event_id", Utility::NewUniqueID(),
"environment_id", m_EnvironmentId, "environment_id", m_EnvironmentId,
"host_id", GetObjectIdentifier(host), "host_id", GetObjectIdentifier(host),
"clear_time", Convert::ToString(TimestampToMilliseconds(changeTime)), "clear_time", Convert::ToString(TimestampToMilliseconds(changeTime)),
@ -2150,6 +2153,8 @@ void IcingaDB::SendAcknowledgementCleared(const Checkable::Ptr& checkable, const
xAdd.emplace_back("set_time"); xAdd.emplace_back("set_time");
xAdd.emplace_back(Convert::ToString(setTime)); xAdd.emplace_back(Convert::ToString(setTime));
xAdd.emplace_back("event_id");
xAdd.emplace_back(CalcEventID("ack_clear", checkable, setTime));
xAdd.emplace_back("id"); xAdd.emplace_back("id");
xAdd.emplace_back(HashValue(new Array({GetEnvironment(), checkable->GetName(), setTime}))); xAdd.emplace_back(HashValue(new Array({GetEnvironment(), checkable->GetName(), setTime})));

View File

@ -80,6 +80,28 @@ String IcingaDB::GetObjectIdentifier(const ConfigObject::Ptr& object)
return HashValue(new Array(Prepend(GetEnvironment(), GetObjectIdentifiersWithoutEnv(object)))); return HashValue(new Array(Prepend(GetEnvironment(), GetObjectIdentifiersWithoutEnv(object))));
} }
/**
* Calculates a deterministic history event ID like SHA1(env, eventType, x...[, nt][, eventTime])
*
* Where SHA1(env, x...) = GetObjectIdentifier(object)
*/
String IcingaDB::CalcEventID(const char* eventType, const ConfigObject::Ptr& object, double eventTime, NotificationType nt)
{
Array::Ptr rawId = new Array(GetObjectIdentifiersWithoutEnv(object));
rawId->Insert(0, GetEnvironment());
rawId->Insert(1, eventType);
if (nt) {
rawId->Add(GetNotificationTypeByEnum(nt));
}
if (eventTime) {
rawId->Add(TimestampToMilliseconds(eventTime));
}
return HashValue(std::move(rawId));
}
static const std::set<String> metadataWhitelist ({"package", "source_location", "templates"}); static const std::set<String> metadataWhitelist ({"package", "source_location", "templates"});
/** /**

View File

@ -107,6 +107,7 @@ private:
static ArrayData GetObjectIdentifiersWithoutEnv(const ConfigObject::Ptr& object); static ArrayData GetObjectIdentifiersWithoutEnv(const ConfigObject::Ptr& object);
static String GetObjectIdentifier(const ConfigObject::Ptr& object); static String GetObjectIdentifier(const ConfigObject::Ptr& object);
static String CalcEventID(const char* eventType, const ConfigObject::Ptr& object, double eventTime = 0, NotificationType nt = NotificationType(0));
static String GetEnvironment(); static String GetEnvironment();
static Dictionary::Ptr SerializeVars(const CustomVarObject::Ptr& object); static Dictionary::Ptr SerializeVars(const CustomVarObject::Ptr& object);
static const char* GetNotificationTypeByEnum(NotificationType type); static const char* GetNotificationTypeByEnum(NotificationType type);