mirror of https://github.com/Icinga/icinga2.git
Merge pull request #8243 from Icinga/feature/work-queue-log-level
Improve IDO queue stats logging
This commit is contained in:
commit
5ead5b1f67
|
@ -14,9 +14,9 @@ using namespace icinga;
|
||||||
std::atomic<int> WorkQueue::m_NextID(1);
|
std::atomic<int> WorkQueue::m_NextID(1);
|
||||||
boost::thread_specific_ptr<WorkQueue *> l_ThreadWorkQueue;
|
boost::thread_specific_ptr<WorkQueue *> l_ThreadWorkQueue;
|
||||||
|
|
||||||
WorkQueue::WorkQueue(size_t maxItems, int threadCount)
|
WorkQueue::WorkQueue(size_t maxItems, int threadCount, LogSeverity statsLogLevel)
|
||||||
: m_ID(m_NextID++), m_ThreadCount(threadCount), m_MaxItems(maxItems),
|
: m_ID(m_NextID++), m_ThreadCount(threadCount), m_MaxItems(maxItems),
|
||||||
m_TaskStats(15 * 60)
|
m_TaskStats(15 * 60), m_StatsLogLevel(statsLogLevel)
|
||||||
{
|
{
|
||||||
/* Initialize logger. */
|
/* Initialize logger. */
|
||||||
m_StatusTimerTimeout = Utility::GetTime();
|
m_StatusTimerTimeout = Utility::GetTime();
|
||||||
|
@ -216,7 +216,7 @@ void WorkQueue::StatusTimerHandler()
|
||||||
|
|
||||||
/* Log if there are pending items, or 5 minute timeout is reached. */
|
/* Log if there are pending items, or 5 minute timeout is reached. */
|
||||||
if (pending > 0 || m_StatusTimerTimeout < now) {
|
if (pending > 0 || m_StatusTimerTimeout < now) {
|
||||||
Log(LogInformation, "WorkQueue")
|
Log(m_StatsLogLevel, "WorkQueue")
|
||||||
<< "#" << m_ID << " (" << m_Name << ") "
|
<< "#" << m_ID << " (" << m_Name << ") "
|
||||||
<< "items: " << pending << ", "
|
<< "items: " << pending << ", "
|
||||||
<< "rate: " << std::setw(2) << GetTaskCount(60) / 60.0 << "/s "
|
<< "rate: " << std::setw(2) << GetTaskCount(60) / 60.0 << "/s "
|
||||||
|
|
|
@ -6,6 +6,7 @@
|
||||||
#include "base/i2-base.hpp"
|
#include "base/i2-base.hpp"
|
||||||
#include "base/timer.hpp"
|
#include "base/timer.hpp"
|
||||||
#include "base/ringbuffer.hpp"
|
#include "base/ringbuffer.hpp"
|
||||||
|
#include "base/logger.hpp"
|
||||||
#include <boost/thread/thread.hpp>
|
#include <boost/thread/thread.hpp>
|
||||||
#include <boost/thread/mutex.hpp>
|
#include <boost/thread/mutex.hpp>
|
||||||
#include <boost/thread/condition_variable.hpp>
|
#include <boost/thread/condition_variable.hpp>
|
||||||
|
@ -52,7 +53,7 @@ class WorkQueue
|
||||||
public:
|
public:
|
||||||
typedef std::function<void (boost::exception_ptr)> ExceptionCallback;
|
typedef std::function<void (boost::exception_ptr)> ExceptionCallback;
|
||||||
|
|
||||||
WorkQueue(size_t maxItems = 0, int threadCount = 1);
|
WorkQueue(size_t maxItems = 0, int threadCount = 1, LogSeverity statsLogLevel = LogInformation);
|
||||||
~WorkQueue();
|
~WorkQueue();
|
||||||
|
|
||||||
void SetName(const String& name);
|
void SetName(const String& name);
|
||||||
|
@ -129,6 +130,7 @@ private:
|
||||||
std::vector<boost::exception_ptr> m_Exceptions;
|
std::vector<boost::exception_ptr> m_Exceptions;
|
||||||
Timer::Ptr m_StatusTimer;
|
Timer::Ptr m_StatusTimer;
|
||||||
double m_StatusTimerTimeout;
|
double m_StatusTimerTimeout;
|
||||||
|
LogSeverity m_StatsLogLevel;
|
||||||
|
|
||||||
RingBuffer m_TaskStats;
|
RingBuffer m_TaskStats;
|
||||||
size_t m_PendingTasks{0};
|
size_t m_PendingTasks{0};
|
||||||
|
|
|
@ -76,6 +76,13 @@ void DbConnection::Resume()
|
||||||
m_CleanUpTimer->SetInterval(60);
|
m_CleanUpTimer->SetInterval(60);
|
||||||
m_CleanUpTimer->OnTimerExpired.connect(std::bind(&DbConnection::CleanUpHandler, this));
|
m_CleanUpTimer->OnTimerExpired.connect(std::bind(&DbConnection::CleanUpHandler, this));
|
||||||
m_CleanUpTimer->Start();
|
m_CleanUpTimer->Start();
|
||||||
|
|
||||||
|
m_LogStatsTimeout = 0;
|
||||||
|
|
||||||
|
m_LogStatsTimer = new Timer();
|
||||||
|
m_LogStatsTimer->SetInterval(10);
|
||||||
|
m_LogStatsTimer->OnTimerExpired.connect([this](const Timer * const&) { LogStatsHandler(); });
|
||||||
|
m_LogStatsTimer->Start();
|
||||||
}
|
}
|
||||||
|
|
||||||
void DbConnection::Pause()
|
void DbConnection::Pause()
|
||||||
|
@ -236,6 +243,38 @@ void DbConnection::CleanUpHandler()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void DbConnection::LogStatsHandler()
|
||||||
|
{
|
||||||
|
if (!GetConnected())
|
||||||
|
return;
|
||||||
|
|
||||||
|
auto pending = m_PendingQueries.load();
|
||||||
|
|
||||||
|
auto now = Utility::GetTime();
|
||||||
|
bool timeoutReached = m_LogStatsTimeout < now;
|
||||||
|
|
||||||
|
if (pending == 0u && !timeoutReached) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto output = round(m_OutputQueries.CalculateRate(now, 10));
|
||||||
|
|
||||||
|
if (pending < output * 5 && !timeoutReached) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto input = round(m_InputQueries.CalculateRate(now, 10));
|
||||||
|
|
||||||
|
Log(LogInformation, GetReflectionType()->GetName())
|
||||||
|
<< "Pending queries: " << pending << " (Input: " << input
|
||||||
|
<< "/s; Output: " << output << "/s)";
|
||||||
|
|
||||||
|
/* Reschedule next log entry in 5 minutes. */
|
||||||
|
if (timeoutReached) {
|
||||||
|
m_LogStatsTimeout = now + 60 * 5;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void DbConnection::CleanUpExecuteQuery(const String&, const String&, double)
|
void DbConnection::CleanUpExecuteQuery(const String&, const String&, double)
|
||||||
{
|
{
|
||||||
/* Default handler does nothing. */
|
/* Default handler does nothing. */
|
||||||
|
@ -507,3 +546,15 @@ int DbConnection::GetSessionToken()
|
||||||
{
|
{
|
||||||
return Application::GetStartTime();
|
return Application::GetStartTime();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void DbConnection::IncreasePendingQueries(int count)
|
||||||
|
{
|
||||||
|
m_PendingQueries.fetch_add(count);
|
||||||
|
m_InputQueries.InsertValue(Utility::GetTime(), count);
|
||||||
|
}
|
||||||
|
|
||||||
|
void DbConnection::DecreasePendingQueries(int count)
|
||||||
|
{
|
||||||
|
m_PendingQueries.fetch_sub(count);
|
||||||
|
m_OutputQueries.InsertValue(Utility::GetTime(), count);
|
||||||
|
}
|
||||||
|
|
|
@ -92,6 +92,9 @@ protected:
|
||||||
|
|
||||||
static int GetSessionToken();
|
static int GetSessionToken();
|
||||||
|
|
||||||
|
void IncreasePendingQueries(int count);
|
||||||
|
void DecreasePendingQueries(int count);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
bool m_IDCacheValid{false};
|
bool m_IDCacheValid{false};
|
||||||
std::map<std::pair<DbType::Ptr, DbReference>, String> m_ConfigHashes;
|
std::map<std::pair<DbType::Ptr, DbReference>, String> m_ConfigHashes;
|
||||||
|
@ -101,8 +104,12 @@ private:
|
||||||
std::set<DbObject::Ptr> m_ConfigUpdates;
|
std::set<DbObject::Ptr> m_ConfigUpdates;
|
||||||
std::set<DbObject::Ptr> m_StatusUpdates;
|
std::set<DbObject::Ptr> m_StatusUpdates;
|
||||||
Timer::Ptr m_CleanUpTimer;
|
Timer::Ptr m_CleanUpTimer;
|
||||||
|
Timer::Ptr m_LogStatsTimer;
|
||||||
|
|
||||||
|
double m_LogStatsTimeout;
|
||||||
|
|
||||||
void CleanUpHandler();
|
void CleanUpHandler();
|
||||||
|
void LogStatsHandler();
|
||||||
|
|
||||||
static Timer::Ptr m_ProgramStatusTimer;
|
static Timer::Ptr m_ProgramStatusTimer;
|
||||||
static boost::once_flag m_OnceFlag;
|
static boost::once_flag m_OnceFlag;
|
||||||
|
@ -112,6 +119,10 @@ private:
|
||||||
mutable boost::mutex m_StatsMutex;
|
mutable boost::mutex m_StatsMutex;
|
||||||
RingBuffer m_QueryStats{15 * 60};
|
RingBuffer m_QueryStats{15 * 60};
|
||||||
bool m_ActiveChangedHandler{false};
|
bool m_ActiveChangedHandler{false};
|
||||||
|
|
||||||
|
RingBuffer m_InputQueries{10};
|
||||||
|
RingBuffer m_OutputQueries{10};
|
||||||
|
Atomic<uint_fast64_t> m_PendingQueries{0};
|
||||||
};
|
};
|
||||||
|
|
||||||
struct database_error : virtual std::exception, virtual boost::exception { };
|
struct database_error : virtual std::exception, virtual boost::exception { };
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
#include "base/configtype.hpp"
|
#include "base/configtype.hpp"
|
||||||
#include "base/exception.hpp"
|
#include "base/exception.hpp"
|
||||||
#include "base/statsfunction.hpp"
|
#include "base/statsfunction.hpp"
|
||||||
|
#include "base/defer.hpp"
|
||||||
#include <utility>
|
#include <utility>
|
||||||
|
|
||||||
using namespace icinga;
|
using namespace icinga;
|
||||||
|
@ -175,6 +176,8 @@ void IdoMysqlConnection::InternalNewTransaction()
|
||||||
if (!GetConnected())
|
if (!GetConnected())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
IncreasePendingQueries(2);
|
||||||
|
|
||||||
AsyncQuery("COMMIT");
|
AsyncQuery("COMMIT");
|
||||||
AsyncQuery("BEGIN");
|
AsyncQuery("BEGIN");
|
||||||
}
|
}
|
||||||
|
@ -524,12 +527,28 @@ void IdoMysqlConnection::FinishAsyncQueries()
|
||||||
|
|
||||||
std::vector<IdoAsyncQuery>::size_type offset = 0;
|
std::vector<IdoAsyncQuery>::size_type offset = 0;
|
||||||
|
|
||||||
|
// This will be executed if there is a problem with executing the queries,
|
||||||
|
// at which point this function throws an exception and the queries should
|
||||||
|
// not be listed as still pending in the queue.
|
||||||
|
Defer decreaseQueries ([this, &offset, &queries]() {
|
||||||
|
auto lostQueries = queries.size() - offset;
|
||||||
|
|
||||||
|
if (lostQueries > 0) {
|
||||||
|
DecreasePendingQueries(lostQueries);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
while (offset < queries.size()) {
|
while (offset < queries.size()) {
|
||||||
std::ostringstream querybuf;
|
std::ostringstream querybuf;
|
||||||
|
|
||||||
std::vector<IdoAsyncQuery>::size_type count = 0;
|
std::vector<IdoAsyncQuery>::size_type count = 0;
|
||||||
size_t num_bytes = 0;
|
size_t num_bytes = 0;
|
||||||
|
|
||||||
|
Defer decreaseQueries ([this, &offset, &count]() {
|
||||||
|
offset += count;
|
||||||
|
DecreasePendingQueries(count);
|
||||||
|
});
|
||||||
|
|
||||||
for (std::vector<IdoAsyncQuery>::size_type i = offset; i < queries.size(); i++) {
|
for (std::vector<IdoAsyncQuery>::size_type i = offset; i < queries.size(); i++) {
|
||||||
const IdoAsyncQuery& aq = queries[i];
|
const IdoAsyncQuery& aq = queries[i];
|
||||||
|
|
||||||
|
@ -608,8 +627,6 @@ void IdoMysqlConnection::FinishAsyncQueries()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
offset += count;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -617,6 +634,9 @@ IdoMysqlResult IdoMysqlConnection::Query(const String& query)
|
||||||
{
|
{
|
||||||
AssertOnWorkQueue();
|
AssertOnWorkQueue();
|
||||||
|
|
||||||
|
IncreasePendingQueries(1);
|
||||||
|
Defer decreaseQueries ([this]() { DecreasePendingQueries(1); });
|
||||||
|
|
||||||
/* finish all async queries to maintain the right order for queries */
|
/* finish all async queries to maintain the right order for queries */
|
||||||
FinishAsyncQueries();
|
FinishAsyncQueries();
|
||||||
|
|
||||||
|
@ -770,6 +790,7 @@ void IdoMysqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj)
|
||||||
SetObjectID(dbobj, GetLastInsertID());
|
SetObjectID(dbobj, GetLastInsertID());
|
||||||
} else {
|
} else {
|
||||||
qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 1 WHERE object_id = " << static_cast<long>(dbref);
|
qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 1 WHERE object_id = " << static_cast<long>(dbref);
|
||||||
|
IncreasePendingQueries(1);
|
||||||
AsyncQuery(qbuf.str());
|
AsyncQuery(qbuf.str());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -804,6 +825,7 @@ void IdoMysqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj)
|
||||||
|
|
||||||
std::ostringstream qbuf;
|
std::ostringstream qbuf;
|
||||||
qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 0 WHERE object_id = " << static_cast<long>(dbref);
|
qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 0 WHERE object_id = " << static_cast<long>(dbref);
|
||||||
|
IncreasePendingQueries(1);
|
||||||
AsyncQuery(qbuf.str());
|
AsyncQuery(qbuf.str());
|
||||||
|
|
||||||
/* Note that we're _NOT_ clearing the db refs via SetReference/SetConfigUpdate/SetStatusUpdate
|
/* Note that we're _NOT_ clearing the db refs via SetReference/SetConfigUpdate/SetStatusUpdate
|
||||||
|
@ -893,6 +915,7 @@ void IdoMysqlConnection::ExecuteQuery(const DbQuery& query)
|
||||||
<< "Scheduling execute query task, type " << query.Type << ", table '" << query.Table << "'.";
|
<< "Scheduling execute query task, type " << query.Type << ", table '" << query.Table << "'.";
|
||||||
#endif /* I2_DEBUG */
|
#endif /* I2_DEBUG */
|
||||||
|
|
||||||
|
IncreasePendingQueries(1);
|
||||||
m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, -1), query.Priority, true);
|
m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, -1), query.Priority, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -909,6 +932,7 @@ void IdoMysqlConnection::ExecuteMultipleQueries(const std::vector<DbQuery>& quer
|
||||||
<< "Scheduling multiple execute query task, type " << queries[0].Type << ", table '" << queries[0].Table << "'.";
|
<< "Scheduling multiple execute query task, type " << queries[0].Type << ", table '" << queries[0].Table << "'.";
|
||||||
#endif /* I2_DEBUG */
|
#endif /* I2_DEBUG */
|
||||||
|
|
||||||
|
IncreasePendingQueries(queries.size());
|
||||||
m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalExecuteMultipleQueries, this, queries), queries[0].Priority, true);
|
m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalExecuteMultipleQueries, this, queries), queries[0].Priority, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -948,11 +972,16 @@ void IdoMysqlConnection::InternalExecuteMultipleQueries(const std::vector<DbQuer
|
||||||
{
|
{
|
||||||
AssertOnWorkQueue();
|
AssertOnWorkQueue();
|
||||||
|
|
||||||
if (IsPaused())
|
if (IsPaused()) {
|
||||||
|
DecreasePendingQueries(queries.size());
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (!GetConnected())
|
if (!GetConnected()) {
|
||||||
|
DecreasePendingQueries(queries.size());
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
for (const DbQuery& query : queries) {
|
for (const DbQuery& query : queries) {
|
||||||
ASSERT(query.Type == DbQueryNewTransaction || query.Category != DbCatInvalid);
|
ASSERT(query.Type == DbQueryNewTransaction || query.Category != DbCatInvalid);
|
||||||
|
@ -979,23 +1008,32 @@ void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query, int typeOver
|
||||||
{
|
{
|
||||||
AssertOnWorkQueue();
|
AssertOnWorkQueue();
|
||||||
|
|
||||||
if (IsPaused())
|
if (IsPaused()) {
|
||||||
|
DecreasePendingQueries(1);
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (!GetConnected())
|
if (!GetConnected()) {
|
||||||
|
DecreasePendingQueries(1);
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (query.Type == DbQueryNewTransaction) {
|
if (query.Type == DbQueryNewTransaction) {
|
||||||
|
DecreasePendingQueries(1);
|
||||||
InternalNewTransaction();
|
InternalNewTransaction();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* check whether we're allowed to execute the query first */
|
/* check whether we're allowed to execute the query first */
|
||||||
if (GetCategoryFilter() != DbCatEverything && (query.Category & GetCategoryFilter()) == 0)
|
if (GetCategoryFilter() != DbCatEverything && (query.Category & GetCategoryFilter()) == 0) {
|
||||||
|
DecreasePendingQueries(1);
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (query.Object && query.Object->GetObject()->GetExtension("agent_check").ToBool())
|
if (query.Object && query.Object->GetObject()->GetExtension("agent_check").ToBool()) {
|
||||||
|
DecreasePendingQueries(1);
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
/* check if there are missing object/insert ids and re-enqueue the query */
|
/* check if there are missing object/insert ids and re-enqueue the query */
|
||||||
if (!CanExecuteQuery(query)) {
|
if (!CanExecuteQuery(query)) {
|
||||||
|
@ -1066,6 +1104,7 @@ void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query, int typeOver
|
||||||
if ((type & DbQueryInsert) && (type & DbQueryDelete)) {
|
if ((type & DbQueryInsert) && (type & DbQueryDelete)) {
|
||||||
std::ostringstream qdel;
|
std::ostringstream qdel;
|
||||||
qdel << "DELETE FROM " << GetTablePrefix() << query.Table << where.str();
|
qdel << "DELETE FROM " << GetTablePrefix() << query.Table << where.str();
|
||||||
|
IncreasePendingQueries(1);
|
||||||
AsyncQuery(qdel.str());
|
AsyncQuery(qdel.str());
|
||||||
|
|
||||||
type = DbQueryInsert;
|
type = DbQueryInsert;
|
||||||
|
@ -1150,6 +1189,7 @@ void IdoMysqlConnection::FinishExecuteQuery(const DbQuery& query, int type, bool
|
||||||
<< "Rescheduling DELETE/INSERT query: Upsert UPDATE did not affect rows, type " << type << ", table '" << query.Table << "'.";
|
<< "Rescheduling DELETE/INSERT query: Upsert UPDATE did not affect rows, type " << type << ", table '" << query.Table << "'.";
|
||||||
#endif /* I2_DEBUG */
|
#endif /* I2_DEBUG */
|
||||||
|
|
||||||
|
IncreasePendingQueries(1);
|
||||||
m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, DbQueryDelete | DbQueryInsert), query.Priority);
|
m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, DbQueryDelete | DbQueryInsert), query.Priority);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
|
@ -1178,6 +1218,7 @@ void IdoMysqlConnection::CleanUpExecuteQuery(const String& table, const String&
|
||||||
<< time_column << "'. max_age is set to '" << max_age << "'.";
|
<< time_column << "'. max_age is set to '" << max_age << "'.";
|
||||||
#endif /* I2_DEBUG */
|
#endif /* I2_DEBUG */
|
||||||
|
|
||||||
|
IncreasePendingQueries(1);
|
||||||
m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), PriorityLow, true);
|
m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), PriorityLow, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1185,11 +1226,15 @@ void IdoMysqlConnection::InternalCleanUpExecuteQuery(const String& table, const
|
||||||
{
|
{
|
||||||
AssertOnWorkQueue();
|
AssertOnWorkQueue();
|
||||||
|
|
||||||
if (IsPaused())
|
if (IsPaused()) {
|
||||||
|
DecreasePendingQueries(1);
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (!GetConnected())
|
if (!GetConnected()) {
|
||||||
|
DecreasePendingQueries(1);
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
AsyncQuery("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " +
|
AsyncQuery("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " +
|
||||||
Convert::ToString(static_cast<long>(m_InstanceID)) + " AND " + time_column +
|
Convert::ToString(static_cast<long>(m_InstanceID)) + " AND " + time_column +
|
||||||
|
|
|
@ -54,7 +54,7 @@ protected:
|
||||||
private:
|
private:
|
||||||
DbReference m_InstanceID;
|
DbReference m_InstanceID;
|
||||||
|
|
||||||
WorkQueue m_QueryQueue{10000000};
|
WorkQueue m_QueryQueue{10000000, 1, LogNotice};
|
||||||
|
|
||||||
Library m_Library;
|
Library m_Library;
|
||||||
std::unique_ptr<MysqlInterface, MysqlInterfaceDeleter> m_Mysql;
|
std::unique_ptr<MysqlInterface, MysqlInterfaceDeleter> m_Mysql;
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
#include "base/exception.hpp"
|
#include "base/exception.hpp"
|
||||||
#include "base/context.hpp"
|
#include "base/context.hpp"
|
||||||
#include "base/statsfunction.hpp"
|
#include "base/statsfunction.hpp"
|
||||||
|
#include "base/defer.hpp"
|
||||||
#include <utility>
|
#include <utility>
|
||||||
|
|
||||||
using namespace icinga;
|
using namespace icinga;
|
||||||
|
@ -137,6 +138,7 @@ void IdoPgsqlConnection::Disconnect()
|
||||||
if (!GetConnected())
|
if (!GetConnected())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
IncreasePendingQueries(1);
|
||||||
Query("COMMIT");
|
Query("COMMIT");
|
||||||
|
|
||||||
m_Pgsql->finish(m_Connection);
|
m_Pgsql->finish(m_Connection);
|
||||||
|
@ -166,6 +168,7 @@ void IdoPgsqlConnection::InternalNewTransaction()
|
||||||
if (!GetConnected())
|
if (!GetConnected())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
IncreasePendingQueries(2);
|
||||||
Query("COMMIT");
|
Query("COMMIT");
|
||||||
Query("BEGIN");
|
Query("BEGIN");
|
||||||
}
|
}
|
||||||
|
@ -191,6 +194,7 @@ void IdoPgsqlConnection::Reconnect()
|
||||||
if (GetConnected()) {
|
if (GetConnected()) {
|
||||||
/* Check if we're really still connected */
|
/* Check if we're really still connected */
|
||||||
try {
|
try {
|
||||||
|
IncreasePendingQueries(1);
|
||||||
Query("SELECT 1");
|
Query("SELECT 1");
|
||||||
return;
|
return;
|
||||||
} catch (const std::exception&) {
|
} catch (const std::exception&) {
|
||||||
|
@ -260,10 +264,13 @@ void IdoPgsqlConnection::Reconnect()
|
||||||
/* explicitely require legacy mode for string escaping in PostgreSQL >= 9.1
|
/* explicitely require legacy mode for string escaping in PostgreSQL >= 9.1
|
||||||
* changing standard_conforming_strings to on by default
|
* changing standard_conforming_strings to on by default
|
||||||
*/
|
*/
|
||||||
if (m_Pgsql->serverVersion(m_Connection) >= 90100)
|
if (m_Pgsql->serverVersion(m_Connection) >= 90100) {
|
||||||
|
IncreasePendingQueries(1);
|
||||||
result = Query("SET standard_conforming_strings TO off");
|
result = Query("SET standard_conforming_strings TO off");
|
||||||
|
}
|
||||||
|
|
||||||
String dbVersionName = "idoutils";
|
String dbVersionName = "idoutils";
|
||||||
|
IncreasePendingQueries(1);
|
||||||
result = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name=E'" + Escape(dbVersionName) + "'");
|
result = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name=E'" + Escape(dbVersionName) + "'");
|
||||||
|
|
||||||
Dictionary::Ptr row = FetchRow(result, 0);
|
Dictionary::Ptr row = FetchRow(result, 0);
|
||||||
|
@ -295,10 +302,12 @@ void IdoPgsqlConnection::Reconnect()
|
||||||
|
|
||||||
String instanceName = GetInstanceName();
|
String instanceName = GetInstanceName();
|
||||||
|
|
||||||
|
IncreasePendingQueries(1);
|
||||||
result = Query("SELECT instance_id FROM " + GetTablePrefix() + "instances WHERE instance_name = E'" + Escape(instanceName) + "'");
|
result = Query("SELECT instance_id FROM " + GetTablePrefix() + "instances WHERE instance_name = E'" + Escape(instanceName) + "'");
|
||||||
row = FetchRow(result, 0);
|
row = FetchRow(result, 0);
|
||||||
|
|
||||||
if (!row) {
|
if (!row) {
|
||||||
|
IncreasePendingQueries(1);
|
||||||
Query("INSERT INTO " + GetTablePrefix() + "instances (instance_name, instance_description) VALUES (E'" + Escape(instanceName) + "', E'" + Escape(GetInstanceDescription()) + "')");
|
Query("INSERT INTO " + GetTablePrefix() + "instances (instance_name, instance_description) VALUES (E'" + Escape(instanceName) + "', E'" + Escape(GetInstanceDescription()) + "')");
|
||||||
m_InstanceID = GetSequenceValue(GetTablePrefix() + "instances", "instance_id");
|
m_InstanceID = GetSequenceValue(GetTablePrefix() + "instances", "instance_id");
|
||||||
} else {
|
} else {
|
||||||
|
@ -310,6 +319,7 @@ void IdoPgsqlConnection::Reconnect()
|
||||||
/* we have an endpoint in a cluster setup, so decide if we can proceed here */
|
/* we have an endpoint in a cluster setup, so decide if we can proceed here */
|
||||||
if (my_endpoint && GetHAMode() == HARunOnce) {
|
if (my_endpoint && GetHAMode() == HARunOnce) {
|
||||||
/* get the current endpoint writing to programstatus table */
|
/* get the current endpoint writing to programstatus table */
|
||||||
|
IncreasePendingQueries(1);
|
||||||
result = Query("SELECT UNIX_TIMESTAMP(status_update_time) AS status_update_time, endpoint_name FROM " +
|
result = Query("SELECT UNIX_TIMESTAMP(status_update_time) AS status_update_time, endpoint_name FROM " +
|
||||||
GetTablePrefix() + "programstatus WHERE instance_id = " + Convert::ToString(m_InstanceID));
|
GetTablePrefix() + "programstatus WHERE instance_id = " + Convert::ToString(m_InstanceID));
|
||||||
row = FetchRow(result, 0);
|
row = FetchRow(result, 0);
|
||||||
|
@ -372,12 +382,14 @@ void IdoPgsqlConnection::Reconnect()
|
||||||
<< "PGSQL IDO instance id: " << static_cast<long>(m_InstanceID) << " (schema version: '" + version + "')"
|
<< "PGSQL IDO instance id: " << static_cast<long>(m_InstanceID) << " (schema version: '" + version + "')"
|
||||||
<< (!sslMode.IsEmpty() ? ", sslmode='" + sslMode + "'" : "");
|
<< (!sslMode.IsEmpty() ? ", sslmode='" + sslMode + "'" : "");
|
||||||
|
|
||||||
|
IncreasePendingQueries(1);
|
||||||
Query("BEGIN");
|
Query("BEGIN");
|
||||||
|
|
||||||
/* update programstatus table */
|
/* update programstatus table */
|
||||||
UpdateProgramStatus();
|
UpdateProgramStatus();
|
||||||
|
|
||||||
/* record connection */
|
/* record connection */
|
||||||
|
IncreasePendingQueries(1);
|
||||||
Query("INSERT INTO " + GetTablePrefix() + "conninfo " +
|
Query("INSERT INTO " + GetTablePrefix() + "conninfo " +
|
||||||
"(instance_id, connect_time, last_checkin_time, agent_name, agent_version, connect_type, data_start_time) VALUES ("
|
"(instance_id, connect_time, last_checkin_time, agent_name, agent_version, connect_type, data_start_time) VALUES ("
|
||||||
+ Convert::ToString(static_cast<long>(m_InstanceID)) + ", NOW(), NOW(), E'icinga2 db_ido_pgsql', E'" + Escape(Application::GetAppVersion())
|
+ Convert::ToString(static_cast<long>(m_InstanceID)) + ", NOW(), NOW(), E'icinga2 db_ido_pgsql', E'" + Escape(Application::GetAppVersion())
|
||||||
|
@ -388,6 +400,7 @@ void IdoPgsqlConnection::Reconnect()
|
||||||
|
|
||||||
std::ostringstream q1buf;
|
std::ostringstream q1buf;
|
||||||
q1buf << "SELECT object_id, objecttype_id, name1, name2, is_active FROM " + GetTablePrefix() + "objects WHERE instance_id = " << static_cast<long>(m_InstanceID);
|
q1buf << "SELECT object_id, objecttype_id, name1, name2, is_active FROM " + GetTablePrefix() + "objects WHERE instance_id = " << static_cast<long>(m_InstanceID);
|
||||||
|
IncreasePendingQueries(1);
|
||||||
result = Query(q1buf.str());
|
result = Query(q1buf.str());
|
||||||
|
|
||||||
std::vector<DbObject::Ptr> activeDbObjs;
|
std::vector<DbObject::Ptr> activeDbObjs;
|
||||||
|
@ -442,6 +455,7 @@ void IdoPgsqlConnection::FinishConnect(double startTime)
|
||||||
<< "Finished reconnecting to '" << GetName() << "' database '" << GetDatabase() << "' in "
|
<< "Finished reconnecting to '" << GetName() << "' database '" << GetDatabase() << "' in "
|
||||||
<< std::setw(2) << Utility::GetTime() - startTime << " second(s).";
|
<< std::setw(2) << Utility::GetTime() - startTime << " second(s).";
|
||||||
|
|
||||||
|
IncreasePendingQueries(2);
|
||||||
Query("COMMIT");
|
Query("COMMIT");
|
||||||
Query("BEGIN");
|
Query("BEGIN");
|
||||||
}
|
}
|
||||||
|
@ -455,6 +469,7 @@ void IdoPgsqlConnection::ClearTablesBySession()
|
||||||
|
|
||||||
void IdoPgsqlConnection::ClearTableBySession(const String& table)
|
void IdoPgsqlConnection::ClearTableBySession(const String& table)
|
||||||
{
|
{
|
||||||
|
IncreasePendingQueries(1);
|
||||||
Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " +
|
Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " +
|
||||||
Convert::ToString(static_cast<long>(m_InstanceID)) + " AND session_token <> " +
|
Convert::ToString(static_cast<long>(m_InstanceID)) + " AND session_token <> " +
|
||||||
Convert::ToString(GetSessionToken()));
|
Convert::ToString(GetSessionToken()));
|
||||||
|
@ -464,6 +479,8 @@ IdoPgsqlResult IdoPgsqlConnection::Query(const String& query)
|
||||||
{
|
{
|
||||||
AssertOnWorkQueue();
|
AssertOnWorkQueue();
|
||||||
|
|
||||||
|
Defer decreaseQueries ([this]() { DecreasePendingQueries(1); });
|
||||||
|
|
||||||
Log(LogDebug, "IdoPgsqlConnection")
|
Log(LogDebug, "IdoPgsqlConnection")
|
||||||
<< "Query: " << query;
|
<< "Query: " << query;
|
||||||
|
|
||||||
|
@ -512,6 +529,7 @@ DbReference IdoPgsqlConnection::GetSequenceValue(const String& table, const Stri
|
||||||
{
|
{
|
||||||
AssertOnWorkQueue();
|
AssertOnWorkQueue();
|
||||||
|
|
||||||
|
IncreasePendingQueries(1);
|
||||||
IdoPgsqlResult result = Query("SELECT CURRVAL(pg_get_serial_sequence(E'" + Escape(table) + "', E'" + Escape(column) + "')) AS id");
|
IdoPgsqlResult result = Query("SELECT CURRVAL(pg_get_serial_sequence(E'" + Escape(table) + "', E'" + Escape(column) + "')) AS id");
|
||||||
|
|
||||||
Dictionary::Ptr row = FetchRow(result, 0);
|
Dictionary::Ptr row = FetchRow(result, 0);
|
||||||
|
@ -601,10 +619,12 @@ void IdoPgsqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj)
|
||||||
<< "E'" << Escape(dbobj->GetName1()) << "', 1)";
|
<< "E'" << Escape(dbobj->GetName1()) << "', 1)";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
IncreasePendingQueries(1);
|
||||||
Query(qbuf.str());
|
Query(qbuf.str());
|
||||||
SetObjectID(dbobj, GetSequenceValue(GetTablePrefix() + "objects", "object_id"));
|
SetObjectID(dbobj, GetSequenceValue(GetTablePrefix() + "objects", "object_id"));
|
||||||
} else {
|
} else {
|
||||||
qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 1 WHERE object_id = " << static_cast<long>(dbref);
|
qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 1 WHERE object_id = " << static_cast<long>(dbref);
|
||||||
|
IncreasePendingQueries(1);
|
||||||
Query(qbuf.str());
|
Query(qbuf.str());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -631,6 +651,7 @@ void IdoPgsqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj)
|
||||||
|
|
||||||
std::ostringstream qbuf;
|
std::ostringstream qbuf;
|
||||||
qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 0 WHERE object_id = " << static_cast<long>(dbref);
|
qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 0 WHERE object_id = " << static_cast<long>(dbref);
|
||||||
|
IncreasePendingQueries(1);
|
||||||
Query(qbuf.str());
|
Query(qbuf.str());
|
||||||
|
|
||||||
/* Note that we're _NOT_ clearing the db refs via SetReference/SetConfigUpdate/SetStatusUpdate
|
/* Note that we're _NOT_ clearing the db refs via SetReference/SetConfigUpdate/SetStatusUpdate
|
||||||
|
@ -715,6 +736,7 @@ void IdoPgsqlConnection::ExecuteQuery(const DbQuery& query)
|
||||||
|
|
||||||
ASSERT(query.Category != DbCatInvalid);
|
ASSERT(query.Category != DbCatInvalid);
|
||||||
|
|
||||||
|
IncreasePendingQueries(1);
|
||||||
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, -1), query.Priority, true);
|
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, -1), query.Priority, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -726,6 +748,7 @@ void IdoPgsqlConnection::ExecuteMultipleQueries(const std::vector<DbQuery>& quer
|
||||||
if (queries.empty())
|
if (queries.empty())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
IncreasePendingQueries(queries.size());
|
||||||
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalExecuteMultipleQueries, this, queries), queries[0].Priority, true);
|
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalExecuteMultipleQueries, this, queries), queries[0].Priority, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -765,11 +788,15 @@ void IdoPgsqlConnection::InternalExecuteMultipleQueries(const std::vector<DbQuer
|
||||||
{
|
{
|
||||||
AssertOnWorkQueue();
|
AssertOnWorkQueue();
|
||||||
|
|
||||||
if (IsPaused())
|
if (IsPaused()) {
|
||||||
|
DecreasePendingQueries(queries.size());
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (!GetConnected())
|
if (!GetConnected()) {
|
||||||
|
DecreasePendingQueries(queries.size());
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
for (const DbQuery& query : queries) {
|
for (const DbQuery& query : queries) {
|
||||||
ASSERT(query.Type == DbQueryNewTransaction || query.Category != DbCatInvalid);
|
ASSERT(query.Type == DbQueryNewTransaction || query.Category != DbCatInvalid);
|
||||||
|
@ -789,23 +816,32 @@ void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query, int typeOver
|
||||||
{
|
{
|
||||||
AssertOnWorkQueue();
|
AssertOnWorkQueue();
|
||||||
|
|
||||||
if (IsPaused())
|
if (IsPaused()) {
|
||||||
|
DecreasePendingQueries(1);
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (!GetConnected())
|
if (!GetConnected()) {
|
||||||
|
DecreasePendingQueries(1);
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (query.Type == DbQueryNewTransaction) {
|
if (query.Type == DbQueryNewTransaction) {
|
||||||
|
DecreasePendingQueries(1);
|
||||||
InternalNewTransaction();
|
InternalNewTransaction();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* check whether we're allowed to execute the query first */
|
/* check whether we're allowed to execute the query first */
|
||||||
if (GetCategoryFilter() != DbCatEverything && (query.Category & GetCategoryFilter()) == 0)
|
if (GetCategoryFilter() != DbCatEverything && (query.Category & GetCategoryFilter()) == 0) {
|
||||||
|
DecreasePendingQueries(1);
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (query.Object && query.Object->GetObject()->GetExtension("agent_check").ToBool())
|
if (query.Object && query.Object->GetObject()->GetExtension("agent_check").ToBool()) {
|
||||||
|
DecreasePendingQueries(1);
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
/* check if there are missing object/insert ids and re-enqueue the query */
|
/* check if there are missing object/insert ids and re-enqueue the query */
|
||||||
if (!CanExecuteQuery(query)) {
|
if (!CanExecuteQuery(query)) {
|
||||||
|
@ -862,6 +898,7 @@ void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query, int typeOver
|
||||||
if ((type & DbQueryInsert) && (type & DbQueryDelete)) {
|
if ((type & DbQueryInsert) && (type & DbQueryDelete)) {
|
||||||
std::ostringstream qdel;
|
std::ostringstream qdel;
|
||||||
qdel << "DELETE FROM " << GetTablePrefix() << query.Table << where.str();
|
qdel << "DELETE FROM " << GetTablePrefix() << query.Table << where.str();
|
||||||
|
IncreasePendingQueries(1);
|
||||||
Query(qdel.str());
|
Query(qdel.str());
|
||||||
|
|
||||||
type = DbQueryInsert;
|
type = DbQueryInsert;
|
||||||
|
@ -929,6 +966,7 @@ void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query, int typeOver
|
||||||
Query(qbuf.str());
|
Query(qbuf.str());
|
||||||
|
|
||||||
if (upsert && GetAffectedRows() == 0) {
|
if (upsert && GetAffectedRows() == 0) {
|
||||||
|
IncreasePendingQueries(1);
|
||||||
InternalExecuteQuery(query, DbQueryDelete | DbQueryInsert);
|
InternalExecuteQuery(query, DbQueryDelete | DbQueryInsert);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
|
@ -959,6 +997,7 @@ void IdoPgsqlConnection::CleanUpExecuteQuery(const String& table, const String&
|
||||||
if (IsPaused())
|
if (IsPaused())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
IncreasePendingQueries(1);
|
||||||
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), PriorityLow, true);
|
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), PriorityLow, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -966,8 +1005,10 @@ void IdoPgsqlConnection::InternalCleanUpExecuteQuery(const String& table, const
|
||||||
{
|
{
|
||||||
AssertOnWorkQueue();
|
AssertOnWorkQueue();
|
||||||
|
|
||||||
if (!GetConnected())
|
if (!GetConnected()) {
|
||||||
|
DecreasePendingQueries(1);
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " +
|
Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " +
|
||||||
Convert::ToString(static_cast<long>(m_InstanceID)) + " AND " + time_column +
|
Convert::ToString(static_cast<long>(m_InstanceID)) + " AND " + time_column +
|
||||||
|
@ -977,6 +1018,7 @@ void IdoPgsqlConnection::InternalCleanUpExecuteQuery(const String& table, const
|
||||||
void IdoPgsqlConnection::FillIDCache(const DbType::Ptr& type)
|
void IdoPgsqlConnection::FillIDCache(const DbType::Ptr& type)
|
||||||
{
|
{
|
||||||
String query = "SELECT " + type->GetIDColumn() + " AS object_id, " + type->GetTable() + "_id, config_hash FROM " + GetTablePrefix() + type->GetTable() + "s";
|
String query = "SELECT " + type->GetIDColumn() + " AS object_id, " + type->GetTable() + "_id, config_hash FROM " + GetTablePrefix() + type->GetTable() + "s";
|
||||||
|
IncreasePendingQueries(1);
|
||||||
IdoPgsqlResult result = Query(query);
|
IdoPgsqlResult result = Query(query);
|
||||||
|
|
||||||
Dictionary::Ptr row;
|
Dictionary::Ptr row;
|
||||||
|
|
|
@ -48,7 +48,7 @@ protected:
|
||||||
private:
|
private:
|
||||||
DbReference m_InstanceID;
|
DbReference m_InstanceID;
|
||||||
|
|
||||||
WorkQueue m_QueryQueue{1000000};
|
WorkQueue m_QueryQueue{1000000, 1, LogNotice};
|
||||||
|
|
||||||
Library m_Library;
|
Library m_Library;
|
||||||
std::unique_ptr<PgsqlInterface, PgsqlInterfaceDeleter> m_Pgsql;
|
std::unique_ptr<PgsqlInterface, PgsqlInterfaceDeleter> m_Pgsql;
|
||||||
|
|
Loading…
Reference in New Issue