diff --git a/base/configobject.cpp b/base/configobject.cpp index 49b7e529f..57a062f96 100644 --- a/base/configobject.cpp +++ b/base/configobject.cpp @@ -87,6 +87,18 @@ bool ConfigObject::IsAbstract(void) const return value; } +void ConfigObject::SetSource(const string& value) +{ + GetProperties()->SetProperty("__source", value); +} + +string ConfigObject::GetSource(void) const +{ + string value; + GetProperties()->GetProperty("__source", &value); + return value; +} + void ConfigObject::Commit(void) { ConfigObject::Ptr dobj = GetObject(GetType(), GetName()); diff --git a/base/configobject.h b/base/configobject.h index 8c376a720..84b7b6db4 100644 --- a/base/configobject.h +++ b/base/configobject.h @@ -79,6 +79,9 @@ public: void SetAbstract(bool value); bool IsAbstract(void) const; + void SetSource(const string& value); + string GetSource(void) const; + void Commit(void); void Unregister(void); diff --git a/components/configrpc/configrpccomponent.cpp b/components/configrpc/configrpccomponent.cpp index 7ac576676..931281ced 100644 --- a/components/configrpc/configrpccomponent.cpp +++ b/components/configrpc/configrpccomponent.cpp @@ -28,28 +28,27 @@ string ConfigRpcComponent::GetName(void) const void ConfigRpcComponent::Start(void) { + m_Syncing = false; + EndpointManager::Ptr endpointManager = EndpointManager::GetInstance(); m_Endpoint = boost::make_shared(); - long configSource; - if (GetConfig()->GetProperty("configSource", &configSource) && configSource != 0) { - m_Endpoint->RegisterTopicHandler("config::FetchObjects", - boost::bind(&ConfigRpcComponent::FetchObjectsHandler, this, _2)); + m_Endpoint->RegisterTopicHandler("config::FetchObjects", + boost::bind(&ConfigRpcComponent::FetchObjectsHandler, this, _2)); - ConfigObject::GetAllObjects()->OnObjectAdded.connect(boost::bind(&ConfigRpcComponent::LocalObjectCommittedHandler, this, _2)); - ConfigObject::GetAllObjects()->OnObjectCommitted.connect(boost::bind(&ConfigRpcComponent::LocalObjectCommittedHandler, this, _2)); - ConfigObject::GetAllObjects()->OnObjectRemoved.connect(boost::bind(&ConfigRpcComponent::LocalObjectRemovedHandler, this, _2)); + ConfigObject::GetAllObjects()->OnObjectAdded.connect(boost::bind(&ConfigRpcComponent::LocalObjectCommittedHandler, this, _2)); + ConfigObject::GetAllObjects()->OnObjectCommitted.connect(boost::bind(&ConfigRpcComponent::LocalObjectCommittedHandler, this, _2)); + ConfigObject::GetAllObjects()->OnObjectRemoved.connect(boost::bind(&ConfigRpcComponent::LocalObjectRemovedHandler, this, _2)); - m_Endpoint->RegisterPublication("config::ObjectCommitted"); - m_Endpoint->RegisterPublication("config::ObjectRemoved"); - } + m_Endpoint->RegisterPublication("config::ObjectCommitted"); + m_Endpoint->RegisterPublication("config::ObjectRemoved"); endpointManager->OnNewEndpoint.connect(boost::bind(&ConfigRpcComponent::NewEndpointHandler, this, _2)); m_Endpoint->RegisterPublication("config::FetchObjects"); m_Endpoint->RegisterTopicHandler("config::ObjectCommitted", - boost::bind(&ConfigRpcComponent::RemoteObjectCommittedHandler, this, _3)); + boost::bind(&ConfigRpcComponent::RemoteObjectCommittedHandler, this, _2, _3)); m_Endpoint->RegisterTopicHandler("config::ObjectRemoved", boost::bind(&ConfigRpcComponent::RemoteObjectRemovedHandler, this, _3)); @@ -121,6 +120,10 @@ void ConfigRpcComponent::FetchObjectsHandler(const Endpoint::Ptr& sender) void ConfigRpcComponent::LocalObjectCommittedHandler(const ConfigObject::Ptr& object) { + /* don't send messages when we're currently processing a remote update */ + if (m_Syncing) + return; + if (!ShouldReplicateObject(object)) return; @@ -130,6 +133,10 @@ void ConfigRpcComponent::LocalObjectCommittedHandler(const ConfigObject::Ptr& ob void ConfigRpcComponent::LocalObjectRemovedHandler(const ConfigObject::Ptr& object) { + /* don't send messages when we're currently processing a remote update */ + if (m_Syncing) + return; + if (!ShouldReplicateObject(object)) return; @@ -137,7 +144,7 @@ void ConfigRpcComponent::LocalObjectRemovedHandler(const ConfigObject::Ptr& obje MakeObjectMessage(object, "config::ObjectRemoved", false)); } -void ConfigRpcComponent::RemoteObjectCommittedHandler(const RequestMessage& request) +void ConfigRpcComponent::RemoteObjectCommittedHandler(const Endpoint::Ptr& sender, const RequestMessage& request) { MessagePart params; if (!request.GetParams(¶ms)) @@ -157,15 +164,41 @@ void ConfigRpcComponent::RemoteObjectCommittedHandler(const RequestMessage& requ ConfigObject::Ptr object = ConfigObject::GetObject(type, name); - if (!object) + if (!object) { object = boost::make_shared(properties.GetDictionary()); - else + + if (object->GetSource() == EndpointManager::GetInstance()->GetIdentity()) { + /* the peer sent us an object that was originally created by us - + * however if was deleted locally so we have to tell the peer to destroy + * its copy of the object. */ + EndpointManager::GetInstance()->SendMulticastMessage(m_Endpoint, + MakeObjectMessage(object, "config::ObjectRemoved", false)); + + return; + } + } else { + /* TODO: compare transaction timestamps and reject the update if our local object is newer */ + object->SetProperties(properties.GetDictionary()); + } if (object->IsLocal()) throw invalid_argument("Replicated remote object is marked as local."); - object->Commit(); + if (object->GetSource().empty()) + object->SetSource(sender->GetIdentity()); + + try { + /* TODO: only ignore updates for _this_ object rather than all objects + * this might be relevant if the commit handler for this object + * creates other objects. */ + m_Syncing = true; + object->Commit(); + m_Syncing = false; + } catch (const std::exception& ex) { + m_Syncing = false; + throw; + } } void ConfigRpcComponent::RemoteObjectRemovedHandler(const RequestMessage& request) @@ -187,8 +220,16 @@ void ConfigRpcComponent::RemoteObjectRemovedHandler(const RequestMessage& reques if (!object) return; - if (!object->IsLocal()) - object->Unregister(); + if (!object->IsLocal()) { + try { + m_Syncing = true; + object->Unregister(); + m_Syncing = false; + } catch (const std::exception& ex) { + m_Syncing = false; + throw; + } + } } EXPORT_COMPONENT(configrpc, ConfigRpcComponent); diff --git a/components/configrpc/configrpccomponent.h b/components/configrpc/configrpccomponent.h index c3f602de5..5e9bc073f 100644 --- a/components/configrpc/configrpccomponent.h +++ b/components/configrpc/configrpccomponent.h @@ -35,6 +35,7 @@ public: private: VirtualEndpoint::Ptr m_Endpoint; + bool m_Syncing; void NewEndpointHandler(const Endpoint::Ptr& endpoint); void SessionEstablishedHandler(const Endpoint::Ptr& endpoint); @@ -43,7 +44,7 @@ private: void LocalObjectRemovedHandler(const ConfigObject::Ptr& object); void FetchObjectsHandler(const Endpoint::Ptr& sender); - void RemoteObjectCommittedHandler(const RequestMessage& request); + void RemoteObjectCommittedHandler(const Endpoint::Ptr& sender, const RequestMessage& request); void RemoteObjectRemovedHandler(const RequestMessage& request); static RequestMessage MakeObjectMessage(const ConfigObject::Ptr& object, diff --git a/components/delegation/delegationcomponent.cpp b/components/delegation/delegationcomponent.cpp index e85316633..b3139ecc8 100644 --- a/components/delegation/delegationcomponent.cpp +++ b/components/delegation/delegationcomponent.cpp @@ -64,8 +64,18 @@ void DelegationComponent::ObjectCommittedHandler(const ConfigObject::Ptr& object { Service service(object); - /* object was updated, clear its checker to make sure it's re-delegated by the delegation timer */ - service.SetChecker(""); + string checker = service.GetChecker(); + + if (!checker.empty()) { + /* object was updated, clear its checker to make sure it's re-delegated by the delegation timer */ + service.SetChecker(""); + + /* TODO: figure out a better way to clear individual services */ + Endpoint::Ptr endpoint = EndpointManager::GetInstance()->GetEndpointByIdentity(checker); + + if (endpoint) + ClearServices(endpoint); + } } void DelegationComponent::AssignService(const Endpoint::Ptr& checker, const Service& service) @@ -84,6 +94,10 @@ void DelegationComponent::AssignService(const Endpoint::Ptr& checker, const Serv void DelegationComponent::ClearServices(const Endpoint::Ptr& checker) { + stringstream msgbuf; + msgbuf << "Clearing assigned services for endpoint '" << checker->GetIdentity() << "'"; + Application::Log(LogInformation, "delegation", msgbuf.str()); + RequestMessage request; request.SetMethod("checker::ClearServices"); @@ -134,10 +148,6 @@ void DelegationComponent::SessionEstablishedHandler(const Endpoint::Ptr& endpoin if (!IsEndpointChecker(endpoint)) return; - stringstream msgbuf; - msgbuf << "Clearing assigned services for endpoint '" << endpoint->GetIdentity() << "'"; - Application::Log(LogInformation, "delegation", msgbuf.str()); - /* locally clear checker for all services that previously belonged to this endpoint */ ConfigObject::Set::Iterator it; for (it = m_AllServices->Begin(); it != m_AllServices->End(); it++) {