Multi-threaded nagios checks.

This commit is contained in:
Gunnar Beutner 2012-06-17 20:35:56 +02:00
parent 40b0cf8140
commit 4c658eb889
22 changed files with 363 additions and 372 deletions

View File

@ -32,6 +32,8 @@ libbase_la_SOURCES = \
tcpserver.h \ tcpserver.h \
tcpsocket.cpp \ tcpsocket.cpp \
tcpsocket.h \ tcpsocket.h \
threadpool.cpp \
threadpool.h \
timer.cpp \ timer.cpp \
timer.h \ timer.h \
tlsclient.cpp \ tlsclient.cpp \
@ -63,4 +65,5 @@ libbase_la_LIBADD = \
$(LIBLTDL) \ $(LIBLTDL) \
$(OPENSSL_LIBS) \ $(OPENSSL_LIBS) \
$(BOOST_SIGNALS_LIB) \ $(BOOST_SIGNALS_LIB) \
$(BOOST_THREAD_LIB) \
${top_builddir}/third-party/mmatch/libmmatch.la ${top_builddir}/third-party/mmatch/libmmatch.la

View File

@ -24,6 +24,7 @@
<ClCompile Include="tcpclient.cpp" /> <ClCompile Include="tcpclient.cpp" />
<ClCompile Include="tcpserver.cpp" /> <ClCompile Include="tcpserver.cpp" />
<ClCompile Include="tcpsocket.cpp" /> <ClCompile Include="tcpsocket.cpp" />
<ClCompile Include="threadpool.cpp" />
<ClCompile Include="timer.cpp" /> <ClCompile Include="timer.cpp" />
<ClCompile Include="tlsclient.cpp" /> <ClCompile Include="tlsclient.cpp" />
<ClCompile Include="unix.cpp" /> <ClCompile Include="unix.cpp" />
@ -46,6 +47,7 @@
<ClInclude Include="tcpclient.h" /> <ClInclude Include="tcpclient.h" />
<ClInclude Include="tcpserver.h" /> <ClInclude Include="tcpserver.h" />
<ClInclude Include="tcpsocket.h" /> <ClInclude Include="tcpsocket.h" />
<ClInclude Include="threadpool.h" />
<ClInclude Include="timer.h" /> <ClInclude Include="timer.h" />
<ClInclude Include="tlsclient.h" /> <ClInclude Include="tlsclient.h" />
<ClInclude Include="unix.h" /> <ClInclude Include="unix.h" />

View File

@ -91,6 +91,7 @@
#include <map> #include <map>
#include <list> #include <list>
#include <algorithm> #include <algorithm>
#include <stack>
using std::string; using std::string;
using std::vector; using std::vector;
@ -99,6 +100,7 @@ using std::list;
using std::set; using std::set;
using std::multimap; using std::multimap;
using std::pair; using std::pair;
using std::stack;
using std::stringstream; using std::stringstream;
@ -114,6 +116,7 @@ using std::domain_error;
#include <boost/signal.hpp> #include <boost/signal.hpp>
#include <boost/algorithm/string/trim.hpp> #include <boost/algorithm/string/trim.hpp>
#include <boost/algorithm/string/split.hpp> #include <boost/algorithm/string/split.hpp>
#include <boost/thread.hpp>
using boost::shared_ptr; using boost::shared_ptr;
using boost::weak_ptr; using boost::weak_ptr;
@ -121,6 +124,11 @@ using boost::enable_shared_from_this;
using boost::dynamic_pointer_cast; using boost::dynamic_pointer_cast;
using boost::static_pointer_cast; using boost::static_pointer_cast;
using boost::function; using boost::function;
using boost::thread;
using boost::thread_group;
using boost::mutex;
using boost::unique_lock;
using boost::condition_variable;
#if defined(__APPLE__) && defined(__MACH__) #if defined(__APPLE__) && defined(__MACH__)
# pragma GCC diagnostic ignored "-Wdeprecated-declarations" # pragma GCC diagnostic ignored "-Wdeprecated-declarations"
@ -158,5 +166,6 @@ using boost::function;
#include "configobject.h" #include "configobject.h"
#include "application.h" #include "application.h"
#include "component.h" #include "component.h"
#include "threadpool.h"
#endif /* I2BASE_H */ #endif /* I2BASE_H */

63
base/threadpool.cpp Normal file
View File

@ -0,0 +1,63 @@
#include "i2-base.h"
using namespace icinga;
ThreadPool::ThreadPool(long numThreads)
: m_Alive(true)
{
for (long i = 0; i < numThreads; i++)
m_Threads.create_thread(boost::bind(&ThreadPool::WorkerThreadProc, this));
}
ThreadPool::~ThreadPool(void)
{
unique_lock<mutex> lock(m_Lock);
/* wait for all pending tasks */
while (m_Tasks.size() > 0)
m_CV.wait(lock);
/* notify worker threads to exit */
m_Alive = false;
m_CV.notify_all();
}
void ThreadPool::EnqueueTask(Task task)
{
unique_lock<mutex> lock(m_Lock);
m_Tasks.push(task);
m_CV.notify_one();
}
void ThreadPool::WorkerThreadProc(void)
{
while (true) {
Task task;
{
unique_lock<mutex> lock(m_Lock);
while (m_Tasks.size() == 0) {
m_CV.wait(lock);
if (!m_Alive)
return;
}
task = m_Tasks.top();
m_Tasks.pop();
}
task();
}
}
ThreadPool::Ptr ThreadPool::GetDefaultPool(void)
{
static ThreadPool::Ptr threadPool;
if (!threadPool)
threadPool = boost::make_shared<ThreadPool>();
return threadPool;
}

