Implement support for work queues.

This commit is contained in:
Gunnar Beutner 2013-09-18 09:09:16 +02:00
parent 9d0fee73a3
commit e7da4057f9
6 changed files with 468 additions and 297 deletions

View File

@ -238,34 +238,47 @@ void ClusterComponent::AddConnection(const String& node, const String& service)
} }
void ClusterComponent::RelayMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message, bool persistent) void ClusterComponent::RelayMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message, bool persistent)
{
m_RelayQueue.Enqueue(boost::bind(&ClusterComponent::RealRelayMessage, this, source, message, persistent));
}
void ClusterComponent::PersistMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message)
{
double ts = message->Get("ts");
ASSERT(ts != 0);
Dictionary::Ptr pmessage = boost::make_shared<Dictionary>();
pmessage->Set("timestamp", ts);
if (source)
pmessage->Set("source", source->GetName());
pmessage->Set("message", Value(message).Serialize());
pmessage->Set("security", message->Get("security"));
ObjectLock olock(this);
if (m_LogFile) {
String json = Value(pmessage).Serialize();
NetString::WriteStringToStream(m_LogFile, json);
m_LogMessageCount++;
m_LogMessageTimestamp = ts;
if (m_LogMessageCount > 50000) {
CloseLogFile();
RotateLogFile();
OpenLogFile();
}
}
}
void ClusterComponent::RealRelayMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message, bool persistent)
{ {
double ts = Utility::GetTime(); double ts = Utility::GetTime();
message->Set("ts", ts); message->Set("ts", ts);
if (persistent) { if (persistent)
Dictionary::Ptr pmessage = boost::make_shared<Dictionary>(); m_LogQueue.Enqueue(boost::bind(&ClusterComponent::PersistMessage, this, source, message));
pmessage->Set("timestamp", ts);
if (source)
pmessage->Set("source", source->GetName());
pmessage->Set("message", Value(message).Serialize());
pmessage->Set("security", message->Get("security"));
ObjectLock olock(this);
if (m_LogFile) {
String json = Value(pmessage).Serialize();
NetString::WriteStringToStream(m_LogFile, json);
m_LogMessageCount++;
m_LogMessageTimestamp = ts;
if (m_LogMessageCount > 50000) {
CloseLogFile();
RotateLogFile();
OpenLogFile();
}
}
}
Dictionary::Ptr security = message->Get("security"); Dictionary::Ptr security = message->Get("security");
DynamicObject::Ptr secobj; DynamicObject::Ptr secobj;
@ -971,6 +984,11 @@ void ClusterComponent::AcknowledgementClearedHandler(const Service::Ptr& service
} }
void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message) void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message)
{
m_MessageQueue.Enqueue(boost::bind(&ClusterComponent::RealMessageHandler, this, sender, message));
}
void ClusterComponent::RealMessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message)
{ {
sender->SetSeen(Utility::GetTime()); sender->SetSeen(Utility::GetTime());

View File

@ -28,6 +28,7 @@
#include "base/utility.h" #include "base/utility.h"
#include "base/tlsutility.h" #include "base/tlsutility.h"
#include "base/stdiostream.h" #include "base/stdiostream.h"
#include "base/workqueue.h"
#include "icinga/service.h" #include "icinga/service.h"
#include "cluster/endpoint.h" #include "cluster/endpoint.h"
@ -70,6 +71,10 @@ private:
shared_ptr<SSL_CTX> m_SSLContext; shared_ptr<SSL_CTX> m_SSLContext;
String m_Identity; String m_Identity;
WorkQueue m_RelayQueue;
WorkQueue m_MessageQueue;
WorkQueue m_LogQueue;
Timer::Ptr m_ClusterTimer; Timer::Ptr m_ClusterTimer;
void ClusterTimerHandler(void); void ClusterTimerHandler(void);
@ -84,6 +89,7 @@ private:
void ListenerThreadProc(const Socket::Ptr& server); void ListenerThreadProc(const Socket::Ptr& server);
void RelayMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message, bool persistent); void RelayMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message, bool persistent);
void RealRelayMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message, bool persistent);
void OpenLogFile(void); void OpenLogFile(void);
void RotateLogFile(void); void RotateLogFile(void);
@ -110,7 +116,9 @@ private:
void DowntimeRemovedHandler(const Service::Ptr& service, const Dictionary::Ptr& downtime, const String& authority); void DowntimeRemovedHandler(const Service::Ptr& service, const Dictionary::Ptr& downtime, const String& authority);
void AcknowledgementSetHandler(const Service::Ptr& service, const String& author, const String& comment, AcknowledgementType type, double expiry, const String& authority); void AcknowledgementSetHandler(const Service::Ptr& service, const String& author, const String& comment, AcknowledgementType type, double expiry, const String& authority);
void AcknowledgementClearedHandler(const Service::Ptr& service, const String& authority); void AcknowledgementClearedHandler(const Service::Ptr& service, const String& authority);
void MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message); void MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message);
void RealMessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message);
bool IsAuthority(const DynamicObject::Ptr& object, const String& type); bool IsAuthority(const DynamicObject::Ptr& object, const String& type);
void UpdateAuthority(void); void UpdateAuthority(void);
@ -119,6 +127,8 @@ private:
static bool SupportsNotifications(void); static bool SupportsNotifications(void);
void SetSecurityInfo(const Dictionary::Ptr& message, const DynamicObject::Ptr& object, int privs); void SetSecurityInfo(const Dictionary::Ptr& message, const DynamicObject::Ptr& object, int privs);
void PersistMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message);
}; };
} }

