From 9cd5298d8b2981d7a92be4a0f90b1ee5f9ddf89e Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Tue, 6 Mar 2018 08:49:43 +0100 Subject: [PATCH] Ensure that SetCorked() works properly --- lib/base/socketevents-poll.cpp | 10 ++++++++-- lib/base/tlsstream.cpp | 8 ++++++-- lib/remote/httpserverconnection.cpp | 6 ++++-- lib/remote/jsonrpcconnection.cpp | 8 ++++---- 4 files changed, 22 insertions(+), 10 deletions(-) diff --git a/lib/base/socketevents-poll.cpp b/lib/base/socketevents-poll.cpp index 5f3b103d4..19748434e 100644 --- a/lib/base/socketevents-poll.cpp +++ b/lib/base/socketevents-poll.cpp @@ -57,11 +57,17 @@ void SocketEventEnginePoll::ThreadProc(int tid) if (desc.second.Events == 0) continue; - if (desc.second.EventInterface) + int events = desc.second.Events; + + if (desc.second.EventInterface) { desc.second.EventInterface->m_EnginePrivate = &pfds[i]; + if (!desc.second.EventInterface->m_Events) + events = 0; + } + pfds[i].fd = desc.first; - pfds[i].events = desc.second.Events; + pfds[i].events = events; descriptors[i] = desc.second; i++; diff --git a/lib/base/tlsstream.cpp b/lib/base/tlsstream.cpp index e93f119d8..a05a3d7d5 100644 --- a/lib/base/tlsstream.cpp +++ b/lib/base/tlsstream.cpp @@ -173,6 +173,8 @@ void TlsStream::OnEvent(int revents) */ ERR_clear_error(); + size_t readTotal = 0; + switch (m_CurrentAction) { case TlsActionRead: do { @@ -181,8 +183,10 @@ void TlsStream::OnEvent(int revents) if (rc > 0) { m_RecvQ->Write(buffer, rc); success = true; + + readTotal += rc; } - } while (rc > 0); + } while (rc > 0 && readTotal < 64 * 1024); if (success) m_CV.notify_all(); @@ -264,7 +268,7 @@ void TlsStream::OnEvent(int revents) lock.unlock(); - while (m_RecvQ->IsDataAvailable() && IsHandlingEvents()) + while (!IsCorked() && m_RecvQ->IsDataAvailable() && IsHandlingEvents()) SignalDataAvailable(); } diff --git a/lib/remote/httpserverconnection.cpp b/lib/remote/httpserverconnection.cpp index 53f287a87..d409c78bf 100644 --- a/lib/remote/httpserverconnection.cpp +++ b/lib/remote/httpserverconnection.cpp @@ -173,8 +173,6 @@ bool HttpServerConnection::ProcessMessage() return res; } - m_Stream->SetCorked(true); - m_RequestQueue.Enqueue(std::bind(&HttpServerConnection::ProcessMessageAsync, HttpServerConnection::Ptr(this), m_CurrentRequest, response, m_AuthenticatedUser)); @@ -347,6 +345,8 @@ void HttpServerConnection::DataAvailableHandler() if (!m_Stream->IsEof()) { boost::mutex::scoped_lock lock(m_DataHandlerMutex); + m_Stream->SetCorked(true); + try { while (ProcessMessage()) ; /* empty loop body */ @@ -356,6 +356,8 @@ void HttpServerConnection::DataAvailableHandler() close = true; } + + m_RequestQueue.Enqueue(std::bind(&Stream::SetCorked, m_Stream, false)); } else close = true; diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp index bbb6fe180..259679edc 100644 --- a/lib/remote/jsonrpcconnection.cpp +++ b/lib/remote/jsonrpcconnection.cpp @@ -155,8 +155,6 @@ void JsonRpcConnection::MessageHandlerWrapper(const String& jsonString) try { MessageHandler(jsonString); - - m_Stream->SetCorked(false); } catch (const std::exception& ex) { Log(LogWarning, "JsonRpcConnection") << "Error while reading JSON-RPC message for identity '" << m_Identity @@ -262,8 +260,6 @@ bool JsonRpcConnection::ProcessMessage() if (srs != StatusNewItem) return false; - m_Stream->SetCorked(true); - l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(std::bind(&JsonRpcConnection::MessageHandlerWrapper, JsonRpcConnection::Ptr(this), message)); return true; @@ -279,6 +275,8 @@ void JsonRpcConnection::DataAvailableHandler() if (!m_Stream->IsEof()) { boost::mutex::scoped_lock lock(m_DataHandlerMutex); + m_Stream->SetCorked(true); + try { while (ProcessMessage()) ; /* empty loop body */ @@ -291,6 +289,8 @@ void JsonRpcConnection::DataAvailableHandler() return; } + + l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(std::bind(&Stream::SetCorked, m_Stream, false)); } else close = true;