diff --git a/lib/base/socketevents.cpp b/lib/base/socketevents.cpp index bca7abae6..fbcca5d6a 100644 --- a/lib/base/socketevents.cpp +++ b/lib/base/socketevents.cpp @@ -23,17 +23,26 @@ #include #include #include +#ifdef __linux__ +# include +#endif /* __linux__ */ using namespace icinga; struct SocketEventDescriptor { +#ifndef __linux__ int Events; +#endif /* __linux__ */ SocketEvents *EventInterface; Object *LifesupportObject; SocketEventDescriptor(void) - : Events(0), EventInterface(NULL), LifesupportObject(NULL) + : +#ifndef __linux__ + Events(POLLIN), +#endif /* __linux__ */ + EventInterface(NULL), LifesupportObject(NULL) { } }; @@ -48,19 +57,24 @@ struct EventDescription static boost::once_flag l_SocketIOOnceFlag = BOOST_ONCE_INIT; static boost::thread l_SocketIOThreads[IOTHREADS]; +#ifdef __linux__ +static SOCKET l_SocketIOPollFDs[IOTHREADS]; +#endif /* __linux__ */ static SOCKET l_SocketIOEventFDs[IOTHREADS][2]; +static bool l_SocketIOFDChanged[IOTHREADS]; static boost::mutex l_SocketIOMutex[IOTHREADS]; static boost::condition_variable l_SocketIOCV[IOTHREADS]; -static bool l_SocketIOFDChanged[IOTHREADS]; static std::map l_SocketIOSockets[IOTHREADS]; -static std::map l_SocketIOEventChanges[IOTHREADS]; int SocketEvents::m_NextID = 0; void SocketEvents::InitializeThread(void) { - for (int i = 0; i < IOTHREADS; i++) { +#ifdef __linux__ + l_SocketIOPollFDs[i] = epoll_create1(EPOLL_CLOEXEC); +#endif /* __linux__ */ + Socket::SocketPair(l_SocketIOEventFDs[i]); Utility::SetNonBlockingSocket(l_SocketIOEventFDs[i][0]); @@ -72,27 +86,68 @@ void SocketEvents::InitializeThread(void) #endif /* _WIN32 */ SocketEventDescriptor sed; +#ifndef __linux__ sed.Events = POLLIN; +#endif /* __linux__ */ l_SocketIOSockets[i][l_SocketIOEventFDs[i][0]] = sed; l_SocketIOFDChanged[i] = true; +#ifdef __linux__ + epoll_event event; + memset(&event, 0, sizeof(event)); + event.data.fd = l_SocketIOEventFDs[i][0]; + event.events = EPOLLIN; + epoll_ctl(l_SocketIOPollFDs[i], EPOLL_CTL_ADD, l_SocketIOEventFDs[i][0], &event); +#endif /* __linux__ */ + l_SocketIOThreads[i] = boost::thread(&SocketEvents::ThreadProc, i); } } +#ifdef __linux__ +int SocketEvents::PollToEpoll(int events) +{ + int result = 0; + + if (events & POLLIN) + result |= EPOLLIN; + + if (events & POLLOUT) + result |= EPOLLOUT; + + return events; +} + +int SocketEvents::EpollToPoll(int events) +{ + int result = 0; + + if (events & EPOLLIN) + result |= POLLIN; + + if (events & EPOLLOUT) + result |= POLLOUT; + + return events; +} +#endif /* __linux__ */ + void SocketEvents::ThreadProc(int tid) { Utility::SetThreadName("SocketIO"); +#ifndef __linux__ std::vector pfds; std::vector descriptors; +#endif /* __linux__ */ for (;;) { { boost::mutex::scoped_lock lock(l_SocketIOMutex[tid]); if (l_SocketIOFDChanged[tid]) { +#ifndef __linux__ pfds.resize(l_SocketIOSockets[tid].size()); descriptors.resize(l_SocketIOSockets[tid].size()); @@ -110,15 +165,21 @@ void SocketEvents::ThreadProc(int tid) i++; } +#endif /* __linux__ */ l_SocketIOFDChanged[tid] = false; l_SocketIOCV[tid].notify_all(); } } +#ifndef __linux__ ASSERT(!pfds.empty()); +#endif /* __linux__ */ -#ifdef _WIN32 +#ifdef __linux__ + epoll_event pevents[64]; + int ready = epoll_wait(l_SocketIOPollFDs[tid], pevents, sizeof(pevents) / sizeof(pevents[0]), -1); +#elif _WIN32 (void) WSAPoll(&pfds[0], pfds.size(), -1); #else /* _WIN32 */ (void) poll(&pfds[0], pfds.size(), -1); @@ -129,9 +190,36 @@ void SocketEvents::ThreadProc(int tid) { boost::mutex::scoped_lock lock(l_SocketIOMutex[tid]); - if (l_SocketIOFDChanged[tid]) - continue; + if (l_SocketIOFDChanged[tid]) { +#ifdef __linux__ + l_SocketIOFDChanged[tid] = false; +#endif /* __linux__ */ + continue; + } + +#ifdef __linux__ + for (int i = 0; i < ready; i++) { + if (pevents[i].data.fd == l_SocketIOEventFDs[tid][0]) { + char buffer[512]; + if (recv(l_SocketIOEventFDs[tid][0], buffer, sizeof(buffer), 0) < 0) + Log(LogCritical, "SocketEvents", "Read from event FD failed."); + + continue; + } + + if ((pevents[i].events & (EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLERR)) == 0) + continue; + + EventDescription event; + event.REvents = EpollToPoll(pevents[i].events); + event.Descriptor = l_SocketIOSockets[tid][pevents[i].data.fd]; + event.LifesupportReference = event.Descriptor.LifesupportObject; + VERIFY(event.LifesupportReference); + + events.push_back(event); + } +#else /* __linux__ */ for (int i = 0; i < pfds.size(); i++) { if ((pfds[i].revents & (POLLIN | POLLOUT | POLLHUP | POLLERR)) == 0) continue; @@ -152,6 +240,7 @@ void SocketEvents::ThreadProc(int tid) events.push_back(event); } +#endif /* __linux__ */ } BOOST_FOREACH(const EventDescription& event, events) { @@ -195,7 +284,10 @@ void SocketEvents::WakeUpThread(bool wait) * Constructor for the SocketEvents class. */ SocketEvents::SocketEvents(const Socket::Ptr& socket, Object *lifesupportObject) - : m_ID(m_NextID++), m_FD(socket->GetFD()), m_PFD(NULL) + : m_ID(m_NextID++), m_FD(socket->GetFD()) +#ifndef __linux__ + , m_PFD(NULL) +#endif /* __linux__ */ { boost::call_once(l_SocketIOOnceFlag, &SocketEvents::InitializeThread); @@ -217,19 +309,32 @@ void SocketEvents::Register(Object *lifesupportObject) VERIFY(m_FD != INVALID_SOCKET); SocketEventDescriptor desc; +#ifndef __linux__ desc.Events = 0; +#endif /* __linux__ */ desc.EventInterface = this; desc.LifesupportObject = lifesupportObject; VERIFY(l_SocketIOSockets[tid].find(m_FD) == l_SocketIOSockets[tid].end()); l_SocketIOSockets[tid][m_FD] = desc; + +#ifdef __linux__ + epoll_event event; + memset(&event, 0, sizeof(event)); + event.data.fd = m_FD; + event.events = 0; + epoll_ctl(l_SocketIOPollFDs[tid], EPOLL_CTL_ADD, m_FD, &event); +#else /* __linux__ */ l_SocketIOFDChanged[tid] = true; +#endif /* __linux__ */ m_Events = true; } +#ifndef __linux__ WakeUpThread(true); +#endif /* __linux__ */ } void SocketEvents::Unregister(void) @@ -245,9 +350,12 @@ void SocketEvents::Unregister(void) l_SocketIOSockets[tid].erase(m_FD); l_SocketIOFDChanged[tid] = true; +#ifdef __linux__ + epoll_ctl(l_SocketIOPollFDs[tid], EPOLL_CTL_DEL, m_FD, NULL); +#endif /* __linux__ */ + m_FD = INVALID_SOCKET; m_Events = false; - } WakeUpThread(true); @@ -255,14 +363,21 @@ void SocketEvents::Unregister(void) void SocketEvents::ChangeEvents(int events) { + if (m_FD == INVALID_SOCKET) + BOOST_THROW_EXCEPTION(std::runtime_error("Tried to read/write from a closed socket.")); + int tid = m_ID % IOTHREADS; +#ifdef __linux__ + epoll_event event; + memset(&event, 0, sizeof(event)); + event.data.fd = m_FD; + event.events = PollToEpoll(events); + epoll_ctl(l_SocketIOPollFDs[tid], EPOLL_CTL_MOD, m_FD, &event); +#else /* __linux__ */ { boost::mutex::scoped_lock lock(l_SocketIOMutex[tid]); - if (m_FD == INVALID_SOCKET) - BOOST_THROW_EXCEPTION(std::runtime_error("Tried to read/write from a closed socket.")); - std::map::iterator it = l_SocketIOSockets[tid].find(m_FD); if (it == l_SocketIOSockets[tid].end()) @@ -280,6 +395,7 @@ void SocketEvents::ChangeEvents(int events) } WakeUpThread(); +#endif /* __linux__ */ } bool SocketEvents::IsHandlingEvents(void) const diff --git a/lib/base/socketevents.hpp b/lib/base/socketevents.hpp index b4b324bff..d1a50ef8c 100644 --- a/lib/base/socketevents.hpp +++ b/lib/base/socketevents.hpp @@ -55,7 +55,9 @@ private: int m_ID; SOCKET m_FD; bool m_Events; +#ifndef __linux__ pollfd *m_PFD; +#endif /* __linux__ */ static int m_NextID; @@ -67,6 +69,9 @@ private: int GetPollEvents(void) const; void Register(Object *lifesupportObject); + + static int PollToEpoll(int events); + static int EpollToPoll(int events); }; }