cluster: Make log replays non-blocking.

This commit is contained in:
Gunnar Beutner 2013-09-16 09:30:31 +02:00
parent 21c35aabf4
commit e1b8e1180c
3 changed files with 95 additions and 66 deletions

View File

@ -248,7 +248,12 @@ void ClusterComponent::RelayMessage(const Endpoint::Ptr& except, const Dictionar
if (endpoint->GetName() == GetIdentity()) if (endpoint->GetName() == GetIdentity())
continue; continue;
endpoint->SendMessage(message); {
ObjectLock olock(endpoint);
if (!endpoint->IsSyncing())
endpoint->SendMessage(message);
}
} }
} }
@ -321,55 +326,69 @@ void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Pt
{ {
int count = 0; int count = 0;
ASSERT(OwnsLock()); ASSERT(!OwnsLock());
CloseLogFile(); for (;;) {
RotateLogFile(); ObjectLock olock(this);
std::vector<int> files; CloseLogFile();
Utility::Glob(GetClusterDir() + "log/*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1)); RotateLogFile();
std::sort(files.begin(), files.end());
BOOST_FOREACH(int ts, files) { std::vector<int> files;
String path = GetClusterDir() + "log/" + Convert::ToString(ts); Utility::Glob(GetClusterDir() + "log/*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1));
std::sort(files.begin(), files.end());
if (ts < endpoint->GetLocalLogPosition()) if (files.size() > 1) {
continue; OpenLogFile();
olock.Unlock();
Log(LogInformation, "cluster", "Replaying log: " + path);
std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in);
StdioStream::Ptr logStream = boost::make_shared<StdioStream>(fp, true);
ZlibStream::Ptr lstream = boost::make_shared<ZlibStream>(logStream);
String message;
while (true) {
try {
if (!NetString::ReadStringFromStream(lstream, &message))
break;
} catch (std::exception&) {
/* Log files may be incomplete or corrupted. This is perfectly OK. */
break;
}
Dictionary::Ptr pmessage = Value::Deserialize(message);
if (pmessage->Get("timestamp") < endpoint->GetLocalLogPosition())
continue;
if (pmessage->Get("except") == endpoint->GetName())
continue;
NetString::WriteStringToStream(stream, pmessage->Get("message"));
count++;
} }
lstream->Close(); BOOST_FOREACH(int ts, files) {
String path = GetClusterDir() + "log/" + Convert::ToString(ts);
if (ts < endpoint->GetLocalLogPosition())
continue;
Log(LogInformation, "cluster", "Replaying log: " + path);
std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in);
StdioStream::Ptr logStream = boost::make_shared<StdioStream>(fp, true);
ZlibStream::Ptr lstream = boost::make_shared<ZlibStream>(logStream);
String message;
while (true) {
try {
if (!NetString::ReadStringFromStream(lstream, &message))
break;
} catch (std::exception&) {
/* Log files may be incomplete or corrupted. This is perfectly OK. */
break;
}
Dictionary::Ptr pmessage = Value::Deserialize(message);
if (pmessage->Get("timestamp") < endpoint->GetLocalLogPosition())
continue;
if (pmessage->Get("except") == endpoint->GetName())
continue;
NetString::WriteStringToStream(stream, pmessage->Get("message"));
count++;
}
lstream->Close();
}
Log(LogInformation, "cluster", "Replayed " + Convert::ToString(count) + " messages.");
if (files.size() == 1) {
ObjectLock olock2(endpoint);
endpoint->SetSyncing(false);
break;
}
} }
Log(LogInformation, "cluster", "Replayed " + Convert::ToString(count) + " messages.");
OpenLogFile();
} }
void ClusterComponent::ConfigGlobHandler(const Dictionary::Ptr& config, const String& file, bool basename) void ClusterComponent::ConfigGlobHandler(const Dictionary::Ptr& config, const String& file, bool basename)
@ -411,7 +430,17 @@ void ClusterComponent::NewClientHandler(const Socket::Ptr& client, TlsRole role)
return; return;
} }
endpoint->SetSeen(Utility::GetTime()); {
ObjectLock olock(endpoint);
Stream::Ptr oldClient = endpoint->GetClient();
if (oldClient)
oldClient->Close();
endpoint->SetSyncing(true);
endpoint->SetSeen(Utility::GetTime());
endpoint->SetClient(tlsStream);
}
Dictionary::Ptr config = boost::make_shared<Dictionary>(); Dictionary::Ptr config = boost::make_shared<Dictionary>();
Array::Ptr configFiles = endpoint->GetConfigFiles(); Array::Ptr configFiles = endpoint->GetConfigFiles();
@ -435,17 +464,7 @@ void ClusterComponent::NewClientHandler(const Socket::Ptr& client, TlsRole role)
String json = Value(message).Serialize(); String json = Value(message).Serialize();
NetString::WriteStringToStream(tlsStream, json); NetString::WriteStringToStream(tlsStream, json);
{ ReplayLog(endpoint, tlsStream);
ObjectLock olock(this);
Stream::Ptr oldClient = endpoint->GetClient();
if (oldClient)
oldClient->Close();
endpoint->SetClient(tlsStream);
ReplayLog(endpoint, tlsStream);
}
} }
void ClusterComponent::ClusterTimerHandler(void) void ClusterComponent::ClusterTimerHandler(void)

