From feb4b97df75caa31f291336629339f2aab42f0e2 Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Mon, 18 Apr 2016 14:25:52 +0200 Subject: [PATCH] Make the socket event engine configurable fixes #11522 --- doc/18-language-reference.md | 1 + lib/base/CMakeLists.txt | 2 +- lib/base/socketevents-epoll.cpp | 209 ++++++++++++++++++ lib/base/socketevents-poll.cpp | 199 +++++++++++++++++ lib/base/socketevents.cpp | 380 ++++++-------------------------- lib/base/socketevents.hpp | 91 +++++++- 6 files changed, 556 insertions(+), 326 deletions(-) create mode 100644 lib/base/socketevents-epoll.cpp create mode 100644 lib/base/socketevents-poll.cpp diff --git a/doc/18-language-reference.md b/doc/18-language-reference.md index 4d2b88d06..5dbbb2039 100644 --- a/doc/18-language-reference.md +++ b/doc/18-language-reference.md @@ -369,6 +369,7 @@ PidPath |**Read-write.** Contains the path of the Icinga 2 PID file. Vars |**Read-write.** Contains a dictionary with global custom attributes. Not set by default. NodeName |**Read-write.** Contains the cluster node name. Set to the local hostname by default. UseVfork |**Read-write.** Whether to use vfork(). Only available on *NIX. Defaults to true. +EventEngine |**Read-write.** The name of the socket event engine, can be "poll" or "epoll". The epoll interface is only supported on Linux. AttachDebugger |**Read-write.** Whether to attach a debugger when Icinga 2 crashes. Defaults to false. RunAsUser |**Read-write.** Defines the user the Icinga 2 daemon is running as. Used in the `init.conf` configuration file. RunAsGroup |**Read-write.** Defines the group the Icinga 2 daemon is running as. Used in the `init.conf` configuration file. diff --git a/lib/base/CMakeLists.txt b/lib/base/CMakeLists.txt index 1ccc4d1c8..5c9246348 100644 --- a/lib/base/CMakeLists.txt +++ b/lib/base/CMakeLists.txt @@ -32,7 +32,7 @@ set(base_SOURCES netstring.cpp networkstream.cpp number.cpp number-script.cpp object.cpp object-script.cpp objecttype.cpp primitivetype.cpp process.cpp ringbuffer.cpp scriptframe.cpp function.cpp function-script.cpp functionwrapper.cpp scriptglobal.cpp - scriptutils.cpp serializer.cpp socket.cpp socketevents.cpp stacktrace.cpp + scriptutils.cpp serializer.cpp socket.cpp socketevents.cpp socketevents-epoll.cpp socketevents-poll.cpp stacktrace.cpp statsfunction.cpp stdiostream.cpp stream.cpp streamlogger.cpp streamlogger.thpp string.cpp string-script.cpp sysloglogger.cpp sysloglogger.thpp tcpsocket.cpp threadpool.cpp timer.cpp tlsstream.cpp tlsutility.cpp type.cpp typetype-script.cpp unixsocket.cpp utility.cpp value.cpp diff --git a/lib/base/socketevents-epoll.cpp b/lib/base/socketevents-epoll.cpp new file mode 100644 index 000000000..a6476ac61 --- /dev/null +++ b/lib/base/socketevents-epoll.cpp @@ -0,0 +1,209 @@ +/****************************************************************************** + * Icinga 2 * + * Copyright (C) 2012-2016 Icinga Development Team (https://www.icinga.org/) * + * * + * This program is free software; you can redistribute it and/or * + * modify it under the terms of the GNU General Public License * + * as published by the Free Software Foundation; either version 2 * + * of the License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the Free Software Foundation * + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * + ******************************************************************************/ + +#include "base/socketevents.hpp" +#include "base/exception.hpp" +#include "base/logger.hpp" +#include +#include +#include +#ifdef __linux__ +# include + +using namespace icinga; + +void SocketEventEngineEpoll::InitializeThread(int tid) +{ + m_PollFDs[tid] = epoll_create(128); + Utility::SetCloExec(m_PollFDs[tid]); + + SocketEventDescriptor sed; + + m_Sockets[tid][m_EventFDs[tid][0]] = sed; + m_FDChanged[tid] = true; + + epoll_event event; + memset(&event, 0, sizeof(event)); + event.data.fd = m_EventFDs[tid][0]; + event.events = EPOLLIN; + epoll_ctl(m_PollFDs[tid], EPOLL_CTL_ADD, m_EventFDs[tid][0], &event); +} + +int SocketEventEngineEpoll::PollToEpoll(int events) +{ + int result = 0; + + if (events & POLLIN) + result |= EPOLLIN; + + if (events & POLLOUT) + result |= EPOLLOUT; + + return events; +} + +int SocketEventEngineEpoll::EpollToPoll(int events) +{ + int result = 0; + + if (events & EPOLLIN) + result |= POLLIN; + + if (events & EPOLLOUT) + result |= POLLOUT; + + return events; +} + +void SocketEventEngineEpoll::ThreadProc(int tid) +{ + Utility::SetThreadName("SocketIO"); + + for (;;) { + { + boost::mutex::scoped_lock lock(m_EventMutex[tid]); + + if (m_FDChanged[tid]) { + m_FDChanged[tid] = false; + m_CV[tid].notify_all(); + } + } + + epoll_event pevents[64]; + int ready = epoll_wait(m_PollFDs[tid], pevents, sizeof(pevents) / sizeof(pevents[0]), -1); + + std::vector events; + + { + boost::mutex::scoped_lock lock(m_EventMutex[tid]); + + if (m_FDChanged[tid]) { + m_FDChanged[tid] = false; + + continue; + } + + for (int i = 0; i < ready; i++) { + if (pevents[i].data.fd == m_EventFDs[tid][0]) { + char buffer[512]; + if (recv(m_EventFDs[tid][0], buffer, sizeof(buffer), 0) < 0) + Log(LogCritical, "SocketEvents", "Read from event FD failed."); + + continue; + } + + if ((pevents[i].events & (EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLERR)) == 0) + continue; + + EventDescription event; + event.REvents = SocketEventEngineEpoll::EpollToPoll(pevents[i].events); + event.Descriptor = m_Sockets[tid][pevents[i].data.fd]; + event.LifesupportReference = event.Descriptor.LifesupportObject; + VERIFY(event.LifesupportReference); + + events.push_back(event); + } + } + + BOOST_FOREACH(const EventDescription& event, events) { + try { + event.Descriptor.EventInterface->OnEvent(event.REvents); + } catch (const std::exception& ex) { + Log(LogCritical, "SocketEvents") + << "Exception thrown in socket I/O handler:\n" + << DiagnosticInformation(ex); + } catch (...) { + Log(LogCritical, "SocketEvents", "Exception of unknown type thrown in socket I/O handler."); + } + } + } +} + +void SocketEventEngineEpoll::Register(SocketEvents *se, Object *lifesupportObject) +{ + int tid = se->m_ID % SOCKET_IOTHREADS; + + { + boost::mutex::scoped_lock lock(m_EventMutex[tid]); + + VERIFY(se->m_FD != INVALID_SOCKET); + + SocketEventDescriptor desc; + desc.EventInterface = se; + desc.LifesupportObject = lifesupportObject; + + VERIFY(m_Sockets[tid].find(se->m_FD) == m_Sockets[tid].end()); + + m_Sockets[tid][se->m_FD] = desc; + + epoll_event event; + memset(&event, 0, sizeof(event)); + event.data.fd = se->m_FD; + event.events = 0; + epoll_ctl(m_PollFDs[tid], EPOLL_CTL_ADD, se->m_FD, &event); + + se->m_Events = true; + } +} + +void SocketEventEngineEpoll::Unregister(SocketEvents *se) +{ + int tid = se->m_ID % SOCKET_IOTHREADS; + + { + boost::mutex::scoped_lock lock(m_EventMutex[tid]); + + if (se->m_FD == INVALID_SOCKET) + return; + + m_Sockets[tid].erase(se->m_FD); + m_FDChanged[tid] = true; + + epoll_ctl(m_PollFDs[tid], EPOLL_CTL_DEL, se->m_FD, NULL); + + se->m_FD = INVALID_SOCKET; + se->m_Events = false; + } + + WakeUpThread(tid, true); +} + +void SocketEventEngineEpoll::ChangeEvents(SocketEvents *se, int events) +{ + if (se->m_FD == INVALID_SOCKET) + BOOST_THROW_EXCEPTION(std::runtime_error("Tried to read/write from a closed socket.")); + + int tid = se->m_ID % SOCKET_IOTHREADS; + + { + boost::mutex::scoped_lock lock(m_EventMutex[tid]); + + std::map::iterator it = m_Sockets[tid].find(se->m_FD); + + if (it == m_Sockets[tid].end()) + return; + + epoll_event event; + memset(&event, 0, sizeof(event)); + event.data.fd = se->m_FD; + event.events = SocketEventEngineEpoll::PollToEpoll(events); + epoll_ctl(m_PollFDs[tid], EPOLL_CTL_MOD, se->m_FD, &event); + } +} +#endif /* __linux__ */ diff --git a/lib/base/socketevents-poll.cpp b/lib/base/socketevents-poll.cpp new file mode 100644 index 000000000..784212a16 --- /dev/null +++ b/lib/base/socketevents-poll.cpp @@ -0,0 +1,199 @@ +/****************************************************************************** + * Icinga 2 * + * Copyright (C) 2012-2016 Icinga Development Team (https://www.icinga.org/) * + * * + * This program is free software; you can redistribute it and/or * + * modify it under the terms of the GNU General Public License * + * as published by the Free Software Foundation; either version 2 * + * of the License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the Free Software Foundation * + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * + ******************************************************************************/ + +#include "base/socketevents.hpp" +#include "base/exception.hpp" +#include "base/logger.hpp" +#include +#include +#include + +using namespace icinga; + +void SocketEventEnginePoll::InitializeThread(int tid) +{ + SocketEventDescriptor sed; + sed.Events = POLLIN; + + m_Sockets[tid][m_EventFDs[tid][0]] = sed; + m_FDChanged[tid] = true; +} + +void SocketEventEnginePoll::ThreadProc(int tid) +{ + Utility::SetThreadName("SocketIO"); + + std::vector pfds; + std::vector descriptors; + + for (;;) { + { + boost::mutex::scoped_lock lock(m_EventMutex[tid]); + + if (m_FDChanged[tid]) { + pfds.resize(m_Sockets[tid].size()); + descriptors.resize(m_Sockets[tid].size()); + + int i = 0; + + typedef std::map::value_type kv_pair; + + BOOST_FOREACH(const kv_pair& desc, m_Sockets[tid]) { + if (desc.second.EventInterface) + desc.second.EventInterface->m_EnginePrivate = &pfds[i]; + + pfds[i].fd = desc.first; + pfds[i].events = desc.second.Events; + descriptors[i] = desc.second; + + i++; + } + + m_FDChanged[tid] = false; + m_CV[tid].notify_all(); + } + } + + ASSERT(!pfds.empty()); + +#ifdef _WIN32 + (void) WSAPoll(&pfds[0], pfds.size(), -1); +#else /* _WIN32 */ + (void) poll(&pfds[0], pfds.size(), -1); +#endif /* _WIN32 */ + + std::vector events; + + { + boost::mutex::scoped_lock lock(m_EventMutex[tid]); + + if (m_FDChanged[tid]) + continue; + + for (int i = 0; i < pfds.size(); i++) { + if ((pfds[i].revents & (POLLIN | POLLOUT | POLLHUP | POLLERR)) == 0) + continue; + + if (pfds[i].fd == m_EventFDs[tid][0]) { + char buffer[512]; + if (recv(m_EventFDs[tid][0], buffer, sizeof(buffer), 0) < 0) + Log(LogCritical, "SocketEvents", "Read from event FD failed."); + + continue; + } + + EventDescription event; + event.REvents = pfds[i].revents; + event.Descriptor = descriptors[i]; + event.LifesupportReference = event.Descriptor.LifesupportObject; + VERIFY(event.LifesupportReference); + + events.push_back(event); + } + } + + BOOST_FOREACH(const EventDescription& event, events) { + try { + event.Descriptor.EventInterface->OnEvent(event.REvents); + } catch (const std::exception& ex) { + Log(LogCritical, "SocketEvents") + << "Exception thrown in socket I/O handler:\n" + << DiagnosticInformation(ex); + } catch (...) { + Log(LogCritical, "SocketEvents", "Exception of unknown type thrown in socket I/O handler."); + } + } + } +} + +void SocketEventEnginePoll::Register(SocketEvents *se, Object *lifesupportObject) +{ + int tid = se->m_ID % SOCKET_IOTHREADS; + + { + boost::mutex::scoped_lock lock(m_EventMutex[tid]); + + VERIFY(se->m_FD != INVALID_SOCKET); + + SocketEventDescriptor desc; + desc.Events = 0; + desc.EventInterface = se; + desc.LifesupportObject = lifesupportObject; + + VERIFY(m_Sockets[tid].find(se->m_FD) == m_Sockets[tid].end()); + + m_Sockets[tid][se->m_FD] = desc; + + m_FDChanged[tid] = true; + + se->m_Events = true; + } + + WakeUpThread(tid, true); +} + +void SocketEventEnginePoll::Unregister(SocketEvents *se) +{ + int tid = se->m_ID % SOCKET_IOTHREADS; + + { + boost::mutex::scoped_lock lock(m_EventMutex[tid]); + + if (se->m_FD == INVALID_SOCKET) + return; + + m_Sockets[tid].erase(se->m_FD); + m_FDChanged[tid] = true; + + se->m_FD = INVALID_SOCKET; + se->m_Events = false; + } + + WakeUpThread(tid, true); +} + +void SocketEventEnginePoll::ChangeEvents(SocketEvents *se, int events) +{ + if (se->m_FD == INVALID_SOCKET) + BOOST_THROW_EXCEPTION(std::runtime_error("Tried to read/write from a closed socket.")); + + int tid = se->m_ID % SOCKET_IOTHREADS; + + { + boost::mutex::scoped_lock lock(m_EventMutex[tid]); + + std::map::iterator it = m_Sockets[tid].find(se->m_FD); + + if (it == m_Sockets[tid].end()) + return; + + if (it->second.Events == events) + return; + + it->second.Events = events; + + if (se->m_EnginePrivate && boost::this_thread::get_id() == m_Threads[tid].get_id()) + ((pollfd *)se->m_EnginePrivate)->events = events; + else + m_FDChanged[tid] = true; + } + + WakeUpThread(tid, false); +} + diff --git a/lib/base/socketevents.cpp b/lib/base/socketevents.cpp index 40234e60c..76b288d03 100644 --- a/lib/base/socketevents.cpp +++ b/lib/base/socketevents.cpp @@ -20,6 +20,8 @@ #include "base/socketevents.hpp" #include "base/exception.hpp" #include "base/logger.hpp" +#include "base/application.hpp" +#include "base/scriptglobal.hpp" #include #include #include @@ -29,268 +31,91 @@ using namespace icinga; -struct SocketEventDescriptor -{ -#ifndef __linux__ - int Events; -#endif /* __linux__ */ - SocketEvents *EventInterface; - Object *LifesupportObject; - - SocketEventDescriptor(void) - : -#ifndef __linux__ - Events(POLLIN), -#endif /* __linux__ */ - EventInterface(NULL), LifesupportObject(NULL) - { } -}; - -struct EventDescription -{ - int REvents; - SocketEventDescriptor Descriptor; - Object::Ptr LifesupportReference; -}; - -#define SOCKET_IOTHREADS 8 - static boost::once_flag l_SocketIOOnceFlag = BOOST_ONCE_INIT; -static boost::thread l_SocketIOThreads[SOCKET_IOTHREADS]; -#ifdef __linux__ -static SOCKET l_SocketIOPollFDs[SOCKET_IOTHREADS]; -#endif /* __linux__ */ -static SOCKET l_SocketIOEventFDs[SOCKET_IOTHREADS][2]; -static bool l_SocketIOFDChanged[SOCKET_IOTHREADS]; -static boost::mutex l_SocketIOMutex[SOCKET_IOTHREADS]; -static boost::condition_variable l_SocketIOCV[SOCKET_IOTHREADS]; -static std::map l_SocketIOSockets[SOCKET_IOTHREADS]; +static SocketEventEngine *l_SocketIOEngine; int SocketEvents::m_NextID = 0; -void SocketEvents::InitializeThread(void) +void SocketEventEngine::Start(void) { - for (int i = 0; i < SOCKET_IOTHREADS; i++) { -#ifdef __linux__ - l_SocketIOPollFDs[i] = epoll_create(128); - Utility::SetCloExec(l_SocketIOPollFDs[i]); -#endif /* __linux__ */ + for (int tid = 0; tid < SOCKET_IOTHREADS; tid++) { + Socket::SocketPair(m_EventFDs[tid]); - Socket::SocketPair(l_SocketIOEventFDs[i]); - - Utility::SetNonBlockingSocket(l_SocketIOEventFDs[i][0]); - Utility::SetNonBlockingSocket(l_SocketIOEventFDs[i][1]); + Utility::SetNonBlockingSocket(m_EventFDs[tid][0]); + Utility::SetNonBlockingSocket(m_EventFDs[tid][1]); #ifndef _WIN32 - Utility::SetCloExec(l_SocketIOEventFDs[i][0]); - Utility::SetCloExec(l_SocketIOEventFDs[i][1]); + Utility::SetCloExec(m_EventFDs[tid][0]); + Utility::SetCloExec(m_EventFDs[tid][1]); #endif /* _WIN32 */ - SocketEventDescriptor sed; -#ifndef __linux__ - sed.Events = POLLIN; -#endif /* __linux__ */ + InitializeThread(tid); - l_SocketIOSockets[i][l_SocketIOEventFDs[i][0]] = sed; - l_SocketIOFDChanged[i] = true; - -#ifdef __linux__ - epoll_event event; - memset(&event, 0, sizeof(event)); - event.data.fd = l_SocketIOEventFDs[i][0]; - event.events = EPOLLIN; - epoll_ctl(l_SocketIOPollFDs[i], EPOLL_CTL_ADD, l_SocketIOEventFDs[i][0], &event); -#endif /* __linux__ */ - - l_SocketIOThreads[i] = boost::thread(&SocketEvents::ThreadProc, i); + m_Threads[tid] = boost::thread(boost::bind(&SocketEventEngine::ThreadProc, this, tid)); } } -#ifdef __linux__ -int SocketEvents::PollToEpoll(int events) +void SocketEventEngine::WakeUpThread(int sid, bool wait) { - int result = 0; + int tid = sid % SOCKET_IOTHREADS; - if (events & POLLIN) - result |= EPOLLIN; - - if (events & POLLOUT) - result |= EPOLLOUT; - - return events; -} - -int SocketEvents::EpollToPoll(int events) -{ - int result = 0; - - if (events & EPOLLIN) - result |= POLLIN; - - if (events & EPOLLOUT) - result |= POLLOUT; - - return events; -} -#endif /* __linux__ */ - -void SocketEvents::ThreadProc(int tid) -{ - Utility::SetThreadName("SocketIO"); - -#ifndef __linux__ - std::vector pfds; - std::vector descriptors; -#endif /* __linux__ */ - - for (;;) { - { - boost::mutex::scoped_lock lock(l_SocketIOMutex[tid]); - - if (l_SocketIOFDChanged[tid]) { -#ifndef __linux__ - pfds.resize(l_SocketIOSockets[tid].size()); - descriptors.resize(l_SocketIOSockets[tid].size()); - - int i = 0; - - typedef std::map::value_type kv_pair; - - BOOST_FOREACH(const kv_pair& desc, l_SocketIOSockets[tid]) { - if (desc.second.EventInterface) - desc.second.EventInterface->m_PFD = &pfds[i]; - - pfds[i].fd = desc.first; - pfds[i].events = desc.second.Events; - descriptors[i] = desc.second; - - i++; - } -#endif /* __linux__ */ - - l_SocketIOFDChanged[tid] = false; - l_SocketIOCV[tid].notify_all(); - } - } - -#ifndef __linux__ - ASSERT(!pfds.empty()); -#endif /* __linux__ */ - -#ifdef __linux__ - epoll_event pevents[64]; - int ready = epoll_wait(l_SocketIOPollFDs[tid], pevents, sizeof(pevents) / sizeof(pevents[0]), -1); -#elif _WIN32 - (void) WSAPoll(&pfds[0], pfds.size(), -1); -#else /* _WIN32 */ - (void) poll(&pfds[0], pfds.size(), -1); -#endif /* _WIN32 */ - - std::vector events; - - { - boost::mutex::scoped_lock lock(l_SocketIOMutex[tid]); - - if (l_SocketIOFDChanged[tid]) { -#ifdef __linux__ - l_SocketIOFDChanged[tid] = false; -#endif /* __linux__ */ - - continue; - } - -#ifdef __linux__ - for (int i = 0; i < ready; i++) { - if (pevents[i].data.fd == l_SocketIOEventFDs[tid][0]) { - char buffer[512]; - if (recv(l_SocketIOEventFDs[tid][0], buffer, sizeof(buffer), 0) < 0) - Log(LogCritical, "SocketEvents", "Read from event FD failed."); - - continue; - } - - if ((pevents[i].events & (EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLERR)) == 0) - continue; - - EventDescription event; - event.REvents = EpollToPoll(pevents[i].events); - event.Descriptor = l_SocketIOSockets[tid][pevents[i].data.fd]; - event.LifesupportReference = event.Descriptor.LifesupportObject; - VERIFY(event.LifesupportReference); - - events.push_back(event); - } -#else /* __linux__ */ - for (int i = 0; i < pfds.size(); i++) { - if ((pfds[i].revents & (POLLIN | POLLOUT | POLLHUP | POLLERR)) == 0) - continue; - - if (pfds[i].fd == l_SocketIOEventFDs[tid][0]) { - char buffer[512]; - if (recv(l_SocketIOEventFDs[tid][0], buffer, sizeof(buffer), 0) < 0) - Log(LogCritical, "SocketEvents", "Read from event FD failed."); - - continue; - } - - EventDescription event; - event.REvents = pfds[i].revents; - event.Descriptor = descriptors[i]; - event.LifesupportReference = event.Descriptor.LifesupportObject; - VERIFY(event.LifesupportReference); - - events.push_back(event); - } -#endif /* __linux__ */ - } - - BOOST_FOREACH(const EventDescription& event, events) { - try { - event.Descriptor.EventInterface->OnEvent(event.REvents); - } catch (const std::exception& ex) { - Log(LogCritical, "SocketEvents") - << "Exception thrown in socket I/O handler:\n" - << DiagnosticInformation(ex); - } catch (...) { - Log(LogCritical, "SocketEvents", "Exception of unknown type thrown in socket I/O handler."); - } - } - } -} - -void SocketEvents::WakeUpThread(bool wait) -{ - int tid = m_ID % SOCKET_IOTHREADS; - - if (boost::this_thread::get_id() == l_SocketIOThreads[tid].get_id()) + if (boost::this_thread::get_id() == m_Threads[tid].get_id()) return; if (wait) { - boost::mutex::scoped_lock lock(l_SocketIOMutex[tid]); + boost::mutex::scoped_lock lock(m_EventMutex[tid]); - l_SocketIOFDChanged[tid] = true; + m_FDChanged[tid] = true; - while (l_SocketIOFDChanged[tid]) { - (void) send(l_SocketIOEventFDs[tid][1], "T", 1, 0); + while (m_FDChanged[tid]) { + (void) send(m_EventFDs[tid][1], "T", 1, 0); boost::system_time const timeout = boost::get_system_time() + boost::posix_time::milliseconds(50); - l_SocketIOCV[tid].timed_wait(lock, timeout); + m_CV[tid].timed_wait(lock, timeout); } } else { - (void) send(l_SocketIOEventFDs[tid][1], "T", 1, 0); + (void) send(m_EventFDs[tid][1], "T", 1, 0); } } +void SocketEvents::InitializeEngine(void) +{ + String eventEngine = ScriptGlobal::Get("EventEngine", &Empty); + + if (eventEngine.IsEmpty()) +#ifdef __linux__ + eventEngine = "epoll"; +#else /* __linux__ */ + eventEngine = "poll"; +#endif /* __linux__ */ + + if (eventEngine == "poll") + l_SocketIOEngine = new SocketEventEnginePoll(); +#ifdef __linux__ + else if (eventEngine == "epoll") + l_SocketIOEngine = new SocketEventEngineEpoll(); +#endif /* __linux__ */ + else { + eventEngine = "poll"; + + Log(LogWarning, "SocketEvents") + << "Invalid event engine selected: " << eventEngine << " - Falling back to 'poll'"; + + l_SocketIOEngine = new SocketEventEnginePoll(); + } + + l_SocketIOEngine->Start(); + + ScriptGlobal::Set("EventEngine", eventEngine); +} + /** * Constructor for the SocketEvents class. */ SocketEvents::SocketEvents(const Socket::Ptr& socket, Object *lifesupportObject) - : m_ID(m_NextID++), m_FD(socket->GetFD()) -#ifndef __linux__ - , m_PFD(NULL) -#endif /* __linux__ */ + : m_ID(m_NextID++), m_FD(socket->GetFD()), m_EnginePrivate(NULL) { - boost::call_once(l_SocketIOOnceFlag, &SocketEvents::InitializeThread); + boost::call_once(l_SocketIOOnceFlag, &SocketEvents::InitializeEngine); Register(lifesupportObject); } @@ -302,109 +127,28 @@ SocketEvents::~SocketEvents(void) void SocketEvents::Register(Object *lifesupportObject) { - int tid = m_ID % SOCKET_IOTHREADS; - - { - boost::mutex::scoped_lock lock(l_SocketIOMutex[tid]); - - VERIFY(m_FD != INVALID_SOCKET); - - SocketEventDescriptor desc; -#ifndef __linux__ - desc.Events = 0; -#endif /* __linux__ */ - desc.EventInterface = this; - desc.LifesupportObject = lifesupportObject; - - VERIFY(l_SocketIOSockets[tid].find(m_FD) == l_SocketIOSockets[tid].end()); - - l_SocketIOSockets[tid][m_FD] = desc; - -#ifdef __linux__ - epoll_event event; - memset(&event, 0, sizeof(event)); - event.data.fd = m_FD; - event.events = 0; - epoll_ctl(l_SocketIOPollFDs[tid], EPOLL_CTL_ADD, m_FD, &event); -#else /* __linux__ */ - l_SocketIOFDChanged[tid] = true; -#endif /* __linux__ */ - - m_Events = true; - } - -#ifndef __linux__ - WakeUpThread(true); -#endif /* __linux__ */ + l_SocketIOEngine->Register(this, lifesupportObject); } void SocketEvents::Unregister(void) { - int tid = m_ID % SOCKET_IOTHREADS; - - { - boost::mutex::scoped_lock lock(l_SocketIOMutex[tid]); - - if (m_FD == INVALID_SOCKET) - return; - - l_SocketIOSockets[tid].erase(m_FD); - l_SocketIOFDChanged[tid] = true; - -#ifdef __linux__ - epoll_ctl(l_SocketIOPollFDs[tid], EPOLL_CTL_DEL, m_FD, NULL); -#endif /* __linux__ */ - - m_FD = INVALID_SOCKET; - m_Events = false; - } - - WakeUpThread(true); + l_SocketIOEngine->Unregister(this); } void SocketEvents::ChangeEvents(int events) { - if (m_FD == INVALID_SOCKET) - BOOST_THROW_EXCEPTION(std::runtime_error("Tried to read/write from a closed socket.")); + l_SocketIOEngine->ChangeEvents(this, events); +} - int tid = m_ID % SOCKET_IOTHREADS; - - { - boost::mutex::scoped_lock lock(l_SocketIOMutex[tid]); - - std::map::iterator it = l_SocketIOSockets[tid].find(m_FD); - - if (it == l_SocketIOSockets[tid].end()) - return; - -#ifdef __linux__ - epoll_event event; - memset(&event, 0, sizeof(event)); - event.data.fd = m_FD; - event.events = PollToEpoll(events); - epoll_ctl(l_SocketIOPollFDs[tid], EPOLL_CTL_MOD, m_FD, &event); -#else /* __linux__ */ - if (it->second.Events == events) - return; - - it->second.Events = events; - - if (m_PFD && boost::this_thread::get_id() == l_SocketIOThreads[tid].get_id()) - m_PFD->events = events; - else - l_SocketIOFDChanged[tid] = true; -#endif /* __linux__ */ - } - -#ifndef __linux__ - WakeUpThread(); -#endif /* __linux__ */ +boost::mutex& SocketEventEngine::GetMutex(int tid) +{ + return m_EventMutex[tid]; } bool SocketEvents::IsHandlingEvents(void) const { int tid = m_ID % SOCKET_IOTHREADS; - boost::mutex::scoped_lock lock(l_SocketIOMutex[tid]); + boost::mutex::scoped_lock lock(l_SocketIOEngine->GetMutex(tid)); return m_Events; } diff --git a/lib/base/socketevents.hpp b/lib/base/socketevents.hpp index d1a50ef8c..8243555a2 100644 --- a/lib/base/socketevents.hpp +++ b/lib/base/socketevents.hpp @@ -22,6 +22,7 @@ #include "base/i2-base.hpp" #include "base/socket.hpp" +#include #ifndef _WIN32 # include @@ -48,6 +49,9 @@ public: bool IsHandlingEvents(void) const; + void *GetEnginePrivate(void) const; + void SetEnginePrivate(void *priv); + protected: SocketEvents(const Socket::Ptr& socket, Object *lifesupportObject); @@ -55,24 +59,97 @@ private: int m_ID; SOCKET m_FD; bool m_Events; -#ifndef __linux__ - pollfd *m_PFD; -#endif /* __linux__ */ + void *m_EnginePrivate; static int m_NextID; - static void InitializeThread(void); - static void ThreadProc(int tid); + static void InitializeEngine(void); void WakeUpThread(bool wait = false); - int GetPollEvents(void) const; - void Register(Object *lifesupportObject); + friend class SocketEventEnginePoll; + friend class SocketEventEngineEpoll; +}; + +#define SOCKET_IOTHREADS 8 + +struct SocketEventDescriptor +{ + int Events; + SocketEvents *EventInterface; + Object *LifesupportObject; + + SocketEventDescriptor(void) + : Events(POLLIN), EventInterface(NULL), LifesupportObject(NULL) + { } +}; + +struct EventDescription +{ + int REvents; + SocketEventDescriptor Descriptor; + Object::Ptr LifesupportReference; +}; + +class I2_BASE_API SocketEventEngine +{ +public: + void Start(void); + + void WakeUpThread(int sid, bool wait); + + boost::mutex& GetMutex(int tid); + +protected: + virtual void InitializeThread(int tid) = 0; + virtual void ThreadProc(int tid) = 0; + virtual void Register(SocketEvents *se, Object *lifesupportObject) = 0; + virtual void Unregister(SocketEvents *se) = 0; + virtual void ChangeEvents(SocketEvents *se, int events) = 0; + + boost::thread m_Threads[SOCKET_IOTHREADS]; + SOCKET m_EventFDs[SOCKET_IOTHREADS][2]; + bool m_FDChanged[SOCKET_IOTHREADS]; + boost::mutex m_EventMutex[SOCKET_IOTHREADS]; + boost::condition_variable m_CV[SOCKET_IOTHREADS]; + std::map m_Sockets[SOCKET_IOTHREADS]; + + friend class SocketEvents; +}; + +class I2_BASE_API SocketEventEnginePoll : public SocketEventEngine +{ +public: + virtual void Register(SocketEvents *se, Object *lifesupportObject); + virtual void Unregister(SocketEvents *se); + virtual void ChangeEvents(SocketEvents *se, int events); + +protected: + virtual void InitializeThread(int tid); + virtual void ThreadProc(int tid); +}; + +#ifdef __linux__ +class I2_BASE_API SocketEventEngineEpoll : public SocketEventEngine +{ +public: + virtual void Register(SocketEvents *se, Object *lifesupportObject); + virtual void Unregister(SocketEvents *se); + virtual void ChangeEvents(SocketEvents *se, int events); + +protected: + virtual void InitializeThread(int tid); + virtual void ThreadProc(int tid); + +private: + SOCKET m_PollFDs[SOCKET_IOTHREADS]; + static int PollToEpoll(int events); static int EpollToPoll(int events); }; +#endif /* __linux__ */ }