36
base/threadpool.h Normal file
View File

@ -0,0 +1,36 @@
#ifndef THREADPOOL_H
#define THREADPOOL_H
namespace icinga
{
class I2_BASE_API ThreadPool : public Object
{
public:
typedef shared_ptr<ThreadPool> Ptr;
typedef weak_ptr<ThreadPool> WeakPtr;
typedef function<void()> Task;
ThreadPool(long numThreads = 16);
~ThreadPool(void);
static ThreadPool::Ptr GetDefaultPool(void);
void EnqueueTask(Task task);
private:
mutex m_Lock;
condition_variable m_CV;
stack<Task> m_Tasks;
thread_group m_Threads;
bool m_Alive;
void WorkerThreadProc(void);
};
}
#endif /* THREADPOOL_H */

View File

@ -45,13 +45,10 @@ void CheckerComponent::Start(void)
CheckTask::RegisterType("nagios", NagiosCheckTask::CreateTask); CheckTask::RegisterType("nagios", NagiosCheckTask::CreateTask);
ConfigObject::TMap::Range range = ConfigObject::GetObjects("service"); m_ResultTimer = boost::make_shared<Timer>();
m_ResultTimer->SetInterval(10);
for (ConfigObject::TMap::Iterator it = range.first; it != range.second; it++) { m_ResultTimer->OnTimerExpired.connect(boost::bind(&CheckerComponent::ResultTimerHandler, this));
Service svc = it->second; m_ResultTimer->Start();
CheckTask::Ptr ct = CheckTask::CreateTask(svc);
CheckResult cr = ct->Execute();
}
} }
void CheckerComponent::Stop(void) void CheckerComponent::Stop(void)
@ -76,9 +73,11 @@ void CheckerComponent::CheckTimerHandler(void)
if (service.GetNextCheck() > now) if (service.GetNextCheck() > now)
break; break;
CheckTask::Ptr ct = CheckTask::CreateTask(service);
Application::Log(LogInformation, "checker", "Executing service check for '" + service.GetName() + "'"); Application::Log(LogInformation, "checker", "Executing service check for '" + service.GetName() + "'");
CheckResult cr = ct->Execute();
CheckTask::Ptr task = CheckTask::CreateTask(service);
task->Execute();
m_PendingTasks.push_back(task);
m_Services.pop(); m_Services.pop();
service.SetNextCheck(now + service.GetCheckInterval()); service.SetNextCheck(now + service.GetCheckInterval());
@ -90,6 +89,25 @@ void CheckerComponent::CheckTimerHandler(void)
m_CheckTimer->SetInterval(service.GetNextCheck() - now); m_CheckTimer->SetInterval(service.GetNextCheck() - now);
} }
void CheckerComponent::ResultTimerHandler(void)
{
vector<CheckTask::Ptr> unfinishedTasks;
for (vector<CheckTask::Ptr>::iterator it = m_PendingTasks.begin(); it != m_PendingTasks.end(); it++) {
CheckTask::Ptr task = *it;
if (!task->IsFinished()) {
unfinishedTasks.push_back(task);
break;
}
CheckResult result = task->GetResult();
Application::Log(LogInformation, "checker", "Got result! Plugin output: " + result.Output);
}
m_PendingTasks = unfinishedTasks;
}
void CheckerComponent::AssignServiceRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request) void CheckerComponent::AssignServiceRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
{ {
MessagePart params; MessagePart params;

View File

@ -52,7 +52,11 @@ private:
Timer::Ptr m_CheckTimer; Timer::Ptr m_CheckTimer;
VirtualEndpoint::Ptr m_CheckerEndpoint; VirtualEndpoint::Ptr m_CheckerEndpoint;
Timer::Ptr m_ResultTimer;
vector<CheckTask::Ptr> m_PendingTasks;
void CheckTimerHandler(void); void CheckTimerHandler(void);
void ResultTimerHandler(void);
void AssignServiceRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request); void AssignServiceRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request);
void RevokeServiceRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request); void RevokeServiceRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request);

149
config/ax_boost_thread.m4 Normal file
View File

