Implement per-thread event queues.

This commit is contained in:
Gunnar Beutner 2013-02-15 06:47:26 +01:00
parent 2faca52744
commit 023d17c675
13 changed files with 88 additions and 104 deletions

View File

@ -154,7 +154,7 @@ void CompatComponent::CommandPipeThread(const String& commandPath)
line[strlen(line) - 1] = '\0'; line[strlen(line) - 1] = '\0';
String command = line; String command = line;
Event::Post(boost::bind(&CompatComponent::ProcessCommand, this, command)); Application::GetEQ().Post(boost::bind(&CompatComponent::ProcessCommand, this, command));
} }
fclose(fp); fclose(fp);

View File

@ -20,8 +20,8 @@ libbase_la_SOURCES = \
dynamicobject.h \ dynamicobject.h \
dynamictype.cpp \ dynamictype.cpp \
dynamictype.h \ dynamictype.h \
event.cpp \ eventqueue.cpp \
event.h \ eventqueue.h \
exception.cpp \ exception.cpp \
exception.h \ exception.h \
fifo.cpp \ fifo.cpp \

View File

@ -32,6 +32,7 @@ String Application::m_PkgLibDir;
String Application::m_PkgDataDir; String Application::m_PkgDataDir;
int Application::m_ArgC; int Application::m_ArgC;
char **Application::m_ArgV; char **Application::m_ArgV;
EventQueue Application::m_EQ;
/** /**
* Constructor for the Application class. * Constructor for the Application class.
@ -123,7 +124,7 @@ bool Application::ProcessEvents(void)
if (m_ShuttingDown) if (m_ShuttingDown)
return false; return false;
Event::ProcessEvents(boost::posix_time::milliseconds(sleep * 1000)); GetEQ().ProcessEvents(boost::posix_time::milliseconds(sleep * 1000));
DynamicObject::FlushTx(); DynamicObject::FlushTx();
@ -138,6 +139,8 @@ void Application::RunEventLoop(void) const
{ {
boost::mutex::scoped_lock lock(m_Mutex); boost::mutex::scoped_lock lock(m_Mutex);
GetEQ().SetOwner(boost::this_thread::get_id());
#ifdef _DEBUG #ifdef _DEBUG
double nextProfile = 0; double nextProfile = 0;
#endif /* _DEBUG */ #endif /* _DEBUG */
@ -189,7 +192,7 @@ void Application::TimeWatchThreadProc(void)
* causes the event loop to wake up thereby * causes the event loop to wake up thereby
* solving the problem that timed_wait() * solving the problem that timed_wait()
* uses an absolute timestamp for the timeout */ * uses an absolute timestamp for the timeout */
Event::Post(boost::bind(&Timer::AdjustTimers, GetEQ().Post(boost::bind(&Timer::AdjustTimers,
-timeDiff)); -timeDiff));
} }
@ -600,3 +603,13 @@ boost::mutex& Application::GetMutex(void)
{ {
return m_Mutex; return m_Mutex;
} }
/**
* Returns the main thread's event queue.
*
* @returns The event queue.
*/
EventQueue& Application::GetEQ(void)
{
return m_EQ;
}

View File

@ -86,6 +86,8 @@ public:
static boost::mutex& GetMutex(void); static boost::mutex& GetMutex(void);
static EventQueue& GetEQ(void);
protected: protected:
void RunEventLoop(void) const; void RunEventLoop(void) const;
@ -104,6 +106,7 @@ private:
static String m_LocalStateDir; /**< The local state dir. */ static String m_LocalStateDir; /**< The local state dir. */
static String m_PkgLibDir; /**< The package lib dir. */ static String m_PkgLibDir; /**< The package lib dir. */
static String m_PkgDataDir; /**< The package data dir. */ static String m_PkgDataDir; /**< The package data dir. */
static EventQueue m_EQ; /**< The main thread's event queue. */
#ifndef _WIN32 #ifndef _WIN32
static void SigIntHandler(int signum); static void SigIntHandler(int signum);

View File

