Improved Event::Post performance.

This commit is contained in:
Gunnar Beutner 2012-07-13 23:33:30 +02:00
parent a73f41fb6c
commit b176963c93
10 changed files with 47 additions and 66 deletions

View File

@ -99,14 +99,7 @@ void Application::RunEventLoop(void)
if (m_ShuttingDown) if (m_ShuttingDown)
break; break;
vector<Event::Ptr> events; Event::ProcessEvents(boost::get_system_time() + boost::posix_time::seconds(sleep));
Event::Wait(&events, boost::get_system_time() + boost::posix_time::seconds(sleep));
for (vector<Event::Ptr>::iterator it = events.begin(); it != events.end(); it++) {
Event::Ptr ev = *it;
ev->OnEventDelivered();
}
} }
} }

View File

@ -53,18 +53,10 @@ protected:
void Finish(void) void Finish(void)
{ {
Event::Ptr ev = boost::make_shared<Event>(); Event::Post(boost::bind(boost::cref(OnTaskCompleted), static_cast<shared_ptr<T> >(GetSelf())));
ev->OnEventDelivered.connect(boost::bind(&T::FinishForwarder, static_cast<shared_ptr<T> >(GetSelf())));
Event::Post(ev);
} }
bool m_Finished; bool m_Finished;
private:
static void FinishForwarder(const shared_ptr<T>& task)
{
task->OnTaskCompleted(task);
}
}; };
} }

View File

