Fix stalled reconnection attempts for the cluster

refs #8485
This commit is contained in:
Gunnar Beutner 2015-02-20 14:43:13 +01:00
parent 2a9ceffea9
commit e2815de8a6
1 changed files with 34 additions and 15 deletions

View File

@ -41,9 +41,12 @@ struct SocketEventDescriptor
{ }
};
static boost::thread l_SocketIOThread;
static boost::once_flag l_SocketIOOnceFlag = BOOST_ONCE_INIT;
static SOCKET l_SocketIOEventFDs[2];
static boost::mutex l_SocketIOMutex;
static boost::condition_variable l_SocketIOCV;
static bool l_SocketIOFDChanged;
static std::map<SOCKET, SocketEventDescriptor> l_SocketIOSockets;
void SocketEvents::InitializeThread(void)
@ -63,8 +66,7 @@ void SocketEvents::InitializeThread(void)
l_SocketIOSockets[l_SocketIOEventFDs[0]] = sed;
boost::thread thread(&SocketEvents::ThreadProc);
thread.detach();
l_SocketIOThread = boost::thread(&SocketEvents::ThreadProc);
}
void SocketEvents::ThreadProc(void)
@ -100,6 +102,16 @@ void SocketEvents::ThreadProc(void)
(void) poll(pfds, pfdcount, -1);
#endif /* _WIN32 */
{
boost::mutex::scoped_lock lock(l_SocketIOMutex);
if (l_SocketIOFDChanged) {
l_SocketIOFDChanged = false;
l_SocketIOCV.notify_all();
continue;
}
}
for (int i = 0; i < pfdcount; i++) {
if ((pfds[i].revents & (POLLIN | POLLOUT | POLLHUP | POLLERR)) == 0)
continue;
@ -139,7 +151,16 @@ void SocketEvents::ThreadProc(void)
void SocketEvents::WakeUpThread(void)
{
(void) send(l_SocketIOEventFDs[1], "T", 1, 0);
if (boost::this_thread::get_id() != l_SocketIOThread.get_id()) {
boost::mutex::scoped_lock lock(l_SocketIOMutex);
l_SocketIOFDChanged = true;
(void) send(l_SocketIOEventFDs[1], "T", 1, 0);
while (l_SocketIOFDChanged)
l_SocketIOCV.wait(lock);
}
}
/**
@ -160,6 +181,8 @@ SocketEvents::~SocketEvents(void)
void SocketEvents::Register(Object *lifesupportObject)
{
boost::mutex::scoped_lock lock(l_SocketIOMutex);
VERIFY(m_FD != INVALID_SOCKET);
SocketEventDescriptor desc;
@ -167,25 +190,21 @@ void SocketEvents::Register(Object *lifesupportObject)
desc.EventInterface = this;
desc.LifesupportObject = lifesupportObject;
{
boost::mutex::scoped_lock lock(l_SocketIOMutex);
VERIFY(l_SocketIOSockets.find(m_FD) == l_SocketIOSockets.end());
VERIFY(l_SocketIOSockets.find(m_FD) == l_SocketIOSockets.end());
l_SocketIOSockets[m_FD] = desc;
}
l_SocketIOSockets[m_FD] = desc;
/* There's no need to wake up the I/O thread here. */
}
void SocketEvents::Unregister(void)
{
if (m_FD == INVALID_SOCKET)
return;
{
boost::mutex::scoped_lock lock(l_SocketIOMutex);
if (m_FD == INVALID_SOCKET)
return;
l_SocketIOSockets.erase(m_FD);
m_FD = INVALID_SOCKET;
}
@ -195,12 +214,12 @@ void SocketEvents::Unregister(void)
void SocketEvents::ChangeEvents(int events)
{
if (m_FD == INVALID_SOCKET)
BOOST_THROW_EXCEPTION(std::runtime_error("Tried to read/write from a closed socket."));
{
boost::mutex::scoped_lock lock(l_SocketIOMutex);
if (m_FD == INVALID_SOCKET)
BOOST_THROW_EXCEPTION(std::runtime_error("Tried to read/write from a closed socket."));
std::map<SOCKET, SocketEventDescriptor>::iterator it = l_SocketIOSockets.find(m_FD);
if (it == l_SocketIOSockets.end())