Ensure that SetCorked() works properly

This commit is contained in:
Gunnar Beutner 2018-03-06 08:49:43 +01:00
parent 0b9869917f
commit 6670024f62
4 changed files with 23 additions and 11 deletions

View File

@ -57,11 +57,17 @@ void SocketEventEnginePoll::ThreadProc(int tid)
if (desc.second.Events == 0) if (desc.second.Events == 0)
continue; continue;
if (desc.second.EventInterface) int events = desc.second.Events;
if (desc.second.EventInterface) {
desc.second.EventInterface->m_EnginePrivate = &pfds[i]; desc.second.EventInterface->m_EnginePrivate = &pfds[i];
if (!desc.second.EventInterface->m_Events)
events = 0;
}
pfds[i].fd = desc.first; pfds[i].fd = desc.first;
pfds[i].events = desc.second.Events; pfds[i].events = events;
descriptors[i] = desc.second; descriptors[i] = desc.second;
i++; i++;

View File

@ -174,6 +174,8 @@ void TlsStream::OnEvent(int revents)
*/ */
ERR_clear_error(); ERR_clear_error();
size_t readTotal = 0;
switch (m_CurrentAction) { switch (m_CurrentAction) {
case TlsActionRead: case TlsActionRead:
do { do {
@ -182,8 +184,10 @@ void TlsStream::OnEvent(int revents)
if (rc > 0) { if (rc > 0) {
m_RecvQ->Write(buffer, rc); m_RecvQ->Write(buffer, rc);
success = true; success = true;
readTotal += rc;
} }
} while (rc > 0); } while (rc > 0 && readTotal < 64 * 1024);
if (success) if (success)
m_CV.notify_all(); m_CV.notify_all();
@ -265,7 +269,7 @@ void TlsStream::OnEvent(int revents)
lock.unlock(); lock.unlock();
while (m_RecvQ->IsDataAvailable() && IsHandlingEvents()) while (!IsCorked() && m_RecvQ->IsDataAvailable() && IsHandlingEvents())
SignalDataAvailable(); SignalDataAvailable();
} }

View File

@ -174,9 +174,7 @@ bool HttpServerConnection::ProcessMessage(void)
return res; return res;
} }
m_Stream->SetCorked(true); m_RequestQueue.Enqueue(boost::bind(&HttpServerConnection::ProcessMessageAsync,
m_RequestQueue.Enqueue(std::bind(&HttpServerConnection::ProcessMessageAsync,
HttpServerConnection::Ptr(this), m_CurrentRequest, response, m_AuthenticatedUser)); HttpServerConnection::Ptr(this), m_CurrentRequest, response, m_AuthenticatedUser));
m_Seen = Utility::GetTime(); m_Seen = Utility::GetTime();
@ -348,6 +346,8 @@ void HttpServerConnection::DataAvailableHandler(void)
if (!m_Stream->IsEof()) { if (!m_Stream->IsEof()) {
boost::mutex::scoped_lock lock(m_DataHandlerMutex); boost::mutex::scoped_lock lock(m_DataHandlerMutex);
m_Stream->SetCorked(true);
try { try {
while (ProcessMessage()) while (ProcessMessage())
; /* empty loop body */ ; /* empty loop body */
@ -357,6 +357,8 @@ void HttpServerConnection::DataAvailableHandler(void)
close = true; close = true;
} }
m_RequestQueue.Enqueue(boost::bind(&Stream::SetCorked, m_Stream, false));
} else } else
close = true; close = true;

View File

@ -144,8 +144,6 @@ void JsonRpcConnection::MessageHandlerWrapper(const String& jsonString)
try { try {
MessageHandler(jsonString); MessageHandler(jsonString);
m_Stream->SetCorked(false);
} catch (const std::exception& ex) { } catch (const std::exception& ex) {
Log(LogWarning, "JsonRpcConnection") Log(LogWarning, "JsonRpcConnection")
<< "Error while reading JSON-RPC message for identity '" << m_Identity << "Error while reading JSON-RPC message for identity '" << m_Identity
@ -249,8 +247,6 @@ bool JsonRpcConnection::ProcessMessage(void)
if (srs != StatusNewItem) if (srs != StatusNewItem)
return false; return false;
m_Stream->SetCorked(true);
l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(boost::bind(&JsonRpcConnection::MessageHandlerWrapper, JsonRpcConnection::Ptr(this), message)); l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(boost::bind(&JsonRpcConnection::MessageHandlerWrapper, JsonRpcConnection::Ptr(this), message));
return true; return true;
@ -266,6 +262,8 @@ void JsonRpcConnection::DataAvailableHandler(void)
if (!m_Stream->IsEof()) { if (!m_Stream->IsEof()) {
boost::mutex::scoped_lock lock(m_DataHandlerMutex); boost::mutex::scoped_lock lock(m_DataHandlerMutex);
m_Stream->SetCorked(true);
try { try {
while (ProcessMessage()) while (ProcessMessage())
; /* empty loop body */ ; /* empty loop body */
@ -278,6 +276,8 @@ void JsonRpcConnection::DataAvailableHandler(void)
return; return;
} }
l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(boost::bind(&Stream::SetCorked, m_Stream, false));
} else } else
close = true; close = true;