View File

@ -34,6 +34,10 @@ REGISTER_TYPE(Endpoint);
boost::signals2::signal<void (const Endpoint::Ptr&)> Endpoint::OnConnected; boost::signals2::signal<void (const Endpoint::Ptr&)> Endpoint::OnConnected;
boost::signals2::signal<void (const Endpoint::Ptr&, const Dictionary::Ptr&)> Endpoint::OnMessageReceived; boost::signals2::signal<void (const Endpoint::Ptr&, const Dictionary::Ptr&)> Endpoint::OnMessageReceived;
Endpoint::Endpoint(void)
: m_Syncing(false)
{ }
/** /**
* Checks whether this endpoint is connected. * Checks whether this endpoint is connected.
* *
@ -46,18 +50,12 @@ bool Endpoint::IsConnected(void) const
Stream::Ptr Endpoint::GetClient(void) const Stream::Ptr Endpoint::GetClient(void) const
{ {
ObjectLock olock(this);
return m_Client; return m_Client;
} }
void Endpoint::SetClient(const Stream::Ptr& client) void Endpoint::SetClient(const Stream::Ptr& client)
{ {
{ m_Client = client;
ObjectLock olock(this);
m_Client = client;
}
if (client) { if (client) {
boost::thread thread(boost::bind(&Endpoint::MessageThreadProc, this, client)); boost::thread thread(boost::bind(&Endpoint::MessageThreadProc, this, client));
@ -111,8 +109,6 @@ void Endpoint::MessageThreadProc(const Stream::Ptr& stream)
*/ */
String Endpoint::GetHost(void) const String Endpoint::GetHost(void) const
{ {
ObjectLock olock(this);
return m_Host; return m_Host;
} }
@ -123,8 +119,6 @@ String Endpoint::GetHost(void) const
*/ */
String Endpoint::GetPort(void) const String Endpoint::GetPort(void) const
{ {
ObjectLock olock(this);
return m_Port; return m_Port;
} }
@ -168,6 +162,16 @@ void Endpoint::SetRemoteLogPosition(double ts)
m_RemoteLogPosition = ts; m_RemoteLogPosition = ts;
} }
bool Endpoint::IsSyncing(void) const
{
return m_Syncing;
}
void Endpoint::SetSyncing(bool syncing)
{
m_Syncing = syncing;
}
Dictionary::Ptr Endpoint::GetFeatures(void) const Dictionary::Ptr Endpoint::GetFeatures(void) const
{ {
return m_Features; return m_Features;

View File

@ -41,6 +41,8 @@ public:
DECLARE_PTR_TYPEDEFS(Endpoint); DECLARE_PTR_TYPEDEFS(Endpoint);
DECLARE_TYPENAME(Endpoint); DECLARE_TYPENAME(Endpoint);
Endpoint(void);
static boost::signals2::signal<void (const Endpoint::Ptr&)> OnConnected; static boost::signals2::signal<void (const Endpoint::Ptr&)> OnConnected;
static boost::signals2::signal<void (const Endpoint::Ptr&, const Dictionary::Ptr&)> OnMessageReceived; static boost::signals2::signal<void (const Endpoint::Ptr&, const Dictionary::Ptr&)> OnMessageReceived;
@ -70,6 +72,9 @@ public:
bool HasFeature(const String& type) const; bool HasFeature(const String& type) const;
bool IsSyncing(void) const;
void SetSyncing(bool syncing);
protected: protected:
virtual void InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const; virtual void InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const;
virtual void InternalDeserialize(const Dictionary::Ptr& bag, int attributeTypes); virtual void InternalDeserialize(const Dictionary::Ptr& bag, int attributeTypes);
@ -86,6 +91,7 @@ private:
double m_LocalLogPosition; double m_LocalLogPosition;
double m_RemoteLogPosition; double m_RemoteLogPosition;
Dictionary::Ptr m_Features; Dictionary::Ptr m_Features;
bool m_Syncing;
void MessageThreadProc(const Stream::Ptr& stream); void MessageThreadProc(const Stream::Ptr& stream);
}; };