mirror of
https://github.com/Icinga/icinga2.git
synced 2025-07-27 07:34:15 +02:00
Implemented broker authorisation.
This commit is contained in:
parent
203d788ea5
commit
4c04c47018
@ -27,15 +27,15 @@ void DiscoveryComponent::Start(void)
|
|||||||
m_DiscoveryEndpoint->RegisterMethodHandler("discovery::Welcome",
|
m_DiscoveryEndpoint->RegisterMethodHandler("discovery::Welcome",
|
||||||
bind_weak(&DiscoveryComponent::WelcomeMessageHandler, shared_from_this()));
|
bind_weak(&DiscoveryComponent::WelcomeMessageHandler, shared_from_this()));
|
||||||
|
|
||||||
GetEndpointManager()->ForeachEndpoint(bind(&DiscoveryComponent::NewEndpointHandler, this, _1));
|
GetEndpointManager()->ForEachEndpoint(bind(&DiscoveryComponent::NewEndpointHandler, this, _1));
|
||||||
GetEndpointManager()->OnNewEndpoint += bind_weak(&DiscoveryComponent::NewEndpointHandler, shared_from_this());
|
GetEndpointManager()->OnNewEndpoint += bind_weak(&DiscoveryComponent::NewEndpointHandler, shared_from_this());
|
||||||
|
|
||||||
GetEndpointManager()->RegisterEndpoint(m_DiscoveryEndpoint);
|
GetEndpointManager()->RegisterEndpoint(m_DiscoveryEndpoint);
|
||||||
|
|
||||||
m_DiscoveryConnectTimer = make_shared<Timer>();
|
m_DiscoveryTimer = make_shared<Timer>();
|
||||||
m_DiscoveryConnectTimer->SetInterval(30);
|
m_DiscoveryTimer->SetInterval(30);
|
||||||
m_DiscoveryConnectTimer->OnTimerExpired += bind_weak(&DiscoveryComponent::ReconnectTimerHandler, shared_from_this());
|
m_DiscoveryTimer->OnTimerExpired += bind_weak(&DiscoveryComponent::DiscoveryTimerHandler, shared_from_this());
|
||||||
m_DiscoveryConnectTimer->Start();
|
m_DiscoveryTimer->Start();
|
||||||
}
|
}
|
||||||
|
|
||||||
void DiscoveryComponent::Stop(void)
|
void DiscoveryComponent::Stop(void)
|
||||||
@ -73,13 +73,9 @@ int DiscoveryComponent::NewEndpointHandler(const NewEndpointEventArgs& neea)
|
|||||||
neea.Endpoint->RegisterMethodSource("discovery::RegisterComponent");
|
neea.Endpoint->RegisterMethodSource("discovery::RegisterComponent");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* accept discovery::Welcome messages from any endpoint */
|
||||||
neea.Endpoint->RegisterMethodSource("discovery::Welcome");
|
neea.Endpoint->RegisterMethodSource("discovery::Welcome");
|
||||||
|
|
||||||
/* TODO: implement message broker authorisation */
|
|
||||||
neea.Endpoint->RegisterMethodSource("discovery::NewComponent");
|
|
||||||
|
|
||||||
/* TODO: register handler to unregister this endpoint when it's closed */
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -97,8 +93,8 @@ int DiscoveryComponent::DiscoverySourceHandler(const NewMethodEventArgs& nmea, C
|
|||||||
|
|
||||||
int DiscoveryComponent::DiscoveryEndpointHandler(const NewEndpointEventArgs& neea, ComponentDiscoveryInfo::Ptr info) const
|
int DiscoveryComponent::DiscoveryEndpointHandler(const NewEndpointEventArgs& neea, ComponentDiscoveryInfo::Ptr info) const
|
||||||
{
|
{
|
||||||
neea.Endpoint->ForeachMethodSink(bind(&DiscoveryComponent::DiscoverySinkHandler, this, _1, info));
|
neea.Endpoint->ForEachMethodSink(bind(&DiscoveryComponent::DiscoverySinkHandler, this, _1, info));
|
||||||
neea.Endpoint->ForeachMethodSource(bind(&DiscoveryComponent::DiscoverySourceHandler, this, _1, info));
|
neea.Endpoint->ForEachMethodSource(bind(&DiscoveryComponent::DiscoverySourceHandler, this, _1, info));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -107,8 +103,9 @@ bool DiscoveryComponent::GetComponentDiscoveryInfo(string component, ComponentDi
|
|||||||
if (component == GetEndpointManager()->GetIdentity()) {
|
if (component == GetEndpointManager()->GetIdentity()) {
|
||||||
/* Build fake discovery info for ourselves */
|
/* Build fake discovery info for ourselves */
|
||||||
*info = make_shared<ComponentDiscoveryInfo>();
|
*info = make_shared<ComponentDiscoveryInfo>();
|
||||||
GetEndpointManager()->ForeachEndpoint(bind(&DiscoveryComponent::DiscoveryEndpointHandler, this, _1, *info));
|
GetEndpointManager()->ForEachEndpoint(bind(&DiscoveryComponent::DiscoveryEndpointHandler, this, _1, *info));
|
||||||
|
|
||||||
|
(*info)->LastSeen = 0;
|
||||||
(*info)->Node = GetIcingaApplication()->GetNode();
|
(*info)->Node = GetIcingaApplication()->GetNode();
|
||||||
(*info)->Service = GetIcingaApplication()->GetService();
|
(*info)->Service = GetIcingaApplication()->GetService();
|
||||||
|
|
||||||
@ -146,9 +143,16 @@ int DiscoveryComponent::NewIdentityHandler(const EventArgs& ea)
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
GetEndpointManager()->ForeachEndpoint(bind(&DiscoveryComponent::CheckExistingEndpoint, this, endpoint, _1));
|
GetEndpointManager()->ForEachEndpoint(bind(&DiscoveryComponent::CheckExistingEndpoint, this, endpoint, _1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ConfigCollection::Ptr brokerCollection = GetApplication()->GetConfigHive()->GetCollection("broker");
|
||||||
|
if (brokerCollection->GetObject(identity)) {
|
||||||
|
/* accept discovery::NewComponent messages from brokers */
|
||||||
|
endpoint->RegisterMethodSource("discovery::NewComponent");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// we assume the other component _always_ wants
|
// we assume the other component _always_ wants
|
||||||
// discovery::RegisterComponent messages from us
|
// discovery::RegisterComponent messages from us
|
||||||
endpoint->RegisterMethodSink("discovery::RegisterComponent");
|
endpoint->RegisterMethodSink("discovery::RegisterComponent");
|
||||||
@ -281,8 +285,14 @@ void DiscoveryComponent::SendDiscoveryMessage(string method, string identity, En
|
|||||||
|
|
||||||
void DiscoveryComponent::ProcessDiscoveryMessage(string identity, DiscoveryMessage message)
|
void DiscoveryComponent::ProcessDiscoveryMessage(string identity, DiscoveryMessage message)
|
||||||
{
|
{
|
||||||
|
/* ignore discovery messages that are about ourselves */
|
||||||
|
if (identity == GetEndpointManager()->GetIdentity())
|
||||||
|
return;
|
||||||
|
|
||||||
ComponentDiscoveryInfo::Ptr info = make_shared<ComponentDiscoveryInfo>();
|
ComponentDiscoveryInfo::Ptr info = make_shared<ComponentDiscoveryInfo>();
|
||||||
|
|
||||||
|
time(&(info->LastSeen));
|
||||||
|
|
||||||
message.GetNode(&info->Node);
|
message.GetNode(&info->Node);
|
||||||
message.GetService(&info->Service);
|
message.GetService(&info->Service);
|
||||||
|
|
||||||
@ -290,6 +300,9 @@ void DiscoveryComponent::ProcessDiscoveryMessage(string identity, DiscoveryMessa
|
|||||||
if (message.GetProvides(&provides)) {
|
if (message.GetProvides(&provides)) {
|
||||||
DictionaryIterator i;
|
DictionaryIterator i;
|
||||||
for (i = provides.GetDictionary()->Begin(); i != provides.GetDictionary()->End(); i++) {
|
for (i = provides.GetDictionary()->Begin(); i != provides.GetDictionary()->End(); i++) {
|
||||||
|
if (IsBroker()) {
|
||||||
|
/* TODO: Add authorisation checks here */
|
||||||
|
}
|
||||||
info->PublishedMethods.insert(i->second);
|
info->PublishedMethods.insert(i->second);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -298,6 +311,9 @@ void DiscoveryComponent::ProcessDiscoveryMessage(string identity, DiscoveryMessa
|
|||||||
if (message.GetSubscribes(&subscribes)) {
|
if (message.GetSubscribes(&subscribes)) {
|
||||||
DictionaryIterator i;
|
DictionaryIterator i;
|
||||||
for (i = subscribes.GetDictionary()->Begin(); i != subscribes.GetDictionary()->End(); i++) {
|
for (i = subscribes.GetDictionary()->Begin(); i != subscribes.GetDictionary()->End(); i++) {
|
||||||
|
if (IsBroker()) {
|
||||||
|
/* TODO: Add authorisation checks here */
|
||||||
|
}
|
||||||
info->SubscribedMethods.insert(i->second);
|
info->SubscribedMethods.insert(i->second);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -334,24 +350,78 @@ int DiscoveryComponent::NewComponentMessageHandler(const NewRequestEventArgs& nr
|
|||||||
|
|
||||||
int DiscoveryComponent::RegisterComponentMessageHandler(const NewRequestEventArgs& nrea)
|
int DiscoveryComponent::RegisterComponentMessageHandler(const NewRequestEventArgs& nrea)
|
||||||
{
|
{
|
||||||
|
/* ignore discovery::RegisterComponent messages when we're not a broker */
|
||||||
|
if (!IsBroker())
|
||||||
|
return 0;
|
||||||
|
|
||||||
DiscoveryMessage message;
|
DiscoveryMessage message;
|
||||||
nrea.Request.GetParams(&message);
|
nrea.Request.GetParams(&message);
|
||||||
ProcessDiscoveryMessage(nrea.Sender->GetIdentity(), message);
|
ProcessDiscoveryMessage(nrea.Sender->GetIdentity(), message);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int DiscoveryComponent::ReconnectTimerHandler(const TimerEventArgs& tea)
|
int DiscoveryComponent::BrokerConfigHandler(const EventArgs& ea)
|
||||||
{
|
{
|
||||||
|
ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea.Source);
|
||||||
|
|
||||||
EndpointManager::Ptr endpointManager = GetEndpointManager();
|
EndpointManager::Ptr endpointManager = GetEndpointManager();
|
||||||
|
|
||||||
map<string, ComponentDiscoveryInfo::Ptr>::iterator i;
|
/* Check if we're already connected to this broker. */
|
||||||
for (i = m_Components.begin(); i != m_Components.end(); i++) {
|
if (endpointManager->GetEndpointByIdentity(object->GetName()))
|
||||||
Endpoint::Ptr endpoint = endpointManager->GetEndpointByIdentity(i->first);
|
return 0;
|
||||||
if (endpoint)
|
|
||||||
continue;
|
|
||||||
|
|
||||||
|
string node;
|
||||||
|
if (!object->GetPropertyString("node", &node))
|
||||||
|
throw InvalidArgumentException("'node' property required for 'broker' config object.");
|
||||||
|
|
||||||
|
string service;
|
||||||
|
if (!object->GetPropertyString("service", &service))
|
||||||
|
throw InvalidArgumentException("'service' property required for 'broker' config object.");
|
||||||
|
|
||||||
|
/* reconnect to this broker */
|
||||||
|
endpointManager->AddConnection(node, service);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int DiscoveryComponent::DiscoveryTimerHandler(const TimerEventArgs& tea)
|
||||||
|
{
|
||||||
|
EndpointManager::Ptr endpointManager = GetEndpointManager();
|
||||||
|
|
||||||
|
time_t now;
|
||||||
|
time(&now);
|
||||||
|
|
||||||
|
ConfigCollection::Ptr brokerCollection = GetApplication()->GetConfigHive()->GetCollection("broker");
|
||||||
|
brokerCollection->ForEachObject(bind(&DiscoveryComponent::BrokerConfigHandler, this, _1));
|
||||||
|
|
||||||
|
map<string, ComponentDiscoveryInfo::Ptr>::iterator i;
|
||||||
|
for (i = m_Components.begin(); i != m_Components.end(); ) {
|
||||||
|
string identity = i->first;
|
||||||
ComponentDiscoveryInfo::Ptr info = i->second;
|
ComponentDiscoveryInfo::Ptr info = i->second;
|
||||||
endpointManager->AddConnection(info->Node, info->Service);
|
|
||||||
|
if (info->LastSeen < now - DiscoveryComponent::RegistrationTTL) {
|
||||||
|
/* unregister this component if its registration has expired */
|
||||||
|
i = m_Components.erase(i);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (IsBroker()) {
|
||||||
|
/* send discovery message to all connected components to
|
||||||
|
refresh their TTL for this component */
|
||||||
|
SendDiscoveryMessage("discovery::NewComponent", i->first, Endpoint::Ptr());
|
||||||
|
}
|
||||||
|
|
||||||
|
Endpoint::Ptr endpoint = endpointManager->GetEndpointByIdentity(identity);
|
||||||
|
if (endpoint) {
|
||||||
|
/* update LastSeen if we're still connected to this endpoint */
|
||||||
|
info->LastSeen = now;
|
||||||
|
} else {
|
||||||
|
/* try and reconnect to this component */
|
||||||
|
endpointManager->AddConnection(info->Node, info->Service);
|
||||||
|
}
|
||||||
|
|
||||||
|
i++;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -15,6 +15,8 @@ public:
|
|||||||
|
|
||||||
set<string> SubscribedMethods;
|
set<string> SubscribedMethods;
|
||||||
set<string> PublishedMethods;
|
set<string> PublishedMethods;
|
||||||
|
|
||||||
|
time_t LastSeen;
|
||||||
};
|
};
|
||||||
|
|
||||||
class DiscoveryComponent : public IcingaComponent
|
class DiscoveryComponent : public IcingaComponent
|
||||||
@ -23,7 +25,7 @@ private:
|
|||||||
VirtualEndpoint::Ptr m_DiscoveryEndpoint;
|
VirtualEndpoint::Ptr m_DiscoveryEndpoint;
|
||||||
map<string, ComponentDiscoveryInfo::Ptr> m_Components;
|
map<string, ComponentDiscoveryInfo::Ptr> m_Components;
|
||||||
bool m_Broker;
|
bool m_Broker;
|
||||||
Timer::Ptr m_DiscoveryConnectTimer;
|
Timer::Ptr m_DiscoveryTimer;
|
||||||
|
|
||||||
int NewEndpointHandler(const NewEndpointEventArgs& neea);
|
int NewEndpointHandler(const NewEndpointEventArgs& neea);
|
||||||
int NewIdentityHandler(const EventArgs& ea);
|
int NewIdentityHandler(const EventArgs& ea);
|
||||||
@ -43,12 +45,16 @@ private:
|
|||||||
int DiscoverySinkHandler(const NewMethodEventArgs& nmea, ComponentDiscoveryInfo::Ptr info) const;
|
int DiscoverySinkHandler(const NewMethodEventArgs& nmea, ComponentDiscoveryInfo::Ptr info) const;
|
||||||
int DiscoverySourceHandler(const NewMethodEventArgs& nmea, ComponentDiscoveryInfo::Ptr info) const;
|
int DiscoverySourceHandler(const NewMethodEventArgs& nmea, ComponentDiscoveryInfo::Ptr info) const;
|
||||||
|
|
||||||
int ReconnectTimerHandler(const TimerEventArgs& tea);
|
int DiscoveryTimerHandler(const TimerEventArgs& tea);
|
||||||
|
|
||||||
bool IsBroker(void) const;
|
bool IsBroker(void) const;
|
||||||
|
|
||||||
void FinishDiscoverySetup(Endpoint::Ptr endpoint);
|
void FinishDiscoverySetup(Endpoint::Ptr endpoint);
|
||||||
|
|
||||||
|
int BrokerConfigHandler(const EventArgs& ea);
|
||||||
|
|
||||||
|
static const int RegistrationTTL = 300;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
virtual string GetName(void) const;
|
virtual string GetName(void) const;
|
||||||
virtual void Start(void);
|
virtual void Start(void);
|
||||||
|
@ -16,7 +16,7 @@
|
|||||||
"rpclistener": {
|
"rpclistener": {
|
||||||
"kekslistener": { "replicate": "0", "service": "8888" }
|
"kekslistener": { "replicate": "0", "service": "8888" }
|
||||||
},
|
},
|
||||||
"rpcconnection": {
|
"broker": {
|
||||||
"keksclient": { "replicate": "0", "node": "127.0.0.1", "service": "7777" }
|
"icinga-c1": { "replicate": "0", "node": "127.0.0.1", "service": "7777" }
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -51,7 +51,7 @@ bool Endpoint::IsMethodSink(string method) const
|
|||||||
return (m_MethodSinks.find(method) != m_MethodSinks.end());
|
return (m_MethodSinks.find(method) != m_MethodSinks.end());
|
||||||
}
|
}
|
||||||
|
|
||||||
void Endpoint::ForeachMethodSink(function<int (const NewMethodEventArgs&)> callback)
|
void Endpoint::ForEachMethodSink(function<int (const NewMethodEventArgs&)> callback)
|
||||||
{
|
{
|
||||||
for (set<string>::iterator i = m_MethodSinks.begin(); i != m_MethodSinks.end(); i++) {
|
for (set<string>::iterator i = m_MethodSinks.begin(); i != m_MethodSinks.end(); i++) {
|
||||||
NewMethodEventArgs nmea;
|
NewMethodEventArgs nmea;
|
||||||
@ -76,7 +76,7 @@ bool Endpoint::IsMethodSource(string method) const
|
|||||||
return (m_MethodSources.find(method) != m_MethodSources.end());
|
return (m_MethodSources.find(method) != m_MethodSources.end());
|
||||||
}
|
}
|
||||||
|
|
||||||
void Endpoint::ForeachMethodSource(function<int (const NewMethodEventArgs&)> callback)
|
void Endpoint::ForEachMethodSource(function<int (const NewMethodEventArgs&)> callback)
|
||||||
{
|
{
|
||||||
for (set<string>::iterator i = m_MethodSources.begin(); i != m_MethodSources.end(); i++) {
|
for (set<string>::iterator i = m_MethodSources.begin(); i != m_MethodSources.end(); i++) {
|
||||||
NewMethodEventArgs nmea;
|
NewMethodEventArgs nmea;
|
||||||
|
@ -58,8 +58,8 @@ public:
|
|||||||
Event<NewMethodEventArgs> OnNewMethodSink;
|
Event<NewMethodEventArgs> OnNewMethodSink;
|
||||||
Event<NewMethodEventArgs> OnNewMethodSource;
|
Event<NewMethodEventArgs> OnNewMethodSource;
|
||||||
|
|
||||||
void ForeachMethodSink(function<int (const NewMethodEventArgs&)> callback);
|
void ForEachMethodSink(function<int (const NewMethodEventArgs&)> callback);
|
||||||
void ForeachMethodSource(function<int (const NewMethodEventArgs&)> callback);
|
void ForEachMethodSource(function<int (const NewMethodEventArgs&)> callback);
|
||||||
|
|
||||||
void ClearMethodSinks(void);
|
void ClearMethodSinks(void);
|
||||||
void ClearMethodSources(void);
|
void ClearMethodSources(void);
|
||||||
|
@ -132,7 +132,7 @@ void EndpointManager::SendMulticastRequest(Endpoint::Ptr sender, const JsonRpcRe
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void EndpointManager::ForeachEndpoint(function<int (const NewEndpointEventArgs&)> callback)
|
void EndpointManager::ForEachEndpoint(function<int (const NewEndpointEventArgs&)> callback)
|
||||||
{
|
{
|
||||||
NewEndpointEventArgs neea;
|
NewEndpointEventArgs neea;
|
||||||
neea.Source = shared_from_this();
|
neea.Source = shared_from_this();
|
||||||
|
@ -42,7 +42,7 @@ public:
|
|||||||
void SendAnycastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal = true);
|
void SendAnycastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal = true);
|
||||||
void SendMulticastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal = true);
|
void SendMulticastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal = true);
|
||||||
|
|
||||||
void ForeachEndpoint(function<int (const NewEndpointEventArgs&)> callback);
|
void ForEachEndpoint(function<int (const NewEndpointEventArgs&)> callback);
|
||||||
|
|
||||||
Endpoint::Ptr GetEndpointByIdentity(string identity) const;
|
Endpoint::Ptr GetEndpointByIdentity(string identity) const;
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user