Delegation bugfixes.

This commit is contained in:
Gunnar Beutner 2012-06-21 12:51:50 +02:00
parent 33c37f4a27
commit ddeda8c512
9 changed files with 77 additions and 102 deletions

View File

@ -100,6 +100,14 @@ shared_ptr<SSL_CTX> Utility::MakeSSLContext(string pubkey, string privkey, strin
if (!SSL_CTX_load_verify_locations(sslContext.get(), cakey.c_str(), NULL))
throw OpenSSLException("Could not load public CA key file", ERR_get_error());
STACK_OF(X509_NAME) *cert_names;
cert_names = SSL_load_client_CA_file(cakey.c_str());
if (cert_names == NULL)
throw OpenSSLException("SSL_load_client_CA_file() failed", ERR_get_error());
SSL_CTX_set_client_CA_list(sslContext.get(), cert_names);
return sslContext;
}

View File

@ -31,8 +31,6 @@ void CheckerComponent::Start(void)
m_CheckerEndpoint = boost::make_shared<VirtualEndpoint>();
m_CheckerEndpoint->RegisterTopicHandler("checker::AssignService",
boost::bind(&CheckerComponent::AssignServiceRequestHandler, this, _2, _3));
m_CheckerEndpoint->RegisterTopicHandler("checker::RevokeService",
boost::bind(&CheckerComponent::RevokeServiceRequestHandler, this, _2, _3));
m_CheckerEndpoint->RegisterTopicHandler("checker::ClearServices",
boost::bind(&CheckerComponent::ClearServicesRequestHandler, this, _2, _3));
m_CheckerEndpoint->RegisterPublication("checker::CheckResult");
@ -78,6 +76,8 @@ void CheckerComponent::CheckTimerHandler(void)
Application::Log(LogDebug, "checker", "Executing service check for '" + service.GetName() + "'");
m_PendingServices.insert(service.GetName());
CheckTask::Ptr task = CheckTask::CreateTask(service);
task->Enqueue();
@ -109,6 +109,11 @@ void CheckerComponent::ResultTimerHandler(void)
Service service = task->GetService();
/* if the service isn't in the set of pending services
* it was removed and we need to ignore this check result. */
if (m_PendingServices.find(service.GetName()) == m_PendingServices.end())
continue;
CheckResult result = task->GetResult();
Application::Log(LogDebug, "checker", "Got result for service '" + service.GetName() + "'");
@ -127,6 +132,7 @@ void CheckerComponent::ResultTimerHandler(void)
failed++;
service.SetNextCheck(now + service.GetCheckInterval());
m_PendingServices.erase(service.GetName());
m_Services.push(service);
}
@ -157,49 +163,6 @@ void CheckerComponent::AssignServiceRequestHandler(const Endpoint::Ptr& sender,
Application::Log(LogDebug, "checker", "Accepted delegation for service '" + service.GetName() + "'");
/* force a service check */
m_CheckTimer->Reschedule(0);
string id;
if (request.GetID(&id)) {
ResponseMessage rm;
rm.SetID(id);
MessagePart result;
rm.SetResult(result);
GetEndpointManager()->SendUnicastMessage(m_CheckerEndpoint, sender, rm);
}
}
void CheckerComponent::RevokeServiceRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
{
MessagePart params;
if (!request.GetParams(&params))
return;
string name;
if (!params.GetProperty("service", &name))
return;
vector<Service> services;
while (!m_Services.empty()) {
Service service = m_Services.top();
if (service.GetName() == name)
continue;
// TODO: take care of services that are currently being checked
services.push_back(service);
}
vector<Service>::const_iterator it;
for (it = services.begin(); it != services.end(); it++)
m_Services.push(*it);
Application::Log(LogDebug, "checker", "Revoked delegation for service '" + name + "'");
string id;
if (request.GetID(&id)) {
ResponseMessage rm;
@ -214,7 +177,12 @@ void CheckerComponent::RevokeServiceRequestHandler(const Endpoint::Ptr& sender,
void CheckerComponent::ClearServicesRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
{
Application::Log(LogDebug, "checker", "Clearing service delegations.");
/* clear the services lists */
m_Services = ServiceQueue();
m_PendingServices.clear();
/* TODO: clear checks we've already sent to the thread pool */
string id;
if (request.GetID(&id)) {

View File

@ -51,7 +51,7 @@ private:
VirtualEndpoint::Ptr m_CheckerEndpoint;
ServiceQueue m_Services;
set<Service> m_PendingServices;
set<string> m_PendingServices;
Timer::Ptr m_CheckTimer;

View File

@ -43,7 +43,7 @@ void DelegationComponent::Start(void)
m_DelegationEndpoint = boost::make_shared<VirtualEndpoint>();
m_DelegationEndpoint->RegisterPublication("checker::AssignService");
m_DelegationEndpoint->RegisterPublication("checker::RevokeService");
m_DelegationEndpoint->RegisterPublication("checker::ClearServices");
GetEndpointManager()->RegisterEndpoint(m_DelegationEndpoint);
}
@ -112,10 +112,6 @@ void DelegationComponent::DelegationTimerHandler(void)
for (eit = GetEndpointManager()->Begin(); eit != GetEndpointManager()->End(); eit++)
histogram[eit->second] = 0;
/* nothing to do if we have no checkers */
if (histogram.size() == 0)
return;
vector<Service> services;
/* build "checker -> service count" histogram */
@ -152,19 +148,23 @@ void DelegationComponent::DelegationTimerHandler(void)
oldEndpoint = GetEndpointManager()->GetEndpointByIdentity(checker);
vector<Endpoint::Ptr> candidates = GetCheckerCandidates(service);
int avg_services = 0, overflow_tolerance = 0;
vector<Endpoint::Ptr>::iterator cit;
if (candidates.size() > 0) {
std::random_shuffle(candidates.begin(), candidates.end());
stringstream msgbuf;
msgbuf << "Service: " << service.GetName() << ", candidates: " << candidates.size();
Application::Log(LogDebug, "delegation", msgbuf.str());
int avg_services = 0;
vector<Endpoint::Ptr>::iterator cit;
for (cit = candidates.begin(); cit != candidates.end(); cit++)
avg_services += histogram[*cit];
avg_services /= candidates.size();
int overflow_tolerance = candidates.size() * 2;
overflow_tolerance = candidates.size() * 2;
}
/* don't re-assign service if the checker is still valid
* and doesn't have too many services */
@ -194,7 +194,7 @@ void DelegationComponent::DelegationTimerHandler(void)
delegated++;
}
assert(!service.GetChecker().empty());
assert(candidates.size() == 0 || !service.GetChecker().empty());
}
if (delegated > 0) {

View File

@ -105,6 +105,10 @@ void DiscoveryComponent::CheckExistingEndpoint(const Endpoint::Ptr& self, const
*/
void DiscoveryComponent::NewEndpointHandler(const Endpoint::Ptr& endpoint)
{
/* ignore local endpoints */
if (endpoint->IsLocal())
return;
/* accept discovery::RegisterComponent messages from any endpoint */
endpoint->RegisterPublication("discovery::RegisterComponent");
@ -354,8 +358,13 @@ void DiscoveryComponent::ProcessDiscoveryMessage(const string& identity, const D
time(&(info->LastSeen));
message.GetNode(&info->Node);
message.GetService(&info->Service);
string node;
if (message.GetNode(&node) && !node.empty())
info->Node = node;
string service;
if (message.GetService(&service) && !service.empty())
info->Service = service;
ConfigObject::Ptr endpointConfig = ConfigObject::GetObject("endpoint", identity);
Dictionary::Ptr roles;
@ -472,6 +481,10 @@ void DiscoveryComponent::DiscoveryTimerHandler(void)
string identity = i->first;
ComponentDiscoveryInfo::Ptr info = i->second;
/* there's no need to reconnect to ourself */
if (identity == GetEndpointManager()->GetIdentity())
continue;
curr = i;
i++;
@ -492,7 +505,14 @@ void DiscoveryComponent::DiscoveryTimerHandler(void)
} else {
/* TODO: figure out whether we actually want to connect to this component */
/* try and reconnect to this component */
try {
if (!info->Node.empty() && !info->Service.empty())
endpointManager->AddConnection(info->Node, info->Service);
} catch (const std::exception& ex) {
stringstream msgbuf;
msgbuf << "Exception while trying to reconnect to endpoint '" << endpoint->GetIdentity() << "': " << ex.what();;
Application::Log(LogInformation, "discovery", msgbuf.str());
}
}
}
}

View File

@ -21,6 +21,18 @@
using namespace icinga;
/**
* Constructor for the EndpointManager class.
*/
EndpointManager::EndpointManager(void)
: m_NextMessageID(0)
{
m_RequestTimer = boost::make_shared<Timer>();
m_RequestTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::RequestTimerHandler, this));
m_RequestTimer->SetInterval(5);
m_RequestTimer->Start();
}
/**
* Sets the identity of the endpoint manager. This identity is used when
* connecting to remote peers.
@ -147,9 +159,6 @@ void EndpointManager::UnregisterServer(JsonRpcServer::Ptr server)
*/
void EndpointManager::RegisterEndpoint(Endpoint::Ptr endpoint)
{
if (!endpoint->IsLocal() && endpoint->GetIdentity() != "")
throw invalid_argument("Identity must be empty.");
endpoint->SetEndpointManager(GetSelf());
UnregisterEndpoint(endpoint);
@ -310,7 +319,6 @@ void EndpointManager::SendAPIMessage(const Endpoint::Ptr& sender, const Endpoint
pr.Timeout = time(NULL) + timeout;
m_Requests[id] = pr;
RescheduleRequestTimer();
if (!recipient)
SendAnycastMessage(sender, message);
@ -324,29 +332,6 @@ bool EndpointManager::RequestTimeoutLessComparer(const pair<string, PendingReque
return a.second.Timeout < b.second.Timeout;
}
void EndpointManager::RescheduleRequestTimer(void)
{
map<string, PendingRequest>::iterator it;
it = min_element(m_Requests.begin(), m_Requests.end(),
&EndpointManager::RequestTimeoutLessComparer);
if (!m_RequestTimer) {
m_RequestTimer = boost::make_shared<Timer>();
m_RequestTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::RequestTimerHandler, this));
}
if (it != m_Requests.end()) {
time_t now;
time(&now);
time_t next_timeout = (it->second.Timeout < now) ? now : it->second.Timeout;
m_RequestTimer->SetInterval(next_timeout - now);
m_RequestTimer->Start();
} else {
m_RequestTimer->Stop();
}
}
void EndpointManager::RequestTimerHandler(void)
{
map<string, PendingRequest>::iterator it;
@ -359,8 +344,6 @@ void EndpointManager::RequestTimerHandler(void)
break;
}
}
RescheduleRequestTimer();
}
void EndpointManager::ProcessResponseMessage(const Endpoint::Ptr& sender, const ResponseMessage& message)
@ -378,7 +361,6 @@ void EndpointManager::ProcessResponseMessage(const Endpoint::Ptr& sender, const
it->second.Callback(GetSelf(), sender, it->second.Request, message, false);
m_Requests.erase(it);
RescheduleRequestTimer();
}
EndpointManager::Iterator EndpointManager::Begin(void)

