mirror of https://github.com/Icinga/icinga2.git
commit
94b377ff27
|
@ -52,12 +52,26 @@ void IdoMysqlConnection::Start(void)
|
|||
m_ReconnectTimer->SetInterval(10);
|
||||
m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&IdoMysqlConnection::ReconnectTimerHandler, this));
|
||||
m_ReconnectTimer->Start();
|
||||
m_ReconnectTimer->Reschedule(0);
|
||||
|
||||
ASSERT(mysql_thread_safe());
|
||||
}
|
||||
|
||||
void IdoMysqlConnection::Stop(void)
|
||||
{
|
||||
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::Disconnect, this));
|
||||
m_QueryQueue.Join();
|
||||
}
|
||||
|
||||
void IdoMysqlConnection::AssertOnWorkQueue(void)
|
||||
{
|
||||
VERIFY(boost::this_thread::get_id() == m_QueryQueue.GetThreadId());
|
||||
}
|
||||
|
||||
void IdoMysqlConnection::Disconnect(void)
|
||||
{
|
||||
AssertOnWorkQueue();
|
||||
|
||||
boost::mutex::scoped_lock lock(m_ConnectionMutex);
|
||||
|
||||
if (!m_Connected)
|
||||
|
@ -70,6 +84,11 @@ void IdoMysqlConnection::Stop(void)
|
|||
}
|
||||
|
||||
void IdoMysqlConnection::TxTimerHandler(void)
|
||||
{
|
||||
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::NewTransaction, this));
|
||||
}
|
||||
|
||||
void IdoMysqlConnection::NewTransaction(void)
|
||||
{
|
||||
boost::mutex::scoped_lock lock(m_ConnectionMutex);
|
||||
|
||||
|
@ -82,6 +101,13 @@ void IdoMysqlConnection::TxTimerHandler(void)
|
|||
|
||||
void IdoMysqlConnection::ReconnectTimerHandler(void)
|
||||
{
|
||||
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::Reconnect, this));
|
||||
}
|
||||
|
||||
void IdoMysqlConnection::Reconnect(void)
|
||||
{
|
||||
AssertOnWorkQueue();
|
||||
|
||||
{
|
||||
boost::mutex::scoped_lock lock(m_ConnectionMutex);
|
||||
|
||||
|
@ -219,6 +245,8 @@ void IdoMysqlConnection::ClearConfigTable(const String& table)
|
|||
|
||||
Array::Ptr IdoMysqlConnection::Query(const String& query)
|
||||
{
|
||||
AssertOnWorkQueue();
|
||||
|
||||
Log(LogDebug, "db_ido_mysql", "Query: " + query);
|
||||
|
||||
if (mysql_query(&m_Connection, query.CStr()) != 0)
|
||||
|
@ -251,11 +279,15 @@ Array::Ptr IdoMysqlConnection::Query(const String& query)
|
|||
|
||||
DbReference IdoMysqlConnection::GetLastInsertID(void)
|
||||
{
|
||||
AssertOnWorkQueue();
|
||||
|
||||
return DbReference(mysql_insert_id(&m_Connection));
|
||||
}
|
||||
|
||||
String IdoMysqlConnection::Escape(const String& s)
|
||||
{
|
||||
AssertOnWorkQueue();
|
||||
|
||||
ssize_t length = s.GetLength();
|
||||
char *to = new char[s.GetLength() * 2 + 1];
|
||||
|
||||
|
@ -270,6 +302,8 @@ String IdoMysqlConnection::Escape(const String& s)
|
|||
|
||||
Dictionary::Ptr IdoMysqlConnection::FetchRow(MYSQL_RES *result)
|
||||
{
|
||||
AssertOnWorkQueue();
|
||||
|
||||
MYSQL_ROW row;
|
||||
MYSQL_FIELD *field;
|
||||
unsigned long *lengths, i;
|
||||
|
@ -402,6 +436,11 @@ bool IdoMysqlConnection::FieldToEscapedString(const String& key, const Value& va
|
|||
}
|
||||
|
||||
void IdoMysqlConnection::ExecuteQuery(const DbQuery& query)
|
||||
{
|
||||
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query));
|
||||
}
|
||||
|
||||
void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query)
|
||||
{
|
||||
boost::mutex::scoped_lock lock(m_ConnectionMutex);
|
||||
|
||||
|
@ -532,6 +571,11 @@ void IdoMysqlConnection::ExecuteQuery(const DbQuery& query)
|
|||
}
|
||||
|
||||
void IdoMysqlConnection::CleanUpExecuteQuery(const String& table, const String& time_key, double time_value)
|
||||
{
|
||||
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalCleanUpExecuteQuery, this, table, time_key, time_value));
|
||||
}
|
||||
|
||||
void IdoMysqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_key, double time_value)
|
||||
{
|
||||
boost::mutex::scoped_lock lock(m_ConnectionMutex);
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#include "db_ido_mysql/idomysqlconnection.th"
|
||||
#include "base/array.h"
|
||||
#include "base/timer.h"
|
||||
#include "base/workqueue.h"
|
||||
#include <mysql/mysql.h>
|
||||
|
||||
namespace icinga
|
||||
|
@ -53,6 +54,8 @@ private:
|
|||
DbReference m_InstanceID;
|
||||
DbReference m_LastNotificationID;
|
||||
|
||||
WorkQueue m_QueryQueue;
|
||||
|
||||
boost::mutex m_ConnectionMutex;
|
||||
bool m_Connected;
|
||||
MYSQL m_Connection;
|
||||
|
@ -68,9 +71,18 @@ private:
|
|||
bool FieldToEscapedString(const String& key, const Value& value, Value *result);
|
||||
void InternalActivateObject(const DbObject::Ptr& dbobj);
|
||||
|
||||
void Disconnect(void);
|
||||
void NewTransaction(void);
|
||||
void Reconnect(void);
|
||||
|
||||
void AssertOnWorkQueue(void);
|
||||
|
||||
void TxTimerHandler(void);
|
||||
void ReconnectTimerHandler(void);
|
||||
|
||||
void InternalExecuteQuery(const DbQuery& query);
|
||||
void InternalCleanUpExecuteQuery(const String& table, const String& time_key, double time_value);
|
||||
|
||||
void ClearConfigTables(void);
|
||||
void ClearConfigTable(const String& table);
|
||||
};
|
||||
|
|
|
@ -68,6 +68,11 @@ void WorkQueue::Join(void)
|
|||
m_CV.wait(lock);
|
||||
}
|
||||
|
||||
boost::thread::id WorkQueue::GetThreadId(void) const
|
||||
{
|
||||
return m_Thread.get_id();
|
||||
}
|
||||
|
||||
void WorkQueue::WorkerThreadProc(void)
|
||||
{
|
||||
boost::mutex::scoped_lock lock(m_Mutex);
|
||||
|
|
|
@ -46,6 +46,8 @@ public:
|
|||
void Enqueue(const WorkCallback& item);
|
||||
void Join(void);
|
||||
|
||||
boost::thread::id GetThreadId(void) const;
|
||||
|
||||
private:
|
||||
int m_ID;
|
||||
static int m_NextID;
|
||||
|
|
Loading…
Reference in New Issue