mirror of https://github.com/Icinga/icinga2.git
Merge pull request #6738 from Icinga/bugfix/stalled-api-connections
Ensure that API/JSON-RPC messages in the same session are processed and not stalled
This commit is contained in:
commit
10fe38f815
|
@ -91,16 +91,6 @@ bool Stream::WaitForData(int timeout)
|
||||||
return IsDataAvailable() || IsEof();
|
return IsDataAvailable() || IsEof();
|
||||||
}
|
}
|
||||||
|
|
||||||
void Stream::SetCorked(bool corked)
|
|
||||||
{
|
|
||||||
m_Corked = corked;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool Stream::IsCorked() const
|
|
||||||
{
|
|
||||||
return m_Corked;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void StreamDummyCallback()
|
static void StreamDummyCallback()
|
||||||
{ }
|
{ }
|
||||||
|
|
||||||
|
|
|
@ -127,9 +127,6 @@ public:
|
||||||
bool WaitForData();
|
bool WaitForData();
|
||||||
bool WaitForData(int timeout);
|
bool WaitForData(int timeout);
|
||||||
|
|
||||||
virtual void SetCorked(bool corked);
|
|
||||||
bool IsCorked() const;
|
|
||||||
|
|
||||||
virtual bool SupportsWaiting() const;
|
virtual bool SupportsWaiting() const;
|
||||||
|
|
||||||
virtual bool IsDataAvailable() const;
|
virtual bool IsDataAvailable() const;
|
||||||
|
@ -146,8 +143,6 @@ private:
|
||||||
|
|
||||||
boost::mutex m_Mutex;
|
boost::mutex m_Mutex;
|
||||||
boost::condition_variable m_CV;
|
boost::condition_variable m_CV;
|
||||||
|
|
||||||
bool m_Corked{false};
|
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -153,16 +153,12 @@ void TlsStream::OnEvent(int revents)
|
||||||
char buffer[64 * 1024];
|
char buffer[64 * 1024];
|
||||||
|
|
||||||
if (m_CurrentAction == TlsActionNone) {
|
if (m_CurrentAction == TlsActionNone) {
|
||||||
bool corked = IsCorked();
|
if (revents & (POLLIN | POLLERR | POLLHUP))
|
||||||
if (!corked && (revents & (POLLIN | POLLERR | POLLHUP)))
|
|
||||||
m_CurrentAction = TlsActionRead;
|
m_CurrentAction = TlsActionRead;
|
||||||
else if (m_SendQ->GetAvailableBytes() > 0 && (revents & POLLOUT))
|
else if (m_SendQ->GetAvailableBytes() > 0 && (revents & POLLOUT))
|
||||||
m_CurrentAction = TlsActionWrite;
|
m_CurrentAction = TlsActionWrite;
|
||||||
else {
|
else {
|
||||||
if (corked)
|
ChangeEvents(POLLIN);
|
||||||
ChangeEvents(0);
|
|
||||||
else
|
|
||||||
ChangeEvents(POLLIN);
|
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -289,7 +285,7 @@ void TlsStream::OnEvent(int revents)
|
||||||
|
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
|
|
||||||
while (!IsCorked() && m_RecvQ->IsDataAvailable() && IsHandlingEvents())
|
while (m_RecvQ->IsDataAvailable() && IsHandlingEvents())
|
||||||
SignalDataAvailable();
|
SignalDataAvailable();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -441,18 +437,6 @@ bool TlsStream::IsDataAvailable() const
|
||||||
return m_RecvQ->GetAvailableBytes() > 0;
|
return m_RecvQ->GetAvailableBytes() > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void TlsStream::SetCorked(bool corked)
|
|
||||||
{
|
|
||||||
Stream::SetCorked(corked);
|
|
||||||
|
|
||||||
boost::mutex::scoped_lock lock(m_Mutex);
|
|
||||||
|
|
||||||
if (corked)
|
|
||||||
m_CurrentAction = TlsActionNone;
|
|
||||||
else
|
|
||||||
ChangeEvents(POLLIN | POLLOUT);
|
|
||||||
}
|
|
||||||
|
|
||||||
Socket::Ptr TlsStream::GetSocket() const
|
Socket::Ptr TlsStream::GetSocket() const
|
||||||
{
|
{
|
||||||
return m_Socket;
|
return m_Socket;
|
||||||
|
|
|
@ -70,8 +70,6 @@ public:
|
||||||
bool SupportsWaiting() const override;
|
bool SupportsWaiting() const override;
|
||||||
bool IsDataAvailable() const override;
|
bool IsDataAvailable() const override;
|
||||||
|
|
||||||
void SetCorked(bool corked) override;
|
|
||||||
|
|
||||||
bool IsVerifyOK() const;
|
bool IsVerifyOK() const;
|
||||||
String GetVerifyError() const;
|
String GetVerifyError() const;
|
||||||
|
|
||||||
|
|
|
@ -344,7 +344,6 @@ void HttpServerConnection::ProcessMessageAsync(HttpRequest& request, HttpRespons
|
||||||
|
|
||||||
response.Finish();
|
response.Finish();
|
||||||
m_PendingRequests--;
|
m_PendingRequests--;
|
||||||
m_Stream->SetCorked(false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void HttpServerConnection::DataAvailableHandler()
|
void HttpServerConnection::DataAvailableHandler()
|
||||||
|
@ -354,8 +353,6 @@ void HttpServerConnection::DataAvailableHandler()
|
||||||
if (!m_Stream->IsEof()) {
|
if (!m_Stream->IsEof()) {
|
||||||
boost::recursive_mutex::scoped_lock lock(m_DataHandlerMutex);
|
boost::recursive_mutex::scoped_lock lock(m_DataHandlerMutex);
|
||||||
|
|
||||||
m_Stream->SetCorked(true);
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
while (ProcessMessage())
|
while (ProcessMessage())
|
||||||
; /* empty loop body */
|
; /* empty loop body */
|
||||||
|
@ -366,8 +363,6 @@ void HttpServerConnection::DataAvailableHandler()
|
||||||
close = true;
|
close = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
m_RequestQueue.Enqueue(std::bind(&Stream::SetCorked, m_Stream, false));
|
|
||||||
|
|
||||||
/* Request finished, decide whether to explicitly close the connection. */
|
/* Request finished, decide whether to explicitly close the connection. */
|
||||||
if (m_CurrentRequest.ProtocolVersion == HttpVersion10 ||
|
if (m_CurrentRequest.ProtocolVersion == HttpVersion10 ||
|
||||||
m_CurrentRequest.Headers->Get("connection") == "close") {
|
m_CurrentRequest.Headers->Get("connection") == "close") {
|
||||||
|
|
|
@ -276,8 +276,6 @@ void JsonRpcConnection::DataAvailableHandler()
|
||||||
if (!m_Stream->IsEof()) {
|
if (!m_Stream->IsEof()) {
|
||||||
boost::mutex::scoped_lock lock(m_DataHandlerMutex);
|
boost::mutex::scoped_lock lock(m_DataHandlerMutex);
|
||||||
|
|
||||||
m_Stream->SetCorked(true);
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
while (ProcessMessage())
|
while (ProcessMessage())
|
||||||
; /* empty loop body */
|
; /* empty loop body */
|
||||||
|
@ -290,8 +288,6 @@ void JsonRpcConnection::DataAvailableHandler()
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(std::bind(&Stream::SetCorked, m_Stream, false));
|
|
||||||
} else
|
} else
|
||||||
close = true;
|
close = true;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue