mirror of
https://github.com/Icinga/icinga2.git
synced 2025-07-31 01:24:19 +02:00
Fix deadlocks in the Socket/Stream classes.
This commit is contained in:
parent
5a861b0de0
commit
375746d710
@ -48,8 +48,6 @@ Socket::~Socket(void)
|
|||||||
*/
|
*/
|
||||||
void Socket::Start(void)
|
void Socket::Start(void)
|
||||||
{
|
{
|
||||||
ObjectLock olock(this);
|
|
||||||
|
|
||||||
ASSERT(!m_ReadThread.joinable() && !m_WriteThread.joinable());
|
ASSERT(!m_ReadThread.joinable() && !m_WriteThread.joinable());
|
||||||
ASSERT(GetFD() != INVALID_SOCKET);
|
ASSERT(GetFD() != INVALID_SOCKET);
|
||||||
|
|
||||||
@ -100,13 +98,15 @@ SOCKET Socket::GetFD(void) const
|
|||||||
*/
|
*/
|
||||||
void Socket::Close(void)
|
void Socket::Close(void)
|
||||||
{
|
{
|
||||||
ObjectLock olock(this);
|
{
|
||||||
|
ObjectLock olock(this);
|
||||||
|
|
||||||
if (m_FD == INVALID_SOCKET)
|
if (m_FD == INVALID_SOCKET)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
closesocket(m_FD);
|
closesocket(m_FD);
|
||||||
m_FD = INVALID_SOCKET;
|
m_FD = INVALID_SOCKET;
|
||||||
|
}
|
||||||
|
|
||||||
Stream::Close();
|
Stream::Close();
|
||||||
}
|
}
|
||||||
@ -385,19 +385,17 @@ size_t Socket::GetAvailableBytes(void) const
|
|||||||
*/
|
*/
|
||||||
size_t Socket::Read(void *buffer, size_t size)
|
size_t Socket::Read(void *buffer, size_t size)
|
||||||
{
|
{
|
||||||
ObjectLock olock(this);
|
|
||||||
|
|
||||||
if (m_Listening)
|
|
||||||
throw new logic_error("Socket does not support Read().");
|
|
||||||
|
|
||||||
{
|
{
|
||||||
ObjectLock olock(m_RecvQueue);
|
ObjectLock olock(this);
|
||||||
|
|
||||||
if (m_RecvQueue->GetAvailableBytes() == 0)
|
if (m_Listening)
|
||||||
CheckException();
|
throw new logic_error("Socket does not support Read().");
|
||||||
|
|
||||||
return m_RecvQueue->Read(buffer, size);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (m_RecvQueue->GetAvailableBytes() == 0)
|
||||||
|
CheckException();
|
||||||
|
|
||||||
|
return m_RecvQueue->Read(buffer, size);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -416,14 +414,10 @@ size_t Socket::Peek(void *buffer, size_t size)
|
|||||||
throw new logic_error("Socket does not support Peek().");
|
throw new logic_error("Socket does not support Peek().");
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
if (m_RecvQueue->GetAvailableBytes() == 0)
|
||||||
ObjectLock olock(m_RecvQueue);
|
CheckException();
|
||||||
|
|
||||||
if (m_RecvQueue->GetAvailableBytes() == 0)
|
return m_RecvQueue->Peek(buffer, size);
|
||||||
CheckException();
|
|
||||||
|
|
||||||
return m_RecvQueue->Peek(buffer, size);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -449,12 +443,13 @@ void Socket::Write(const void *buffer, size_t size)
|
|||||||
*/
|
*/
|
||||||
void Socket::Listen(void)
|
void Socket::Listen(void)
|
||||||
{
|
{
|
||||||
ObjectLock olock(this);
|
|
||||||
|
|
||||||
if (listen(GetFD(), SOMAXCONN) < 0)
|
if (listen(GetFD(), SOMAXCONN) < 0)
|
||||||
BOOST_THROW_EXCEPTION(SocketException("listen() failed", GetError()));
|
BOOST_THROW_EXCEPTION(SocketException("listen() failed", GetError()));
|
||||||
|
|
||||||
m_Listening = true;
|
{
|
||||||
|
ObjectLock olock(this);
|
||||||
|
m_Listening = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void Socket::HandleWritable(void)
|
void Socket::HandleWritable(void)
|
||||||
@ -553,7 +548,7 @@ void Socket::HandleReadableServer(void)
|
|||||||
if (fd < 0)
|
if (fd < 0)
|
||||||
BOOST_THROW_EXCEPTION(SocketException("accept() failed", GetError()));
|
BOOST_THROW_EXCEPTION(SocketException("accept() failed", GetError()));
|
||||||
|
|
||||||
TcpSocket::Ptr client = boost::make_shared<TcpSocket>();
|
Socket::Ptr client = boost::make_shared<Socket>();
|
||||||
client->SetFD(fd);
|
client->SetFD(fd);
|
||||||
OnNewClient(GetSelf(), client);
|
OnNewClient(GetSelf(), client);
|
||||||
}
|
}
|
||||||
|
@ -33,6 +33,7 @@ public:
|
|||||||
typedef shared_ptr<Socket> Ptr;
|
typedef shared_ptr<Socket> Ptr;
|
||||||
typedef weak_ptr<Socket> WeakPtr;
|
typedef weak_ptr<Socket> WeakPtr;
|
||||||
|
|
||||||
|
Socket(void);
|
||||||
~Socket(void);
|
~Socket(void);
|
||||||
|
|
||||||
virtual void Start(void);
|
virtual void Start(void);
|
||||||
@ -54,8 +55,6 @@ public:
|
|||||||
signals2::signal<void (const Socket::Ptr&, const Socket::Ptr&)> OnNewClient;
|
signals2::signal<void (const Socket::Ptr&, const Socket::Ptr&)> OnNewClient;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
Socket(void);
|
|
||||||
|
|
||||||
void SetFD(SOCKET fd);
|
void SetFD(SOCKET fd);
|
||||||
SOCKET GetFD(void) const;
|
SOCKET GetFD(void) const;
|
||||||
|
|
||||||
|
@ -112,3 +112,22 @@ void Stream::Close(void)
|
|||||||
|
|
||||||
SetConnected(false);
|
SetConnected(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool Stream::ReadLine(String *line, size_t maxLength)
|
||||||
|
{
|
||||||
|
char buffer[maxLength];
|
||||||
|
|
||||||
|
size_t rc = Peek(buffer, maxLength);
|
||||||
|
|
||||||
|
for (int i = 0; i < rc; i++) {
|
||||||
|
if (buffer[i] == '\n') {
|
||||||
|
*line = String(buffer, &(buffer[i]));
|
||||||
|
|
||||||
|
Read(NULL, rc);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
@ -82,6 +82,8 @@ public:
|
|||||||
|
|
||||||
bool IsConnected(void) const;
|
bool IsConnected(void) const;
|
||||||
|
|
||||||
|
bool ReadLine(String *line, size_t maxLength = 4096);
|
||||||
|
|
||||||
boost::exception_ptr GetException(void);
|
boost::exception_ptr GetException(void);
|
||||||
void CheckException(void);
|
void CheckException(void);
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user