@ -0,0 +1,149 @@
# ===========================================================================
# http://www.gnu.org/software/autoconf-archive/ax_boost_thread.html
# ===========================================================================
#
# SYNOPSIS
#
# AX_BOOST_THREAD
#
# DESCRIPTION
#
# Test for Thread library from the Boost C++ libraries. The macro requires
# a preceding call to AX_BOOST_BASE. Further documentation is available at
# <http://randspringer.de/boost/index.html>.
#
# This macro calls:
#
# AC_SUBST(BOOST_THREAD_LIB)
#
# And sets:
#
# HAVE_BOOST_THREAD
#
# LICENSE
#
# Copyright (c) 2009 Thomas Porschberg <thomas@randspringer.de>
# Copyright (c) 2009 Michael Tindal
#
# Copying and distribution of this file, with or without modification, are
# permitted in any medium without royalty provided the copyright notice
# and this notice are preserved. This file is offered as-is, without any
# warranty.
#serial 25
AC_DEFUN([AX_BOOST_THREAD],
[
AC_ARG_WITH([boost-thread],
AS_HELP_STRING([--with-boost-thread@<:@=special-lib@:>@],
[use the Thread library from boost - it is possible to specify a certain library for the linker
e.g. --with-boost-thread=boost_thread-gcc-mt ]),
[
if test "$withval" = "no"; then
want_boost="no"
elif test "$withval" = "yes"; then
want_boost="yes"
ax_boost_user_thread_lib=""
else
want_boost="yes"
ax_boost_user_thread_lib="$withval"
fi
],
[want_boost="yes"]
)
if test "x$want_boost" = "xyes"; then
AC_REQUIRE([AC_PROG_CC])
AC_REQUIRE([AC_CANONICAL_BUILD])
CPPFLAGS_SAVED="$CPPFLAGS"
CPPFLAGS="$CPPFLAGS $BOOST_CPPFLAGS"
export CPPFLAGS
LDFLAGS_SAVED="$LDFLAGS"
LDFLAGS="$LDFLAGS $BOOST_LDFLAGS"
export LDFLAGS
AC_CACHE_CHECK(whether the Boost::Thread library is available,
ax_cv_boost_thread,
[AC_LANG_PUSH([C++])
CXXFLAGS_SAVE=$CXXFLAGS
if test "x$host_os" = "xsolaris" ; then
CXXFLAGS="-pthreads $CXXFLAGS"
elif test "x$host_os" = "xmingw32" ; then
CXXFLAGS="-mthreads $CXXFLAGS"
else
CXXFLAGS="-pthread $CXXFLAGS"
fi
AC_COMPILE_IFELSE([AC_LANG_PROGRAM([[@%:@include <boost/thread/thread.hpp>]],
[[boost::thread_group thrds;
return 0;]])],
ax_cv_boost_thread=yes, ax_cv_boost_thread=no)
CXXFLAGS=$CXXFLAGS_SAVE
AC_LANG_POP([C++])
])
if test "x$ax_cv_boost_thread" = "xyes"; then
if test "x$host_os" = "xsolaris" ; then
BOOST_CPPFLAGS="-pthreads $BOOST_CPPFLAGS"
elif test "x$host_os" = "xmingw32" ; then
BOOST_CPPFLAGS="-mthreads $BOOST_CPPFLAGS"
else
BOOST_CPPFLAGS="-pthread $BOOST_CPPFLAGS"
fi
AC_SUBST(BOOST_CPPFLAGS)
AC_DEFINE(HAVE_BOOST_THREAD,,[define if the Boost::Thread library is available])
BOOSTLIBDIR=`echo $BOOST_LDFLAGS | sed -e 's/@<:@^\/@:>@*//'`
LDFLAGS_SAVE=$LDFLAGS
case "x$host_os" in
*bsd* )
LDFLAGS="-pthread $LDFLAGS"
break;
;;
esac
if test "x$ax_boost_user_thread_lib" = "x"; then
for libextension in `ls $BOOSTLIBDIR/libboost_thread*.so* 2>/dev/null | sed 's,.*/,,' | sed -e 's;^lib\(boost_thread.*\)\.so.*$;\1;'` `ls $BOOSTLIBDIR/libboost_thread*.dylib* 2>/dev/null | sed 's,.*/,,' | sed -e 's;^lib\(libboost_thread.*\)\.dylib.*$;\1;'` `ls $BOOSTLIBDIR/libboost_thread*.a* 2>/dev/null | sed 's,.*/,,' | sed -e 's;^lib\(boost_thread.*\)\.a.*$;\1;'`; do
ax_lib=${libextension}
AC_CHECK_LIB($ax_lib, exit,
[BOOST_THREAD_LIB="-l$ax_lib"; AC_SUBST(BOOST_THREAD_LIB) link_thread="yes"; break],
[link_thread="no"])
done
if test "x$link_thread" != "xyes"; then
for libextension in `ls $BOOSTLIBDIR/boost_thread*.dll* 2>/dev/null | sed 's,.*/,,' | sed -e 's;^\(boost_thread.*\)\.dll.*$;\1;'` `ls $BOOSTLIBDIR/boost_thread*.a* 2>/dev/null | sed 's,.*/,,' | sed -e 's;^\(boost_thread.*\)\.a.*$;\1;'` ; do
ax_lib=${libextension}
AC_CHECK_LIB($ax_lib, exit,
[BOOST_THREAD_LIB="-l$ax_lib"; AC_SUBST(BOOST_THREAD_LIB) link_thread="yes"; break],
[link_thread="no"])
done
fi
else
for ax_lib in $ax_boost_user_thread_lib boost_thread-$ax_boost_user_thread_lib; do
AC_CHECK_LIB($ax_lib, exit,
[BOOST_THREAD_LIB="-l$ax_lib"; AC_SUBST(BOOST_THREAD_LIB) link_thread="yes"; break],
[link_thread="no"])
done
fi
if test "x$ax_lib" = "x"; then
AC_MSG_ERROR(Could not find a version of the library!)
fi
if test "x$link_thread" = "xno"; then
AC_MSG_ERROR(Could not link against $ax_lib !)
else
case "x$host_os" in
*bsd* )
BOOST_LDFLAGS="-pthread $BOOST_LDFLAGS"
break;
;;
esac
fi
fi
CPPFLAGS="$CPPFLAGS_SAVED"
LDFLAGS="$LDFLAGS_SAVED"
fi
])

