Implement HA for IDO connections.

Refs #6107
This commit is contained in:
Gunnar Beutner 2014-05-09 13:02:30 +02:00
parent 4c022199f4
commit b367187c0b
9 changed files with 49 additions and 18 deletions

View File

@ -60,16 +60,16 @@ Value IdoMysqlConnection::StatsFunc(Dictionary::Ptr& status, Dictionary::Ptr& pe
return 0; return 0;
} }
void IdoMysqlConnection::Start(void) void IdoMysqlConnection::Resume(void)
{ {
DbConnection::Start(); DbConnection::Resume();
m_Connected = false; m_Connected = false;
m_QueryQueue.SetExceptionCallback(boost::bind(&IdoMysqlConnection::ExceptionHandler, this, _1)); m_QueryQueue.SetExceptionCallback(boost::bind(&IdoMysqlConnection::ExceptionHandler, this, _1));
m_TxTimer = make_shared<Timer>(); m_TxTimer = make_shared<Timer>();
m_TxTimer->SetInterval(5); m_TxTimer->SetInterval(1);
m_TxTimer->OnTimerExpired.connect(boost::bind(&IdoMysqlConnection::TxTimerHandler, this)); m_TxTimer->OnTimerExpired.connect(boost::bind(&IdoMysqlConnection::TxTimerHandler, this));
m_TxTimer->Start(); m_TxTimer->Start();
@ -82,8 +82,12 @@ void IdoMysqlConnection::Start(void)
ASSERT(mysql_thread_safe()); ASSERT(mysql_thread_safe());
} }
void IdoMysqlConnection::Stop(void) void IdoMysqlConnection::Pause(void)
{ {
DbConnection::Pause();
m_ReconnectTimer.reset();
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::Disconnect, this)); m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::Disconnect, this));
m_QueryQueue.Join(); m_QueryQueue.Join();
} }

View File

@ -45,8 +45,8 @@ public:
static Value StatsFunc(Dictionary::Ptr& status, Dictionary::Ptr& perfdata); static Value StatsFunc(Dictionary::Ptr& status, Dictionary::Ptr& perfdata);
protected: protected:
virtual void Start(void); virtual void Resume(void);
virtual void Stop(void); virtual void Pause(void);
virtual void ActivateObject(const DbObject::Ptr& dbobj); virtual void ActivateObject(const DbObject::Ptr& dbobj);
virtual void DeactivateObject(const DbObject::Ptr& dbobj); virtual void DeactivateObject(const DbObject::Ptr& dbobj);

View File

@ -62,16 +62,16 @@ Value IdoPgsqlConnection::StatsFunc(Dictionary::Ptr& status, Dictionary::Ptr& pe
return 0; return 0;
} }
void IdoPgsqlConnection::Start(void) void IdoPgsqlConnection::Resume(void)
{ {
DbConnection::Start(); DbConnection::Resume();
m_Connection = NULL; m_Connection = NULL;
m_QueryQueue.SetExceptionCallback(boost::bind(&IdoPgsqlConnection::ExceptionHandler, this, _1)); m_QueryQueue.SetExceptionCallback(boost::bind(&IdoPgsqlConnection::ExceptionHandler, this, _1));
m_TxTimer = make_shared<Timer>(); m_TxTimer = make_shared<Timer>();
m_TxTimer->SetInterval(5); m_TxTimer->SetInterval(1);
m_TxTimer->OnTimerExpired.connect(boost::bind(&IdoPgsqlConnection::TxTimerHandler, this)); m_TxTimer->OnTimerExpired.connect(boost::bind(&IdoPgsqlConnection::TxTimerHandler, this));
m_TxTimer->Start(); m_TxTimer->Start();
@ -84,8 +84,12 @@ void IdoPgsqlConnection::Start(void)
ASSERT(PQisthreadsafe()); ASSERT(PQisthreadsafe());
} }
void IdoPgsqlConnection::Stop(void) void IdoPgsqlConnection::Pause(void)
{ {
DbConnection::Pause();
m_ReconnectTimer.reset();
m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::Disconnect, this)); m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::Disconnect, this));
m_QueryQueue.Join(); m_QueryQueue.Join();
} }

View File

