mirror of https://github.com/Icinga/icinga2.git
parent
f704468c4b
commit
bd610a74b1
|
@ -39,9 +39,11 @@ INITIALIZE_ONCE(&ApiClient::StaticInitialize);
|
||||||
static Value SetLogPositionHandler(const MessageOrigin& origin, const Dictionary::Ptr& params);
|
static Value SetLogPositionHandler(const MessageOrigin& origin, const Dictionary::Ptr& params);
|
||||||
REGISTER_APIFUNCTION(SetLogPosition, log, &SetLogPositionHandler);
|
REGISTER_APIFUNCTION(SetLogPosition, log, &SetLogPositionHandler);
|
||||||
|
|
||||||
ApiClient::ApiClient(const Endpoint::Ptr& endpoint, const Stream::Ptr& stream, ConnectionRole role)
|
ApiClient::ApiClient(const String& identity, const Stream::Ptr& stream, ConnectionRole role)
|
||||||
: m_Endpoint(endpoint), m_Stream(stream), m_Role(role), m_Seen(Utility::GetTime())
|
: m_Identity(identity), m_Stream(stream), m_Role(role), m_Seen(Utility::GetTime())
|
||||||
{ }
|
{
|
||||||
|
m_Endpoint = Endpoint::GetByName(identity);
|
||||||
|
}
|
||||||
|
|
||||||
void ApiClient::StaticInitialize(void)
|
void ApiClient::StaticInitialize(void)
|
||||||
{
|
{
|
||||||
|
@ -57,6 +59,11 @@ void ApiClient::Start(void)
|
||||||
thread.detach();
|
thread.detach();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
String ApiClient::GetIdentity(void) const
|
||||||
|
{
|
||||||
|
return m_Identity;
|
||||||
|
}
|
||||||
|
|
||||||
Endpoint::Ptr ApiClient::GetEndpoint(void) const
|
Endpoint::Ptr ApiClient::GetEndpoint(void) const
|
||||||
{
|
{
|
||||||
return m_Endpoint;
|
return m_Endpoint;
|
||||||
|
@ -81,7 +88,7 @@ void ApiClient::SendMessage(const Dictionary::Ptr& message)
|
||||||
m_Seen = Utility::GetTime();
|
m_Seen = Utility::GetTime();
|
||||||
} catch (const std::exception& ex) {
|
} catch (const std::exception& ex) {
|
||||||
std::ostringstream msgbuf;
|
std::ostringstream msgbuf;
|
||||||
msgbuf << "Error while sending JSON-RPC message for endpoint '" << m_Endpoint->GetName() << "': " << DiagnosticInformation(ex);
|
msgbuf << "Error while sending JSON-RPC message for identity '" << m_Identity << "': " << DiagnosticInformation(ex);
|
||||||
Log(LogWarning, "remote", msgbuf.str());
|
Log(LogWarning, "remote", msgbuf.str());
|
||||||
|
|
||||||
Disconnect();
|
Disconnect();
|
||||||
|
@ -90,8 +97,10 @@ void ApiClient::SendMessage(const Dictionary::Ptr& message)
|
||||||
|
|
||||||
void ApiClient::Disconnect(void)
|
void ApiClient::Disconnect(void)
|
||||||
{
|
{
|
||||||
Log(LogWarning, "remote", "API client disconnected for endpoint '" + m_Endpoint->GetName() + "'");
|
Log(LogWarning, "remote", "API client disconnected for identity '" + m_Identity + "'");
|
||||||
m_Stream->Close();
|
m_Stream->Close();
|
||||||
|
|
||||||
|
if (m_Endpoint)
|
||||||
m_Endpoint->RemoveClient(GetSelf());
|
m_Endpoint->RemoveClient(GetSelf());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -105,7 +114,7 @@ bool ApiClient::ProcessMessage(void)
|
||||||
if (message->Get("method") != "log::SetLogPosition")
|
if (message->Get("method") != "log::SetLogPosition")
|
||||||
m_Seen = Utility::GetTime();
|
m_Seen = Utility::GetTime();
|
||||||
|
|
||||||
if (message->Contains("ts")) {
|
if (m_Endpoint && message->Contains("ts")) {
|
||||||
double ts = message->Get("ts");
|
double ts = message->Get("ts");
|
||||||
|
|
||||||
/* ignore old messages */
|
/* ignore old messages */
|
||||||
|
@ -118,14 +127,16 @@ bool ApiClient::ProcessMessage(void)
|
||||||
MessageOrigin origin;
|
MessageOrigin origin;
|
||||||
origin.FromClient = GetSelf();
|
origin.FromClient = GetSelf();
|
||||||
|
|
||||||
|
if (m_Endpoint) {
|
||||||
if (m_Endpoint->GetZone() != Zone::GetLocalZone())
|
if (m_Endpoint->GetZone() != Zone::GetLocalZone())
|
||||||
origin.FromZone = m_Endpoint->GetZone();
|
origin.FromZone = m_Endpoint->GetZone();
|
||||||
else
|
else
|
||||||
origin.FromZone = Zone::GetByName(message->Get("originZone"));
|
origin.FromZone = Zone::GetByName(message->Get("originZone"));
|
||||||
|
}
|
||||||
|
|
||||||
String method = message->Get("method");
|
String method = message->Get("method");
|
||||||
|
|
||||||
Log(LogDebug, "remote", "Received '" + method + "' message from '" + m_Endpoint->GetName() + "'");
|
Log(LogDebug, "remote", "Received '" + method + "' message from '" + m_Identity + "'");
|
||||||
|
|
||||||
Dictionary::Ptr resultMessage = make_shared<Dictionary>();
|
Dictionary::Ptr resultMessage = make_shared<Dictionary>();
|
||||||
|
|
||||||
|
@ -159,7 +170,7 @@ void ApiClient::MessageThreadProc(void)
|
||||||
|
|
||||||
Disconnect();
|
Disconnect();
|
||||||
} catch (const std::exception& ex) {
|
} catch (const std::exception& ex) {
|
||||||
Log(LogWarning, "remote", "Error while reading JSON-RPC message for endpoint '" + m_Endpoint->GetName() + "': " + DiagnosticInformation(ex));
|
Log(LogWarning, "remote", "Error while reading JSON-RPC message for identity '" + m_Identity + "': " + DiagnosticInformation(ex));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -193,6 +204,9 @@ Value SetLogPositionHandler(const MessageOrigin& origin, const Dictionary::Ptr&
|
||||||
double log_position = params->Get("log_position");
|
double log_position = params->Get("log_position");
|
||||||
Endpoint::Ptr endpoint = origin.FromClient->GetEndpoint();
|
Endpoint::Ptr endpoint = origin.FromClient->GetEndpoint();
|
||||||
|
|
||||||
|
if (!endpoint)
|
||||||
|
return Empty;
|
||||||
|
|
||||||
if (log_position > endpoint->GetLocalLogPosition())
|
if (log_position > endpoint->GetLocalLogPosition())
|
||||||
endpoint->SetLocalLogPosition(log_position);
|
endpoint->SetLocalLogPosition(log_position);
|
||||||
|
|
||||||
|
|
|
@ -46,12 +46,13 @@ class I2_REMOTE_API ApiClient : public Object
|
||||||
public:
|
public:
|
||||||
DECLARE_PTR_TYPEDEFS(ApiClient);
|
DECLARE_PTR_TYPEDEFS(ApiClient);
|
||||||
|
|
||||||
ApiClient(const Endpoint::Ptr& endpoint, const Stream::Ptr& stream, ConnectionRole role);
|
ApiClient(const String& identity, const Stream::Ptr& stream, ConnectionRole role);
|
||||||
|
|
||||||
static void StaticInitialize(void);
|
static void StaticInitialize(void);
|
||||||
|
|
||||||
void Start(void);
|
void Start(void);
|
||||||
|
|
||||||
|
String GetIdentity(void) const;
|
||||||
Endpoint::Ptr GetEndpoint(void) const;
|
Endpoint::Ptr GetEndpoint(void) const;
|
||||||
Stream::Ptr GetStream(void) const;
|
Stream::Ptr GetStream(void) const;
|
||||||
ConnectionRole GetRole(void) const;
|
ConnectionRole GetRole(void) const;
|
||||||
|
@ -61,6 +62,7 @@ public:
|
||||||
void SendMessage(const Dictionary::Ptr& request);
|
void SendMessage(const Dictionary::Ptr& request);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
String m_Identity;
|
||||||
Endpoint::Ptr m_Endpoint;
|
Endpoint::Ptr m_Endpoint;
|
||||||
Stream::Ptr m_Stream;
|
Stream::Ptr m_Stream;
|
||||||
ConnectionRole m_Role;
|
ConnectionRole m_Role;
|
||||||
|
|
|
@ -201,18 +201,14 @@ void ApiListener::NewClientHandler(const Socket::Ptr& client, ConnectionRole rol
|
||||||
shared_ptr<X509> cert = tlsStream->GetPeerCertificate();
|
shared_ptr<X509> cert = tlsStream->GetPeerCertificate();
|
||||||
String identity = GetCertificateCN(cert);
|
String identity = GetCertificateCN(cert);
|
||||||
|
|
||||||
Endpoint::Ptr endpoint = Endpoint::GetByName(identity);
|
|
||||||
|
|
||||||
if (!endpoint) {
|
|
||||||
Log(LogInformation, "remote", "New client for unknown endpoint '" + identity + "'");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
Log(LogInformation, "remote", "New client connection for identity '" + identity + "'");
|
Log(LogInformation, "remote", "New client connection for identity '" + identity + "'");
|
||||||
|
|
||||||
|
Endpoint::Ptr endpoint = Endpoint::GetByName(identity);
|
||||||
|
|
||||||
|
if (endpoint) {
|
||||||
bool need_sync = !endpoint->IsConnected();
|
bool need_sync = !endpoint->IsConnected();
|
||||||
|
|
||||||
ApiClient::Ptr aclient = make_shared<ApiClient>(endpoint, tlsStream, role);
|
ApiClient::Ptr aclient = make_shared<ApiClient>(identity, tlsStream, role);
|
||||||
aclient->Start();
|
aclient->Start();
|
||||||
|
|
||||||
if (need_sync) {
|
if (need_sync) {
|
||||||
|
@ -227,6 +223,7 @@ void ApiListener::NewClientHandler(const Socket::Ptr& client, ConnectionRole rol
|
||||||
|
|
||||||
endpoint->AddClient(aclient);
|
endpoint->AddClient(aclient);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void ApiListener::ApiTimerHandler(void)
|
void ApiListener::ApiTimerHandler(void)
|
||||||
{
|
{
|
||||||
|
@ -262,21 +259,37 @@ void ApiListener::ApiTimerHandler(void)
|
||||||
if (IsMaster()) {
|
if (IsMaster()) {
|
||||||
Zone::Ptr my_zone = Zone::GetLocalZone();
|
Zone::Ptr my_zone = Zone::GetLocalZone();
|
||||||
|
|
||||||
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
|
BOOST_FOREACH(const Zone::Ptr& zone, DynamicType::GetObjects<Zone>()) {
|
||||||
if (endpoint->IsConnected() || endpoint->GetName() == GetIdentity())
|
/* only connect to endpoints in a) the same zone b) our parent zone c) immediate child zones */
|
||||||
|
if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent())
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
|
bool connected = false;
|
||||||
|
|
||||||
|
BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints()) {
|
||||||
|
if (endpoint->IsConnected()) {
|
||||||
|
connected = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* don't connect to an endpoint if we already have a connection to the zone */
|
||||||
|
if (connected)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints()) {
|
||||||
|
/* don't connect to ourselves */
|
||||||
|
if (endpoint->GetName() == GetIdentity())
|
||||||
|
continue;
|
||||||
|
|
||||||
|
/* don't try to connect to endpoints which don't have a host and port */
|
||||||
if (endpoint->GetHost().IsEmpty() || endpoint->GetPort().IsEmpty())
|
if (endpoint->GetHost().IsEmpty() || endpoint->GetPort().IsEmpty())
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
Zone::Ptr their_zone = endpoint->GetZone();
|
|
||||||
|
|
||||||
if (my_zone != their_zone && my_zone != their_zone->GetParent() && their_zone != my_zone->GetParent())
|
|
||||||
continue;
|
|
||||||
|
|
||||||
AddConnection(endpoint->GetHost(), endpoint->GetPort());
|
AddConnection(endpoint->GetHost(), endpoint->GetPort());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
|
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
|
||||||
if (!endpoint->IsConnected())
|
if (!endpoint->IsConnected())
|
||||||
|
|
|
@ -25,8 +25,3 @@ bool MessageOrigin::IsLocal(void) const
|
||||||
{
|
{
|
||||||
return !FromClient;
|
return !FromClient;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool MessageOrigin::IsSameZone(void) const
|
|
||||||
{
|
|
||||||
return !FromZone;
|
|
||||||
}
|
|
||||||
|
|
|
@ -35,7 +35,6 @@ struct I2_REMOTE_API MessageOrigin
|
||||||
ApiClient::Ptr FromClient;
|
ApiClient::Ptr FromClient;
|
||||||
|
|
||||||
bool IsLocal(void) const;
|
bool IsLocal(void) const;
|
||||||
bool IsSameZone(void) const;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue