Merge pull request #6133 from Icinga/fix/cork-socket

Limit the number of HTTP/JSON-RPC requests we read in parallel
This commit is contained in:
Gunnar Beutner 2018-02-28 12:48:06 +01:00 committed by GitHub
commit a3bf8cd26e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 43 additions and 3 deletions

View File

@ -91,6 +91,16 @@ bool Stream::WaitForData(int timeout)
return IsDataAvailable() || IsEof();
}
void Stream::SetCorked(bool corked)
{
m_Corked = corked;
}
bool Stream::IsCorked() const
{
return m_Corked;
}
static void StreamDummyCallback()
{ }

View File

@ -127,6 +127,9 @@ public:
bool WaitForData();
bool WaitForData(int timeout);
virtual void SetCorked(bool corked);
bool IsCorked() const;
virtual bool SupportsWaiting() const;
virtual bool IsDataAvailable() const;
@ -143,6 +146,8 @@ private:
boost::mutex m_Mutex;
boost::condition_variable m_CV;
bool m_Corked{false};
};
}

View File

@ -151,12 +151,17 @@ void TlsStream::OnEvent(int revents)
char buffer[64 * 1024];
if (m_CurrentAction == TlsActionNone) {
if (revents & (POLLIN | POLLERR | POLLHUP))
bool corked = IsCorked();
if (!corked && (revents & (POLLIN | POLLERR | POLLHUP)))
m_CurrentAction = TlsActionRead;
else if (m_SendQ->GetAvailableBytes() > 0 && (revents & POLLOUT))
m_CurrentAction = TlsActionWrite;
else {
ChangeEvents(POLLIN);
if (corked)
ChangeEvents(0);
else
ChangeEvents(POLLIN);
return;
}
}
@ -399,6 +404,18 @@ bool TlsStream::IsDataAvailable() const
return m_RecvQ->GetAvailableBytes() > 0;
}
void TlsStream::SetCorked(bool corked)
{
Stream::SetCorked(corked);
boost::mutex::scoped_lock lock(m_Mutex);
if (corked)
m_CurrentAction = TlsActionNone;
else
ChangeEvents(POLLIN | POLLOUT);
}
Socket::Ptr TlsStream::GetSocket() const
{
return m_Socket;

View File

@ -70,6 +70,8 @@ public:
bool SupportsWaiting() const override;
bool IsDataAvailable() const override;
void SetCorked(bool corked) override;
bool IsVerifyOK() const;
String GetVerifyError() const;

View File

@ -96,7 +96,6 @@ void HttpServerConnection::Disconnect()
bool HttpServerConnection::ProcessMessage()
{
bool res;
HttpResponse response(m_Stream, m_CurrentRequest);
@ -174,6 +173,8 @@ bool HttpServerConnection::ProcessMessage()
return res;
}
m_Stream->SetCorked(true);
m_RequestQueue.Enqueue(std::bind(&HttpServerConnection::ProcessMessageAsync,
HttpServerConnection::Ptr(this), m_CurrentRequest, response, m_AuthenticatedUser));
@ -328,6 +329,7 @@ void HttpServerConnection::ProcessMessageAsync(HttpRequest& request, HttpRespons
response.Finish();
m_PendingRequests--;
m_Stream->SetCorked(false);
}
void HttpServerConnection::DataAvailableHandler()

View File

@ -155,6 +155,8 @@ void JsonRpcConnection::MessageHandlerWrapper(const String& jsonString)
try {
MessageHandler(jsonString);
m_Stream->SetCorked(false);
} catch (const std::exception& ex) {
Log(LogWarning, "JsonRpcConnection")
<< "Error while reading JSON-RPC message for identity '" << m_Identity
@ -255,6 +257,8 @@ bool JsonRpcConnection::ProcessMessage()
if (srs != StatusNewItem)
return false;
m_Stream->SetCorked(true);
l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(std::bind(&JsonRpcConnection::MessageHandlerWrapper, JsonRpcConnection::Ptr(this), message));
return true;