cluster: Compress log files.

This commit is contained in:
Gunnar Beutner 2013-09-03 14:05:03 +02:00
parent 9c00e560ab
commit ba6be063b6
7 changed files with 160 additions and 6 deletions

View File

@ -24,6 +24,7 @@
#include "base/logger_fwd.h"
#include "base/objectlock.h"
#include "base/networkstream.h"
#include "base/zlibstream.h"
#include "base/application.h"
#include "base/convert.h"
#include <boost/smart_ptr/make_shared.hpp>
@ -260,7 +261,8 @@ void ClusterComponent::OpenLogFile(void)
return;
}
m_LogFile = boost::make_shared<StdioStream>(fp, true);
StdioStream::Ptr logStream = boost::make_shared<StdioStream>(fp, true);
m_LogFile = boost::make_shared<ZlibStream>(logStream);
m_LogMessageCount = 0;
m_LogMessageTimestamp = 0;
}
@ -269,6 +271,9 @@ void ClusterComponent::CloseLogFile(void)
{
ASSERT(OwnsLock());
if (!m_LogFile)
return;
m_LogFile->Close();
m_LogFile.reset();
@ -317,10 +322,19 @@ void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Pt
Log(LogInformation, "cluster", "Replaying log: " + path);
std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in);
StdioStream::Ptr lstream = boost::make_shared<StdioStream>(fp, true);
StdioStream::Ptr logStream = boost::make_shared<StdioStream>(fp, true);
ZlibStream::Ptr lstream = boost::make_shared<ZlibStream>(logStream);
String message;
while (NetString::ReadStringFromStream(lstream, &message)) {
while (true) {
try {
if (!NetString::ReadStringFromStream(lstream, &message))
break;
} catch (std::exception&) {
/* Log files may be incomplete or corrupted. This is perfectly OK. */
break;
}
Dictionary::Ptr pmessage = Value::Deserialize(message);
if (pmessage->Get("timestamp") < endpoint->GetLocalLogPosition())

View File

@ -88,7 +88,7 @@ private:
static void LogGlobHandler(std::vector<int>& files, const String& file);
void ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Ptr& stream);
StdioStream::Ptr m_LogFile;
Stream::Ptr m_LogFile;
double m_LogMessageTimestamp;
size_t m_LogMessageCount;

View File

@ -94,7 +94,9 @@ libbase_la_SOURCES = \
utility.h \
value.cpp \
value.h \
win32.h
win32.h \
zlibstream.cpp \
zlibstream.h
libbase_la_CPPFLAGS = \
-DI2_BASE_BUILD \

View File

@ -22,6 +22,7 @@
#include "base/convert.h"
#include "base/debug.h"
#include "base/utility.h"
#include "base/scriptvariable.h"
#include <sstream>
#include <iostream>
#include <boost/bind.hpp>
@ -229,6 +230,11 @@ void ThreadPool::ManagerThreadProc(void)
double avg_latency, max_latency;
double utilization = 0;
Value adaptive = ScriptVariable::Get("ThreadPoolAdaptive");
if (!adaptive.IsEmpty() && !static_cast<bool>(adaptive))
break;
{
boost::mutex::scoped_lock lock(m_Mutex);

View File

@ -24,9 +24,10 @@
#include "base/object.h"
#include "base/qstring.h"
#include "base/exception.h"
#include <openssl/bio.h>
#include <openssl/ssl.h>
#include <openssl/bio.h>
#include <openssl/err.h>
#include <openssl/comp.h>
namespace icinga
{

80
lib/base/zlibstream.cpp Normal file
View File

@ -0,0 +1,80 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "base/zlibstream.h"
#include "base/objectlock.h"
#include "base/convert.h"
#include "base/logger_fwd.h"
#include <boost/make_shared.hpp>
using namespace icinga;
extern "C" BIO_METHOD *BIO_f_zlib(void);
/**
* Constructor for the ZlibStream class.
*
* @param innerStream The inner stream.
* @param compress Whether we're compressing, false if we're decompressing.
*/
ZlibStream::ZlibStream(const Stream::Ptr& innerStream)
: m_InnerStream(innerStream)
{
BIO *ibio = BIO_new_I2Stream(innerStream);
BIO *zbio = BIO_new(BIO_f_zlib());
m_BIO = BIO_push(zbio, ibio);
}
ZlibStream::~ZlibStream(void)
{
Close();
}
size_t ZlibStream::Read(void *buffer, size_t size)
{
ObjectLock olock(this);
return BIO_read(m_BIO, buffer, size);
}
void ZlibStream::Write(const void *buffer, size_t size)
{
ObjectLock olock(this);
BIO_write(m_BIO, buffer, size);
}
void ZlibStream::Close(void)
{
ObjectLock olock(this);
if (m_BIO) {
BIO_free_all(m_BIO);
m_BIO = NULL;
m_InnerStream->Close();
}
}
bool ZlibStream::IsEof(void) const
{
ObjectLock olock(this);
return BIO_eof(m_BIO);
}

51
lib/base/zlibstream.h Normal file
View File

@ -0,0 +1,51 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef ZLIBSTREAM_H
#define ZLIBSTREAM_H
#include "base/i2-base.h"
#include "base/stream_bio.h"
#include <iostream>
namespace icinga {
class ZlibStream : public Stream
{
public:
DECLARE_PTR_TYPEDEFS(ZlibStream);
ZlibStream(const Stream::Ptr& innerStream);
~ZlibStream(void);
virtual size_t Read(void *buffer, size_t size);
virtual void Write(const void *buffer, size_t size);
virtual void Close(void);
virtual bool IsEof(void) const;
private:
Stream::Ptr m_InnerStream;
BIO *m_BIO;
};
}
#endif /* ZLIBSTREAM_H */