From 9a8d388734781d6770be763a31722c295c61b0b5 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Wed, 19 Jan 2022 16:28:07 +0100 Subject: [PATCH] Introduce Bulker --- lib/base/CMakeLists.txt | 1 + lib/base/bulker.hpp | 105 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 106 insertions(+) create mode 100644 lib/base/bulker.hpp diff --git a/lib/base/CMakeLists.txt b/lib/base/CMakeLists.txt index 4ffadb7a0..f99201822 100644 --- a/lib/base/CMakeLists.txt +++ b/lib/base/CMakeLists.txt @@ -19,6 +19,7 @@ set(base_SOURCES atomic.hpp base64.cpp base64.hpp boolean.cpp boolean.hpp boolean-script.cpp + bulker.hpp configobject.cpp configobject.hpp configobject-ti.hpp configobject-script.cpp configtype.cpp configtype.hpp configuration.cpp configuration.hpp configuration-ti.hpp diff --git a/lib/base/bulker.hpp b/lib/base/bulker.hpp new file mode 100644 index 000000000..d9deaf661 --- /dev/null +++ b/lib/base/bulker.hpp @@ -0,0 +1,105 @@ +/* Icinga 2 | (c) 2022 Icinga GmbH | GPLv2+ */ + +#ifndef BULKER_H +#define BULKER_H + +#include +#include +#include +#include +#include +#include +#include + +namespace icinga +{ + +/** + * A queue which outputs the input as bulks of a defined size + * or after a defined time, whichever is reached first + * + * @ingroup base + */ +template +class Bulker +{ +private: + typedef std::chrono::steady_clock Clock; + +public: + typedef std::vector Container; + typedef typename Container::size_type SizeType; + typedef typename Clock::duration Duration; + + Bulker(SizeType bulkSize, Duration threshold) + : m_BulkSize(bulkSize), m_Threshold(threshold), m_NextConsumption(NullTimePoint()) { } + + void ProduceOne(T needle); + Container ConsumeMany(); + +private: + typedef std::chrono::time_point TimePoint; + + static inline + TimePoint NullTimePoint() + { + return TimePoint::min(); + } + + inline void UpdateNextConsumption() + { + m_NextConsumption = Clock::now() + m_Threshold; + } + + const SizeType m_BulkSize; + const Duration m_Threshold; + + std::mutex m_Mutex; + std::condition_variable m_CV; + std::queue m_Bulks; + TimePoint m_NextConsumption; +}; + +template +void Bulker::ProduceOne(T needle) +{ + std::unique_lock lock (m_Mutex); + + if (m_Bulks.empty() || m_Bulks.back().size() == m_BulkSize) { + m_Bulks.emplace(); + } + + m_Bulks.back().emplace_back(std::move(needle)); + + if (m_Bulks.size() == 1u && m_Bulks.back().size() == m_BulkSize) { + m_CV.notify_one(); + } +} + +template +typename Bulker::Container Bulker::ConsumeMany() +{ + std::unique_lock lock (m_Mutex); + + if (BOOST_UNLIKELY(m_NextConsumption == NullTimePoint())) { + UpdateNextConsumption(); + } + + auto deadline (m_NextConsumption); + + m_CV.wait_until(lock, deadline, [this]() { return !m_Bulks.empty() && m_Bulks.front().size() == m_BulkSize; }); + UpdateNextConsumption(); + + if (m_Bulks.empty()) { + return Container(); + } + + auto haystack (std::move(m_Bulks.front())); + + m_Bulks.pop(); + return haystack; +} + +} + +#endif /* BULKER_H */