Removed roles/publications.

This commit is contained in:
Gunnar Beutner 2012-08-14 10:53:04 +02:00
parent 645a767ecc
commit 8b87e30197
19 changed files with 17 additions and 221 deletions

View File

@ -24,7 +24,11 @@ using namespace icinga;
void CheckerComponent::Start(void)
{
m_Endpoint = boost::make_shared<VirtualEndpoint>();
m_Endpoint->RegisterPublication("checker::ServiceStateChange");
/* dummy registration so the delegation module knows this is a checker
TODO: figure out a better way for this */
m_Endpoint->RegisterSubscription("checker");
EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint);
Service::OnCheckerChanged.connect(bind(&CheckerComponent::CheckerChangedHandler, this, _1));

View File

@ -36,12 +36,8 @@ void CIBSyncComponent::Start(void)
DynamicObject::OnUnregistered.connect(boost::bind(&CIBSyncComponent::LocalObjectUnregisteredHandler, this, _1));
DynamicObject::OnTransactionClosing.connect(boost::bind(&CIBSyncComponent::TransactionClosingHandler, this, _1));
m_Endpoint->RegisterPublication("config::ObjectUpdate");
m_Endpoint->RegisterPublication("config::ObjectRemoved");
EndpointManager::GetInstance()->OnNewEndpoint.connect(boost::bind(&CIBSyncComponent::NewEndpointHandler, this, _2));
m_Endpoint->RegisterPublication("config::FetchObjects");
m_Endpoint->RegisterTopicHandler("config::ObjectUpdate",
boost::bind(&CIBSyncComponent::RemoteObjectUpdateHandler, this, _2, _3));
m_Endpoint->RegisterTopicHandler("config::ObjectRemoved",

View File

@ -33,7 +33,7 @@ void DelegationComponent::Start(void)
bool DelegationComponent::IsEndpointChecker(const Endpoint::Ptr& endpoint)
{
return (endpoint->HasPublication("checker::ServiceStateChange"));
return (endpoint->HasSubscription("checker"));
}
vector<Endpoint::Ptr> DelegationComponent::GetCheckerCandidates(const Service::Ptr& service) const

View File

@ -29,7 +29,6 @@ void DemoComponent::Start(void)
m_Endpoint = boost::make_shared<VirtualEndpoint>();
m_Endpoint->RegisterTopicHandler("demo::HelloWorld",
boost::bind(&DemoComponent::HelloWorldRequestHandler, this, _2, _3));
m_Endpoint->RegisterPublication("demo::HelloWorld");
EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint);
m_DemoTimer = boost::make_shared<Timer>();

View File