View File

@ -36,9 +36,7 @@ public:
typedef map<string, Endpoint::Ptr>::iterator Iterator;
EndpointManager(void)
: m_NextMessageID(0)
{ }
EndpointManager(void);
void SetIdentity(string identity);
string GetIdentity(void) const;
@ -102,7 +100,6 @@ private:
void UnregisterServer(JsonRpcServer::Ptr server);
static bool RequestTimeoutLessComparer(const pair<string, PendingRequest>& a, const pair<string, PendingRequest>& b);
void RescheduleRequestTimer(void);
void RequestTimerHandler(void);
void NewClientHandler(const TcpClient::Ptr& client);

View File

@ -142,9 +142,9 @@ void JsonRpcEndpoint::ClientErrorHandler(const std::exception& ex)
Application::Log(LogWarning, "jsonrpc", message.str());
}
void JsonRpcEndpoint::VerifyCertificateHandler(bool& valid, const shared_ptr<X509>& certificate)
void JsonRpcEndpoint::VerifyCertificateHandler(bool *valid, const shared_ptr<X509>& certificate)
{
if (certificate && valid) {
if (certificate && *valid) {
string identity = Utility::GetCertificateCN(certificate);
if (GetIdentity().empty() && !identity.empty()) {

View File

@ -65,7 +65,7 @@ private:
void NewMessageHandler(const MessagePart& message);
void ClientClosedHandler(void);
void ClientErrorHandler(const std::exception& ex);
void VerifyCertificateHandler(bool& valid, const shared_ptr<X509>& certificate);
void VerifyCertificateHandler(bool *valid, const shared_ptr<X509>& certificate);
};
}