Ensure that API/JSON-RPC messages in the same session are processed and not stalled

This basically drops the "corked" implementation which just stalled the
TLS IO polling after some requests. If you need sort of rate limiting
for these events, use an external TLS proxy which terminates that in front
of Icinga.

fixes #6635
This commit is contained in:
Michael Friedrich 2018-10-29 12:57:24 +01:00
parent 13e54d960d
commit 5406ce6540
6 changed files with 3 additions and 45 deletions

View File

@ -91,16 +91,6 @@ bool Stream::WaitForData(int timeout)
return IsDataAvailable() || IsEof();
}
void Stream::SetCorked(bool corked)
{
m_Corked = corked;
}
bool Stream::IsCorked() const
{
return m_Corked;
}
static void StreamDummyCallback()
{ }

View File

@ -127,9 +127,6 @@ public:
bool WaitForData();
bool WaitForData(int timeout);
virtual void SetCorked(bool corked);
bool IsCorked() const;
virtual bool SupportsWaiting() const;
virtual bool IsDataAvailable() const;
@ -146,8 +143,6 @@ private:
boost::mutex m_Mutex;
boost::condition_variable m_CV;
bool m_Corked{false};
};
}

View File

@ -153,16 +153,12 @@ void TlsStream::OnEvent(int revents)
char buffer[64 * 1024];
if (m_CurrentAction == TlsActionNone) {
bool corked = IsCorked();
if (!corked && (revents & (POLLIN | POLLERR | POLLHUP)))
if (revents & (POLLIN | POLLERR | POLLHUP))
m_CurrentAction = TlsActionRead;
else if (m_SendQ->GetAvailableBytes() > 0 && (revents & POLLOUT))
m_CurrentAction = TlsActionWrite;
else {
if (corked)
ChangeEvents(0);
else
ChangeEvents(POLLIN);
ChangeEvents(POLLIN);
return;
}
@ -289,7 +285,7 @@ void TlsStream::OnEvent(int revents)
lock.unlock();
while (!IsCorked() && m_RecvQ->IsDataAvailable() && IsHandlingEvents())
while (m_RecvQ->IsDataAvailable() && IsHandlingEvents())
SignalDataAvailable();
}
@ -441,18 +437,6 @@ bool TlsStream::IsDataAvailable() const
return m_RecvQ->GetAvailableBytes() > 0;
}
void TlsStream::SetCorked(bool corked)
{
Stream::SetCorked(corked);
boost::mutex::scoped_lock lock(m_Mutex);
if (corked)
m_CurrentAction = TlsActionNone;
else
ChangeEvents(POLLIN | POLLOUT);
}
Socket::Ptr TlsStream::GetSocket() const
{
return m_Socket;

View File

@ -70,8 +70,6 @@ public:
bool SupportsWaiting() const override;
bool IsDataAvailable() const override;
void SetCorked(bool corked) override;
bool IsVerifyOK() const;
String GetVerifyError() const;

View File

@ -344,7 +344,6 @@ void HttpServerConnection::ProcessMessageAsync(HttpRequest& request, HttpRespons
response.Finish();
m_PendingRequests--;
m_Stream->SetCorked(false);
}
void HttpServerConnection::DataAvailableHandler()
@ -354,8 +353,6 @@ void HttpServerConnection::DataAvailableHandler()
if (!m_Stream->IsEof()) {
boost::recursive_mutex::scoped_lock lock(m_DataHandlerMutex);
m_Stream->SetCorked(true);
try {
while (ProcessMessage())
; /* empty loop body */
@ -366,8 +363,6 @@ void HttpServerConnection::DataAvailableHandler()
close = true;
}
m_RequestQueue.Enqueue(std::bind(&Stream::SetCorked, m_Stream, false));
/* Request finished, decide whether to explicitly close the connection. */
if (m_CurrentRequest.ProtocolVersion == HttpVersion10 ||
m_CurrentRequest.Headers->Get("connection") == "close") {

View File

@ -276,8 +276,6 @@ void JsonRpcConnection::DataAvailableHandler()
if (!m_Stream->IsEof()) {
boost::mutex::scoped_lock lock(m_DataHandlerMutex);
m_Stream->SetCorked(true);
try {
while (ProcessMessage())
; /* empty loop body */
@ -290,8 +288,6 @@ void JsonRpcConnection::DataAvailableHandler()
return;
}
l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(std::bind(&Stream::SetCorked, m_Stream, false));
} else
close = true;