Implement work queue support for db_ido_mysql.

Refs #4758
This commit is contained in:
Gunnar Beutner 2013-10-30 13:32:01 +01:00
parent 4ce0f1e7bb
commit ce06aa3b56
2 changed files with 53 additions and 0 deletions

View File

@ -52,12 +52,26 @@ void IdoMysqlConnection::Start(void)
m_ReconnectTimer->SetInterval(10);
m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&IdoMysqlConnection::ReconnectTimerHandler, this));
m_ReconnectTimer->Start();
m_ReconnectTimer->Reschedule(0);
ASSERT(mysql_thread_safe());
}
void IdoMysqlConnection::Stop(void)
{
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::Disconnect, this));
m_QueryQueue.Join();
}
void IdoMysqlConnection::AssertOnWorkQueue(void)
{
VERIFY(boost::this_thread::get_id() == m_QueryQueue.GetThreadId());
}
void IdoMysqlConnection::Disconnect(void)
{
AssertOnWorkQueue();
boost::mutex::scoped_lock lock(m_ConnectionMutex);
if (!m_Connected)
@ -70,6 +84,11 @@ void IdoMysqlConnection::Stop(void)
}
void IdoMysqlConnection::TxTimerHandler(void)
{
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::NewTransaction, this));
}
void IdoMysqlConnection::NewTransaction(void)
{
boost::mutex::scoped_lock lock(m_ConnectionMutex);
@ -82,6 +101,13 @@ void IdoMysqlConnection::TxTimerHandler(void)
void IdoMysqlConnection::ReconnectTimerHandler(void)
{
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::Reconnect, this));
}
void IdoMysqlConnection::Reconnect(void)
{
AssertOnWorkQueue();
{
boost::mutex::scoped_lock lock(m_ConnectionMutex);
@ -219,6 +245,8 @@ void IdoMysqlConnection::ClearConfigTable(const String& table)
Array::Ptr IdoMysqlConnection::Query(const String& query)
{
AssertOnWorkQueue();
Log(LogDebug, "db_ido_mysql", "Query: " + query);
if (mysql_query(&m_Connection, query.CStr()) != 0)
@ -251,11 +279,15 @@ Array::Ptr IdoMysqlConnection::Query(const String& query)
DbReference IdoMysqlConnection::GetLastInsertID(void)
{
AssertOnWorkQueue();
return DbReference(mysql_insert_id(&m_Connection));
}
String IdoMysqlConnection::Escape(const String& s)
{
AssertOnWorkQueue();
ssize_t length = s.GetLength();
char *to = new char[s.GetLength() * 2 + 1];
@ -270,6 +302,8 @@ String IdoMysqlConnection::Escape(const String& s)
Dictionary::Ptr IdoMysqlConnection::FetchRow(MYSQL_RES *result)
{
AssertOnWorkQueue();
MYSQL_ROW row;
MYSQL_FIELD *field;
unsigned long *lengths, i;
@ -402,6 +436,11 @@ bool IdoMysqlConnection::FieldToEscapedString(const String& key, const Value& va
}
void IdoMysqlConnection::ExecuteQuery(const DbQuery& query)
{
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query));
}
void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query)
{
boost::mutex::scoped_lock lock(m_ConnectionMutex);
@ -532,6 +571,11 @@ void IdoMysqlConnection::ExecuteQuery(const DbQuery& query)
}
void IdoMysqlConnection::CleanUpExecuteQuery(const String& table, const String& time_key, double time_value)
{
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalCleanUpExecuteQuery, this, table, time_key, time_value));
}
void IdoMysqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_key, double time_value)
{
boost::mutex::scoped_lock lock(m_ConnectionMutex);

View File

@ -71,9 +71,18 @@ private:
bool FieldToEscapedString(const String& key, const Value& value, Value *result);
void InternalActivateObject(const DbObject::Ptr& dbobj);
void Disconnect(void);
void NewTransaction(void);
void Reconnect(void);
void AssertOnWorkQueue(void);
void TxTimerHandler(void);
void ReconnectTimerHandler(void);
void InternalExecuteQuery(const DbQuery& query);
void InternalCleanUpExecuteQuery(const String& table, const String& time_key, double time_value);
void ClearConfigTables(void);
void ClearConfigTable(const String& table);
};