Implemented asynchronous (internal) API calls.

This commit is contained in:
Gunnar Beutner 2012-06-14 11:18:20 +02:00
parent ee0ee227da
commit 739d0c1c29
21 changed files with 268 additions and 48 deletions

View File

@ -50,3 +50,8 @@ long Dictionary::GetLength(void) const
{
return m_Data.size();
}
bool Dictionary::Contains(const string& key) const
{
return (m_Data.find(key) != m_Data.end());
}

View File

@ -116,6 +116,8 @@ public:
SetProperty(key, value);
}
bool Contains(const string& key) const;
DictionaryIterator Begin(void);
DictionaryIterator End(void);

View File

@ -151,6 +151,8 @@ EventArgs Timer::GetUserArgs(void) const
*/
void Timer::Start(void)
{
Stop();
Timers.push_back(static_pointer_cast<Timer>(shared_from_this()));
Reschedule(time(NULL) + m_Interval);

View File

@ -1,7 +1,9 @@
## Process this file with automake to produce Makefile.in
## Created by Anjuta
SUBDIRS = configfile \
SUBDIRS = \
checker \
configfile \
configrpc \
demo \
discovery

View File

@ -40,9 +40,7 @@ void DemoComponent::Start(void)
m_DemoEndpoint->RegisterTopicHandler("demo::HelloWorld",
bind_weak(&DemoComponent::HelloWorldRequestHandler, shared_from_this()));
m_DemoEndpoint->RegisterPublication("demo::HelloWorld");
EndpointManager::Ptr endpointManager = GetIcingaApplication()->GetEndpointManager();
endpointManager->RegisterEndpoint(m_DemoEndpoint);
GetEndpointManager()->RegisterEndpoint(m_DemoEndpoint);
m_DemoTimer = make_shared<Timer>();
m_DemoTimer->SetInterval(5);

View File

@ -69,6 +69,7 @@ Makefile
compat/Makefile
base/Makefile
components/Makefile
components/checker/Makefile
components/configfile/Makefile
components/configrpc/Makefile
components/demo/Makefile

View File

@ -2,7 +2,7 @@ local object application "icinga" {
}
local object component "demo" {
local object component "checker" {
}
@ -20,7 +20,8 @@ abstract object service "nagios-service" {
abstract object service "ping" inherits "nagios-service" {
check_type = "nagios",
check_command = "$plugindir$/check_ping -H $address$"
check_command = "$plugindir$/check_ping -H $address$",
check_interval = 30
}
object service "localhost-ping" inherits "ping" {

View File

@ -24,6 +24,7 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "icinga-app", "icinga-app\ic
{2E6C1133-730F-4875-A72C-B455B1DD4C5C} = {2E6C1133-730F-4875-A72C-B455B1DD4C5C}
{697C6D7E-3109-484C-A7AF-384D28711610} = {697C6D7E-3109-484C-A7AF-384D28711610}
{E58F1DA7-B723-412B-B2B7-7FF58E2A944E} = {E58F1DA7-B723-412B-B2B7-7FF58E2A944E}
{38CE81CC-2660-4EF0-A936-4A337591DA3E} = {38CE81CC-2660-4EF0-A936-4A337591DA3E}
{C1FC77E1-04A4-481B-A78B-2F7AF489C2F8} = {C1FC77E1-04A4-481B-A78B-2F7AF489C2F8}
EndProjectSection
EndProject
@ -62,6 +63,11 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "dyntest", "dyntest\dyntest.
{B26AFFA6-2BDF-42E6-A224-2591FFD9BFB7} = {B26AFFA6-2BDF-42E6-A224-2591FFD9BFB7}
EndProjectSection
EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "checker", "components\checker\checker.vcxproj", "{38CE81CC-2660-4EF0-A936-4A337591DA3E}"
ProjectSection(ProjectDependencies) = postProject
{C1FC77E1-04A4-481B-A78B-2F7AF489C2F8} = {C1FC77E1-04A4-481B-A78B-2F7AF489C2F8}
EndProjectSection
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Win32 = Debug|Win32
@ -116,6 +122,10 @@ Global
{E6FA740D-0939-4711-AFBC-3D9E913510A1}.Debug|Win32.Build.0 = Debug|Win32
{E6FA740D-0939-4711-AFBC-3D9E913510A1}.Release|Win32.ActiveCfg = Release|Win32
{E6FA740D-0939-4711-AFBC-3D9E913510A1}.Release|Win32.Build.0 = Release|Win32
{38CE81CC-2660-4EF0-A936-4A337591DA3E}.Debug|Win32.ActiveCfg = Debug|Win32
{38CE81CC-2660-4EF0-A936-4A337591DA3E}.Debug|Win32.Build.0 = Debug|Win32
{38CE81CC-2660-4EF0-A936-4A337591DA3E}.Release|Win32.ActiveCfg = Release|Win32
{38CE81CC-2660-4EF0-A936-4A337591DA3E}.Release|Win32.Build.0 = Release|Win32
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE

View File

@ -24,7 +24,7 @@ struct CheckResult
Dictionary::Ptr PerformanceData;
};
class CheckTask : public Object
class I2_ICINGA_API CheckTask : public Object
{
public:
typedef shared_ptr<CheckTask> Ptr;

View File

@ -184,15 +184,14 @@ void EndpointManager::UnregisterEndpoint(Endpoint::Ptr endpoint)
void EndpointManager::SendUnicastMessage(Endpoint::Ptr sender,
Endpoint::Ptr recipient, const MessagePart& message)
{
/* don't forward messages back to the sender */
if (sender == recipient)
return;
/* don't forward messages between non-local endpoints */
if (!sender->IsLocal() && !recipient->IsLocal())
return;
recipient->ProcessRequest(sender, message);
if (ResponseMessage::IsResponseMessage(message))
recipient->ProcessResponse(sender, message);
else
recipient->ProcessRequest(sender, message);
}
/**
@ -205,7 +204,23 @@ void EndpointManager::SendUnicastMessage(Endpoint::Ptr sender,
void EndpointManager::SendAnycastMessage(Endpoint::Ptr sender,
const RequestMessage& message)
{
throw NotImplementedException();
string method;
if (!message.GetMethod(&method))
throw invalid_argument("Message is missing the 'method' property.");
vector<Endpoint::Ptr> candidates;
for (vector<Endpoint::Ptr>::iterator i = m_Endpoints.begin(); i != m_Endpoints.end(); i++)
{
Endpoint::Ptr endpoint = *i;
if (endpoint->HasSubscription(method))
candidates.push_back(endpoint);
}
if (candidates.size() == 0)
return;
Endpoint::Ptr recipient = candidates[rand() % candidates.size()];
SendUnicastMessage(sender, recipient, message);
}
/**
@ -229,6 +244,11 @@ void EndpointManager::SendMulticastMessage(Endpoint::Ptr sender,
for (vector<Endpoint::Ptr>::iterator i = m_Endpoints.begin(); i != m_Endpoints.end(); i++)
{
Endpoint::Ptr recipient = *i;
/* don't forward messages back to the sender */
if (sender == recipient)
continue;
if (recipient->HasSubscription(method))
SendUnicastMessage(sender, recipient, message);
}
@ -269,3 +289,103 @@ Endpoint::Ptr EndpointManager::GetEndpointByIdentity(string identity) const
return Endpoint::Ptr();
}
void EndpointManager::SendAPIMessage(Endpoint::Ptr sender,
RequestMessage& message,
function<int(const NewResponseEventArgs&)> callback, time_t timeout)
{
m_NextMessageID++;
stringstream idstream;
idstream << m_NextMessageID;
string id = idstream.str();
message.SetID(id);
PendingRequest pr;
pr.Request = message;
pr.Callback = callback;
pr.Timeout = time(NULL) + timeout;
m_Requests[id] = pr;
RescheduleRequestTimer();
SendAnycastMessage(sender, message);
}
bool EndpointManager::RequestTimeoutLessComparer(const pair<string, PendingRequest>& a,
const pair<string, PendingRequest>& b)
{
return a.second.Timeout < b.second.Timeout;
}
void EndpointManager::RescheduleRequestTimer(void)
{
map<string, PendingRequest>::iterator it;
it = min_element(m_Requests.begin(), m_Requests.end(),
&EndpointManager::RequestTimeoutLessComparer);
if (!m_RequestTimer) {
m_RequestTimer = make_shared<Timer>();
m_RequestTimer->OnTimerExpired += bind_weak(&EndpointManager::RequestTimerHandler, shared_from_this());
}
if (it != m_Requests.end()) {
time_t now;
time(&now);
time_t next_timeout = (it->second.Timeout < now) ? now : it->second.Timeout;
m_RequestTimer->SetInterval(next_timeout - now);
m_RequestTimer->Start();
} else {
m_RequestTimer->Stop();
}
}
int EndpointManager::RequestTimerHandler(const TimerEventArgs& ea)
{
map<string, PendingRequest>::iterator it;
for (it = m_Requests.begin(); it != m_Requests.end(); it++) {
if (it->second.HasTimedOut()) {
NewResponseEventArgs nrea;
nrea.Request = it->second.Request;
nrea.Source = shared_from_this();
nrea.TimedOut = true;
it->second.Callback(nrea);
m_Requests.erase(it);
break;
}
}
RescheduleRequestTimer();
return 0;
}
void EndpointManager::ProcessResponseMessage(const Endpoint::Ptr& sender, const ResponseMessage& message)
{
string id;
if (!message.GetID(&id))
throw invalid_argument("Response message must have a message ID.");
map<string, PendingRequest>::iterator it;
it = m_Requests.find(id);
if (it == m_Requests.end())
return;
NewResponseEventArgs nrea;
nrea.Sender = sender;
nrea.Request = it->second.Request;
nrea.Response = message;
nrea.Source = shared_from_this();
nrea.TimedOut = false;
it->second.Callback(nrea);
m_Requests.erase(it);
RescheduleRequestTimer();
}

View File

@ -33,6 +33,38 @@ struct I2_ICINGA_API NewEndpointEventArgs : public EventArgs
icinga::Endpoint::Ptr Endpoint; /**< The new endpoint. */
};
struct NewResponseEventArgs;
/**
* Information about a pending API request.
*
* @ingroup icinga
*/
struct I2_ICINGA_API PendingRequest
{
time_t Timeout;
RequestMessage Request;
function<int(const NewResponseEventArgs&)> Callback;
bool HasTimedOut(void) const
{
return time(NULL) > Timeout;
}
};
/**
* Event arguments for the "new response" event.
*
* @ingroup icinga
*/
struct I2_ICINGA_API NewResponseEventArgs : public EventArgs
{
Endpoint::Ptr Sender;
RequestMessage Request;
ResponseMessage Response;
bool TimedOut;
};
/**
* Forwards messages between endpoints.
*
@ -44,6 +76,10 @@ public:
typedef shared_ptr<EndpointManager> Ptr;
typedef weak_ptr<EndpointManager> WeakPtr;
EndpointManager(void)
: m_NextMessageID(0)
{ }
void SetIdentity(string identity);
string GetIdentity(void) const;
@ -60,6 +96,11 @@ public:
void SendAnycastMessage(Endpoint::Ptr sender, const RequestMessage& message);
void SendMulticastMessage(Endpoint::Ptr sender, const RequestMessage& message);
void SendAPIMessage(Endpoint::Ptr sender, RequestMessage& message,
function<int(const NewResponseEventArgs&)> callback, time_t timeout = 10);
void ProcessResponseMessage(const Endpoint::Ptr& sender, const ResponseMessage& message);
void ForEachEndpoint(function<int (const NewEndpointEventArgs&)> callback);
Endpoint::Ptr GetEndpointByIdentity(string identity) const;
@ -73,9 +114,17 @@ private:
vector<JsonRpcServer::Ptr> m_Servers;
vector<Endpoint::Ptr> m_Endpoints;
long m_NextMessageID;
map<string, PendingRequest> m_Requests;
Timer::Ptr m_RequestTimer;
void RegisterServer(JsonRpcServer::Ptr server);
void UnregisterServer(JsonRpcServer::Ptr server);
static bool RequestTimeoutLessComparer(const pair<string, PendingRequest>& a, const pair<string, PendingRequest>& b);
void RescheduleRequestTimer(void);
int RequestTimerHandler(const TimerEventArgs& ea);
int NewClientHandler(const NewClientEventArgs& ncea);
};

View File

@ -91,16 +91,6 @@ int IcingaApplication::Main(const vector<string>& args)
m_EndpointManager->SetSSLContext(sslContext);
}
CheckTask::RegisterType("nagios", NagiosCheckTask::CreateTask);
ConfigObject::TMap::Range range = ConfigObject::GetObjects("service");
for (ConfigObject::TMap::Iterator it = range.first; it != range.second; it++) {
Service svc(it->second);
CheckTask::Ptr ct = CheckTask::CreateTask(svc);
CheckResult cr = ct->Execute();
}
/* create the primary RPC listener */
string service = GetService();
if (!service.empty())

View File

@ -77,8 +77,7 @@ void JsonRpcEndpoint::ProcessRequest(Endpoint::Ptr sender, const RequestMessage&
void JsonRpcEndpoint::ProcessResponse(Endpoint::Ptr sender, const ResponseMessage& message)
{
if (IsConnected())
m_Client->SendMessage(message);
m_Client->SendMessage(message);
}
int JsonRpcEndpoint::NewMessageHandler(const NewMessageEventArgs& nmea)
@ -86,25 +85,28 @@ int JsonRpcEndpoint::NewMessageHandler(const NewMessageEventArgs& nmea)
const MessagePart& message = nmea.Message;
Endpoint::Ptr sender = static_pointer_cast<Endpoint>(shared_from_this());
string method;
if (message.GetProperty("method", &method)) {
if (!HasPublication(method))
return 0;
RequestMessage request = message;
string id;
if (request.GetID(&id))
GetEndpointManager()->SendAnycastMessage(sender, request);
else
GetEndpointManager()->SendMulticastMessage(sender, request);
} else {
ResponseMessage response = message;
// TODO: deal with response messages
throw NotImplementedException();
if (ResponseMessage::IsResponseMessage(message)) {
/* rather than routing the message to the right virtual
* endpoint we just process it here right away. */
GetEndpointManager()->ProcessResponseMessage(sender, message);
return 0;
}
string method;
if (!message.GetProperty("method", &method))
return 0;
if (!HasPublication(method))
return 0;
RequestMessage request = message;
string id;
if (request.GetID(&id))
GetEndpointManager()->SendAnycastMessage(sender, request);
else
GetEndpointManager()->SendMulticastMessage(sender, request);
return 0;
}

View File

@ -4,7 +4,7 @@
namespace icinga
{
class MacroProcessor
class I2_ICINGA_API MacroProcessor
{
public:
static string ResolveMacros(string str, Dictionary::Ptr macros);

View File

@ -4,7 +4,7 @@
namespace icinga
{
class NagiosCheckTask : public CheckTask
class I2_ICINGA_API NagiosCheckTask : public CheckTask
{
public:
NagiosCheckTask(const Service& service);

View File

@ -51,14 +51,26 @@ long Service::GetMaxCheckAttempts(void) const
long Service::GetCheckInterval(void) const
{
long value = 1;
long value = 300;
GetConfigObject()->GetProperty("check_interval", &value);
return value;
}
long Service::GetRetryInterval(void) const
{
long value = 1;
long value = 15;
GetConfigObject()->GetProperty("retry_interval", &value);
return value;
}
void Service::SetNextCheck(time_t nextCheck)
{
GetConfigObject()->SetTag("next_check", static_cast<long>(nextCheck));
}
time_t Service::GetNextCheck(void) const
{
long value = 0;
GetConfigObject()->GetTag("next_check", &value);
return value;
}

View File

@ -19,6 +19,9 @@ public:
long GetMaxCheckAttempts(void) const;
long GetCheckInterval(void) const;
long GetRetryInterval(void) const;
void SetNextCheck(time_t nextCheck);
time_t GetNextCheck(void) const;
};
}

View File

@ -74,8 +74,7 @@ void VirtualEndpoint::ProcessRequest(Endpoint::Ptr sender, const RequestMessage&
void VirtualEndpoint::ProcessResponse(Endpoint::Ptr sender, const ResponseMessage& response)
{
// TODO: figure out which request this response belongs to and notify the caller
throw NotImplementedException();
GetEndpointManager()->ProcessResponseMessage(sender, response);
}
void VirtualEndpoint::Stop(void)

View File

@ -232,3 +232,14 @@ DictionaryIterator MessagePart::End(void)
{
return GetDictionary()->End();
}
/**
* Checks whether the message contains the specified element.
*
* @param key The name of the element.
* @returns true if the message contains the element, false otherwise.
*/
bool MessagePart::Contains(const string& key) const
{
return GetDictionary()->Contains(key);
}

View File

@ -85,6 +85,8 @@ public:
void AddUnnamedProperty(const MessagePart& value);
bool Contains(const string& key) const;
DictionaryIterator Begin(void);
DictionaryIterator End(void);

View File

@ -128,6 +128,17 @@ public:
{
SetProperty("id", value);
}
/**
* Checks whether a message is a response message.
*
* @param message The message.
* @returns true if the message is a response message, false otherwise.
*/
static bool IsResponseMessage(const MessagePart& message)
{
return (message.Contains("result"));
}
};
}