icinga2/lib/db_ido/dbconnection.cpp
Alexander A. Klimov ba7102cae3 Explicitly stop started timers and wait for them
before permitting their parent objects' destruction.
For the cases where the handlers have raw pointers to these objects.
2023-04-14 14:52:04 +02:00

584 lines
16 KiB
C++

/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
#include "db_ido/dbconnection.hpp"
#include "db_ido/dbconnection-ti.cpp"
#include "db_ido/dbvalue.hpp"
#include "icinga/icingaapplication.hpp"
#include "icinga/host.hpp"
#include "icinga/service.hpp"
#include "base/configtype.hpp"
#include "base/convert.hpp"
#include "base/objectlock.hpp"
#include "base/utility.hpp"
#include "base/logger.hpp"
#include "base/exception.hpp"
using namespace icinga;
REGISTER_TYPE(DbConnection);
Timer::Ptr DbConnection::m_ProgramStatusTimer;
boost::once_flag DbConnection::m_OnceFlag = BOOST_ONCE_INIT;
void DbConnection::OnConfigLoaded()
{
ConfigObject::OnConfigLoaded();
Value categories = GetCategories();
SetCategoryFilter(FilterArrayToInt(categories, DbQuery::GetCategoryFilterMap(), DbCatEverything));
if (!GetEnableHa()) {
Log(LogDebug, "DbConnection")
<< "HA functionality disabled. Won't pause IDO connection: " << GetName();
SetHAMode(HARunEverywhere);
}
boost::call_once(m_OnceFlag, InitializeDbTimer);
}
void DbConnection::Start(bool runtimeCreated)
{
ObjectImpl<DbConnection>::Start(runtimeCreated);
Log(LogInformation, "DbConnection")
<< "'" << GetName() << "' started.";
auto onQuery = [this](const DbQuery& query) { ExecuteQuery(query); };
DbObject::OnQuery.connect(onQuery);
auto onMultipleQueries = [this](const std::vector<DbQuery>& multiQueries) { ExecuteMultipleQueries(multiQueries); };
DbObject::OnMultipleQueries.connect(onMultipleQueries);
DbObject::QueryCallbacks queryCallbacks;
queryCallbacks.Query = onQuery;
queryCallbacks.MultipleQueries = onMultipleQueries;
DbObject::OnMakeQueries.connect([queryCallbacks](const std::function<void (const DbObject::QueryCallbacks&)>& queryFunc) {
queryFunc(queryCallbacks);
});
}
void DbConnection::Stop(bool runtimeRemoved)
{
Log(LogInformation, "DbConnection")
<< "'" << GetName() << "' stopped.";
ObjectImpl<DbConnection>::Stop(runtimeRemoved);
}
void DbConnection::EnableActiveChangedHandler()
{
if (!m_ActiveChangedHandler) {
ConfigObject::OnActiveChanged.connect([this](const ConfigObject::Ptr& object, const Value&) { UpdateObject(object); });
m_ActiveChangedHandler = true;
}
}
void DbConnection::Resume()
{
ConfigObject::Resume();
Log(LogInformation, "DbConnection")
<< "Resuming IDO connection: " << GetName();
m_CleanUpTimer = Timer::Create();
m_CleanUpTimer->SetInterval(60);
m_CleanUpTimer->OnTimerExpired.connect([this](const Timer * const&) { CleanUpHandler(); });
m_CleanUpTimer->Start();
m_LogStatsTimeout = 0;
m_LogStatsTimer = Timer::Create();
m_LogStatsTimer->SetInterval(10);
m_LogStatsTimer->OnTimerExpired.connect([this](const Timer * const&) { LogStatsHandler(); });
m_LogStatsTimer->Start();
}
void DbConnection::Pause()
{
Log(LogInformation, "DbConnection")
<< "Pausing IDO connection: " << GetName();
m_LogStatsTimer->Stop(true);
m_CleanUpTimer->Stop(true);
DbQuery query1;
query1.Table = "programstatus";
query1.IdColumn = "programstatus_id";
query1.Type = DbQueryUpdate;
query1.Category = DbCatProgramStatus;
query1.WhereCriteria = new Dictionary({
{ "instance_id", 0 } /* DbConnection class fills in real ID */
});
query1.Fields = new Dictionary({
{ "instance_id", 0 }, /* DbConnection class fills in real ID */
{ "program_end_time", DbValue::FromTimestamp(Utility::GetTime()) },
{ "is_currently_running", 0 },
{ "process_id", Empty }
});
query1.Priority = PriorityHigh;
ExecuteQuery(query1);
NewTransaction();
m_QueryQueue.Enqueue([this]() { Disconnect(); }, PriorityLow);
/* Work on remaining tasks but never delete the threads, for HA resuming later. */
m_QueryQueue.Join();
ConfigObject::Pause();
}
void DbConnection::InitializeDbTimer()
{
m_ProgramStatusTimer = Timer::Create();
m_ProgramStatusTimer->SetInterval(10);
m_ProgramStatusTimer->OnTimerExpired.connect([](const Timer * const&) { UpdateProgramStatus(); });
m_ProgramStatusTimer->Start();
}
void DbConnection::InsertRuntimeVariable(const String& key, const Value& value)
{
DbQuery query;
query.Table = "runtimevariables";
query.Type = DbQueryInsert;
query.Category = DbCatProgramStatus;
query.Fields = new Dictionary({
{ "instance_id", 0 }, /* DbConnection class fills in real ID */
{ "varname", key },
{ "varvalue", value }
});
DbObject::OnQuery(query);
}
void DbConnection::UpdateProgramStatus()
{
IcingaApplication::Ptr icingaApplication = IcingaApplication::GetInstance();
if (!icingaApplication)
return;
Log(LogNotice, "DbConnection")
<< "Updating programstatus table.";
std::vector<DbQuery> queries;
DbQuery query1;
query1.Type = DbQueryNewTransaction;
query1.Priority = PriorityImmediate;
queries.emplace_back(std::move(query1));
DbQuery query2;
query2.Table = "programstatus";
query2.IdColumn = "programstatus_id";
query2.Type = DbQueryInsert | DbQueryDelete;
query2.Category = DbCatProgramStatus;
query2.Fields = new Dictionary({
{ "instance_id", 0 }, /* DbConnection class fills in real ID */
{ "program_version", Application::GetAppVersion() },
{ "status_update_time", DbValue::FromTimestamp(Utility::GetTime()) },
{ "program_start_time", DbValue::FromTimestamp(Application::GetStartTime()) },
{ "is_currently_running", 1 },
{ "endpoint_name", icingaApplication->GetNodeName() },
{ "process_id", Utility::GetPid() },
{ "daemon_mode", 1 },
{ "last_command_check", DbValue::FromTimestamp(Utility::GetTime()) },
{ "notifications_enabled", (icingaApplication->GetEnableNotifications() ? 1 : 0) },
{ "active_host_checks_enabled", (icingaApplication->GetEnableHostChecks() ? 1 : 0) },
{ "passive_host_checks_enabled", 1 },
{ "active_service_checks_enabled", (icingaApplication->GetEnableServiceChecks() ? 1 : 0) },
{ "passive_service_checks_enabled", 1 },
{ "event_handlers_enabled", (icingaApplication->GetEnableEventHandlers() ? 1 : 0) },
{ "flap_detection_enabled", (icingaApplication->GetEnableFlapping() ? 1 : 0) },
{ "process_performance_data", (icingaApplication->GetEnablePerfdata() ? 1 : 0) }
});
query2.WhereCriteria = new Dictionary({
{ "instance_id", 0 } /* DbConnection class fills in real ID */
});
queries.emplace_back(std::move(query2));
DbQuery query3;
query3.Type = DbQueryNewTransaction;
queries.emplace_back(std::move(query3));
DbObject::OnMultipleQueries(queries);
DbQuery query4;
query4.Table = "runtimevariables";
query4.Type = DbQueryDelete;
query4.Category = DbCatProgramStatus;
query4.WhereCriteria = new Dictionary({
{ "instance_id", 0 } /* DbConnection class fills in real ID */
});
DbObject::OnQuery(query4);
InsertRuntimeVariable("total_services", ConfigType::Get<Service>()->GetObjectCount());
InsertRuntimeVariable("total_scheduled_services", ConfigType::Get<Service>()->GetObjectCount());
InsertRuntimeVariable("total_hosts", ConfigType::Get<Host>()->GetObjectCount());
InsertRuntimeVariable("total_scheduled_hosts", ConfigType::Get<Host>()->GetObjectCount());
}
void DbConnection::CleanUpHandler()
{
auto now = static_cast<long>(Utility::GetTime());
struct {
String name;
String time_column;
} tables[] = {
{ "acknowledgements", "entry_time" },
{ "commenthistory", "entry_time" },
{ "contactnotifications", "start_time" },
{ "contactnotificationmethods", "start_time" },
{ "downtimehistory", "entry_time" },
{ "eventhandlers", "start_time" },
{ "externalcommands", "entry_time" },
{ "flappinghistory", "event_time" },
{ "hostchecks", "start_time" },
{ "logentries", "logentry_time" },
{ "notifications", "start_time" },
{ "processevents", "event_time" },
{ "statehistory", "state_time" },
{ "servicechecks", "start_time" },
{ "systemcommands", "start_time" }
};
for (auto& table : tables) {
double max_age = GetCleanup()->Get(table.name + "_age");
if (max_age == 0)
continue;
CleanUpExecuteQuery(table.name, table.time_column, now - max_age);
Log(LogNotice, "DbConnection")
<< "Cleanup (" << table.name << "): " << max_age
<< " now: " << now
<< " old: " << now - max_age;
}
}
void DbConnection::LogStatsHandler()
{
if (!GetConnected() || IsPaused())
return;
auto pending = m_PendingQueries.load();
auto now = Utility::GetTime();
bool timeoutReached = m_LogStatsTimeout < now;
if (pending == 0u && !timeoutReached) {
return;
}
auto output = round(m_OutputQueries.CalculateRate(now, 10));
if (pending < output * 5 && !timeoutReached) {
return;
}
auto input = round(m_InputQueries.CalculateRate(now, 10));
Log(LogInformation, GetReflectionType()->GetName())
<< "Pending queries: " << pending << " (Input: " << input
<< "/s; Output: " << output << "/s)";
/* Reschedule next log entry in 5 minutes. */
if (timeoutReached) {
m_LogStatsTimeout = now + 60 * 5;
}
}
void DbConnection::CleanUpExecuteQuery(const String&, const String&, double)
{
/* Default handler does nothing. */
}
void DbConnection::SetConfigHash(const DbObject::Ptr& dbobj, const String& hash)
{
SetConfigHash(dbobj->GetType(), GetObjectID(dbobj), hash);
}
void DbConnection::SetConfigHash(const DbType::Ptr& type, const DbReference& objid, const String& hash)
{
if (!objid.IsValid())
return;
if (!hash.IsEmpty())
m_ConfigHashes[std::make_pair(type, objid)] = hash;
else
m_ConfigHashes.erase(std::make_pair(type, objid));
}
String DbConnection::GetConfigHash(const DbObject::Ptr& dbobj) const
{
return GetConfigHash(dbobj->GetType(), GetObjectID(dbobj));
}
String DbConnection::GetConfigHash(const DbType::Ptr& type, const DbReference& objid) const
{
if (!objid.IsValid())
return String();
auto it = m_ConfigHashes.find(std::make_pair(type, objid));
if (it == m_ConfigHashes.end())
return String();
return it->second;
}
void DbConnection::SetObjectID(const DbObject::Ptr& dbobj, const DbReference& dbref)
{
if (dbref.IsValid())
m_ObjectIDs[dbobj] = dbref;
else
m_ObjectIDs.erase(dbobj);
}
DbReference DbConnection::GetObjectID(const DbObject::Ptr& dbobj) const
{
auto it = m_ObjectIDs.find(dbobj);
if (it == m_ObjectIDs.end())
return {};
return it->second;
}
void DbConnection::SetInsertID(const DbObject::Ptr& dbobj, const DbReference& dbref)
{
SetInsertID(dbobj->GetType(), GetObjectID(dbobj), dbref);
}
void DbConnection::SetInsertID(const DbType::Ptr& type, const DbReference& objid, const DbReference& dbref)
{
if (!objid.IsValid())
return;
if (dbref.IsValid())
m_InsertIDs[std::make_pair(type, objid)] = dbref;
else
m_InsertIDs.erase(std::make_pair(type, objid));
}
DbReference DbConnection::GetInsertID(const DbObject::Ptr& dbobj) const
{
return GetInsertID(dbobj->GetType(), GetObjectID(dbobj));
}
DbReference DbConnection::GetInsertID(const DbType::Ptr& type, const DbReference& objid) const
{
if (!objid.IsValid())
return {};
auto it = m_InsertIDs.find(std::make_pair(type, objid));
if (it == m_InsertIDs.end())
return DbReference();
return it->second;
}
void DbConnection::SetObjectActive(const DbObject::Ptr& dbobj, bool active)
{
if (active)
m_ActiveObjects.insert(dbobj);
else
m_ActiveObjects.erase(dbobj);
}
bool DbConnection::GetObjectActive(const DbObject::Ptr& dbobj) const
{
return (m_ActiveObjects.find(dbobj) != m_ActiveObjects.end());
}
void DbConnection::ClearIDCache()
{
SetIDCacheValid(false);
m_ObjectIDs.clear();
m_InsertIDs.clear();
m_ActiveObjects.clear();
m_ConfigUpdates.clear();
m_StatusUpdates.clear();
m_ConfigHashes.clear();
}
void DbConnection::SetConfigUpdate(const DbObject::Ptr& dbobj, bool hasupdate)
{
if (hasupdate)
m_ConfigUpdates.insert(dbobj);
else
m_ConfigUpdates.erase(dbobj);
}
bool DbConnection::GetConfigUpdate(const DbObject::Ptr& dbobj) const
{
return (m_ConfigUpdates.find(dbobj) != m_ConfigUpdates.end());
}
void DbConnection::SetStatusUpdate(const DbObject::Ptr& dbobj, bool hasupdate)
{
if (hasupdate)
m_StatusUpdates.insert(dbobj);
else
m_StatusUpdates.erase(dbobj);
}
bool DbConnection::GetStatusUpdate(const DbObject::Ptr& dbobj) const
{
return (m_StatusUpdates.find(dbobj) != m_StatusUpdates.end());
}
void DbConnection::UpdateObject(const ConfigObject::Ptr& object)
{
bool isShuttingDown = Application::IsShuttingDown();
bool isRestarting = Application::IsRestarting();
#ifdef I2_DEBUG
if (isShuttingDown || isRestarting) {
//Log(LogDebug, "DbConnection")
// << "Updating object '" << object->GetName() << "' \t\t active '" << Convert::ToLong(object->IsActive())
// << "' shutting down '" << Convert::ToLong(isShuttingDown) << "' restarting '" << Convert::ToLong(isRestarting) << "'.";
}
#endif /* I2_DEBUG */
/* Wait until a database connection is established on reconnect. */
if (!GetConnected())
return;
/* Don't update inactive objects during shutdown/reload/restart.
* They would be marked as deleted. This gets triggered with ConfigObject::StopObjects().
* During startup/reconnect this is fine, the handler is not active there.
*/
if (isShuttingDown || isRestarting)
return;
DbObject::Ptr dbobj = DbObject::GetOrCreateByObject(object);
if (dbobj) {
bool dbActive = GetObjectActive(dbobj);
bool active = object->IsActive();
if (active) {
if (!dbActive)
ActivateObject(dbobj);
Dictionary::Ptr configFields = dbobj->GetConfigFields();
String configHash = dbobj->CalculateConfigHash(configFields);
ASSERT(configHash.GetLength() <= 64);
configFields->Set("config_hash", configHash);
String cachedHash = GetConfigHash(dbobj);
if (cachedHash != configHash) {
dbobj->SendConfigUpdateHeavy(configFields);
dbobj->SendStatusUpdate();
} else {
dbobj->SendConfigUpdateLight();
}
} else if (!active) {
/* This may happen on reload/restart actions too
* and is blocked above already.
*
* Deactivate the deleted object no matter
* which state it had in the database.
*/
DeactivateObject(dbobj);
}
}
}
void DbConnection::UpdateAllObjects()
{
for (const Type::Ptr& type : Type::GetAllTypes()) {
auto *dtype = dynamic_cast<ConfigType *>(type.get());
if (!dtype)
continue;
for (const ConfigObject::Ptr& object : dtype->GetObjects()) {
m_QueryQueue.Enqueue([this, object](){ UpdateObject(object); }, PriorityHigh);
}
}
}
void DbConnection::PrepareDatabase()
{
for (const DbType::Ptr& type : DbType::GetAllTypes()) {
FillIDCache(type);
}
}
void DbConnection::ValidateFailoverTimeout(const Lazy<double>& lvalue, const ValidationUtils& utils)
{
ObjectImpl<DbConnection>::ValidateFailoverTimeout(lvalue, utils);
if (lvalue() < 30)
BOOST_THROW_EXCEPTION(ValidationError(this, { "failover_timeout" }, "Failover timeout minimum is 30s."));
}
void DbConnection::ValidateCategories(const Lazy<Array::Ptr>& lvalue, const ValidationUtils& utils)
{
ObjectImpl<DbConnection>::ValidateCategories(lvalue, utils);
int filter = FilterArrayToInt(lvalue(), DbQuery::GetCategoryFilterMap(), 0);
if (filter != DbCatEverything && (filter & ~(DbCatInvalid | DbCatEverything | DbCatConfig | DbCatState |
DbCatAcknowledgement | DbCatComment | DbCatDowntime | DbCatEventHandler | DbCatExternalCommand |
DbCatFlapping | DbCatLog | DbCatNotification | DbCatProgramStatus | DbCatRetention |
DbCatStateHistory)) != 0)
BOOST_THROW_EXCEPTION(ValidationError(this, { "categories" }, "categories filter is invalid."));
}
void DbConnection::IncreaseQueryCount()
{
double now = Utility::GetTime();
std::unique_lock<std::mutex> lock(m_StatsMutex);
m_QueryStats.InsertValue(now, 1);
}
int DbConnection::GetQueryCount(RingBuffer::SizeType span)
{
std::unique_lock<std::mutex> lock(m_StatsMutex);
return m_QueryStats.UpdateAndGetValues(Utility::GetTime(), span);
}
bool DbConnection::IsIDCacheValid() const
{
return m_IDCacheValid;
}
void DbConnection::SetIDCacheValid(bool valid)
{
m_IDCacheValid = valid;
}
int DbConnection::GetSessionToken()
{
return Application::GetStartTime();
}
void DbConnection::IncreasePendingQueries(int count)
{
m_PendingQueries.fetch_add(count);
m_InputQueries.InsertValue(Utility::GetTime(), count);
}
void DbConnection::DecreasePendingQueries(int count)
{
m_PendingQueries.fetch_sub(count);
m_OutputQueries.InsertValue(Utility::GetTime(), count);
}