@ -21,33 +21,43 @@
using namespace icinga; using namespace icinga;
deque<Event::Ptr> Event::m_Events; vector<Event> Event::m_Events;
condition_variable Event::m_EventAvailable; condition_variable Event::m_EventAvailable;
mutex Event::m_Mutex; mutex Event::m_Mutex;
bool Event::Wait(vector<Event::Ptr> *events, const system_time& wait_until) Event::Event(const function<void ()>& callback)
: m_Callback(callback)
{ }
void Event::ProcessEvents(const system_time& wait_until)
{ {
mutex::scoped_lock lock(m_Mutex); vector<Event> events;
while (m_Events.empty()) { {
if (!m_EventAvailable.timed_wait(lock, wait_until)) mutex::scoped_lock lock(m_Mutex);
return false;
while (m_Events.empty()) {
if (!m_EventAvailable.timed_wait(lock, wait_until))
return;
}
events.swap(m_Events);
} }
vector<Event::Ptr> result;
std::copy(m_Events.begin(), m_Events.end(), back_inserter(*events));
m_Events.clear();
return true; vector<Event>::iterator it;
for (it = events.begin(); it != events.end(); it++)
it->m_Callback();
} }
void Event::Post(const Event::Ptr& ev) void Event::Post(const function<void ()>& callback)
{ {
if (Application::IsMainThread()) { if (Application::IsMainThread()) {
ev->OnEventDelivered(); callback();
return; return;
} }
Event ev(callback);
{ {
mutex::scoped_lock lock(m_Mutex); mutex::scoped_lock lock(m_Mutex);
m_Events.push_back(ev); m_Events.push_back(ev);

View File

@ -23,19 +23,18 @@
namespace icinga namespace icinga
{ {
class I2_BASE_API Event : public Object class I2_BASE_API Event
{ {
public: public:
typedef shared_ptr<Event> Ptr; static void ProcessEvents(const system_time& wait_until);
typedef weak_ptr<Event> WeakPtr; static void Post(const function<void ()>& callback);
static bool Wait(vector<Event::Ptr> *events, const system_time& wait_until);
static void Post(const Event::Ptr& ev);
boost::signal<void ()> OnEventDelivered;
private: private:
static deque<Event::Ptr> m_Events; Event(const function<void ()>& callback);
function<void ()> m_Callback;
static vector<Event> m_Events;
static condition_variable m_EventAvailable; static condition_variable m_EventAvailable;
static mutex m_Mutex; static mutex m_Mutex;
}; };

View File

@ -49,9 +49,7 @@ void Logger::Write(LogSeverity severity, const string& facility,
entry.Facility = facility; entry.Facility = facility;
entry.Message = message; entry.Message = message;
Event::Ptr ev = boost::make_shared<Event>(); Event::Post(boost::bind(&Logger::ForwardLogEntry, entry));
ev->OnEventDelivered.connect(boost::bind(&Logger::ForwardLogEntry, entry));
Event::Post(ev);
} }
/** /**

View File

@ -111,11 +111,8 @@ void Socket::CloseInternal(bool from_dtor)
/* nobody can possibly have a valid event subscription when the /* nobody can possibly have a valid event subscription when the
destructor has been called */ destructor has been called */
if (!from_dtor) { if (!from_dtor)
Event::Ptr ev = boost::make_shared<Event>(); Event::Post(boost::bind(boost::ref(OnClosed), GetSelf()));
ev->OnEventDelivered.connect(boost::bind(boost::ref(OnClosed), GetSelf()));
Event::Post(ev);
}
} }
/** /**
@ -159,9 +156,7 @@ int Socket::GetLastSocketError(void)
void Socket::HandleSocketError(const exception& ex) void Socket::HandleSocketError(const exception& ex)
{ {
if (!OnError.empty()) { if (!OnError.empty()) {
Event::Ptr ev = boost::make_shared<Event>(); Event::Post(boost::bind(boost::ref(OnError), GetSelf(), runtime_error(ex.what())));
ev->OnEventDelivered.connect(boost::bind(boost::ref(OnError), GetSelf(), runtime_error(ex.what())));
Event::Post(ev);
CloseInternal(false); CloseInternal(false);
} else { } else {

View File

@ -159,9 +159,7 @@ void TcpClient::HandleReadable(void)
m_RecvQueue->Write(NULL, rc); m_RecvQueue->Write(NULL, rc);
} }
Event::Ptr ev = boost::make_shared<Event>(); Event::Post(boost::bind(boost::ref(OnDataAvailable), GetSelf()));
ev->OnEventDelivered.connect(boost::bind(boost::ref(OnDataAvailable), GetSelf()));
Event::Post(ev);
} }
/** /**

View File

@ -91,7 +91,5 @@ void TcpServer::HandleReadable(void)
TcpClient::Ptr client = m_ClientFactory(fd); TcpClient::Ptr client = m_ClientFactory(fd);
Event::Ptr ev = boost::make_shared<Event>(); Event::Post(boost::bind(boost::ref(OnNewClient), GetSelf(), client));
ev->OnEventDelivered.connect(boost::bind(boost::ref(OnNewClient), GetSelf(), client));
Event::Post(ev);
} }

View File

@ -142,9 +142,7 @@ void TlsClient::HandleReadable(void)
} }
post_event: post_event:
Event::Ptr ev = boost::make_shared<Event>(); Event::Post(boost::bind(boost::ref(OnDataAvailable), GetSelf()));
ev->OnEventDelivered.connect(boost::bind(boost::ref(OnDataAvailable), GetSelf()));
Event::Post(ev);
} }
/** /**
@ -257,11 +255,8 @@ int TlsClient::ValidateCertificateInternal(int ok, X509_STORE_CTX *x509Context)
shared_ptr<X509> x509Certificate = shared_ptr<X509>(x509Context->cert, &TlsClient::NullCertificateDeleter); shared_ptr<X509> x509Certificate = shared_ptr<X509>(x509Context->cert, &TlsClient::NullCertificateDeleter);
bool valid = ValidateCertificate((ok != 0), x509Context, x509Certificate); bool valid = ValidateCertificate((ok != 0), x509Context, x509Certificate);
if (valid) { if (valid)
Event::Ptr ev = boost::make_shared<Event>(); Event::Post(boost::bind(boost::ref(OnCertificateValidated), GetSelf()));
ev->OnEventDelivered.connect(boost::bind(boost::ref(OnCertificateValidated), GetSelf()));
Event::Post(ev);
}
return valid ? 1 : 0; return valid ? 1 : 0;
} }

View File

@ -46,6 +46,10 @@ void NagiosCheckTask::Run(void)
void NagiosCheckTask::ProcessFinishedHandler(void) void NagiosCheckTask::ProcessFinishedHandler(void)
{ {
time_t now;
time(&now);
GetResult().SetExecutionEnd(now);
string output = m_Process->GetOutput(); string output = m_Process->GetOutput();
boost::algorithm::trim(output); boost::algorithm::trim(output);
ProcessCheckOutput(output); ProcessCheckOutput(output);
@ -71,9 +75,8 @@ void NagiosCheckTask::ProcessFinishedHandler(void)
GetResult().SetState(state); GetResult().SetState(state);
time_t now;
time(&now); time(&now);
GetResult().SetExecutionEnd(now); GetResult().SetScheduleEnd(now);
Finish(); Finish();
} }