diff --git a/lib/remote/CMakeLists.txt b/lib/remote/CMakeLists.txt index 2271abff6..d8d3298c5 100644 --- a/lib/remote/CMakeLists.txt +++ b/lib/remote/CMakeLists.txt @@ -27,6 +27,7 @@ set(remote_SOURCES eventshandler.cpp eventshandler.hpp filterutility.cpp filterutility.hpp httphandler.cpp httphandler.hpp + httpmessage.cpp httpmessage.hpp httpserverconnection.cpp httpserverconnection.hpp httputility.cpp httputility.hpp infohandler.cpp infohandler.hpp diff --git a/lib/remote/httpmessage.cpp b/lib/remote/httpmessage.cpp new file mode 100644 index 000000000..18e5a3016 --- /dev/null +++ b/lib/remote/httpmessage.cpp @@ -0,0 +1,196 @@ +/* Icinga 2 | (c) 2025 Icinga GmbH | GPLv2+ */ + +#include "remote/httpmessage.hpp" +#include "base/io-engine.hpp" +#include "base/json.hpp" +#include "remote/httputility.hpp" +#include "remote/url.hpp" +#include +#include +#include + +using namespace icinga; + +/** + * This is the buffer size threshold above which to flush to the connection. + * + * This value was determined with a series of measurements in + * [PR #10516](https://github.com/Icinga/icinga2/pull/10516#issuecomment-3232642284). + */ +constexpr std::size_t l_FlushThreshold = 128UL * 1024UL; + +/** + * Adapter class for Boost Beast HTTP messages body to be used with the @c JsonEncoder. + * + * This class implements the @c nlohmann::detail::output_adapter_protocol<> interface and provides + * a way to write JSON data directly into the body of a @c HttpResponse. + * + * @ingroup base + */ +class HttpResponseJsonWriter : public AsyncJsonWriter +{ +public: + explicit HttpResponseJsonWriter(HttpResponse& msg) : m_Message{msg} + { + m_Message.body().Start(); +#if BOOST_VERSION >= 107000 + // We pre-allocate more than the threshold because we always go above the threshold + // at least once. + m_Message.body().Buffer().reserve(l_FlushThreshold + (l_FlushThreshold / 4)); +#endif /* BOOST_VERSION */ + } + + ~HttpResponseJsonWriter() override { m_Message.body().Finish(); } + + void write_character(char c) override { write_characters(&c, 1); } + + void write_characters(const char* s, std::size_t length) override + { + auto buf = m_Message.body().Buffer().prepare(length); + boost::asio::buffer_copy(buf, boost::asio::const_buffer{s, length}); + m_Message.body().Buffer().commit(length); + } + + void MayFlush(boost::asio::yield_context& yield) override + { + if (m_Message.body().Size() >= l_FlushThreshold) { + m_Message.Flush(yield); + } + } + +private: + HttpResponse& m_Message; +}; + +HttpRequest::HttpRequest(Shared::Ptr stream) : m_Stream(std::move(stream)) +{ +} + +void HttpRequest::ParseHeader(boost::beast::flat_buffer& buf, boost::asio::yield_context yc) +{ + boost::beast::http::async_read_header(*m_Stream, buf, m_Parser, yc); + base() = m_Parser.get().base(); +} + +void HttpRequest::ParseBody(boost::beast::flat_buffer& buf, boost::asio::yield_context yc) +{ + boost::beast::http::async_read(*m_Stream, buf, m_Parser, yc); + body() = std::move(m_Parser.release().body()); +} + +ApiUser::Ptr HttpRequest::User() const +{ + return m_User; +} + +void HttpRequest::User(const ApiUser::Ptr& user) +{ + m_User = user; +} + +Url::Ptr HttpRequest::Url() const +{ + return m_Url; +} + +void HttpRequest::DecodeUrl() +{ + m_Url = new icinga::Url(std::string(target())); +} + +Dictionary::Ptr HttpRequest::Params() const +{ + return m_Params; +} + +void HttpRequest::DecodeParams() +{ + if (!m_Url) { + DecodeUrl(); + } + m_Params = HttpUtility::FetchRequestParameters(m_Url, body()); +} + +HttpResponse::HttpResponse(Shared::Ptr stream, HttpServerConnection::Ptr server) + : m_Server(std::move(server)), m_Stream(std::move(stream)) +{ +} + +void HttpResponse::Clear() +{ + ASSERT(!m_SerializationStarted); + boost::beast::http::response::operator=({}); +} + +void HttpResponse::Flush(boost::asio::yield_context yc) +{ + if (!chunked() && !has_content_length()) { + ASSERT(!m_SerializationStarted); + prepare_payload(); + } + + m_SerializationStarted = true; + + if (!m_Serializer.is_header_done()) { + boost::beast::http::write_header(*m_Stream, m_Serializer); + } + + boost::system::error_code ec; + boost::beast::http::async_write(*m_Stream, m_Serializer, yc[ec]); + if (ec && ec != boost::beast::http::error::need_buffer) { + if (yc.ec_) { + *yc.ec_ = ec; + return; + } + BOOST_THROW_EXCEPTION(boost::system::system_error{ec}); + } + m_Stream->async_flush(yc); + + ASSERT(m_Serializer.is_done() || !body().Finished()); +} + +void HttpResponse::StartStreaming(bool checkForDisconnect) +{ + ASSERT(body().Size() == 0 && !m_SerializationStarted); + body().Start(); + chunked(true); + + if (checkForDisconnect) { + ASSERT(m_Server); + m_Server->StartDetectClientSideShutdown(); + } +} + +bool HttpResponse::IsClientDisconnected() const +{ + ASSERT(m_Server); + return m_Server->Disconnected(); +} + +void HttpResponse::SendFile(const String& path, const boost::asio::yield_context& yc) +{ + std::ifstream fp(path.CStr(), std::ifstream::in | std::ifstream::binary | std::ifstream::ate); + fp.exceptions(std::ifstream::badbit | std::ifstream::eofbit); + + std::uint64_t remaining = fp.tellg(); + fp.seekg(0); + + content_length(remaining); + body().Start(); + + while (remaining) { + auto maxTransfer = std::min(remaining, static_cast(l_FlushThreshold)); + + auto buf = *body().Buffer().prepare(maxTransfer).begin(); + fp.read(static_cast(buf.data()), buf.size()); + body().Buffer().commit(buf.size()); + + remaining -= buf.size(); + Flush(yc); + } +} + +JsonEncoder HttpResponse::GetJsonEncoder(bool pretty) +{ + return JsonEncoder{std::make_shared(*this), pretty}; +} diff --git a/lib/remote/httpmessage.hpp b/lib/remote/httpmessage.hpp new file mode 100644 index 000000000..10d00fd49 --- /dev/null +++ b/lib/remote/httpmessage.hpp @@ -0,0 +1,281 @@ +/* Icinga 2 | (c) 2025 Icinga GmbH | GPLv2+ */ + +#pragma once + +#include "base/dictionary.hpp" +#include "base/json.hpp" +#include "base/tlsstream.hpp" +#include "remote/apiuser.hpp" +#include "remote/httpserverconnection.hpp" +#include "remote/url.hpp" +#include +#include + +namespace icinga { + +/** + * A custom body_type for a @c boost::beast::http::message + * + * It combines the memory management of @c boost::beast::http::dynamic_body, + * which uses a multi_buffer, with the ability to continue serialization when + * new data arrives of the @c boost::beast::http::buffer_body. + * + * @tparam DynamicBuffer A buffer conforming to the boost::beast interface of the same name + * + * @ingroup remote + */ +template +struct SerializableBody +{ + class writer; + + class value_type + { + public: + template + value_type& operator<<(T&& right) + { + /* Preferably, we would return an ostream object here instead. However + * there seems to be a bug in boost::beast where if the ostream, or rather its + * streambuf object is moved into the return value, the chunked encoding gets + * mangled, leading to the client disconnecting. + * + * A workaround would have been to construct the boost::beast::detail::ostream_helper + * with the last parameter set to false, indicating that the streambuf object is not + * movable, but that is an implementation detail we'd rather not use directly in our + * code. + * + * This version has a certain overhead of the ostream being constructed on every call + * to the operator, which leads to an individual append for each time, whereas if the + * object could be kept until the entire chain of output operators is finished, only + * a single call to prepare()/commit() would have been needed. + * + * However, since this operator is mostly used for small error messages and the big + * responses are handled via a reader instance, this shouldn't be too much of a + * problem. + */ + boost::beast::ostream(m_Buffer) << std::forward(right); + return *this; + } + + [[nodiscard]] std::size_t Size() const { return m_Buffer.size(); } + + void Finish() { m_More = false; } + bool Finished() { return !m_More; } + void Start() { m_More = true; } + DynamicBuffer& Buffer() { return m_Buffer; } + + friend class writer; + + private: + /* This defaults to false so the body does not require any special handling + * for simple messages and can still be written with http::async_write(). + */ + bool m_More = false; + DynamicBuffer m_Buffer; + }; + + static std::uint64_t size(const value_type& body) { return body.Size(); } + + /** + * Implement the boost::beast BodyWriter interface for this body type + * + * This is used (for example) by the @c boost::beast::http::serializer to write out the + * message over the TLS stream. The logic is similar to the writer of the + * @c boost::beast::http::buffer_body. + * + * On the every call, it will free up the buffer range that has previously been written, + * then return a buffer containing data the has become available in the meantime. Otherwise, + * if there is more data expected in the future, for example because a corresponding reader + * has not yet finished filling the body, a `need_buffer` error is returned, to inform the + * serializer to abort writing for now, which in turn leads to the outer call to + * `http::async_write` to call their completion handlers with a `need_buffer` error, to + * notify that more data is required for another call to `http::async_write`. + */ + class writer + { + public: + using const_buffers_type = typename DynamicBuffer::const_buffers_type; + +#if BOOST_VERSION > 106600 + template + explicit writer(const boost::beast::http::header&, value_type& b) : m_Body(b) + { + } +#else + /** + * This constructor is needed specifically for boost-1.66, which was the first version + * the beast library was introduced and is still used on older (supported) distros. + */ + template + explicit writer(const boost::beast::http::message& msg) + : m_Body(const_cast(msg.body())) + { + } +#endif + void init(boost::beast::error_code& ec) { ec = {}; } + + boost::optional> get(boost::beast::error_code& ec) + { + using namespace boost::beast::http; + + if (m_SizeWritten > 0) { + m_Body.m_Buffer.consume(std::exchange(m_SizeWritten, 0)); + } + + if (m_Body.m_Buffer.size()) { + ec = {}; + m_SizeWritten = m_Body.m_Buffer.size(); + return {{m_Body.m_Buffer.data(), m_Body.m_More}}; + } + + if (m_Body.m_More) { + ec = {make_error_code(error::need_buffer)}; + } else { + ec = {}; + } + return boost::none; + } + + private: + value_type& m_Body; + std::size_t m_SizeWritten = 0; + }; +}; + +/** + * A wrapper class for a boost::beast HTTP request + * + * @ingroup remote + */ +class HttpRequest : public boost::beast::http::request +{ +public: + using ParserType = boost::beast::http::request_parser; + + explicit HttpRequest(Shared::Ptr stream); + + /** + * Parse the header of the response using the internal parser object. + * + * This first performs an @f async_read_header() into the parser, then copies + * the parsed header into this object. + */ + void ParseHeader(boost::beast::flat_buffer& buf, boost::asio::yield_context yc); + + /** + * Parse the body of the response using the internal parser object. + * + * This first performs an async_read() into the parser, then moves the parsed body + * into this object. + * + * @param buf The buffer used to track the state of the connection + * @param yc The yield_context for this operation + */ + void ParseBody(boost::beast::flat_buffer& buf, boost::asio::yield_context yc); + + ParserType& Parser() { return m_Parser; } + + [[nodiscard]] ApiUser::Ptr User() const; + void User(const ApiUser::Ptr& user); + + [[nodiscard]] icinga::Url::Ptr Url() const; + void DecodeUrl(); + + [[nodiscard]] Dictionary::Ptr Params() const; + void DecodeParams(); + +private: + ApiUser::Ptr m_User; + Url::Ptr m_Url; + Dictionary::Ptr m_Params; + + ParserType m_Parser; + + Shared::Ptr m_Stream; +}; + +/** + * A wrapper class for a boost::beast HTTP response + * + * @ingroup remote + */ +class HttpResponse : public boost::beast::http::response> +{ +public: + explicit HttpResponse(Shared::Ptr stream, HttpServerConnection::Ptr server = nullptr); + + /* Delete the base class clear() which is inherited from the fields<> class and doesn't + * clear things like the body or obviously our own members. + */ + void clear() = delete; + + /** + * Clear the header and body of the message. + * + * @note This can only be used when nothing has been written to the stream yet. + */ + void Clear(); + + /** + * Writes as much of the response as is currently available. + * + * Uses chunk-encoding if the content_length has not been set by the time this is called + * for the first time. + * + * The caller needs to ensure that the header is finished before calling this for the + * first time as changes to the header afterwards will not have any effect. + * + * @param yc The yield_context for this operation + */ + void Flush(boost::asio::yield_context yc); + + [[nodiscard]] bool HasSerializationStarted() const { return m_SerializationStarted; } + + /** + * Enables chunked encoding. + * + * Optionally starts a coroutine that reads from the stream and checks for client-side + * disconnects. In this case, the stream can not be reused after the response has been + * sent and any further requests sent over the connections will be discarded, even if + * no client-side disconnect occurs. This requires that this object has been constructed + * with a valid HttpServerConnection::Ptr. + * + * @param checkForDisconnect Whether to start a coroutine to detect disconnects + */ + void StartStreaming(bool checkForDisconnect = false); + + /** + * Check if the server has initiated a disconnect. + * + * @note This requires that the message has been constructed with a pointer to the + * @c HttpServerConnection. + */ + [[nodiscard]] bool IsClientDisconnected() const; + + /** + * Sends the contents of a file. + * + * This does not use chunked encoding because the file size is expected to be fixed. + * The message will be flushed to the stream after a certain amount has been loaded into + * the buffer. + * + * @todo Switch the implementation to @c boost::asio::stream_file when we require >=boost-1.78. + * + * @param path A path to the file + * @param yc The yield context for flushing the message. + */ + void SendFile(const String& path, const boost::asio::yield_context& yc); + + JsonEncoder GetJsonEncoder(bool pretty = false); + +private: + using Serializer = boost::beast::http::response_serializer; + Serializer m_Serializer{*this}; + bool m_SerializationStarted = false; + + HttpServerConnection::Ptr m_Server; + Shared::Ptr m_Stream; +}; + +} // namespace icinga