mirror of
				https://github.com/Icinga/icinga2.git
				synced 2025-10-26 01:33:50 +02:00 
			
		
		
		
	
		
			
				
	
	
		
			189 lines
		
	
	
		
			4.1 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			189 lines
		
	
	
		
			4.1 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
 | |
| 
 | |
| #include "base/socketevents.hpp"
 | |
| #include "base/exception.hpp"
 | |
| #include "base/logger.hpp"
 | |
| #include <boost/thread/once.hpp>
 | |
| #include <map>
 | |
| #ifdef __linux__
 | |
| #	include <sys/epoll.h>
 | |
| 
 | |
| using namespace icinga;
 | |
| 
 | |
| void SocketEventEngineEpoll::InitializeThread(int tid)
 | |
| {
 | |
| 	m_PollFDs[tid] = epoll_create(128);
 | |
| 	Utility::SetCloExec(m_PollFDs[tid]);
 | |
| 
 | |
| 	SocketEventDescriptor sed;
 | |
| 
 | |
| 	m_Sockets[tid][m_EventFDs[tid][0]] = sed;
 | |
| 	m_FDChanged[tid] = true;
 | |
| 
 | |
| 	epoll_event event;
 | |
| 	memset(&event, 0, sizeof(event));
 | |
| 	event.data.fd = m_EventFDs[tid][0];
 | |
| 	event.events = EPOLLIN;
 | |
| 	epoll_ctl(m_PollFDs[tid], EPOLL_CTL_ADD, m_EventFDs[tid][0], &event);
 | |
| }
 | |
| 
 | |
| int SocketEventEngineEpoll::PollToEpoll(int events)
 | |
| {
 | |
| 	int result = 0;
 | |
| 
 | |
| 	if (events & POLLIN)
 | |
| 		result |= EPOLLIN;
 | |
| 
 | |
| 	if (events & POLLOUT)
 | |
| 		result |= EPOLLOUT;
 | |
| 
 | |
| 	return events;
 | |
| }
 | |
| 
 | |
| int SocketEventEngineEpoll::EpollToPoll(int events)
 | |
| {
 | |
| 	int result = 0;
 | |
| 
 | |
| 	if (events & EPOLLIN)
 | |
| 		result |= POLLIN;
 | |
| 
 | |
| 	if (events & EPOLLOUT)
 | |
| 		result |= POLLOUT;
 | |
| 
 | |
| 	return events;
 | |
| }
 | |
| 
 | |
| void SocketEventEngineEpoll::ThreadProc(int tid)
 | |