View File

@ -95,6 +95,8 @@ libbase_la_SOURCES = \
value.cpp \ value.cpp \
value.h \ value.h \
win32.h \ win32.h \
workqueue.cpp \
workqueue.h \
zlibstream.cpp \ zlibstream.cpp \
zlibstream.h zlibstream.h

80
lib/base/workqueue.cpp Normal file
View File

@ -0,0 +1,80 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "base/workqueue.h"
#include "base/utility.h"
#include <boost/bind.hpp>
using namespace icinga;
WorkQueue::~WorkQueue(void)
{
Join();
}
/**
* Enqueues a work item. Work items are guaranteed to be executed in the order
* they were enqueued in.
*/
void WorkQueue::Enqueue(const WorkCallback& item)
{
boost::mutex::scoped_lock lock(m_Mutex);
m_Items.push_back(item);
m_CV.notify_all();
if (!m_Executing)
Utility::QueueAsyncCallback(boost::bind(&WorkQueue::ExecuteItem, this));
}
void WorkQueue::Join(void)
{
boost::mutex::scoped_lock lock(m_Mutex);
while (m_Executing || !m_Items.empty())
m_CV.wait(lock);
}
void WorkQueue::Clear(void)
{
boost::mutex::scoped_lock lock(m_Mutex);
m_Items.clear();
m_CV.notify_all();
}
void WorkQueue::ExecuteItem(void)
{
boost::mutex::scoped_lock lock(m_Mutex);
m_Executing = true;
while (!m_Items.empty()) {
try {
WorkCallback wi = m_Items.front();
m_Items.pop_front();
lock.unlock();
wi();
lock.lock();
} catch (...) {
lock.lock();
m_Executing = false;
throw;
}
}
m_Executing = false;
}

59
lib/base/workqueue.h Normal file
View File

@ -0,0 +1,59 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef WORKQUEUE_H
#define WORKQUEUE_H
#include "base/i2-base.h"
#include <deque>
#include <boost/function.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
namespace icinga
{
/**
* A workqueue.
*
* @ingroup base
*/
class I2_BASE_API WorkQueue
{
public:
typedef boost::function<void (void)> WorkCallback;
~WorkQueue(void);
void Enqueue(const WorkCallback& item);
void Join(void);
void Clear(void);
private:
boost::mutex m_Mutex;
boost::condition_variable m_CV;
bool m_Executing;
std::deque<WorkCallback> m_Items;
void ExecuteItem(void);
};
}
#endif /* WORKQUEUE_H */

File diff suppressed because it is too large Load Diff