@ -28,11 +28,9 @@ void DiscoveryComponent::Start(void)
{
m_Endpoint = boost::make_shared<VirtualEndpoint>();
m_Endpoint->RegisterPublication("discovery::RegisterComponent");
m_Endpoint->RegisterTopicHandler("discovery::RegisterComponent",
boost::bind(&DiscoveryComponent::RegisterComponentMessageHandler, this, _2, _3));
m_Endpoint->RegisterPublication("discovery::NewComponent");
m_Endpoint->RegisterTopicHandler("discovery::NewComponent",
boost::bind(&DiscoveryComponent::NewComponentMessageHandler, this, _3));
@ -101,12 +99,6 @@ void DiscoveryComponent::NewEndpointHandler(const Endpoint::Ptr& endpoint)
return;
}
/* accept discovery::RegisterComponent messages from any endpoint */
endpoint->RegisterPublication("discovery::RegisterComponent");
/* accept discovery::Welcome messages from any endpoint */
endpoint->RegisterPublication("discovery::Welcome");
String identity = endpoint->GetIdentity();
if (identity == EndpointManager::GetInstance()->GetIdentity()) {
@ -156,10 +148,6 @@ void DiscoveryComponent::NewEndpointHandler(const Endpoint::Ptr& endpoint)
// register published/subscribed topics for this endpoint
ComponentDiscoveryInfo::Ptr info = ic->second;
BOOST_FOREACH(String publication, info->Publications) {
endpoint->RegisterPublication(publication);
}
BOOST_FOREACH(String subscription, info->Subscriptions) {
endpoint->RegisterSubscription(subscription);
}
@ -180,9 +168,6 @@ void DiscoveryComponent::DiscoveryEndpointHandler(const Endpoint::Ptr& endpoint,
for (i = endpoint->BeginSubscriptions(); i != endpoint->EndSubscriptions(); i++)
info->Subscriptions.insert(*i);
for (i = endpoint->BeginPublications(); i != endpoint->EndPublications(); i++)
info->Publications.insert(*i);
}
/**
@ -295,41 +280,12 @@ void DiscoveryComponent::SendDiscoveryMessage(const String& method, const String
params.SetSubscriptions(subscriptions);
Dictionary::Ptr publications = boost::make_shared<Dictionary>();
BOOST_FOREACH(String publication, info->Publications) {
publications->Add(publication);
}
params.SetPublications(publications);
if (recipient)
EndpointManager::GetInstance()->SendUnicastMessage(m_Endpoint, recipient, request);
else
EndpointManager::GetInstance()->SendMulticastMessage(m_Endpoint, request);
}
bool DiscoveryComponent::HasMessagePermission(const Dictionary::Ptr& roles, const String& messageType, const String& message)
{
if (!roles)
return false;
Value roleName;
BOOST_FOREACH(tie(tuples::ignore, roleName), roles) {
DynamicObject::Ptr role = DynamicObject::GetObject("Role", roleName);
Dictionary::Ptr permissions = role->Get(messageType);
if (!permissions)
continue;
Value permission;
BOOST_FOREACH(tie(tuples::ignore, permission), permissions) {
if (Utility::Match(permission, message))
return true;
}
}
return false;
}
/**
* Processes a discovery message by registering the component in the
* discovery component registry.
@ -363,27 +319,13 @@ void DiscoveryComponent::ProcessDiscoveryMessage(const String& identity, const D
Endpoint::Ptr endpoint = EndpointManager::GetInstance()->GetEndpointByIdentity(identity);
Dictionary::Ptr publications;
if (message.GetPublications(&publications)) {
Value publication;
BOOST_FOREACH(tie(tuples::ignore, publication), publications) {
if (trusted || HasMessagePermission(roles, "publications", publication)) {
info->Publications.insert(publication);
if (endpoint)
endpoint->RegisterPublication(publication);
}
}
}
Dictionary::Ptr subscriptions;
if (message.GetSubscriptions(&subscriptions)) {
Value subscription;
BOOST_FOREACH(tie(tuples::ignore, subscription), subscriptions) {
if (trusted || HasMessagePermission(roles, "subscriptions", subscription)) {
info->Subscriptions.insert(subscription);
if (endpoint)
endpoint->RegisterSubscription(subscription);
}
info->Subscriptions.insert(subscription);
if (endpoint)
endpoint->RegisterSubscription(subscription);
}
}
@ -490,7 +432,6 @@ void DiscoveryComponent::DiscoveryTimerHandler(void)
/* update LastSeen if we're still connected to this endpoint */
info->LastSeen = now;
} else {
/* TODO: figure out whether we actually want to connect to this component */
/* try and reconnect to this component */
try {
if (!info->Node.IsEmpty() && !info->Service.IsEmpty())

View File

@ -74,8 +74,6 @@ private:
void FinishDiscoverySetup(const Endpoint::Ptr& endpoint);
bool HasMessagePermission(const Dictionary::Ptr& roles, const String& messageType, const String& message);
static const int RegistrationTTL = 300;
};

View File

@ -69,12 +69,3 @@ void DiscoveryMessage::SetSubscriptions(const Dictionary::Ptr& value)
Set("subscriptions", value);
}
bool DiscoveryMessage::GetPublications(Dictionary::Ptr *value) const
{
return Get("publications", value);
}
void DiscoveryMessage::SetPublications(const Dictionary::Ptr& value)
{
Set("publications", value);
}

View File

@ -43,9 +43,6 @@ public:
bool GetSubscriptions(Dictionary::Ptr *value) const;
void SetSubscriptions(const Dictionary::Ptr& value);
bool GetPublications(Dictionary::Ptr *value) const;
void SetPublications(const Dictionary::Ptr& value);
};
}

View File

@ -53,6 +53,7 @@ AX_CXX_GCC_ABI_DEMANGLE
AX_BOOST_BASE
AX_BOOST_SIGNALS
AX_BOOST_THREAD
AX_BOOST_SYSTEM
AX_BOOST_UNIT_TEST_FRAMEWORK
AX_CHECK_OPENSSL([], [AC_MSG_ERROR([You need the OpenSSL headers and libraries in order to build this application])])
AC_CHECK_LIB(ssl, SSL_new)

Binary file not shown.

View File