@ -45,8 +45,8 @@ public:
static Value StatsFunc(Dictionary::Ptr& status, Dictionary::Ptr& perfdata); static Value StatsFunc(Dictionary::Ptr& status, Dictionary::Ptr& perfdata);
protected: protected:
virtual void Start(void); virtual void Resume(void);
virtual void Stop(void); virtual void Pause(void);
virtual void ActivateObject(const DbObject::Ptr& dbobj); virtual void ActivateObject(const DbObject::Ptr& dbobj);
virtual void DeactivateObject(const DbObject::Ptr& dbobj); virtual void DeactivateObject(const DbObject::Ptr& dbobj);

View File

@ -140,6 +140,8 @@ void DynamicObject::Activate(void)
} }
OnStarted(GetSelf()); OnStarted(GetSelf());
SetAuthority(true);
} }
void DynamicObject::Stop(void) void DynamicObject::Stop(void)
@ -154,6 +156,8 @@ void DynamicObject::Deactivate(void)
{ {
ASSERT(!OwnsLock()); ASSERT(!OwnsLock());
SetAuthority(false);
{ {
ObjectLock olock(this); ObjectLock olock(this);
@ -200,7 +204,7 @@ void DynamicObject::SetAuthority(bool authority)
OnResumed(GetSelf()); OnResumed(GetSelf());
} else if (!authority && !GetPaused()) { } else if (!authority && !GetPaused()) {
SetPauseCalled(false); SetPauseCalled(false);
Resume(); Pause();
ASSERT(GetPauseCalled()); ASSERT(GetPauseCalled());
SetPaused(true); SetPaused(true);
OnPaused(GetSelf()); OnPaused(GetSelf());

View File

@ -25,7 +25,9 @@ abstract class DynamicObject
[config] Dictionary::Ptr methods; [config] Dictionary::Ptr methods;
[config] Dictionary::Ptr vars (VarsRaw); [config] Dictionary::Ptr vars (VarsRaw);
[get_protected] bool active; [get_protected] bool active;
[get_protected] bool paused; [get_protected] bool paused {
default {{{ return true; }}}
};
[get_protected] bool start_called; [get_protected] bool start_called;
[get_protected] bool stop_called; [get_protected] bool stop_called;
[get_protected] bool pause_called; [get_protected] bool pause_called;

View File

@ -43,6 +43,13 @@ void DbConnection::Start(void)
DynamicObject::Start(); DynamicObject::Start();
DbObject::OnQuery.connect(boost::bind(&DbConnection::ExecuteQuery, this, _1)); DbObject::OnQuery.connect(boost::bind(&DbConnection::ExecuteQuery, this, _1));
}
void DbConnection::Resume(void)
{
DynamicObject::Resume();
Log(LogInformation, "db_ido", "Resuming IDO connection: " + GetName());
m_CleanUpTimer = make_shared<Timer>(); m_CleanUpTimer = make_shared<Timer>();
m_CleanUpTimer->SetInterval(60); m_CleanUpTimer->SetInterval(60);
@ -50,6 +57,17 @@ void DbConnection::Start(void)
m_CleanUpTimer->Start(); m_CleanUpTimer->Start();
} }
void DbConnection::Pause(void)
{
DynamicObject::Pause();
Log(LogInformation, "db_ido", "Pausing IDO connection: " + GetName());
m_CleanUpTimer.reset();
ClearIDCache();
}
void DbConnection::StaticInitialize(void) void DbConnection::StaticInitialize(void)
{ {
m_ProgramStatusTimer = make_shared<Timer>(); m_ProgramStatusTimer = make_shared<Timer>();

View File

@ -65,6 +65,8 @@ public:
protected: protected:
virtual void Start(void); virtual void Start(void);
virtual void Resume(void);
virtual void Pause(void);
virtual void ExecuteQuery(const DbQuery& query) = 0; virtual void ExecuteQuery(const DbQuery& query) = 0;
virtual void ActivateObject(const DbObject::Ptr& dbobj) = 0; virtual void ActivateObject(const DbObject::Ptr& dbobj) = 0;

View File

@ -61,10 +61,7 @@ static void AuthorityTimerHandler(void)
BOOST_FOREACH(const DynamicObject::Ptr& object, type->GetObjects()) { BOOST_FOREACH(const DynamicObject::Ptr& object, type->GetObjects()) {
Endpoint::Ptr endpoint = endpoints[Utility::SDBM(object->GetName()) % endpoints.size()]; Endpoint::Ptr endpoint = endpoints[Utility::SDBM(object->GetName()) % endpoints.size()];
if (endpoint == my_endpoint) object->SetAuthority(endpoint == my_endpoint);
object->Resume();
else
object->Pause();
} }
} }
} }