mirror of
https://github.com/Icinga/icinga2.git
synced 2025-04-08 17:05:25 +02:00
Merge pull request #7076 from Icinga/bugfix/eventqueue-leak
/v1/events: terminate on disconnect
This commit is contained in:
commit
b1042c3689
@ -20,7 +20,7 @@ bool ActionsHandler::HandleRequest(
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
const Dictionary::Ptr& params,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
)
|
||||
{
|
||||
namespace http = boost::beast::http;
|
||||
|
@ -21,7 +21,7 @@ public:
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
const Dictionary::Ptr& params,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
) override;
|
||||
};
|
||||
|
||||
|
@ -20,7 +20,7 @@ bool ConfigFilesHandler::HandleRequest(
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
const Dictionary::Ptr& params,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
)
|
||||
{
|
||||
namespace http = boost::beast::http;
|
||||
|
@ -21,7 +21,7 @@ public:
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
const Dictionary::Ptr& params,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
) override;
|
||||
};
|
||||
|
||||
|
@ -18,7 +18,7 @@ bool ConfigPackagesHandler::HandleRequest(
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
const Dictionary::Ptr& params,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
)
|
||||
{
|
||||
namespace http = boost::beast::http;
|
||||
|
@ -21,7 +21,7 @@ public:
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
const Dictionary::Ptr& params,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
) override;
|
||||
|
||||
private:
|
||||
|
@ -19,7 +19,7 @@ bool ConfigStagesHandler::HandleRequest(
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
const Dictionary::Ptr& params,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
)
|
||||
{
|
||||
namespace http = boost::beast::http;
|
||||
|
@ -21,7 +21,7 @@ public:
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
const Dictionary::Ptr& params,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
) override;
|
||||
|
||||
private:
|
||||
|
@ -61,7 +61,7 @@ bool ConsoleHandler::HandleRequest(
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
const Dictionary::Ptr& params,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
)
|
||||
{
|
||||
namespace http = boost::beast::http;
|
||||
|
@ -30,7 +30,7 @@ public:
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
const Dictionary::Ptr& params,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
) override;
|
||||
|
||||
static std::vector<String> GetAutocompletionSuggestions(const String& word, ScriptFrame& frame);
|
||||
|
@ -22,7 +22,7 @@ bool CreateObjectHandler::HandleRequest(
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
const Dictionary::Ptr& params,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
)
|
||||
{
|
||||
namespace http = boost::beast::http;
|
||||
|
@ -21,7 +21,7 @@ public:
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
const Dictionary::Ptr& params,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
) override;
|
||||
};
|
||||
|
||||
|
@ -22,7 +22,7 @@ bool DeleteObjectHandler::HandleRequest(
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
const Dictionary::Ptr& params,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
)
|
||||
{
|
||||
namespace http = boost::beast::http;
|
||||
|
@ -21,7 +21,7 @@ public:
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
const Dictionary::Ptr& params,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
) override;
|
||||
};
|
||||
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include "base/io-engine.hpp"
|
||||
#include "base/singleton.hpp"
|
||||
#include "base/logger.hpp"
|
||||
#include "base/utility.hpp"
|
||||
#include <boost/asio/spawn.hpp>
|
||||
|
||||
using namespace icinga;
|
||||
@ -102,19 +103,29 @@ Dictionary::Ptr EventQueue::WaitForEvent(void *client, double timeout)
|
||||
}
|
||||
}
|
||||
|
||||
Dictionary::Ptr EventQueue::WaitForEvent(void *client, boost::asio::yield_context yc)
|
||||
Dictionary::Ptr EventQueue::WaitForEvent(void *client, boost::asio::yield_context yc, double timeout)
|
||||
{
|
||||
double deadline = -1.0;
|
||||
|
||||
for (;;) {
|
||||
{
|
||||
boost::mutex::scoped_lock lock(m_Mutex);
|
||||
boost::mutex::scoped_try_lock lock(m_Mutex);
|
||||
|
||||
auto it = m_Events.find(client);
|
||||
ASSERT(it != m_Events.end());
|
||||
if (lock.owns_lock()) {
|
||||
auto it = m_Events.find(client);
|
||||
ASSERT(it != m_Events.end());
|
||||
|
||||
if (!it->second.empty()) {
|
||||
Dictionary::Ptr result = *it->second.begin();
|
||||
it->second.pop_front();
|
||||
return result;
|
||||
if (it->second.empty()) {
|
||||
if (deadline == -1.0) {
|
||||
deadline = Utility::GetTime() + timeout;
|
||||
} else if (Utility::GetTime() >= deadline) {
|
||||
return nullptr;
|
||||
}
|
||||
} else {
|
||||
Dictionary::Ptr result = *it->second.begin();
|
||||
it->second.pop_front();
|
||||
return result;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -32,7 +32,7 @@ public:
|
||||
void SetFilter(std::unique_ptr<Expression> filter);
|
||||
|
||||
Dictionary::Ptr WaitForEvent(void *client, double timeout = 5);
|
||||
Dictionary::Ptr WaitForEvent(void *client, boost::asio::yield_context yc);
|
||||
Dictionary::Ptr WaitForEvent(void *client, boost::asio::yield_context yc, double timeout = 5);
|
||||
|
||||
static std::vector<EventQueue::Ptr> GetQueuesForType(const String& type);
|
||||
static void UnregisterIfUnused(const String& name, const EventQueue::Ptr& queue);
|
||||
|
@ -25,7 +25,7 @@ bool EventsHandler::HandleRequest(
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
const Dictionary::Ptr& params,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
)
|
||||
{
|
||||
namespace asio = boost::asio;
|
||||
@ -88,7 +88,7 @@ bool EventsHandler::HandleRequest(
|
||||
EventQueue::UnregisterIfUnused(queueName, queue);
|
||||
});
|
||||
|
||||
hasStartedStreaming = true;
|
||||
server.StartStreaming();
|
||||
|
||||
response.result(http::status::ok);
|
||||
response.set(http::field::content_type, "application/json");
|
||||
@ -101,19 +101,28 @@ bool EventsHandler::HandleRequest(
|
||||
}
|
||||
|
||||
asio::const_buffer newLine ("\n", 1);
|
||||
AsioConditionVariable dontLockOwnStrand (stream.get_io_service(), true);
|
||||
|
||||
for (;;) {
|
||||
String body = JsonEncode(queue->WaitForEvent(&request, yc));
|
||||
auto event (queue->WaitForEvent(&request, yc));
|
||||
|
||||
boost::algorithm::replace_all(body, "\n", "");
|
||||
if (event) {
|
||||
String body = JsonEncode(event);
|
||||
|
||||
asio::const_buffer payload (body.CStr(), body.GetLength());
|
||||
boost::algorithm::replace_all(body, "\n", "");
|
||||
|
||||
IoBoundWorkSlot dontLockTheIoThreadWhileWriting (yc);
|
||||
asio::const_buffer payload (body.CStr(), body.GetLength());
|
||||
|
||||
asio::async_write(stream, payload, yc);
|
||||
asio::async_write(stream, newLine, yc);
|
||||
stream.async_flush(yc);
|
||||
IoBoundWorkSlot dontLockTheIoThreadWhileWriting (yc);
|
||||
|
||||
asio::async_write(stream, payload, yc);
|
||||
asio::async_write(stream, newLine, yc);
|
||||
stream.async_flush(yc);
|
||||
} else if (server.Disconnected()) {
|
||||
return true;
|
||||
} else {
|
||||
dontLockOwnStrand.Wait(yc);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -22,7 +22,7 @@ public:
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
const Dictionary::Ptr& params,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
) override;
|
||||
};
|
||||
|
||||
|
@ -51,7 +51,7 @@ void HttpHandler::ProcessRequest(
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
)
|
||||
{
|
||||
Dictionary::Ptr node = m_UrlTree;
|
||||
@ -99,7 +99,7 @@ void HttpHandler::ProcessRequest(
|
||||
|
||||
bool processed = false;
|
||||
for (const HttpHandler::Ptr& handler : handlers) {
|
||||
if (handler->HandleRequest(stream, user, request, url, response, params, yc, hasStartedStreaming)) {
|
||||
if (handler->HandleRequest(stream, user, request, url, response, params, yc, server)) {
|
||||
processed = true;
|
||||
break;
|
||||
}
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include "remote/i2-remote.hpp"
|
||||
#include "remote/url.hpp"
|
||||
#include "remote/httpresponse.hpp"
|
||||
#include "remote/httpserverconnection.hpp"
|
||||
#include "remote/apiuser.hpp"
|
||||
#include "base/registry.hpp"
|
||||
#include "base/tlsstream.hpp"
|
||||
@ -34,7 +35,7 @@ public:
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
const Dictionary::Ptr& params,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
) = 0;
|
||||
|
||||
static void Register(const Url::Ptr& url, const HttpHandler::Ptr& handler);
|
||||
@ -44,7 +45,7 @@ public:
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
);
|
||||
|
||||
private:
|
||||
|
@ -31,7 +31,7 @@ using namespace icinga;
|
||||
auto const l_ServerHeader ("Icinga/" + Application::GetAppVersion());
|
||||
|
||||
HttpServerConnection::HttpServerConnection(const String& identity, bool authenticated, const std::shared_ptr<AsioTlsStream>& stream)
|
||||
: m_Stream(stream), m_Seen(Utility::GetTime()), m_IoStrand(stream->get_io_service()), m_ShuttingDown(false)
|
||||
: m_Stream(stream), m_Seen(Utility::GetTime()), m_IoStrand(stream->get_io_service()), m_ShuttingDown(false), m_HasStartedStreaming(false)
|
||||
{
|
||||
if (authenticated) {
|
||||
m_ApiUser = ApiUser::GetByClientCN(identity);
|
||||
@ -91,6 +91,34 @@ void HttpServerConnection::Disconnect()
|
||||
});
|
||||
}
|
||||
|
||||
void HttpServerConnection::StartStreaming()
|
||||
{
|
||||
namespace asio = boost::asio;
|
||||
|
||||
m_HasStartedStreaming = true;
|
||||
|
||||
HttpServerConnection::Ptr keepAlive (this);
|
||||
|
||||
asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
|
||||
if (!m_ShuttingDown) {
|
||||
char buf[128];
|
||||
asio::mutable_buffer readBuf (buf, 128);
|
||||
boost::system::error_code ec;
|
||||
|
||||
do {
|
||||
m_Stream->async_read_some(readBuf, yc[ec]);
|
||||
} while (!ec);
|
||||
|
||||
Disconnect();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
bool HttpServerConnection::Disconnected()
|
||||
{
|
||||
return m_ShuttingDown;
|
||||
}
|
||||
|
||||
static inline
|
||||
bool EnsureValidHeaders(
|
||||
AsioTlsStream& stream,
|
||||
@ -375,17 +403,17 @@ bool ProcessRequest(
|
||||
boost::beast::http::request<boost::beast::http::string_body>& request,
|
||||
ApiUser::Ptr& authenticatedUser,
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
HttpServerConnection& server,
|
||||
bool& hasStartedStreaming,
|
||||
boost::asio::yield_context& yc
|
||||
)
|
||||
{
|
||||
namespace http = boost::beast::http;
|
||||
|
||||
bool hasStartedStreaming = false;
|
||||
|
||||
try {
|
||||
CpuBoundWork handlingRequest (yc);
|
||||
|
||||
HttpHandler::ProcessRequest(stream, authenticatedUser, request, response, yc, hasStartedStreaming);
|
||||
HttpHandler::ProcessRequest(stream, authenticatedUser, request, response, yc, server);
|
||||
} catch (const std::exception& ex) {
|
||||
if (hasStartedStreaming) {
|
||||
return false;
|
||||
@ -481,7 +509,7 @@ void HttpServerConnection::ProcessMessages(boost::asio::yield_context yc)
|
||||
|
||||
m_Seen = std::numeric_limits<decltype(m_Seen)>::max();
|
||||
|
||||
if (!ProcessRequest(*m_Stream, request, authenticatedUser, response, yc)) {
|
||||
if (!ProcessRequest(*m_Stream, request, authenticatedUser, response, *this, m_HasStartedStreaming, yc)) {
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -27,6 +27,9 @@ public:
|
||||
|
||||
void Start();
|
||||
void Disconnect();
|
||||
void StartStreaming();
|
||||
|
||||
bool Disconnected();
|
||||
|
||||
private:
|
||||
ApiUser::Ptr m_ApiUser;
|
||||
@ -35,6 +38,7 @@ private:
|
||||
String m_PeerAddress;
|
||||
boost::asio::io_service::strand m_IoStrand;
|
||||
bool m_ShuttingDown;
|
||||
bool m_HasStartedStreaming;
|
||||
|
||||
void ProcessMessages(boost::asio::yield_context yc);
|
||||
void CheckLiveness(boost::asio::yield_context yc);
|
||||
|
@ -16,7 +16,7 @@ bool InfoHandler::HandleRequest(
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
const Dictionary::Ptr& params,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
)
|
||||
{
|
||||
namespace http = boost::beast::http;
|
||||
|
@ -21,7 +21,7 @@ public:
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
const Dictionary::Ptr& params,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
) override;
|
||||
};
|
||||
|
||||
|
@ -20,7 +20,7 @@ bool ModifyObjectHandler::HandleRequest(
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
const Dictionary::Ptr& params,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
)
|
||||
{
|
||||
namespace http = boost::beast::http;
|
||||
|
@ -21,7 +21,7 @@ public:
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
const Dictionary::Ptr& params,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
) override;
|
||||
};
|
||||
|
||||
|
@ -95,7 +95,7 @@ bool ObjectQueryHandler::HandleRequest(
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
const Dictionary::Ptr& params,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
)
|
||||
{
|
||||
namespace http = boost::beast::http;
|
||||
|
@ -21,7 +21,7 @@ public:
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
const Dictionary::Ptr& params,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
) override;
|
||||
|
||||
private:
|
||||
|
@ -76,7 +76,7 @@ bool StatusHandler::HandleRequest(
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
const Dictionary::Ptr& params,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
)
|
||||
{
|
||||
namespace http = boost::beast::http;
|
||||
|
@ -21,7 +21,7 @@ public:
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
const Dictionary::Ptr& params,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
) override;
|
||||
};
|
||||
|
||||
|
@ -83,7 +83,7 @@ bool TemplateQueryHandler::HandleRequest(
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
const Dictionary::Ptr& params,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
)
|
||||
{
|
||||
namespace http = boost::beast::http;
|
||||
|
@ -21,7 +21,7 @@ public:
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
const Dictionary::Ptr& params,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
) override;
|
||||
};
|
||||
|
||||
|
@ -54,7 +54,7 @@ bool TypeQueryHandler::HandleRequest(
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
const Dictionary::Ptr& params,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
)
|
||||
{
|
||||
namespace http = boost::beast::http;
|
||||
|
@ -21,7 +21,7 @@ public:
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
const Dictionary::Ptr& params,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
) override;
|
||||
};
|
||||
|
||||
|
@ -64,7 +64,7 @@ bool VariableQueryHandler::HandleRequest(
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
const Dictionary::Ptr& params,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
)
|
||||
{
|
||||
namespace http = boost::beast::http;
|
||||
|
@ -21,7 +21,7 @@ public:
|
||||
boost::beast::http::response<boost::beast::http::string_body>& response,
|
||||
const Dictionary::Ptr& params,
|
||||
boost::asio::yield_context& yc,
|
||||
bool& hasStartedStreaming
|
||||
HttpServerConnection& server
|
||||
) override;
|
||||
};
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user