cluster: Send heartbeats.

This commit is contained in:
Gunnar Beutner 2013-08-30 09:34:58 +02:00
parent 8729a809b3
commit f3638877eb
4 changed files with 41 additions and 7 deletions

View File

@ -47,10 +47,10 @@ void ClusterComponent::Start(void)
if (!GetBindPort().IsEmpty())
AddListener(GetBindPort());
m_ReconnectTimer = boost::make_shared<Timer>();
m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&ClusterComponent::ReconnectTimerHandler, this));
m_ReconnectTimer->SetInterval(5);
m_ReconnectTimer->Start();
m_ClusterTimer = boost::make_shared<Timer>();
m_ClusterTimer->OnTimerExpired.connect(boost::bind(&ClusterComponent::ClusterTimerHandler, this));
m_ClusterTimer->SetInterval(5);
m_ClusterTimer->Start();
Service::OnNewCheckResult.connect(bind(&ClusterComponent::CheckResultHandler, this, _1, _2, _3));
Service::OnNextCheckChanged.connect(bind(&ClusterComponent::NextCheckChangedHandler, this, _1, _2, _3));
@ -226,8 +226,17 @@ void ClusterComponent::NewClientHandler(const Socket::Ptr& client, TlsRole role)
endpoint->SetClient(tlsStream);
}
void ClusterComponent::ReconnectTimerHandler(void)
void ClusterComponent::ClusterTimerHandler(void)
{
/* broadcast a heartbeat message */
Dictionary::Ptr message = boost::make_shared<Dictionary>();
message->Set("jsonrpc", "2.0");
message->Set("method", "cluster::HeartBeat");
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
endpoint->SendMessage(message);
}
Array::Ptr peers = GetPeers();
if (!peers)
@ -553,6 +562,11 @@ void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Diction
endpoint->SendMessage(message);
}
if (message->Get("method") == "cluster::HeartBeat") {
sender->SetSeen(Utility::GetTime());
return;
}
Dictionary::Ptr params = message->Get("params");
if (!params)

View File

@ -68,8 +68,8 @@ private:
shared_ptr<SSL_CTX> m_SSLContext;
String m_Identity;
Timer::Ptr m_ReconnectTimer;
void ReconnectTimerHandler(void);
Timer::Ptr m_ClusterTimer;
void ClusterTimerHandler(void);
std::set<TcpSocket::Ptr> m_Servers;

View File

@ -126,6 +126,16 @@ String Endpoint::GetPort(void) const
return m_Port;
}
double Endpoint::GetSeen(void) const
{
return m_Seen;
}
void Endpoint::SetSeen(double ts)
{
m_Seen = ts;
}
void Endpoint::InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const
{
DynamicObject::InternalSerialize(bag, attributeTypes);
@ -134,6 +144,9 @@ void Endpoint::InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes)
bag->Set("host", m_Host);
bag->Set("port", m_Port);
}
if (attributeTypes & Attribute_State)
bag->Set("seen", m_Seen);
}
void Endpoint::InternalDeserialize(const Dictionary::Ptr& bag, int attributeTypes)
@ -144,4 +157,7 @@ void Endpoint::InternalDeserialize(const Dictionary::Ptr& bag, int attributeType
m_Host = bag->Get("host");
m_Port = bag->Get("port");
}
if (attributeTypes & Attribute_State)
m_Seen = bag->Get("seen");
}

View File

@ -53,6 +53,9 @@ public:
String GetHost(void) const;
String GetPort(void) const;
double GetSeen(void) const;
void SetSeen(double ts);
protected:
virtual void InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const;
virtual void InternalDeserialize(const Dictionary::Ptr& bag, int attributeTypes);
@ -63,6 +66,7 @@ private:
String m_Port;
Stream::Ptr m_Client;
double m_Seen;
void MessageThreadProc(const Stream::Ptr& stream);
};