@ -20,11 +20,5 @@ local object Component "discovery" {
local object Endpoint "icinga-c1" {
node = "192.168.5.46",
service = 7777,
roles = { "all" }
}
local object Role "all" {
publications = { "*" },
subscriptions = { "*" }
}

View File

@ -14,11 +14,5 @@ local object component "discovery" {
local object endpoint "icinga-c1" {
node = "192.168.5.46",
service = 7777,
roles = { "all" }
}
local object role "all" {
publications = { "*" },
subscriptions = { "*" }
}

View File

@ -3,7 +3,11 @@ local object Application "icinga" {
ca = "ca.crt",
node = "192.168.2.235",
service = 7777
service = 7777,
macros = {
plugindir = "/usr/local/icinga/libexec"
}
}
local object Component "discovery" {
@ -22,23 +26,6 @@ local object Component "compat" {
}
local object Endpoint "icinga-c2" {
roles = { "all" }
}
local object Endpoint "icinga-c3" {
roles = { "all" }
}
local object Endpoint "icinga-c4" {
roles = { "all" }
}
local object Role "all" {
publications = { "*" },
subscriptions = { "*" }
}
object Host "localhost" {
}
@ -46,10 +33,6 @@ object Host "localhost" {
abstract object Service "nagios-service" {
methods = {
check = "native::NagiosCheck"
},
macros = {
plugindir = "/usr/local/icinga/libexec"
}
}

View File

@ -17,19 +17,3 @@ local object Component "discovery" {
broker = 1
}
local object Endpoint "icinga-c2" {
roles = { "demo" }
}
local object Endpoint "icinga-c3" {
roles = { "demo" }
}
local object Role "broker" {
publications = { "discovery::NewComponent" }
}
local object Role "demo" {
publications = { "demo::*" },
subscriptions = { "demo::*" }
}

View File

@ -22,13 +22,6 @@ local object component "discovery" {
local object endpoint "icinga-c3" {
node = "192.168.5.46",
service = 9999,
roles = { "all" }
}
local object role "all" {
publications = { "*" },
subscriptions = { "*" }
}
# --------------------------------------------

View File

@ -19,11 +19,5 @@ local object component "discovery" {
local object endpoint "icinga-c2" {
node = "192.168.2.235",
service = 7777,
roles = { "all" }
}
local object role "all" {
publications = { "*" },
subscriptions = { "*" }
}

View File

@ -72,37 +72,6 @@ bool Endpoint::HasSubscription(String topic) const
return (m_Subscriptions.find(topic) != m_Subscriptions.end());
}
/**
* Registers a topic publication for this endpoint.
*
* @param topic The name of the topic.
*/
void Endpoint::RegisterPublication(String topic)
{
m_Publications.insert(topic);
}
/**
* Removes a topic publication from this endpoint.
*
* @param topic The name of the topic.
*/
void Endpoint::UnregisterPublication(String topic)
{
m_Publications.erase(topic);
}
/**
* Checks whether the endpoint has a publication for the specified topic.
*
* @param topic The name of the topic.
* @returns true if the endpoint is publishing this topic, false otherwise.
*/
bool Endpoint::HasPublication(String topic) const
{
return (m_Publications.find(topic) != m_Publications.end());
}
/**
* Removes all subscriptions for the endpoint.
*/
@ -111,14 +80,6 @@ void Endpoint::ClearSubscriptions(void)
m_Subscriptions.clear();
}
/**
* Removes all publications for the endpoint.
*/
void Endpoint::ClearPublications(void)
{
m_Publications.clear();
}
/**
* Returns the beginning of the subscriptions list.
*
@ -139,26 +100,6 @@ Endpoint::ConstTopicIterator Endpoint::EndSubscriptions(void) const
return m_Subscriptions.end();
}
/**
* Returns the beginning of the publications list.
*
* @returns An iterator that points to the first publication.
*/
Endpoint::ConstTopicIterator Endpoint::BeginPublications(void) const
{
return m_Publications.begin();
}
/**
* Returns the end of the publications list.
*
* @returns An iterator that points past the last publication.
*/
Endpoint::ConstTopicIterator Endpoint::EndPublications(void) const
{
return m_Publications.end();
}
/**
* Sets whether a welcome message has been received from this endpoint.
*

View File

@ -58,10 +58,6 @@ public:
void UnregisterSubscription(String topic);
bool HasSubscription(String topic) const;
void RegisterPublication(String topic);
void UnregisterPublication(String topic);
bool HasPublication(String topic) const;
virtual bool IsLocal(void) const = 0;
virtual bool IsConnected(void) const = 0;
@ -71,21 +67,15 @@ public:
virtual void Stop(void) = 0;
void ClearSubscriptions(void);
void ClearPublications(void);
ConstTopicIterator BeginSubscriptions(void) const;
ConstTopicIterator EndSubscriptions(void) const;
ConstTopicIterator BeginPublications(void) const;
ConstTopicIterator EndPublications(void) const;
boost::signal<void (const Endpoint::Ptr&)> OnSessionEstablished;
private:
set<String> m_Subscriptions; /**< The topics this endpoint is
subscribed to. */
set<String> m_Publications; /**< The topics this endpoint is
publishing. */
bool m_ReceivedWelcome; /**< Have we received a welcome message
from this endpoint? */
bool m_SentWelcome; /**< Have we sent a welcome message to this

View File

@ -96,9 +96,6 @@ void JsonRpcEndpoint::NewMessageHandler(const MessagePart& message)
if (!request.GetMethod(&method))
return;
if (!HasPublication(method))
return;
String id;
if (request.GetID(&id))
GetEndpointManager()->SendAnycastMessage(sender, request);
@ -119,10 +116,9 @@ void JsonRpcEndpoint::ClientClosedHandler(void)
Logger::Write(LogWarning, "jsonrpc", "Lost connection to endpoint: identity=" + GetIdentity());
// TODO: _only_ clear non-persistent publications/subscriptions
// unregister ourselves if no persistent publications/subscriptions are left (use a timer for that, once we have a TTL property for the topics)
// TODO: _only_ clear non-persistent subscriptions
// unregister ourselves if no persistent subscriptions are left (use a timer for that, once we have a TTL property for the topics)
ClearSubscriptions();
ClearPublications();
// remove the endpoint if there are no more subscriptions */
if (BeginSubscriptions() == EndSubscriptions()) {