cluster: Don't replay logs unless they're relevant.

This commit is contained in:
Gunnar Beutner 2013-09-03 10:30:28 +02:00
parent 235042bbac
commit 8098329a33
2 changed files with 24 additions and 5 deletions

View File

@ -40,7 +40,10 @@ void ClusterComponent::Start(void)
{
DynamicObject::Start();
OpenLogFile();
{
ObjectLock olock(this);
OpenLogFile();
}
/* set up SSL context */
shared_ptr<X509> cert = GetX509Certificate(GetCertificateFile());
@ -82,6 +85,7 @@ void ClusterComponent::Start(void)
*/
void ClusterComponent::Stop(void)
{
ObjectLock olock(this);
CloseLogFile();
}
@ -202,13 +206,15 @@ void ClusterComponent::RelayMessage(const Endpoint::Ptr& except, const Dictionar
if (persistent) {
Dictionary::Ptr pmessage = boost::make_shared<Dictionary>();
pmessage->Set("timestamp", Utility::GetTime());
double ts = Utility::GetTime();
pmessage->Set("timestamp", ts);
pmessage->Set("message", message);
ObjectLock olock(this);
String json = Value(pmessage).Serialize();
NetString::WriteStringToStream(m_LogFile, json);
m_LogMessageCount++;
m_LogMessageTimestamp = ts;
if (m_LogMessageCount > 250000) {
CloseLogFile();
@ -237,9 +243,9 @@ String ClusterComponent::GetClusterDir(void) const
void ClusterComponent::OpenLogFile(void)
{
std::ostringstream msgbuf;
msgbuf << GetClusterDir() << static_cast<long>(Utility::GetTime());
String path = msgbuf.str();
ASSERT(OwnsLock());
String path = GetClusterDir() + "current";
std::fstream *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app);
@ -250,12 +256,21 @@ void ClusterComponent::OpenLogFile(void)
m_LogFile = boost::make_shared<StdioStream>(fp, true);
m_LogMessageCount = 0;
m_LogMessageTimestamp = 0;
}
void ClusterComponent::CloseLogFile(void)
{
ASSERT(OwnsLock());
m_LogFile->Close();
m_LogFile.reset();
if (m_LogMessageTimestamp != 0) {
String oldpath = GetClusterDir() + "current";
String newpath = GetClusterDir() + Convert::ToString(static_cast<int>(m_LogMessageTimestamp) + 1);
(void) rename(oldpath.CStr(), newpath.CStr());
}
}
void ClusterComponent::LogGlobHandler(std::vector<int>& files, const String& file)
@ -282,6 +297,9 @@ void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Pt
msgbuf << GetClusterDir() << ts;
String path = msgbuf.str();
if (ts < endpoint->GetLocalLogPosition())
continue;
Log(LogInformation, "cluster", "Replaying log: " + path);
std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in);

View File

@ -89,6 +89,7 @@ private:
void ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Ptr& stream);
StdioStream::Ptr m_LogFile;
double m_LogMessageTimestamp;
size_t m_LogMessageCount;
void CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr, const String& authority);