@ -21,53 +21,53 @@
using namespace icinga; using namespace icinga;
vector<Event> Event::m_Events; EventQueue::EventQueue(void)
condition_variable Event::m_EventAvailable; : m_Stopped(false)
boost::mutex Event::m_Mutex;
/**
* Constructor for the Event class
*
* @param callback The callback function for the new event object.
*/
Event::Event(const Event::Callback& callback)
: m_Callback(callback)
{ } { }
boost::thread::id EventQueue::GetOwner(void) const
{
return m_Owner;
}
void EventQueue::SetOwner(boost::thread::id owner)
{
m_Owner = owner;
}
void EventQueue::Stop(void)
{
boost::mutex::scoped_lock lock(m_Mutex);
m_Stopped = true;
m_EventAvailable.notify_all();
}
/** /**
* Waits for events using the specified timeout value and processes * Waits for events using the specified timeout value and processes
* them. * them.
* *
* @param timeout The wait timeout. * @param timeout The wait timeout.
* @returns false if the queue has been stopped, true otherwise.
*/ */
void Event::ProcessEvents(millisec timeout) bool EventQueue::ProcessEvents(millisec timeout)
{ {
vector<Event> events; vector<Callback> events;
assert(Application::IsMainThread());
Application::GetMutex().unlock();
{ {
boost::mutex::scoped_lock lock(m_Mutex); boost::mutex::scoped_lock lock(m_Mutex);
while (m_Events.empty()) { while (m_Events.empty() && !m_Stopped) {
if (!m_EventAvailable.timed_wait(lock, timeout)) { if (!m_EventAvailable.timed_wait(lock, timeout))
Application::GetMutex().lock(); return !m_Stopped;
return;
}
} }
events.swap(m_Events); events.swap(m_Events);
} }
Application::GetMutex().lock(); BOOST_FOREACH(const Callback& ev, events) {
BOOST_FOREACH(const Event& ev, events) {
double st = Utility::GetTime(); double st = Utility::GetTime();
ev.m_Callback(); ev();
double et = Utility::GetTime(); double et = Utility::GetTime();
@ -77,6 +77,8 @@ void Event::ProcessEvents(millisec timeout)
Logger::Write(LogWarning, "base", msgbuf.str()); Logger::Write(LogWarning, "base", msgbuf.str());
} }
} }
return !m_Stopped;
} }
/** /**
@ -85,18 +87,16 @@ void Event::ProcessEvents(millisec timeout)
* *
* @param callback The callback function for the event. * @param callback The callback function for the event.
*/ */
void Event::Post(const Event::Callback& callback) void EventQueue::Post(const EventQueue::Callback& callback)
{ {
if (Application::IsMainThread()) { if (boost::this_thread::get_id() == m_Owner) {
callback(); callback();
return; return;
} }
Event ev(callback);
{ {
boost::mutex::scoped_lock lock(m_Mutex); boost::mutex::scoped_lock lock(m_Mutex);
m_Events.push_back(ev); m_Events.push_back(callback);
m_EventAvailable.notify_all(); m_EventAvailable.notify_all();
} }
} }

View File

@ -17,35 +17,41 @@
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/ ******************************************************************************/
#ifndef EVENT_H #ifndef EVENTQUEUE_H
#define EVENT_H #define EVENTQUEUE_H
namespace icinga namespace icinga
{ {
/** /**
* A thread-safe event that can be posted to the main thread's event queue. * An event queue.
* *
* @ingroup base * @ingroup base
*/ */
class I2_BASE_API Event class I2_BASE_API EventQueue
{ {
public: public:
typedef function<void ()> Callback; typedef function<void ()> Callback;
static void ProcessEvents(millisec timeout); EventQueue(void);
static void Post(const Callback& callback);
bool ProcessEvents(millisec timeout = boost::posix_time::milliseconds(30000));
void Post(const Callback& callback);
void Stop(void);
boost::thread::id GetOwner(void) const;
void SetOwner(boost::thread::id owner);
private: private:
Event(const Callback& callback); boost::thread::id m_Owner;
Callback m_Callback; boost::mutex m_Mutex;
bool m_Stopped;
static vector<Event> m_Events; vector<Callback> m_Events;
static condition_variable m_EventAvailable; condition_variable m_EventAvailable;
static boost::mutex m_Mutex;
}; };
} }
#endif /* EVENT_H */ #endif /* EVENTQUEUE_H */

View File

@ -180,7 +180,7 @@ namespace tuples = boost::tuples;
#include "utility.h" #include "utility.h"
#include "object.h" #include "object.h"
#include "exception.h" #include "exception.h"
#include "event.h" #include "eventqueue.h"
#include "value.h" #include "value.h"
#include "convert.h" #include "convert.h"
#include "dictionary.h" #include "dictionary.h"

View File