| {
 | |
| 	Utility::SetThreadName("SocketIO");
 | |
| 
 | |
| 	for (;;) {
 | |
| 		{
 | |
| 			boost::mutex::scoped_lock lock(m_EventMutex[tid]);
 | |
| 
 | |
| 			if (m_FDChanged[tid]) {
 | |
| 				m_FDChanged[tid] = false;
 | |
| 				m_CV[tid].notify_all();
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		epoll_event pevents[64];
 | |
| 		int ready = epoll_wait(m_PollFDs[tid], pevents, sizeof(pevents) / sizeof(pevents[0]), -1);
 | |
| 
 | |
| 		std::vector<EventDescription> events;
 | |
| 
 | |
| 		{
 | |
| 			boost::mutex::scoped_lock lock(m_EventMutex[tid]);
 | |
| 
 | |
| 			if (m_FDChanged[tid]) {
 | |
| 				m_FDChanged[tid] = false;
 | |
| 
 | |
| 				continue;
 | |
| 			}
 | |
| 
 | |
| 			for (int i = 0; i < ready; i++) {
 | |
| 				if (pevents[i].data.fd == m_EventFDs[tid][0]) {
 | |
| 					char buffer[512];
 | |
| 					if (recv(m_EventFDs[tid][0], buffer, sizeof(buffer), 0) < 0)
 | |
| 						Log(LogCritical, "SocketEvents", "Read from event FD failed.");
 | |
| 
 | |
| 					continue;
 | |
| 				}
 | |
| 
 | |
| 				if ((pevents[i].events & (EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLERR)) == 0)
 | |
| 					continue;
 | |
| 
 | |
| 				EventDescription event;
 | |
| 				event.REvents = SocketEventEngineEpoll::EpollToPoll(pevents[i].events);
 | |
| 				event.Descriptor = m_Sockets[tid][pevents[i].data.fd];
 | |
| 
 | |
| 				events.emplace_back(std::move(event));
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		for (const EventDescription& event : events) {
 | |
| 			try {
 | |
| 				event.Descriptor.EventInterface->OnEvent(event.REvents);
 | |
| 			} catch (const std::exception& ex) {
 | |
| 				Log(LogCritical, "SocketEvents")
 | |
| 					<< "Exception thrown in socket I/O handler:\n"
 | |
| 					<< DiagnosticInformation(ex);
 | |
| 			} catch (...) {
 | |
| 				Log(LogCritical, "SocketEvents", "Exception of unknown type thrown in socket I/O handler.");
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| void SocketEventEngineEpoll::Register(SocketEvents *se)
 | |
| {
 | |
| 	int tid = se->m_ID % SOCKET_IOTHREADS;
 | |
| 
 | |
| 	{
 | |
| 		boost::mutex::scoped_lock lock(m_EventMutex[tid]);
 | |
| 
 | |
| 		VERIFY(se->m_FD != INVALID_SOCKET);
 | |
| 
 | |
| 		SocketEventDescriptor desc;
 | |
| 		desc.EventInterface = se;
 | |
| 
 | |
| 		VERIFY(m_Sockets[tid].find(se->m_FD) == m_Sockets[tid].end());
 | |
| 
 | |
| 		m_Sockets[tid][se->m_FD] = desc;
 | |
| 
 | |
| 		epoll_event event;
 | |
| 		memset(&event, 0, sizeof(event));
 | |
| 		event.data.fd = se->m_FD;
 | |
| 		event.events = 0;
 | |
| 		epoll_ctl(m_PollFDs[tid], EPOLL_CTL_ADD, se->m_FD, &event);
 | |
| 
 | |
| 		se->m_Events = true;
 | |
| 	}
 | |
| }
 | |
| 
 | |
| void SocketEventEngineEpoll::Unregister(SocketEvents *se)
 | |
| {
 | |
| 	int tid = se->m_ID % SOCKET_IOTHREADS;
 | |
| 
 | |
| 	{
 | |
| 		boost::mutex::scoped_lock lock(m_EventMutex[tid]);
 | |
| 
 | |
| 		if (se->m_FD == INVALID_SOCKET)
 | |
| 			return;
 | |
| 
 | |
| 		m_Sockets[tid].erase(se->m_FD);
 | |
| 		m_FDChanged[tid] = true;
 | |
| 
 | |
| 		epoll_ctl(m_PollFDs[tid], EPOLL_CTL_DEL, se->m_FD, nullptr);
 | |
| 
 | |
| 		se->m_FD = INVALID_SOCKET;
 | |
| 		se->m_Events = false;
 | |
| 	}
 | |
| 
 | |
| 	WakeUpThread(tid, true);
 | |
| }
 | |
| 
 | |
| void SocketEventEngineEpoll::ChangeEvents(SocketEvents *se, int events)
 | |
| {
 | |
| 	if (se->m_FD == INVALID_SOCKET)
 | |
| 		BOOST_THROW_EXCEPTION(std::runtime_error("Tried to read/write from a closed socket."));
 | |
| 
 | |
| 	int tid = se->m_ID % SOCKET_IOTHREADS;
 | |
| 
 | |
| 	{
 | |
| 		boost::mutex::scoped_lock lock(m_EventMutex[tid]);
 | |
| 
 | |
| 		auto it = m_Sockets[tid].find(se->m_FD);
 | |
| 
 | |
| 		if (it == m_Sockets[tid].end())
 | |
| 			return;
 | |
| 
 | |
| 		epoll_event event;
 | |
| 		memset(&event, 0, sizeof(event));
 | |
| 		event.data.fd = se->m_FD;
 | |
| 		event.events = SocketEventEngineEpoll::PollToEpoll(events);
 | |
| 		epoll_ctl(m_PollFDs[tid], EPOLL_CTL_MOD, se->m_FD, &event);
 | |
| 	}
 | |
| }
 | |
| #endif /* __linux__ */
 |