Improved TcpClient lock usage.

This commit is contained in:
Gunnar Beutner 2012-08-06 10:01:21 +02:00
parent 378894d24f
commit caf08b3069
5 changed files with 60 additions and 37 deletions

View File

@ -33,7 +33,7 @@ Socket::Socket(void)
*/ */
Socket::~Socket(void) Socket::~Socket(void)
{ {
mutex::scoped_lock lock(m_Mutex); mutex::scoped_lock lock(m_SocketMutex);
CloseInternal(true); CloseInternal(true);
} }
@ -90,7 +90,7 @@ SOCKET Socket::GetFD(void) const
*/ */
void Socket::Close(void) void Socket::Close(void)
{ {
mutex::scoped_lock lock(m_Mutex); mutex::scoped_lock lock(m_SocketMutex);
CloseInternal(false); CloseInternal(false);
} }
@ -209,7 +209,7 @@ String Socket::GetAddressFromSockaddr(sockaddr *address, socklen_t len)
*/ */
String Socket::GetClientAddress(void) String Socket::GetClientAddress(void)
{ {
mutex::scoped_lock lock(m_Mutex); mutex::scoped_lock lock(m_SocketMutex);
sockaddr_storage sin; sockaddr_storage sin;
socklen_t len = sizeof(sin); socklen_t len = sizeof(sin);
@ -227,7 +227,7 @@ String Socket::GetClientAddress(void)
*/ */
String Socket::GetPeerAddress(void) String Socket::GetPeerAddress(void)
{ {
mutex::scoped_lock lock(m_Mutex); mutex::scoped_lock lock(m_SocketMutex);
sockaddr_storage sin; sockaddr_storage sin;
socklen_t len = sizeof(sin); socklen_t len = sizeof(sin);
@ -258,7 +258,7 @@ SocketException::SocketException(const String& message, int errorCode)
void Socket::ReadThreadProc(void) void Socket::ReadThreadProc(void)
{ {
mutex::scoped_lock lock(m_Mutex); mutex::scoped_lock lock(m_SocketMutex);
for (;;) { for (;;) {
fd_set readfds, exceptfds; fd_set readfds, exceptfds;
@ -312,7 +312,7 @@ void Socket::ReadThreadProc(void)
void Socket::WriteThreadProc(void) void Socket::WriteThreadProc(void)
{ {
mutex::scoped_lock lock(m_Mutex); mutex::scoped_lock lock(m_SocketMutex);
for (;;) { for (;;) {
fd_set writefds; fd_set writefds;
@ -358,11 +358,6 @@ void Socket::WriteThreadProc(void)
} }
} }
mutex& Socket::GetMutex(void) const
{
return m_Mutex;
}
void Socket::SetConnected(bool connected) void Socket::SetConnected(bool connected)
{ {
m_Connected = connected; m_Connected = connected;

View File

@ -44,8 +44,6 @@ public:
String GetClientAddress(void); String GetClientAddress(void);
String GetPeerAddress(void); String GetPeerAddress(void);
mutex& GetMutex(void) const;
bool IsConnected(void) const; bool IsConnected(void) const;
void CheckException(void); void CheckException(void);
@ -70,6 +68,8 @@ protected:
virtual void CloseInternal(bool from_dtor); virtual void CloseInternal(bool from_dtor);
mutable mutex m_SocketMutex;
private: private:
SOCKET m_FD; /**< The socket descriptor. */ SOCKET m_FD; /**< The socket descriptor. */
bool m_Connected; bool m_Connected;
@ -79,7 +79,6 @@ private:
condition_variable m_WriteCV; condition_variable m_WriteCV;
mutable mutex m_Mutex;
boost::exception_ptr m_Exception; boost::exception_ptr m_Exception;
void ReadThreadProc(void); void ReadThreadProc(void);

View File

@ -113,22 +113,29 @@ void TcpClient::HandleWritable(void)
} }
for (;;) { for (;;) {
count = m_SendQueue->GetAvailableBytes(); {
mutex::scoped_lock lock(m_QueueMutex);
if (count == 0) count = m_SendQueue->GetAvailableBytes();
break;
if (count > sizeof(data)) if (count == 0)
count = sizeof(data); break;
m_SendQueue->Peek(data, count); if (count > sizeof(data))
count = sizeof(data);
m_SendQueue->Peek(data, count);
}
rc = send(GetFD(), (const char *)data, count, 0); rc = send(GetFD(), (const char *)data, count, 0);
if (rc <= 0) if (rc <= 0)
throw_exception(SocketException("send() failed", GetError())); throw_exception(SocketException("send() failed", GetError()));
m_SendQueue->Read(NULL, rc); {
mutex::scoped_lock lock(m_QueueMutex);
m_SendQueue->Read(NULL, rc);
}
} }
} }
@ -137,7 +144,7 @@ void TcpClient::HandleWritable(void)
*/ */
size_t TcpClient::GetAvailableBytes(void) const size_t TcpClient::GetAvailableBytes(void) const
{ {
mutex::scoped_lock lock(GetMutex()); mutex::scoped_lock lock(m_QueueMutex);
return m_RecvQueue->GetAvailableBytes(); return m_RecvQueue->GetAvailableBytes();
} }
@ -147,7 +154,7 @@ size_t TcpClient::GetAvailableBytes(void) const
*/ */
void TcpClient::Peek(void *buffer, size_t count) void TcpClient::Peek(void *buffer, size_t count)
{ {
mutex::scoped_lock lock(GetMutex()); mutex::scoped_lock lock(m_QueueMutex);
m_RecvQueue->Peek(buffer, count); m_RecvQueue->Peek(buffer, count);
} }
@ -157,7 +164,7 @@ void TcpClient::Peek(void *buffer, size_t count)
*/ */
void TcpClient::Read(void *buffer, size_t count) void TcpClient::Read(void *buffer, size_t count)
{ {
mutex::scoped_lock lock(GetMutex()); mutex::scoped_lock lock(m_QueueMutex);
m_RecvQueue->Read(buffer, count); m_RecvQueue->Read(buffer, count);
} }
@ -167,7 +174,7 @@ void TcpClient::Read(void *buffer, size_t count)
*/ */
void TcpClient::Write(const void *buffer, size_t count) void TcpClient::Write(const void *buffer, size_t count)
{ {
mutex::scoped_lock lock(GetMutex()); mutex::scoped_lock lock(m_QueueMutex);
m_SendQueue->Write(buffer, count); m_SendQueue->Write(buffer, count);
} }
@ -193,7 +200,11 @@ void TcpClient::HandleReadable(void)
if (rc <= 0) if (rc <= 0)
throw_exception(SocketException("recv() failed", GetError())); throw_exception(SocketException("recv() failed", GetError()));
m_RecvQueue->Write(data, rc); {
mutex::scoped_lock lock(m_QueueMutex);
m_RecvQueue->Write(data, rc);
}
} }
Event::Post(boost::bind(boost::ref(OnDataAvailable), GetSelf())); Event::Post(boost::bind(boost::ref(OnDataAvailable), GetSelf()));
@ -216,7 +227,14 @@ bool TcpClient::WantsToRead(void) const
*/ */
bool TcpClient::WantsToWrite(void) const bool TcpClient::WantsToWrite(void) const
{ {
return (m_SendQueue->GetAvailableBytes() > 0 || !IsConnected()); {
mutex::scoped_lock lock(m_QueueMutex);
if (m_SendQueue->GetAvailableBytes() > 0)
return true;
}
return (!IsConnected());
} }
/** /**

View File

@ -68,6 +68,7 @@ protected:
virtual void HandleReadable(void); virtual void HandleReadable(void);
virtual void HandleWritable(void); virtual void HandleWritable(void);
mutable mutex m_QueueMutex;
FIFO::Ptr m_SendQueue; FIFO::Ptr m_SendQueue;
FIFO::Ptr m_RecvQueue; FIFO::Ptr m_RecvQueue;

View File

@ -90,7 +90,7 @@ void TlsClient::NullCertificateDeleter(X509 *certificate)
*/ */
shared_ptr<X509> TlsClient::GetClientCertificate(void) const shared_ptr<X509> TlsClient::GetClientCertificate(void) const
{ {
mutex::scoped_lock lock(GetMutex()); mutex::scoped_lock lock(m_SocketMutex);
return shared_ptr<X509>(SSL_get_certificate(m_SSL.get()), &TlsClient::NullCertificateDeleter); return shared_ptr<X509>(SSL_get_certificate(m_SSL.get()), &TlsClient::NullCertificateDeleter);
} }
@ -102,7 +102,7 @@ shared_ptr<X509> TlsClient::GetClientCertificate(void) const
*/ */
shared_ptr<X509> TlsClient::GetPeerCertificate(void) const shared_ptr<X509> TlsClient::GetPeerCertificate(void) const
{ {
mutex::scoped_lock lock(GetMutex()); mutex::scoped_lock lock(m_SocketMutex);
return shared_ptr<X509>(SSL_get_peer_certificate(m_SSL.get()), X509_free); return shared_ptr<X509>(SSL_get_peer_certificate(m_SSL.get()), X509_free);
} }
@ -146,8 +146,11 @@ void TlsClient::HandleReadable(void)
} }
} }
if (IsConnected()) if (IsConnected()) {
mutex::scoped_lock lock(m_QueueMutex);
m_RecvQueue->Write(data, rc); m_RecvQueue->Write(data, rc);
}
} }
post_event: post_event:
@ -169,15 +172,19 @@ void TlsClient::HandleWritable(void)
int rc; int rc;
if (IsConnected()) { if (IsConnected()) {
count = m_SendQueue->GetAvailableBytes(); {
mutex::scoped_lock lock(m_QueueMutex);
if (count == 0) count = m_SendQueue->GetAvailableBytes();
break;
if (count > sizeof(data)) if (count == 0)
count = sizeof(data); break;
m_SendQueue->Peek(data, count); if (count > sizeof(data))
count = sizeof(data);
m_SendQueue->Peek(data, count);
}
rc = SSL_write(m_SSL.get(), (const char *)data, count); rc = SSL_write(m_SSL.get(), (const char *)data, count);
} else { } else {
@ -205,8 +212,11 @@ void TlsClient::HandleWritable(void)
} }
} }
if (IsConnected()) if (IsConnected()) {
mutex::scoped_lock lock(m_QueueMutex);
m_SendQueue->Read(NULL, rc); m_SendQueue->Read(NULL, rc);
}
} }
} }