DB IDO: Delay connection in HA RunOnce mode

refs #6827
refs #6203
This commit is contained in:
Michael Friedrich 2014-08-13 20:30:28 +02:00
parent d326678e76
commit f6c24f8964
6 changed files with 135 additions and 29 deletions

View File

@ -62,9 +62,6 @@ Value IdoMysqlConnection::StatsFunc(Dictionary::Ptr& status, Dictionary::Ptr& pe
void IdoMysqlConnection::Resume(void) void IdoMysqlConnection::Resume(void)
{ {
if (!IsPaused())
return;
DbConnection::Resume(); DbConnection::Resume();
m_Connected = false; m_Connected = false;
@ -87,11 +84,6 @@ void IdoMysqlConnection::Resume(void)
void IdoMysqlConnection::Pause(void) void IdoMysqlConnection::Pause(void)
{ {
if (!GetEnableHa()) {
Log(LogInformation, "IdoMysqlConnection", "HA functionality disabled. Won't pause IDO connection: " + GetName());
return;
}
m_ReconnectTimer.reset(); m_ReconnectTimer.reset();
DbConnection::Pause(); DbConnection::Pause();
@ -196,6 +188,7 @@ void IdoMysqlConnection::Reconnect(void)
passwd = (!ipasswd.IsEmpty()) ? ipasswd.CStr() : NULL; passwd = (!ipasswd.IsEmpty()) ? ipasswd.CStr() : NULL;
db = (!idb.IsEmpty()) ? idb.CStr() : NULL; db = (!idb.IsEmpty()) ? idb.CStr() : NULL;
/* connection */
if (!mysql_init(&m_Connection)) { if (!mysql_init(&m_Connection)) {
std::ostringstream msgbuf; std::ostringstream msgbuf;
msgbuf << "mysql_init() failed: \"" << mysql_error(&m_Connection) << "\""; msgbuf << "mysql_init() failed: \"" << mysql_error(&m_Connection) << "\"";
@ -218,9 +211,9 @@ void IdoMysqlConnection::Reconnect(void)
String dbVersionName = "idoutils"; String dbVersionName = "idoutils";
IdoMysqlResult result = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name='" + Escape(dbVersionName) + "'"); IdoMysqlResult result = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name='" + Escape(dbVersionName) + "'");
Dictionary::Ptr version_row = FetchRow(result); Dictionary::Ptr row = FetchRow(result);
if (!version_row) { if (!row) {
Log(LogCritical, "IdoMysqlConnection", "Schema does not provide any valid version! Verify your schema installation."); Log(LogCritical, "IdoMysqlConnection", "Schema does not provide any valid version! Verify your schema installation.");
Application::Exit(EXIT_FAILURE); Application::Exit(EXIT_FAILURE);
@ -228,7 +221,7 @@ void IdoMysqlConnection::Reconnect(void)
DiscardRows(result); DiscardRows(result);
String version = version_row->Get("version"); String version = row->Get("version");
if (Utility::CompareVersion(SCHEMA_VERSION, version) < 0) { if (Utility::CompareVersion(SCHEMA_VERSION, version) < 0) {
Log(LogCritical, "IdoMysqlConnection", "Schema version '" + version + "' does not match the required version '" + Log(LogCritical, "IdoMysqlConnection", "Schema version '" + version + "' does not match the required version '" +
@ -240,16 +233,68 @@ void IdoMysqlConnection::Reconnect(void)
String instanceName = GetInstanceName(); String instanceName = GetInstanceName();
result = Query("SELECT instance_id FROM " + GetTablePrefix() + "instances WHERE instance_name = '" + Escape(instanceName) + "'"); result = Query("SELECT instance_id FROM " + GetTablePrefix() + "instances WHERE instance_name = '" + Escape(instanceName) + "'");
row = FetchRow(result);
Dictionary::Ptr row = FetchRow(result);
if (!row) { if (!row) {
Query("INSERT INTO " + GetTablePrefix() + "instances (instance_name, instance_description) VALUES ('" + Escape(instanceName) + "', '" + Escape(GetInstanceDescription()) + "')"); Query("INSERT INTO " + GetTablePrefix() + "instances (instance_name, instance_description) VALUES ('" + Escape(instanceName) + "', '" + Escape(GetInstanceDescription()) + "')");
m_InstanceID = GetLastInsertID(); m_InstanceID = GetLastInsertID();
} else { } else {
m_InstanceID = DbReference(row->Get("instance_id"));
}
DiscardRows(result);
Endpoint::Ptr my_endpoint = Endpoint::GetLocalEndpoint();
/* we have an endpoint in a cluster setup, so decide if we can proceed here */
if (my_endpoint && GetHAMode() == HARunOnce) {
/* get the current endpoint writing to programstatus table */
result = Query("SELECT UNIX_TIMESTAMP(status_update_time) AS status_update_time, endpoint_name FROM " +
GetTablePrefix() + "programstatus WHERE instance_id = " + Convert::ToString(m_InstanceID));
row = FetchRow(result);
DiscardRows(result); DiscardRows(result);
m_InstanceID = DbReference(row->Get("instance_id")); String endpoint_name;
if (row)
endpoint_name = row->Get("endpoint_name");
else
Log(LogNotice, "IdoMysqlConnection", "Empty program status table");
/* if we did not write into the database earlier, another instance is active */
if (endpoint_name != my_endpoint->GetName()) {
double status_update_time;
if (row)
status_update_time = row->Get("status_update_time");
else
status_update_time = 0;
double status_update_age = Utility::GetTime() - status_update_time;
Log(LogNotice, "IdoMysqlConnection", "Last update by '" +
endpoint_name + "' was " + Convert::ToString(status_update_age) + "s ago.");
if (status_update_age < 60) {
mysql_close(&m_Connection);
m_Connected = false;
return;
}
/* activate the IDO only, if we're authoritative in this zone */
if (IsPaused()) {
Log(LogNotice, "IdoMysqlConnection", "Local endpoint '" +
my_endpoint->GetName() + "' is not authoritative, bailing out.");
mysql_close(&m_Connection);
m_Connected = false;
return;
}
}
Log(LogNotice, "IdoMysqlConnection", "Enabling IDO connection.");
} }
std::ostringstream msgbuf; std::ostringstream msgbuf;

View File

@ -64,9 +64,6 @@ Value IdoPgsqlConnection::StatsFunc(Dictionary::Ptr& status, Dictionary::Ptr& pe
void IdoPgsqlConnection::Resume(void) void IdoPgsqlConnection::Resume(void)
{ {
if (!IsPaused())
return;
DbConnection::Resume(); DbConnection::Resume();
m_Connection = NULL; m_Connection = NULL;
@ -89,11 +86,6 @@ void IdoPgsqlConnection::Resume(void)
void IdoPgsqlConnection::Pause(void) void IdoPgsqlConnection::Pause(void)
{ {
if (!GetEnableHa()) {
Log(LogInformation, "IdoMysqlConnection", "HA functionality disabled. Won't pause IDO connection: " + GetName());
return;
}
m_ReconnectTimer.reset(); m_ReconnectTimer.reset();
DbConnection::Pause(); DbConnection::Pause();
@ -220,15 +212,15 @@ void IdoPgsqlConnection::Reconnect(void)
String dbVersionName = "idoutils"; String dbVersionName = "idoutils";
IdoPgsqlResult result = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name=E'" + Escape(dbVersionName) + "'"); IdoPgsqlResult result = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name=E'" + Escape(dbVersionName) + "'");
Dictionary::Ptr version_row = FetchRow(result, 0); Dictionary::Ptr row = FetchRow(result, 0);
if (!version_row) { if (!row) {
Log(LogCritical, "IdoPgsqlConnection", "Schema does not provide any valid version! Verify your schema installation."); Log(LogCritical, "IdoPgsqlConnection", "Schema does not provide any valid version! Verify your schema installation.");
Application::Exit(EXIT_FAILURE); Application::Exit(EXIT_FAILURE);
} }
String version = version_row->Get("version"); String version = row->Get("version");
if (Utility::CompareVersion(SCHEMA_VERSION, version) < 0) { if (Utility::CompareVersion(SCHEMA_VERSION, version) < 0) {
Log(LogCritical, "IdoPgsqlConnection", "Schema version '" + version + "' does not match the required version '" + Log(LogCritical, "IdoPgsqlConnection", "Schema version '" + version + "' does not match the required version '" +
@ -240,8 +232,7 @@ void IdoPgsqlConnection::Reconnect(void)
String instanceName = GetInstanceName(); String instanceName = GetInstanceName();
result = Query("SELECT instance_id FROM " + GetTablePrefix() + "instances WHERE instance_name = E'" + Escape(instanceName) + "'"); result = Query("SELECT instance_id FROM " + GetTablePrefix() + "instances WHERE instance_name = E'" + Escape(instanceName) + "'");
row = FetchRow(result, 0);
Dictionary::Ptr row = FetchRow(result, 0);
if (!row) { if (!row) {
Query("INSERT INTO " + GetTablePrefix() + "instances (instance_name, instance_description) VALUES (E'" + Escape(instanceName) + "', E'" + Escape(GetInstanceDescription()) + "')"); Query("INSERT INTO " + GetTablePrefix() + "instances (instance_name, instance_description) VALUES (E'" + Escape(instanceName) + "', E'" + Escape(GetInstanceDescription()) + "')");
@ -250,6 +241,58 @@ void IdoPgsqlConnection::Reconnect(void)
m_InstanceID = DbReference(row->Get("instance_id")); m_InstanceID = DbReference(row->Get("instance_id"));
} }
Endpoint::Ptr my_endpoint = Endpoint::GetLocalEndpoint();
/* we have an endpoint in a cluster setup, so decide if we can proceed here */
if (my_endpoint && GetHAMode() == HARunOnce) {
/* get the current endpoint writing to programstatus table */
result = Query("SELECT UNIX_TIMESTAMP(status_update_time) AS status_update_time, endpoint_name FROM " +
GetTablePrefix() + "programstatus WHERE instance_id = " + Convert::ToString(m_InstanceID));
row = FetchRow(result, 0);
String endpoint_name;
if (row)
endpoint_name = row->Get("endpoint_name");
else
Log(LogNotice, "IdoPgsqlConnection", "Empty program status table");
/* if we did not write into the database earlier, another instance is active */
if (endpoint_name != my_endpoint->GetName()) {
double status_update_time;
if (row)
status_update_time = row->Get("status_update_time");
else
status_update_time = 0;
double status_update_age = Utility::GetTime() - status_update_time;
Log(LogNotice, "IdoPgsqlConnection", "Last update by '" +
endpoint_name + "' was " + Convert::ToString(status_update_age) + "s ago.");
if (status_update_age < 60) {
PQfinish(m_Connection);
m_Connection = NULL;
return;
}
/* activate the IDO only, if we're authoritative in this zone */
if (IsPaused()) {
Log(LogNotice, "IdoPgsqlConnection", "Local endpoint '" +
my_endpoint->GetName() + "' is not authoritative, bailing out.");
PQfinish(m_Connection);
m_Connection = NULL;
return;
}
}
Log(LogNotice, "IdoPgsqlConnection", "Enabling IDO connection.");
}
std::ostringstream msgbuf; std::ostringstream msgbuf;
msgbuf << "pgSQL IDO instance id: " << static_cast<long>(m_InstanceID) << " (schema version: '" + version + "')"; msgbuf << "pgSQL IDO instance id: " << static_cast<long>(m_InstanceID) << " (schema version: '" + version + "')";
Log(LogInformation, "IdoPgsqlConnection", msgbuf.str()); Log(LogInformation, "IdoPgsqlConnection", msgbuf.str());

View File

@ -2,6 +2,12 @@ namespace icinga
{ {
code {{{ code {{{
enum HAMode
{
HARunOnce,
HARunEverywhere
};
class NameComposer { class NameComposer {
public: public:
virtual String MakeName(const String& shortName, const Dictionary::Ptr props) const = 0; virtual String MakeName(const String& shortName, const Dictionary::Ptr props) const = 0;
@ -32,6 +38,7 @@ abstract class DynamicObject
[get_protected] bool stop_called; [get_protected] bool stop_called;
[get_protected] bool pause_called; [get_protected] bool pause_called;
[get_protected] bool resume_called; [get_protected] bool resume_called;
[enum] HAMode ha_mode (HAMode);
Dictionary::Ptr authority_info; Dictionary::Ptr authority_info;
[protected] Dictionary::Ptr extensions; [protected] Dictionary::Ptr extensions;

View File

@ -38,6 +38,16 @@ Timer::Ptr DbConnection::m_ProgramStatusTimer;
INITIALIZE_ONCE(&DbConnection::StaticInitialize); INITIALIZE_ONCE(&DbConnection::StaticInitialize);
void DbConnection::OnConfigLoaded(void)
{
DynamicObject::OnConfigLoaded();
if (!GetEnableHa()) {
Log(LogDebug, "DbConnection", "HA functionality disabled. Won't pause IDO connection: " + GetName());
SetHAMode(HARunEverywhere);
}
}
void DbConnection::Start(void) void DbConnection::Start(void)
{ {
DynamicObject::Start(); DynamicObject::Start();
@ -385,7 +395,6 @@ void DbConnection::PrepareDatabase(void)
//ClearConfigTable("hostgroups"); //ClearConfigTable("hostgroups");
//ClearConfigTable("hosts"); //ClearConfigTable("hosts");
//ClearConfigTable("hoststatus"); //ClearConfigTable("hoststatus");
ClearConfigTable("programstatus");
ClearConfigTable("scheduleddowntime"); ClearConfigTable("scheduleddowntime");
ClearConfigTable("service_contactgroups"); ClearConfigTable("service_contactgroups");
ClearConfigTable("service_contacts"); ClearConfigTable("service_contacts");

View File

@ -64,6 +64,7 @@ public:
bool GetStatusUpdate(const DbObject::Ptr& dbobj) const; bool GetStatusUpdate(const DbObject::Ptr& dbobj) const;
protected: protected:
virtual void OnConfigLoaded(void);
virtual void Start(void); virtual void Start(void);
virtual void Resume(void); virtual void Resume(void);
virtual void Pause(void); virtual void Pause(void);

View File

@ -61,7 +61,8 @@ static void AuthorityTimerHandler(void)
BOOST_FOREACH(const DynamicObject::Ptr& object, type->GetObjects()) { BOOST_FOREACH(const DynamicObject::Ptr& object, type->GetObjects()) {
Endpoint::Ptr endpoint = endpoints[Utility::SDBM(object->GetName()) % endpoints.size()]; Endpoint::Ptr endpoint = endpoints[Utility::SDBM(object->GetName()) % endpoints.size()];
object->SetAuthority(endpoint == my_endpoint); if (object->GetHAMode() == HARunOnce)
object->SetAuthority(endpoint == my_endpoint);
} }
} }
} }