@ -81,7 +81,7 @@ void Logger::Write(LogSeverity severity, const String& facility,
entry.Facility = facility; entry.Facility = facility;
entry.Message = message; entry.Message = message;
Event::Post(boost::bind(&Logger::ForwardLogEntry, entry)); Application::GetEQ().Post(boost::bind(&Logger::ForwardLogEntry, entry));
} }
/** /**

View File

@ -133,7 +133,7 @@ void Process::WorkerThreadProc(int taskFd)
if (fd >= 0) if (fd >= 0)
tasks[fd] = task; tasks[fd] = task;
} catch (...) { } catch (...) {
Event::Post(boost::bind(&Process::FinishException, task, boost::current_exception())); Application::GetEQ().Post(boost::bind(&Process::FinishException, task, boost::current_exception()));
} }
} }
@ -148,7 +148,7 @@ void Process::WorkerThreadProc(int taskFd)
prev = it; prev = it;
tasks.erase(prev); tasks.erase(prev);
Event::Post(boost::bind(&Process::FinishResult, task, task->m_Result)); Application::GetEQ().Post(boost::bind(&Process::FinishResult, task, task->m_Result));
} }
} }
} }

View File

@ -42,15 +42,7 @@ void ScriptInterpreter::Stop(void)
{ {
assert(Application::IsMainThread()); assert(Application::IsMainThread());
{ m_EQ.Stop();
boost::mutex::scoped_lock lock(m_Mutex);
if (m_Shutdown)
return;
m_Shutdown = true;
m_CallAvailable.notify_all();
}
BOOST_FOREACH(const String& function, m_SubscribedFunctions) { BOOST_FOREACH(const String& function, m_SubscribedFunctions) {
ScriptFunction::Unregister(function); ScriptFunction::Unregister(function);
@ -61,35 +53,17 @@ void ScriptInterpreter::Stop(void)
void ScriptInterpreter::ThreadWorkerProc(void) void ScriptInterpreter::ThreadWorkerProc(void)
{ {
boost::mutex::scoped_lock lock(m_Mutex); m_EQ.SetOwner(boost::this_thread::get_id());
for (;;) { while (m_EQ.ProcessEvents())
while (m_Calls.empty() && !m_Shutdown) ; /* empty loop */
m_CallAvailable.wait(lock);
if (m_Shutdown)
break;
ScriptCall call = m_Calls.front();
m_Calls.pop_front();
ProcessCall(call.Task, call.Function, call.Arguments);
}
} }
void ScriptInterpreter::ScriptFunctionThunk(const ScriptTask::Ptr& task, void ScriptInterpreter::ScriptFunctionThunk(const ScriptTask::Ptr& task,
const String& function, const vector<Value>& arguments) const String& function, const vector<Value>& arguments)
{ {
ScriptCall call; m_EQ.Post(boost::bind(&ScriptInterpreter::ProcessCall, this,
call.Task = task; task, function, arguments));
call.Function = function;
call.Arguments = arguments;
{
boost::mutex::scoped_lock lock(m_Mutex);
m_Calls.push_back(call);
m_CallAvailable.notify_all();
}
} }
void ScriptInterpreter::SubscribeFunction(const String& name) void ScriptInterpreter::SubscribeFunction(const String& name)

View File

@ -23,13 +23,6 @@
namespace icinga namespace icinga
{ {
struct ScriptCall
{
ScriptTask::Ptr Task;
String Function;
vector<Value> Arguments;
};
/** /**
* A script interpreter. * A script interpreter.
* *
@ -56,13 +49,8 @@ protected:
void UnsubscribeFunction(const String& name); void UnsubscribeFunction(const String& name);
private: private:
boost::mutex m_Mutex; EventQueue m_EQ;
bool m_Shutdown; set<String> m_SubscribedFunctions;
deque<ScriptCall> m_Calls;
condition_variable m_CallAvailable;
set<String> m_SubscribedFunctions; /* Not protected by the mutex. */
boost::thread m_Thread; boost::thread m_Thread;
void ThreadWorkerProc(void); void ThreadWorkerProc(void);

View File

@ -532,7 +532,7 @@ void Socket::HandleReadableClient(void)
} }
if (new_data) if (new_data)
Event::Post(boost::bind(boost::ref(OnDataAvailable), GetSelf())); Application::GetEQ().Post(boost::bind(boost::ref(OnDataAvailable), GetSelf()));
} }
void Socket::HandleWritableServer(void) void Socket::HandleWritableServer(void)
@ -557,7 +557,7 @@ void Socket::HandleReadableServer(void)
TcpSocket::Ptr client = boost::make_shared<TcpSocket>(); TcpSocket::Ptr client = boost::make_shared<TcpSocket>();
client->SetFD(fd); client->SetFD(fd);
Event::Post(boost::bind(boost::ref(OnNewClient), GetSelf(), client)); Application::GetEQ().Post(boost::bind(boost::ref(OnNewClient), GetSelf(), client));
} }
/** /**

View File

@ -40,9 +40,9 @@ void Stream::SetConnected(bool connected)
m_Connected = connected; m_Connected = connected;
if (m_Connected) if (m_Connected)
Event::Post(boost::bind(boost::ref(OnConnected), GetSelf())); Application::GetEQ().Post(boost::bind(boost::ref(OnConnected), GetSelf()));
else else
Event::Post(boost::bind(boost::ref(OnClosed), GetSelf())); Application::GetEQ().Post(boost::bind(boost::ref(OnClosed), GetSelf()));
} }
/** /**