From ba6be063b6d90ea231ed2de187bd4d4c96a81fe4 Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Tue, 3 Sep 2013 14:05:03 +0200 Subject: [PATCH] cluster: Compress log files. --- components/cluster/clustercomponent.cpp | 20 ++++++- components/cluster/clustercomponent.h | 2 +- lib/base/Makefile.am | 4 +- lib/base/threadpool.cpp | 6 ++ lib/base/tlsutility.h | 3 +- lib/base/zlibstream.cpp | 80 +++++++++++++++++++++++++ lib/base/zlibstream.h | 51 ++++++++++++++++ 7 files changed, 160 insertions(+), 6 deletions(-) create mode 100644 lib/base/zlibstream.cpp create mode 100644 lib/base/zlibstream.h diff --git a/components/cluster/clustercomponent.cpp b/components/cluster/clustercomponent.cpp index 71f46ae48..0cc8e87d1 100644 --- a/components/cluster/clustercomponent.cpp +++ b/components/cluster/clustercomponent.cpp @@ -24,6 +24,7 @@ #include "base/logger_fwd.h" #include "base/objectlock.h" #include "base/networkstream.h" +#include "base/zlibstream.h" #include "base/application.h" #include "base/convert.h" #include @@ -260,7 +261,8 @@ void ClusterComponent::OpenLogFile(void) return; } - m_LogFile = boost::make_shared(fp, true); + StdioStream::Ptr logStream = boost::make_shared(fp, true); + m_LogFile = boost::make_shared(logStream); m_LogMessageCount = 0; m_LogMessageTimestamp = 0; } @@ -269,6 +271,9 @@ void ClusterComponent::CloseLogFile(void) { ASSERT(OwnsLock()); + if (!m_LogFile) + return; + m_LogFile->Close(); m_LogFile.reset(); @@ -317,10 +322,19 @@ void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Pt Log(LogInformation, "cluster", "Replaying log: " + path); std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in); - StdioStream::Ptr lstream = boost::make_shared(fp, true); + StdioStream::Ptr logStream = boost::make_shared(fp, true); + ZlibStream::Ptr lstream = boost::make_shared(logStream); String message; - while (NetString::ReadStringFromStream(lstream, &message)) { + while (true) { + try { + if (!NetString::ReadStringFromStream(lstream, &message)) + break; + } catch (std::exception&) { + /* Log files may be incomplete or corrupted. This is perfectly OK. */ + break; + } + Dictionary::Ptr pmessage = Value::Deserialize(message); if (pmessage->Get("timestamp") < endpoint->GetLocalLogPosition()) diff --git a/components/cluster/clustercomponent.h b/components/cluster/clustercomponent.h index 052920893..a61991145 100644 --- a/components/cluster/clustercomponent.h +++ b/components/cluster/clustercomponent.h @@ -88,7 +88,7 @@ private: static void LogGlobHandler(std::vector& files, const String& file); void ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Ptr& stream); - StdioStream::Ptr m_LogFile; + Stream::Ptr m_LogFile; double m_LogMessageTimestamp; size_t m_LogMessageCount; diff --git a/lib/base/Makefile.am b/lib/base/Makefile.am index e5faae376..8c8f938d8 100644 --- a/lib/base/Makefile.am +++ b/lib/base/Makefile.am @@ -94,7 +94,9 @@ libbase_la_SOURCES = \ utility.h \ value.cpp \ value.h \ - win32.h + win32.h \ + zlibstream.cpp \ + zlibstream.h libbase_la_CPPFLAGS = \ -DI2_BASE_BUILD \ diff --git a/lib/base/threadpool.cpp b/lib/base/threadpool.cpp index f0b795c3f..51217b14d 100644 --- a/lib/base/threadpool.cpp +++ b/lib/base/threadpool.cpp @@ -22,6 +22,7 @@ #include "base/convert.h" #include "base/debug.h" #include "base/utility.h" +#include "base/scriptvariable.h" #include #include #include @@ -229,6 +230,11 @@ void ThreadPool::ManagerThreadProc(void) double avg_latency, max_latency; double utilization = 0; + Value adaptive = ScriptVariable::Get("ThreadPoolAdaptive"); + + if (!adaptive.IsEmpty() && !static_cast(adaptive)) + break; + { boost::mutex::scoped_lock lock(m_Mutex); diff --git a/lib/base/tlsutility.h b/lib/base/tlsutility.h index 18ff18058..e2789bc11 100644 --- a/lib/base/tlsutility.h +++ b/lib/base/tlsutility.h @@ -24,9 +24,10 @@ #include "base/object.h" #include "base/qstring.h" #include "base/exception.h" -#include #include +#include #include +#include namespace icinga { diff --git a/lib/base/zlibstream.cpp b/lib/base/zlibstream.cpp new file mode 100644 index 000000000..d7dbc3567 --- /dev/null +++ b/lib/base/zlibstream.cpp @@ -0,0 +1,80 @@ +/****************************************************************************** + * Icinga 2 * + * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * + * * + * This program is free software; you can redistribute it and/or * + * modify it under the terms of the GNU General Public License * + * as published by the Free Software Foundation; either version 2 * + * of the License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the Free Software Foundation * + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * + ******************************************************************************/ + +#include "base/zlibstream.h" +#include "base/objectlock.h" +#include "base/convert.h" +#include "base/logger_fwd.h" +#include + +using namespace icinga; + +extern "C" BIO_METHOD *BIO_f_zlib(void); + +/** + * Constructor for the ZlibStream class. + * + * @param innerStream The inner stream. + * @param compress Whether we're compressing, false if we're decompressing. + */ +ZlibStream::ZlibStream(const Stream::Ptr& innerStream) + : m_InnerStream(innerStream) +{ + BIO *ibio = BIO_new_I2Stream(innerStream); + BIO *zbio = BIO_new(BIO_f_zlib()); + m_BIO = BIO_push(zbio, ibio); +} + +ZlibStream::~ZlibStream(void) +{ + Close(); +} + +size_t ZlibStream::Read(void *buffer, size_t size) +{ + ObjectLock olock(this); + + return BIO_read(m_BIO, buffer, size); +} + +void ZlibStream::Write(const void *buffer, size_t size) +{ + ObjectLock olock(this); + + BIO_write(m_BIO, buffer, size); +} + +void ZlibStream::Close(void) +{ + ObjectLock olock(this); + + if (m_BIO) { + BIO_free_all(m_BIO); + m_BIO = NULL; + + m_InnerStream->Close(); + } +} + +bool ZlibStream::IsEof(void) const +{ + ObjectLock olock(this); + + return BIO_eof(m_BIO); +} diff --git a/lib/base/zlibstream.h b/lib/base/zlibstream.h new file mode 100644 index 000000000..dbd6ab643 --- /dev/null +++ b/lib/base/zlibstream.h @@ -0,0 +1,51 @@ +/****************************************************************************** + * Icinga 2 * + * Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) * + * * + * This program is free software; you can redistribute it and/or * + * modify it under the terms of the GNU General Public License * + * as published by the Free Software Foundation; either version 2 * + * of the License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the Free Software Foundation * + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * + ******************************************************************************/ + +#ifndef ZLIBSTREAM_H +#define ZLIBSTREAM_H + +#include "base/i2-base.h" +#include "base/stream_bio.h" +#include + +namespace icinga { + +class ZlibStream : public Stream +{ +public: + DECLARE_PTR_TYPEDEFS(ZlibStream); + + ZlibStream(const Stream::Ptr& innerStream); + ~ZlibStream(void); + + virtual size_t Read(void *buffer, size_t size); + virtual void Write(const void *buffer, size_t size); + + virtual void Close(void); + + virtual bool IsEof(void) const; + +private: + Stream::Ptr m_InnerStream; + BIO *m_BIO; +}; + +} + +#endif /* ZLIBSTREAM_H */