Make sure IDO reconnect attempts don't recurse.

Refs #5235
This commit is contained in:
Gunnar Beutner 2013-11-28 10:36:43 +01:00
parent b67cf1f0cb
commit 6d53bd1c8f
4 changed files with 32 additions and 16 deletions

View File

@ -99,7 +99,7 @@ void IdoMysqlConnection::Disconnect(void)
void IdoMysqlConnection::TxTimerHandler(void)
{
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::NewTransaction, this));
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::NewTransaction, this), true);
}
void IdoMysqlConnection::NewTransaction(void)
@ -465,7 +465,7 @@ void IdoMysqlConnection::ExecuteQuery(const DbQuery& query)
{
ASSERT(query.Category != DbCatInvalid);
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query));
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query), true);
}
void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query)
@ -595,7 +595,7 @@ void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query)
void IdoMysqlConnection::CleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
{
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age));
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), true);
}
void IdoMysqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age)

View File

@ -99,7 +99,7 @@ void IdoPgsqlConnection::Disconnect(void)
void IdoPgsqlConnection::TxTimerHandler(void)
{
m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::NewTransaction, this));
m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::NewTransaction, this), true);
}
void IdoPgsqlConnection::NewTransaction(void)
@ -481,7 +481,7 @@ void IdoPgsqlConnection::ExecuteQuery(const DbQuery& query)
{
ASSERT(query.Category != DbCatInvalid);
m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query));
m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query), true);
}
void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query)
@ -616,7 +616,7 @@ void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query)
void IdoPgsqlConnection::CleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
{
m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age));
m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), true);
}
void IdoPgsqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age)

View File

@ -45,7 +45,7 @@ WorkQueue::~WorkQueue(void)
* Enqueues a work item. Work items are guaranteed to be executed in the order
* they were enqueued in.
*/
void WorkQueue::Enqueue(const WorkCallback& item)
void WorkQueue::Enqueue(const WorkCallback& callback, bool allowInterleaved)
{
boost::mutex::scoped_lock lock(m_Mutex);
@ -58,10 +58,14 @@ void WorkQueue::Enqueue(const WorkCallback& item)
m_CV.wait(lock);
}
WorkItem item;
item.Callback = callback;
item.AllowInterleaved = allowInterleaved;
m_Items.push_back(item);
if (wq_thread)
ProcessItems(lock);
ProcessItems(lock, true);
else
m_CV.notify_all();
}
@ -93,16 +97,20 @@ void WorkQueue::DefaultExceptionCallback(boost::exception_ptr exp)
throw;
}
void WorkQueue::ProcessItems(boost::mutex::scoped_lock& lock)
void WorkQueue::ProcessItems(boost::mutex::scoped_lock& lock, bool interleaved)
{
while (!m_Items.empty()) {
try {
WorkCallback wi = m_Items.front();
WorkItem wi = m_Items.front();
if (interleaved && !wi.AllowInterleaved)
return;
m_Items.pop_front();
m_CV.notify_all();
lock.unlock();
wi();
wi.Callback();
} catch (const std::exception& ex) {
lock.lock();
@ -132,7 +140,7 @@ void WorkQueue::WorkerThreadProc(void)
if (m_Joined)
break;
ProcessItems(lock);
ProcessItems(lock, false);
}
m_Stopped = true;

View File

@ -31,6 +31,15 @@
namespace icinga
{
typedef boost::function<void (void)> WorkCallback;
struct WorkItem
{
WorkCallback Callback;
bool AllowInterleaved;
};
/**
* A workqueue.
*
@ -39,13 +48,12 @@ namespace icinga
class I2_BASE_API WorkQueue
{
public:
typedef boost::function<void (void)> WorkCallback;
typedef boost::function<void (boost::exception_ptr)> ExceptionCallback;
WorkQueue(size_t maxItems = 25000);
~WorkQueue(void);
void Enqueue(const WorkCallback& item);
void Enqueue(const WorkCallback& callback, bool allowInterleaved = false);
void Join(void);
boost::thread::id GetThreadId(void) const;
@ -62,10 +70,10 @@ private:
size_t m_MaxItems;
bool m_Joined;
bool m_Stopped;
std::deque<WorkCallback> m_Items;
std::deque<WorkItem> m_Items;
ExceptionCallback m_ExceptionCallback;
void ProcessItems(boost::mutex::scoped_lock& lock);
void ProcessItems(boost::mutex::scoped_lock& lock, bool interleaved);
void WorkerThreadProc(void);
static void DefaultExceptionCallback(boost::exception_ptr exp);