icinga2/lib/base/workqueue.hpp
Michael Friedrich 149f640fd8 Improve DB IDO HA failover behaviour
- Decrease Object Authority updates to 10s (was 30s)
- Decrease failover timeout to 30s (was 60s)
- Decrease cold startup (after (re)start) with no OA updates to 30s (was 60s)
- Immediately connect on Resume()
- Fix query priority which got broken with #6970
- Add more logging when a failover is in progress

```
[2019-03-29 16:13:53 +0100] information/IdoMysqlConnection: Last update by endpoint 'master1' was 8.33246s ago (< failover timeout of 30s). Retrying.

[2019-03-29 16:14:23 +0100] information/IdoMysqlConnection: Last update by endpoint 'master1' was 38.3288s ago. Taking over 'ido-mysql' in HA zone 'master'.
```

- Add more logging for reconnect and disconnect handling
- Add 'last_failover' attribute to IDO*Connection objects

refs #6970
2019-04-01 08:50:00 +02:00

146 lines
3.3 KiB
C++

/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
#ifndef WORKQUEUE_H
#define WORKQUEUE_H
#include "base/i2-base.hpp"
#include "base/timer.hpp"
#include "base/ringbuffer.hpp"
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/exception_ptr.hpp>
#include <queue>
#include <deque>
#include <atomic>
namespace icinga
{
enum WorkQueuePriority
{
PriorityLow = 0,
PriorityNormal = 1,
PriorityHigh = 2,
PriorityImmediate = 4
};
using TaskFunction = std::function<void ()>;
struct Task
{
Task() = default;
Task(TaskFunction function, WorkQueuePriority priority, int id)
: Function(std::move(function)), Priority(priority), ID(id)
{ }
TaskFunction Function;
WorkQueuePriority Priority{PriorityNormal};
int ID{-1};
};
bool operator<(const Task& a, const Task& b);
/**
* A workqueue.
*
* @ingroup base
*/
class WorkQueue
{
public:
typedef std::function<void (boost::exception_ptr)> ExceptionCallback;
WorkQueue(size_t maxItems = 0, int threadCount = 1);
~WorkQueue();
void SetName(const String& name);
String GetName() const;
boost::mutex::scoped_lock AcquireLock();
void EnqueueUnlocked(boost::mutex::scoped_lock& lock, TaskFunction&& function, WorkQueuePriority priority = PriorityNormal);
void Enqueue(TaskFunction&& function, WorkQueuePriority priority = PriorityNormal,
bool allowInterleaved = false);
void Join(bool stop = false);
template<typename VectorType, typename FuncType>
void ParallelFor(const VectorType& items, const FuncType& func)
{
using SizeType = decltype(items.size());
SizeType totalCount = items.size();
auto lock = AcquireLock();
SizeType offset = 0;
for (int i = 0; i < m_ThreadCount; i++) {
SizeType count = totalCount / static_cast<SizeType>(m_ThreadCount);
if (static_cast<SizeType>(i) < totalCount % static_cast<SizeType>(m_ThreadCount))
count++;
EnqueueUnlocked(lock, [&items, func, offset, count, this]() {
for (SizeType j = offset; j < offset + count; j++) {
RunTaskFunction([&func, &items, j]() {
func(items[j]);
});
}
});
offset += count;
}
ASSERT(offset == items.size());
}
bool IsWorkerThread() const;
size_t GetLength() const;
size_t GetTaskCount(RingBuffer::SizeType span);
void SetExceptionCallback(const ExceptionCallback& callback);
bool HasExceptions() const;
std::vector<boost::exception_ptr> GetExceptions() const;
void ReportExceptions(const String& facility) const;
protected:
void IncreaseTaskCount();
private:
int m_ID;
String m_Name;
static std::atomic<int> m_NextID;
int m_ThreadCount;
bool m_Spawned{false};
mutable boost::mutex m_Mutex;
boost::condition_variable m_CVEmpty;
boost::condition_variable m_CVFull;
boost::condition_variable m_CVStarved;
boost::thread_group m_Threads;
size_t m_MaxItems;
bool m_Stopped{false};
int m_Processing{0};
std::priority_queue<Task, std::deque<Task> > m_Tasks;
int m_NextTaskID{0};
ExceptionCallback m_ExceptionCallback;
std::vector<boost::exception_ptr> m_Exceptions;
Timer::Ptr m_StatusTimer;
double m_StatusTimerTimeout;
RingBuffer m_TaskStats;
size_t m_PendingTasks{0};
double m_PendingTasksTimestamp{0};
void WorkerThreadProc();
void StatusTimerHandler();
void RunTaskFunction(const TaskFunction& func);
};
}
#endif /* WORKQUEUE_H */