View File

@ -1,309 +0,0 @@
# ===========================================================================
# http://www.gnu.org/software/autoconf-archive/ax_pthread.html
# ===========================================================================
#
# SYNOPSIS
#
# AX_PTHREAD([ACTION-IF-FOUND[, ACTION-IF-NOT-FOUND]])
#
# DESCRIPTION
#
# This macro figures out how to build C programs using POSIX threads. It
# sets the PTHREAD_LIBS output variable to the threads library and linker
# flags, and the PTHREAD_CFLAGS output variable to any special C compiler
# flags that are needed. (The user can also force certain compiler
# flags/libs to be tested by setting these environment variables.)
#
# Also sets PTHREAD_CC to any special C compiler that is needed for
# multi-threaded programs (defaults to the value of CC otherwise). (This
# is necessary on AIX to use the special cc_r compiler alias.)
#
# NOTE: You are assumed to not only compile your program with these flags,
# but also link it with them as well. e.g. you should link with
# $PTHREAD_CC $CFLAGS $PTHREAD_CFLAGS $LDFLAGS ... $PTHREAD_LIBS $LIBS
#
# If you are only building threads programs, you may wish to use these
# variables in your default LIBS, CFLAGS, and CC:
#
# LIBS="$PTHREAD_LIBS $LIBS"
# CFLAGS="$CFLAGS $PTHREAD_CFLAGS"
# CC="$PTHREAD_CC"
#
# In addition, if the PTHREAD_CREATE_JOINABLE thread-attribute constant
# has a nonstandard name, defines PTHREAD_CREATE_JOINABLE to that name
# (e.g. PTHREAD_CREATE_UNDETACHED on AIX).
#
# Also HAVE_PTHREAD_PRIO_INHERIT is defined if pthread is found and the
# PTHREAD_PRIO_INHERIT symbol is defined when compiling with
# PTHREAD_CFLAGS.
#
# ACTION-IF-FOUND is a list of shell commands to run if a threads library
# is found, and ACTION-IF-NOT-FOUND is a list of commands to run it if it
# is not found. If ACTION-IF-FOUND is not specified, the default action
# will define HAVE_PTHREAD.
#
# Please let the authors know if this macro fails on any platform, or if
# you have any other suggestions or comments. This macro was based on work
# by SGJ on autoconf scripts for FFTW (http://www.fftw.org/) (with help
# from M. Frigo), as well as ac_pthread and hb_pthread macros posted by
# Alejandro Forero Cuervo to the autoconf macro repository. We are also
# grateful for the helpful feedback of numerous users.
#
# Updated for Autoconf 2.68 by Daniel Richard G.
#
# LICENSE
#
# Copyright (c) 2008 Steven G. Johnson <stevenj@alum.mit.edu>
# Copyright (c) 2011 Daniel Richard G. <skunk@iSKUNK.ORG>
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
#
# As a special exception, the respective Autoconf Macro's copyright owner
# gives unlimited permission to copy, distribute and modify the configure
# scripts that are the output of Autoconf when processing the Macro. You
# need not follow the terms of the GNU General Public License when using
# or distributing such scripts, even though portions of the text of the
# Macro appear in them. The GNU General Public License (GPL) does govern
# all other use of the material that constitutes the Autoconf Macro.
#
# This special exception to the GPL applies to versions of the Autoconf
# Macro released by the Autoconf Archive. When you make and distribute a
# modified version of the Autoconf Macro, you may extend this special
# exception to the GPL to apply to your modified version as well.
#serial 18
AU_ALIAS([ACX_PTHREAD], [AX_PTHREAD])
AC_DEFUN([AX_PTHREAD], [
AC_REQUIRE([AC_CANONICAL_HOST])
AC_LANG_PUSH([C])
ax_pthread_ok=no
# We used to check for pthread.h first, but this fails if pthread.h
# requires special compiler flags (e.g. on True64 or Sequent).
# It gets checked for in the link test anyway.
# First of all, check if the user has set any of the PTHREAD_LIBS,
# etcetera environment variables, and if threads linking works using
# them:
if test x"$PTHREAD_LIBS$PTHREAD_CFLAGS" != x; then
save_CFLAGS="$CFLAGS"
CFLAGS="$CFLAGS $PTHREAD_CFLAGS"
save_LIBS="$LIBS"
LIBS="$PTHREAD_LIBS $LIBS"
AC_MSG_CHECKING([for pthread_join in LIBS=$PTHREAD_LIBS with CFLAGS=$PTHREAD_CFLAGS])
AC_TRY_LINK_FUNC(pthread_join, ax_pthread_ok=yes)
AC_MSG_RESULT($ax_pthread_ok)
if test x"$ax_pthread_ok" = xno; then
PTHREAD_LIBS=""
PTHREAD_CFLAGS=""
fi
LIBS="$save_LIBS"
CFLAGS="$save_CFLAGS"
fi
# We must check for the threads library under a number of different
# names; the ordering is very important because some systems
# (e.g. DEC) have both -lpthread and -lpthreads, where one of the
# libraries is broken (non-POSIX).
# Create a list of thread flags to try. Items starting with a "-" are
# C compiler flags, and other items are library names, except for "none"
# which indicates that we try without any flags at all, and "pthread-config"
# which is a program returning the flags for the Pth emulation library.
ax_pthread_flags="pthreads none -Kthread -kthread lthread -pthread -pthreads -mthreads pthread --thread-safe -mt pthread-config"
# The ordering *is* (sometimes) important. Some notes on the
# individual items follow:
# pthreads: AIX (must check this before -lpthread)
# none: in case threads are in libc; should be tried before -Kthread and
# other compiler flags to prevent continual compiler warnings
# -Kthread: Sequent (threads in libc, but -Kthread needed for pthread.h)
# -kthread: FreeBSD kernel threads (preferred to -pthread since SMP-able)
# lthread: LinuxThreads port on FreeBSD (also preferred to -pthread)
# -pthread: Linux/gcc (kernel threads), BSD/gcc (userland threads)
# -pthreads: Solaris/gcc
# -mthreads: Mingw32/gcc, Lynx/gcc
# -mt: Sun Workshop C (may only link SunOS threads [-lthread], but it
# doesn't hurt to check since this sometimes defines pthreads too;
# also defines -D_REENTRANT)
# ... -mt is also the pthreads flag for HP/aCC
# pthread: Linux, etcetera
# --thread-safe: KAI C++
# pthread-config: use pthread-config program (for GNU Pth library)
case ${host_os} in
solaris*)
# On Solaris (at least, for some versions), libc contains stubbed
# (non-functional) versions of the pthreads routines, so link-based
# tests will erroneously succeed. (We need to link with -pthreads/-mt/
# -lpthread.) (The stubs are missing pthread_cleanup_push, or rather
# a function called by this macro, so we could check for that, but
# who knows whether they'll stub that too in a future libc.) So,
# we'll just look for -pthreads and -lpthread first:
ax_pthread_flags="-pthreads pthread -mt -pthread $ax_pthread_flags"
;;
darwin*)
ax_pthread_flags="-pthread $ax_pthread_flags"
;;
esac
if test x"$ax_pthread_ok" = xno; then
for flag in $ax_pthread_flags; do
case $flag in
none)
AC_MSG_CHECKING([whether pthreads work without any flags])
;;
-*)
AC_MSG_CHECKING([whether pthreads work with $flag])
PTHREAD_CFLAGS="$flag"
;;
pthread-config)
AC_CHECK_PROG(ax_pthread_config, pthread-config, yes, no)
if test x"$ax_pthread_config" = xno; then continue; fi
PTHREAD_CFLAGS="`pthread-config --cflags`"
PTHREAD_LIBS="`pthread-config --ldflags` `pthread-config --libs`"
;;
*)
AC_MSG_CHECKING([for the pthreads library -l$flag])
PTHREAD_LIBS="-l$flag"
;;
esac
save_LIBS="$LIBS"
save_CFLAGS="$CFLAGS"
LIBS="$PTHREAD_LIBS $LIBS"
CFLAGS="$CFLAGS $PTHREAD_CFLAGS"
# Check for various functions. We must include pthread.h,
# since some functions may be macros. (On the Sequent, we
# need a special flag -Kthread to make this header compile.)
# We check for pthread_join because it is in -lpthread on IRIX
# while pthread_create is in libc. We check for pthread_attr_init
# due to DEC craziness with -lpthreads. We check for
# pthread_cleanup_push because it is one of the few pthread
# functions on Solaris that doesn't have a non-functional libc stub.
# We try pthread_create on general principles.
AC_LINK_IFELSE([AC_LANG_PROGRAM([#include <pthread.h>
static void routine(void *a) { a = 0; }
static void *start_routine(void *a) { return a; }],
[pthread_t th; pthread_attr_t attr;
pthread_create(&th, 0, start_routine, 0);
pthread_join(th, 0);
pthread_attr_init(&attr);
pthread_cleanup_push(routine, 0);
pthread_cleanup_pop(0) /* ; */])],
[ax_pthread_ok=yes],
[])
LIBS="$save_LIBS"
CFLAGS="$save_CFLAGS"
AC_MSG_RESULT($ax_pthread_ok)
if test "x$ax_pthread_ok" = xyes; then
break;
fi
PTHREAD_LIBS=""
PTHREAD_CFLAGS=""
done
fi
# Various other checks:
if test "x$ax_pthread_ok" = xyes; then
save_LIBS="$LIBS"
LIBS="$PTHREAD_LIBS $LIBS"
save_CFLAGS="$CFLAGS"
CFLAGS="$CFLAGS $PTHREAD_CFLAGS"
# Detect AIX lossage: JOINABLE attribute is called UNDETACHED.
AC_MSG_CHECKING([for joinable pthread attribute])
attr_name=unknown
for attr in PTHREAD_CREATE_JOINABLE PTHREAD_CREATE_UNDETACHED; do
AC_LINK_IFELSE([AC_LANG_PROGRAM([#include <pthread.h>],
[int attr = $attr; return attr /* ; */])],
[attr_name=$attr; break],
[])
done
AC_MSG_RESULT($attr_name)
if test "$attr_name" != PTHREAD_CREATE_JOINABLE; then
AC_DEFINE_UNQUOTED(PTHREAD_CREATE_JOINABLE, $attr_name,
[Define to necessary symbol if this constant
uses a non-standard name on your system.])
fi
AC_MSG_CHECKING([if more special flags are required for pthreads])
flag=no
case ${host_os} in
aix* | freebsd* | darwin*) flag="-D_THREAD_SAFE";;
osf* | hpux*) flag="-D_REENTRANT";;
solaris*)
if test "$GCC" = "yes"; then
flag="-D_REENTRANT"
else
flag="-mt -D_REENTRANT"
fi
;;
esac
AC_MSG_RESULT(${flag})
if test "x$flag" != xno; then
PTHREAD_CFLAGS="$flag $PTHREAD_CFLAGS"
fi
AC_CACHE_CHECK([for PTHREAD_PRIO_INHERIT],
ax_cv_PTHREAD_PRIO_INHERIT, [
AC_LINK_IFELSE([
AC_LANG_PROGRAM([[#include <pthread.h>]], [[int i = PTHREAD_PRIO_INHERIT;]])],
[ax_cv_PTHREAD_PRIO_INHERIT=yes],
[ax_cv_PTHREAD_PRIO_INHERIT=no])
])
AS_IF([test "x$ax_cv_PTHREAD_PRIO_INHERIT" = "xyes"],
AC_DEFINE([HAVE_PTHREAD_PRIO_INHERIT], 1, [Have PTHREAD_PRIO_INHERIT.]))
LIBS="$save_LIBS"
CFLAGS="$save_CFLAGS"
# More AIX lossage: must compile with xlc_r or cc_r
if test x"$GCC" != xyes; then
AC_CHECK_PROGS(PTHREAD_CC, xlc_r cc_r, ${CC})
else
PTHREAD_CC=$CC
fi
else
PTHREAD_CC="$CC"
fi
AC_SUBST(PTHREAD_LIBS)
AC_SUBST(PTHREAD_CFLAGS)
AC_SUBST(PTHREAD_CC)
# Finally, execute ACTION-IF-FOUND/ACTION-IF-NOT-FOUND:
if test x"$ax_pthread_ok" = xyes; then
ifelse([$1],,AC_DEFINE(HAVE_PTHREAD,1,[Define if you have POSIX threads libraries and header files.]),[$1])
:
else
ax_pthread_ok=no
$2
fi
AC_LANG_POP
])dnl AX_PTHREAD

View File

@ -50,16 +50,15 @@ AM_PROG_LEX
AC_PROG_YACC AC_PROG_YACC
AC_PROG_LIBTOOL AC_PROG_LIBTOOL
AX_CXX_GCC_ABI_DEMANGLE AX_CXX_GCC_ABI_DEMANGLE
AX_PTHREAD
AX_BOOST_BASE AX_BOOST_BASE
AX_BOOST_SIGNALS AX_BOOST_SIGNALS
AX_BOOST_THREAD
AX_BOOST_UNIT_TEST_FRAMEWORK AX_BOOST_UNIT_TEST_FRAMEWORK
AX_CHECK_OPENSSL([], [AC_MSG_ERROR([You need the OpenSSL headers and libraries in order to build this application])]) AX_CHECK_OPENSSL([], [AC_MSG_ERROR([You need the OpenSSL headers and libraries in order to build this application])])
AC_CHECK_LIB(ssl, SSL_new) AC_CHECK_LIB(ssl, SSL_new)
AC_CHECK_LIB(crypto, X509_NAME_oneline) AC_CHECK_LIB(crypto, X509_NAME_oneline)
AC_CHECK_LIB(eay32, X509_NAME_oneline) AC_CHECK_LIB(eay32, X509_NAME_oneline)
AC_CHECK_LIB(m, floor) AC_CHECK_LIB(m, floor)
AC_CHECK_LIB(pthread, pthread_create)
AC_CHECK_LIB(socket, getsockname) AC_CHECK_LIB(socket, getsockname)
AC_CHECK_LIB(ws2_32, getsockname) AC_CHECK_LIB(ws2_32, getsockname)
AC_CHECK_LIB(shlwapi, PathRemoveFileSpecA) AC_CHECK_LIB(shlwapi, PathRemoveFileSpecA)

View File

@ -29,10 +29,8 @@
#include <i2-base.h> #include <i2-base.h>
#include <stack>
#include <fstream> #include <fstream>
using std::stack;
using std::istream; using std::istream;
using std::ostream; using std::ostream;
using std::cin; using std::cin;

View File

@ -46,5 +46,6 @@ libicinga_la_LDFLAGS = \
@VERSION_INFO@ @VERSION_INFO@
libicinga_la_LIBADD = \ libicinga_la_LIBADD = \
$(BOOST_THREAD_LIB) \
${top_builddir}/jsonrpc/libjsonrpc.la \ ${top_builddir}/jsonrpc/libjsonrpc.la \
${top_builddir}/base/libbase.la ${top_builddir}/base/libbase.la

View File

@ -32,7 +32,9 @@ public:
typedef function<CheckTask::Ptr(const Service&)> Factory; typedef function<CheckTask::Ptr(const Service&)> Factory;
virtual CheckResult Execute(void) const = 0; virtual void Execute(void) = 0;
virtual bool IsFinished(void) const = 0;
virtual CheckResult GetResult(void) = 0;
static void RegisterType(string type, Factory factory); static void RegisterType(string type, Factory factory);
static CheckTask::Ptr CreateTask(const Service& service); static CheckTask::Ptr CreateTask(const Service& service);

View File

@ -21,36 +21,6 @@
using namespace icinga; using namespace icinga;
/**
* Constructor for the Endpoint class.
*/
Endpoint::Endpoint(void)
{
m_ReceivedWelcome = false;
m_SentWelcome = false;
}
/**
* Retrieves the identity of this endpoint.
*
* @returns The identity of the endpoint.
*/
string Endpoint::GetIdentity(void) const
{
return m_Identity;
}
/**
* Sets the identity of this endpoint.
*
* @param identity The new identity of the endpoint.
*/
void Endpoint::SetIdentity(string identity)
{
m_Identity = identity;
OnIdentityChanged(GetSelf());
}
/** /**
* Retrieves the endpoint manager this endpoint is registered with. * Retrieves the endpoint manager this endpoint is registered with.
* *

View File

@ -38,13 +38,13 @@ public:
typedef set<string>::const_iterator ConstTopicIterator; typedef set<string>::const_iterator ConstTopicIterator;
Endpoint(void); Endpoint(void)
: m_ReceivedWelcome(false), m_SentWelcome(false)
{ }
virtual string GetIdentity(void) const = 0;
virtual string GetAddress(void) const = 0; virtual string GetAddress(void) const = 0;
string GetIdentity(void) const;
void SetIdentity(string identity);
void SetReceivedWelcome(bool value); void SetReceivedWelcome(bool value);
bool HasReceivedWelcome(void) const; bool HasReceivedWelcome(void) const;
@ -83,7 +83,6 @@ public:
boost::signal<void (const Endpoint::Ptr&)> OnSessionEstablished; boost::signal<void (const Endpoint::Ptr&)> OnSessionEstablished;
private: private:
string m_Identity; /**< The identity of this endpoint. */
set<string> m_Subscriptions; /**< The topics this endpoint is set<string> m_Subscriptions; /**< The topics this endpoint is
subscribed to. */ subscribed to. */
set<string> m_Publications; /**< The topics this endpoint is set<string> m_Publications; /**< The topics this endpoint is

View File

@ -31,6 +31,10 @@
#include <i2-jsonrpc.h> #include <i2-jsonrpc.h>
#include <set> #include <set>
#include <boost/thread/future.hpp>
using boost::packaged_task;
using boost::unique_future;
#ifdef I2_ICINGA_BUILD #ifdef I2_ICINGA_BUILD
# define I2_ICINGA_API I2_EXPORT # define I2_ICINGA_API I2_EXPORT
#else /* I2_ICINGA_BUILD */ #else /* I2_ICINGA_BUILD */

View File

@ -21,6 +21,11 @@
using namespace icinga; using namespace icinga;
string JsonRpcEndpoint::GetIdentity(void) const
{
return m_Identity;
}
string JsonRpcEndpoint::GetAddress(void) const string JsonRpcEndpoint::GetAddress(void) const
{ {
if (!m_Client) if (!m_Client)
@ -142,8 +147,10 @@ void JsonRpcEndpoint::VerifyCertificateHandler(bool& valid, const shared_ptr<X50
if (certificate && valid) { if (certificate && valid) {
string identity = Utility::GetCertificateCN(certificate); string identity = Utility::GetCertificateCN(certificate);
if (GetIdentity().empty() && !identity.empty()) if (GetIdentity().empty() && !identity.empty()) {
SetIdentity(identity); m_Identity = identity;
OnIdentityChanged(GetSelf());
}
} }
} }

View File

@ -41,7 +41,7 @@ public:
JsonRpcClient::Ptr GetClient(void); JsonRpcClient::Ptr GetClient(void);
void SetClient(JsonRpcClient::Ptr client); void SetClient(JsonRpcClient::Ptr client);
void SetAddress(string address); virtual string GetIdentity(void) const;
virtual string GetAddress(void) const; virtual string GetAddress(void) const;
virtual bool IsLocal(void) const; virtual bool IsLocal(void) const;
@ -53,11 +53,15 @@ public:
virtual void Stop(void); virtual void Stop(void);
private: private:
string m_Identity; /**< The identity of this endpoint. */
shared_ptr<SSL_CTX> m_SSLContext; shared_ptr<SSL_CTX> m_SSLContext;
string m_Address; string m_Address;
JsonRpcClient::Ptr m_Client; JsonRpcClient::Ptr m_Client;
map<string, Endpoint::Ptr> m_PendingCalls; map<string, Endpoint::Ptr> m_PendingCalls;
void SetAddress(string address);
void NewMessageHandler(const MessagePart& message); void NewMessageHandler(const MessagePart& message);
void ClientClosedHandler(void); void ClientClosedHandler(void);
void ClientErrorHandler(const std::exception& ex); void ClientErrorHandler(const std::exception& ex);

View File

@ -5,24 +5,45 @@ using namespace icinga;
NagiosCheckTask::NagiosCheckTask(const Service& service) NagiosCheckTask::NagiosCheckTask(const Service& service)
{ {
string checkCommand = service.GetCheckCommand(); string checkCommand = service.GetCheckCommand();
m_Command = MacroProcessor::ResolveMacros(checkCommand, service.GetMacros()); m_Command = MacroProcessor::ResolveMacros(checkCommand, service.GetMacros()) + " 2>&1";
m_Task = packaged_task<CheckResult>(boost::bind(&NagiosCheckTask::RunCheck, this));
m_Result = m_Task.get_future();
} }
CheckResult NagiosCheckTask::Execute(void) const void NagiosCheckTask::Execute(void)
{
Application::Log(LogDebug, "icinga", "Nagios check command: " + m_Command);
ThreadPool::GetDefaultPool()->EnqueueTask(boost::bind(&NagiosCheckTask::InternalExecute, this));
}
void NagiosCheckTask::InternalExecute(void)
{
m_Task();
}
bool NagiosCheckTask::IsFinished(void) const
{
return m_Result.is_ready();
}
CheckResult NagiosCheckTask::GetResult(void)
{
return m_Result.get();
}
CheckResult NagiosCheckTask::RunCheck(void) const
{ {
CheckResult cr; CheckResult cr;
FILE *fp; FILE *fp;
time(&cr.StartTime); time(&cr.StartTime);
string command = m_Command + " 2>&1";
Application::Log(LogDebug, "icinga", "Nagios check command: " + command);
#ifdef _MSC_VER #ifdef _MSC_VER
fp = _popen(command.c_str(), "r"); fp = _popen(m_Command.c_str(), "r");
#else /* _MSC_VER */ #else /* _MSC_VER */
fp = popen(command.c_str(), "r"); fp = popen(m_Command.c_str(), "r");
#endif /* _MSC_VER */ #endif /* _MSC_VER */
stringstream outputbuf; stringstream outputbuf;
@ -40,8 +61,6 @@ CheckResult NagiosCheckTask::Execute(void) const
cr.Output = outputbuf.str(); cr.Output = outputbuf.str();
boost::algorithm::trim(cr.Output); boost::algorithm::trim(cr.Output);
Application::Log(LogDebug, "icinga", "Nagios plugin output: " + cr.Output);
int status, exitcode; int status, exitcode;
#ifdef _MSC_VER #ifdef _MSC_VER
status = _pclose(fp); status = _pclose(fp);

View File

@ -9,12 +9,19 @@ class I2_ICINGA_API NagiosCheckTask : public CheckTask
public: public:
NagiosCheckTask(const Service& service); NagiosCheckTask(const Service& service);
virtual CheckResult Execute(void) const; virtual void Execute(void);
virtual bool IsFinished(void) const;
virtual CheckResult GetResult(void);
static CheckTask::Ptr CreateTask(const Service& service); static CheckTask::Ptr CreateTask(const Service& service);
private: private:
string m_Command; string m_Command;
packaged_task<CheckResult> m_Task;
unique_future<CheckResult> m_Result;
void InternalExecute(void);
CheckResult RunCheck(void) const;
}; };
} }

View File

@ -21,6 +21,11 @@
using namespace icinga; using namespace icinga;
string VirtualEndpoint::GetIdentity(void) const
{
return "__" + GetAddress();
}
string VirtualEndpoint::GetAddress(void) const string VirtualEndpoint::GetAddress(void) const
{ {
char address[50]; char address[50];

View File

@ -37,6 +37,7 @@ public:
void RegisterTopicHandler(string topic, function<void (const VirtualEndpoint::Ptr&, const Endpoint::Ptr, const RequestMessage&)> callback); void RegisterTopicHandler(string topic, function<void (const VirtualEndpoint::Ptr&, const Endpoint::Ptr, const RequestMessage&)> callback);
void UnregisterTopicHandler(string topic, function<void (const VirtualEndpoint::Ptr&, const Endpoint::Ptr, const RequestMessage&)> callback); void UnregisterTopicHandler(string topic, function<void (const VirtualEndpoint::Ptr&, const Endpoint::Ptr, const RequestMessage&)> callback);
virtual string GetIdentity(void) const;
virtual string GetAddress(void) const; virtual string GetAddress(void) const;
virtual bool IsLocal(void) const; virtual bool IsLocal(void) const;