Bugfixes.

This commit is contained in:
Gunnar Beutner 2012-07-02 14:38:37 +02:00
parent 5e4a3eb181
commit 1db53b84e6
5 changed files with 91 additions and 24 deletions

View File

@ -87,6 +87,18 @@ bool ConfigObject::IsAbstract(void) const
return value; return value;
} }
void ConfigObject::SetSource(const string& value)
{
GetProperties()->SetProperty("__source", value);
}
string ConfigObject::GetSource(void) const
{
string value;
GetProperties()->GetProperty("__source", &value);
return value;
}
void ConfigObject::Commit(void) void ConfigObject::Commit(void)
{ {
ConfigObject::Ptr dobj = GetObject(GetType(), GetName()); ConfigObject::Ptr dobj = GetObject(GetType(), GetName());

View File

@ -79,6 +79,9 @@ public:
void SetAbstract(bool value); void SetAbstract(bool value);
bool IsAbstract(void) const; bool IsAbstract(void) const;
void SetSource(const string& value);
string GetSource(void) const;
void Commit(void); void Commit(void);
void Unregister(void); void Unregister(void);

View File

@ -28,12 +28,12 @@ string ConfigRpcComponent::GetName(void) const
void ConfigRpcComponent::Start(void) void ConfigRpcComponent::Start(void)
{ {
m_Syncing = false;
EndpointManager::Ptr endpointManager = EndpointManager::GetInstance(); EndpointManager::Ptr endpointManager = EndpointManager::GetInstance();
m_Endpoint = boost::make_shared<VirtualEndpoint>(); m_Endpoint = boost::make_shared<VirtualEndpoint>();
long configSource;
if (GetConfig()->GetProperty("configSource", &configSource) && configSource != 0) {
m_Endpoint->RegisterTopicHandler("config::FetchObjects", m_Endpoint->RegisterTopicHandler("config::FetchObjects",
boost::bind(&ConfigRpcComponent::FetchObjectsHandler, this, _2)); boost::bind(&ConfigRpcComponent::FetchObjectsHandler, this, _2));
@ -43,13 +43,12 @@ void ConfigRpcComponent::Start(void)
m_Endpoint->RegisterPublication("config::ObjectCommitted"); m_Endpoint->RegisterPublication("config::ObjectCommitted");
m_Endpoint->RegisterPublication("config::ObjectRemoved"); m_Endpoint->RegisterPublication("config::ObjectRemoved");
}
endpointManager->OnNewEndpoint.connect(boost::bind(&ConfigRpcComponent::NewEndpointHandler, this, _2)); endpointManager->OnNewEndpoint.connect(boost::bind(&ConfigRpcComponent::NewEndpointHandler, this, _2));
m_Endpoint->RegisterPublication("config::FetchObjects"); m_Endpoint->RegisterPublication("config::FetchObjects");
m_Endpoint->RegisterTopicHandler("config::ObjectCommitted", m_Endpoint->RegisterTopicHandler("config::ObjectCommitted",
boost::bind(&ConfigRpcComponent::RemoteObjectCommittedHandler, this, _3)); boost::bind(&ConfigRpcComponent::RemoteObjectCommittedHandler, this, _2, _3));
m_Endpoint->RegisterTopicHandler("config::ObjectRemoved", m_Endpoint->RegisterTopicHandler("config::ObjectRemoved",
boost::bind(&ConfigRpcComponent::RemoteObjectRemovedHandler, this, _3)); boost::bind(&ConfigRpcComponent::RemoteObjectRemovedHandler, this, _3));
@ -121,6 +120,10 @@ void ConfigRpcComponent::FetchObjectsHandler(const Endpoint::Ptr& sender)
void ConfigRpcComponent::LocalObjectCommittedHandler(const ConfigObject::Ptr& object) void ConfigRpcComponent::LocalObjectCommittedHandler(const ConfigObject::Ptr& object)
{ {
/* don't send messages when we're currently processing a remote update */
if (m_Syncing)
return;
if (!ShouldReplicateObject(object)) if (!ShouldReplicateObject(object))
return; return;
@ -130,6 +133,10 @@ void ConfigRpcComponent::LocalObjectCommittedHandler(const ConfigObject::Ptr& ob
void ConfigRpcComponent::LocalObjectRemovedHandler(const ConfigObject::Ptr& object) void ConfigRpcComponent::LocalObjectRemovedHandler(const ConfigObject::Ptr& object)
{ {
/* don't send messages when we're currently processing a remote update */
if (m_Syncing)
return;
if (!ShouldReplicateObject(object)) if (!ShouldReplicateObject(object))
return; return;
@ -137,7 +144,7 @@ void ConfigRpcComponent::LocalObjectRemovedHandler(const ConfigObject::Ptr& obje
MakeObjectMessage(object, "config::ObjectRemoved", false)); MakeObjectMessage(object, "config::ObjectRemoved", false));
} }
void ConfigRpcComponent::RemoteObjectCommittedHandler(const RequestMessage& request) void ConfigRpcComponent::RemoteObjectCommittedHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
{ {
MessagePart params; MessagePart params;
if (!request.GetParams(&params)) if (!request.GetParams(&params))
@ -157,15 +164,41 @@ void ConfigRpcComponent::RemoteObjectCommittedHandler(const RequestMessage& requ
ConfigObject::Ptr object = ConfigObject::GetObject(type, name); ConfigObject::Ptr object = ConfigObject::GetObject(type, name);
if (!object) if (!object) {
object = boost::make_shared<ConfigObject>(properties.GetDictionary()); object = boost::make_shared<ConfigObject>(properties.GetDictionary());
else
if (object->GetSource() == EndpointManager::GetInstance()->GetIdentity()) {
/* the peer sent us an object that was originally created by us -
* however if was deleted locally so we have to tell the peer to destroy
* its copy of the object. */
EndpointManager::GetInstance()->SendMulticastMessage(m_Endpoint,
MakeObjectMessage(object, "config::ObjectRemoved", false));
return;
}
} else {
/* TODO: compare transaction timestamps and reject the update if our local object is newer */
object->SetProperties(properties.GetDictionary()); object->SetProperties(properties.GetDictionary());
}
if (object->IsLocal()) if (object->IsLocal())
throw invalid_argument("Replicated remote object is marked as local."); throw invalid_argument("Replicated remote object is marked as local.");
if (object->GetSource().empty())
object->SetSource(sender->GetIdentity());
try {
/* TODO: only ignore updates for _this_ object rather than all objects
* this might be relevant if the commit handler for this object
* creates other objects. */
m_Syncing = true;
object->Commit(); object->Commit();
m_Syncing = false;
} catch (const std::exception& ex) {
m_Syncing = false;
throw;
}
} }
void ConfigRpcComponent::RemoteObjectRemovedHandler(const RequestMessage& request) void ConfigRpcComponent::RemoteObjectRemovedHandler(const RequestMessage& request)
@ -187,8 +220,16 @@ void ConfigRpcComponent::RemoteObjectRemovedHandler(const RequestMessage& reques
if (!object) if (!object)
return; return;
if (!object->IsLocal()) if (!object->IsLocal()) {
try {
m_Syncing = true;
object->Unregister(); object->Unregister();
m_Syncing = false;
} catch (const std::exception& ex) {
m_Syncing = false;
throw;
}
}
} }
EXPORT_COMPONENT(configrpc, ConfigRpcComponent); EXPORT_COMPONENT(configrpc, ConfigRpcComponent);

View File

@ -35,6 +35,7 @@ public:
private: private:
VirtualEndpoint::Ptr m_Endpoint; VirtualEndpoint::Ptr m_Endpoint;
bool m_Syncing;
void NewEndpointHandler(const Endpoint::Ptr& endpoint); void NewEndpointHandler(const Endpoint::Ptr& endpoint);
void SessionEstablishedHandler(const Endpoint::Ptr& endpoint); void SessionEstablishedHandler(const Endpoint::Ptr& endpoint);
@ -43,7 +44,7 @@ private:
void LocalObjectRemovedHandler(const ConfigObject::Ptr& object); void LocalObjectRemovedHandler(const ConfigObject::Ptr& object);
void FetchObjectsHandler(const Endpoint::Ptr& sender); void FetchObjectsHandler(const Endpoint::Ptr& sender);
void RemoteObjectCommittedHandler(const RequestMessage& request); void RemoteObjectCommittedHandler(const Endpoint::Ptr& sender, const RequestMessage& request);
void RemoteObjectRemovedHandler(const RequestMessage& request); void RemoteObjectRemovedHandler(const RequestMessage& request);
static RequestMessage MakeObjectMessage(const ConfigObject::Ptr& object, static RequestMessage MakeObjectMessage(const ConfigObject::Ptr& object,

View File

@ -64,8 +64,18 @@ void DelegationComponent::ObjectCommittedHandler(const ConfigObject::Ptr& object
{ {
Service service(object); Service service(object);
string checker = service.GetChecker();
if (!checker.empty()) {
/* object was updated, clear its checker to make sure it's re-delegated by the delegation timer */ /* object was updated, clear its checker to make sure it's re-delegated by the delegation timer */
service.SetChecker(""); service.SetChecker("");
/* TODO: figure out a better way to clear individual services */
Endpoint::Ptr endpoint = EndpointManager::GetInstance()->GetEndpointByIdentity(checker);
if (endpoint)
ClearServices(endpoint);
}
} }
void DelegationComponent::AssignService(const Endpoint::Ptr& checker, const Service& service) void DelegationComponent::AssignService(const Endpoint::Ptr& checker, const Service& service)
@ -84,6 +94,10 @@ void DelegationComponent::AssignService(const Endpoint::Ptr& checker, const Serv
void DelegationComponent::ClearServices(const Endpoint::Ptr& checker) void DelegationComponent::ClearServices(const Endpoint::Ptr& checker)
{ {
stringstream msgbuf;
msgbuf << "Clearing assigned services for endpoint '" << checker->GetIdentity() << "'";
Application::Log(LogInformation, "delegation", msgbuf.str());
RequestMessage request; RequestMessage request;
request.SetMethod("checker::ClearServices"); request.SetMethod("checker::ClearServices");
@ -134,10 +148,6 @@ void DelegationComponent::SessionEstablishedHandler(const Endpoint::Ptr& endpoin
if (!IsEndpointChecker(endpoint)) if (!IsEndpointChecker(endpoint))
return; return;
stringstream msgbuf;
msgbuf << "Clearing assigned services for endpoint '" << endpoint->GetIdentity() << "'";
Application::Log(LogInformation, "delegation", msgbuf.str());
/* locally clear checker for all services that previously belonged to this endpoint */ /* locally clear checker for all services that previously belonged to this endpoint */
ConfigObject::Set::Iterator it; ConfigObject::Set::Iterator it;
for (it = m_AllServices->Begin(); it != m_AllServices->End(); it++) { for (it = m_AllServices->Begin(); it != m_AllServices->End(); it++) {