2013-08-27 12:21:41 +02:00
/******************************************************************************
* Icinga 2 *
2014-01-09 00:32:11 +01:00
* Copyright ( C ) 2012 - present Icinga Development Team ( http : //www.icinga.org) *
2013-08-27 12:21:41 +02:00
* *
* This program is free software ; you can redistribute it and / or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation ; either version 2 *
* of the License , or ( at your option ) any later version . *
* *
* This program is distributed in the hope that it will be useful , *
* but WITHOUT ANY WARRANTY ; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the *
* GNU General Public License for more details . *
* *
* You should have received a copy of the GNU General Public License *
* along with this program ; if not , write to the Free Software Foundation *
* Inc . , 51 Franklin St , Fifth Floor , Boston , MA 02110 - 1301 , USA . *
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
2013-09-25 09:36:55 +02:00
# include "cluster/clusterlistener.h"
2013-08-27 12:21:41 +02:00
# include "cluster/endpoint.h"
2014-02-13 15:15:16 +01:00
# include "icinga/cib.h"
2013-09-17 13:18:26 +02:00
# include "icinga/domain.h"
2013-10-08 11:57:35 +02:00
# include "icinga/icingaapplication.h"
2013-09-03 10:08:02 +02:00
# include "base/netstring.h"
2013-08-27 12:21:41 +02:00
# include "base/dynamictype.h"
# include "base/logger_fwd.h"
# include "base/objectlock.h"
# include "base/networkstream.h"
2013-09-03 14:05:03 +02:00
# include "base/zlibstream.h"
2013-09-03 10:08:02 +02:00
# include "base/application.h"
# include "base/convert.h"
2013-11-19 07:49:41 +01:00
# include "base/context.h"
2014-02-17 16:34:18 +01:00
# include "base/statsfunction.h"
2013-09-03 10:08:02 +02:00
# include <fstream>
2013-08-27 12:21:41 +02:00
using namespace icinga ;
2013-09-25 09:36:55 +02:00
REGISTER_TYPE ( ClusterListener ) ;
2013-08-27 12:21:41 +02:00
2014-02-17 16:34:18 +01:00
REGISTER_STATSFUNCTION ( ClusterListenerStats , & ClusterListener : : StatsFunc ) ;
Value ClusterListener : : StatsFunc ( Dictionary : : Ptr & status , Dictionary : : Ptr & perfdata )
{
2014-02-18 10:53:44 +01:00
Dictionary : : Ptr nodes = make_shared < Dictionary > ( ) ;
std : : pair < Dictionary : : Ptr , Dictionary : : Ptr > stats ;
2014-02-17 16:34:18 +01:00
2014-02-17 18:51:16 +01:00
BOOST_FOREACH ( const ClusterListener : : Ptr & cluster_listener , DynamicType : : GetObjects < ClusterListener > ( ) ) {
2014-02-18 10:53:44 +01:00
stats = cluster_listener - > GetClusterStatus ( ) ;
nodes - > Set ( cluster_listener - > GetName ( ) , stats . first ) ;
String perfdata_prefix = " clusterlistener_ " + cluster_listener - > GetName ( ) + " _ " ;
BOOST_FOREACH ( Dictionary : : Pair const & kv , stats . second ) {
perfdata - > Set ( perfdata_prefix + kv . first , kv . second ) ;
}
2014-02-17 18:51:16 +01:00
}
2014-02-18 10:53:44 +01:00
status - > Set ( " clusterlistener " , nodes ) ;
2014-02-17 16:34:18 +01:00
return 0 ;
}
2013-08-27 12:21:41 +02:00
/**
* Starts the component .
*/
2013-09-25 09:36:55 +02:00
void ClusterListener : : Start ( void )
2013-08-27 12:21:41 +02:00
{
DynamicObject : : Start ( ) ;
2013-09-03 10:30:28 +02:00
{
ObjectLock olock ( this ) ;
2013-09-03 15:56:48 +02:00
RotateLogFile ( ) ;
2013-09-03 10:30:28 +02:00
OpenLogFile ( ) ;
}
2013-09-03 10:08:02 +02:00
2013-08-27 12:21:41 +02:00
/* set up SSL context */
2013-10-26 09:41:45 +02:00
shared_ptr < X509 > cert = GetX509Certificate ( GetCertPath ( ) ) ;
SetIdentity ( GetCertificateCN ( cert ) ) ;
Log ( LogInformation , " cluster " , " My identity: " + GetIdentity ( ) ) ;
2013-08-27 12:21:41 +02:00
2013-09-04 15:47:15 +02:00
Endpoint : : Ptr self = Endpoint : : GetByName ( GetIdentity ( ) ) ;
if ( ! self )
BOOST_THROW_EXCEPTION ( std : : invalid_argument ( " No configuration available for the local endpoint. " ) ) ;
2013-10-26 09:41:45 +02:00
m_SSLContext = MakeSSLContext ( GetCertPath ( ) , GetKeyPath ( ) , GetCaPath ( ) ) ;
2013-08-27 12:21:41 +02:00
2013-11-13 10:30:40 +01:00
if ( ! GetCrlPath ( ) . IsEmpty ( ) )
AddCRLToSSLContext ( m_SSLContext , GetCrlPath ( ) ) ;
2013-08-27 12:21:41 +02:00
/* create the primary JSON-RPC listener */
if ( ! GetBindPort ( ) . IsEmpty ( ) )
AddListener ( GetBindPort ( ) ) ;
2013-11-06 08:51:56 +01:00
m_ClusterTimer = make_shared < Timer > ( ) ;
2013-09-25 09:36:55 +02:00
m_ClusterTimer - > OnTimerExpired . connect ( boost : : bind ( & ClusterListener : : ClusterTimerHandler , this ) ) ;
2013-08-30 09:34:58 +02:00
m_ClusterTimer - > SetInterval ( 5 ) ;
m_ClusterTimer - > Start ( ) ;
2013-08-27 12:21:41 +02:00
2013-11-20 15:33:04 +01:00
m_MessageQueue . SetExceptionCallback ( & ClusterListener : : MessageExceptionHandler ) ;
2013-09-25 09:36:55 +02:00
Service : : OnNewCheckResult . connect ( boost : : bind ( & ClusterListener : : CheckResultHandler , this , _1 , _2 , _3 ) ) ;
Service : : OnNextCheckChanged . connect ( boost : : bind ( & ClusterListener : : NextCheckChangedHandler , this , _1 , _2 , _3 ) ) ;
Notification : : OnNextNotificationChanged . connect ( boost : : bind ( & ClusterListener : : NextNotificationChangedHandler , this , _1 , _2 , _3 ) ) ;
Service : : OnForceNextCheckChanged . connect ( boost : : bind ( & ClusterListener : : ForceNextCheckChangedHandler , this , _1 , _2 , _3 ) ) ;
Service : : OnForceNextNotificationChanged . connect ( boost : : bind ( & ClusterListener : : ForceNextNotificationChangedHandler , this , _1 , _2 , _3 ) ) ;
Service : : OnEnableActiveChecksChanged . connect ( boost : : bind ( & ClusterListener : : EnableActiveChecksChangedHandler , this , _1 , _2 , _3 ) ) ;
Service : : OnEnablePassiveChecksChanged . connect ( boost : : bind ( & ClusterListener : : EnablePassiveChecksChangedHandler , this , _1 , _2 , _3 ) ) ;
Service : : OnEnableNotificationsChanged . connect ( boost : : bind ( & ClusterListener : : EnableNotificationsChangedHandler , this , _1 , _2 , _3 ) ) ;
Service : : OnEnableFlappingChanged . connect ( boost : : bind ( & ClusterListener : : EnableFlappingChangedHandler , this , _1 , _2 , _3 ) ) ;
Service : : OnCommentAdded . connect ( boost : : bind ( & ClusterListener : : CommentAddedHandler , this , _1 , _2 , _3 ) ) ;
Service : : OnCommentRemoved . connect ( boost : : bind ( & ClusterListener : : CommentRemovedHandler , this , _1 , _2 , _3 ) ) ;
Service : : OnDowntimeAdded . connect ( boost : : bind ( & ClusterListener : : DowntimeAddedHandler , this , _1 , _2 , _3 ) ) ;
Service : : OnDowntimeRemoved . connect ( boost : : bind ( & ClusterListener : : DowntimeRemovedHandler , this , _1 , _2 , _3 ) ) ;
Service : : OnAcknowledgementSet . connect ( boost : : bind ( & ClusterListener : : AcknowledgementSetHandler , this , _1 , _2 , _3 , _4 , _5 , _6 ) ) ;
Service : : OnAcknowledgementCleared . connect ( boost : : bind ( & ClusterListener : : AcknowledgementClearedHandler , this , _1 , _2 ) ) ;
Endpoint : : OnMessageReceived . connect ( boost : : bind ( & ClusterListener : : AsyncMessageHandler , this , _1 , _2 ) ) ;
2013-09-17 13:18:26 +02:00
BOOST_FOREACH ( const DynamicType : : Ptr & type , DynamicType : : GetTypes ( ) ) {
BOOST_FOREACH ( const DynamicObject : : Ptr & object , type - > GetObjects ( ) ) {
BOOST_FOREACH ( const Endpoint : : Ptr & endpoint , DynamicType : : GetObjects < Endpoint > ( ) ) {
int privs = 0 ;
Array : : Ptr domains = object - > GetDomains ( ) ;
if ( domains ) {
ObjectLock olock ( domains ) ;
BOOST_FOREACH ( const String & domain , domains ) {
Domain : : Ptr domainObj = Domain : : GetByName ( domain ) ;
if ( ! domainObj )
BOOST_THROW_EXCEPTION ( std : : invalid_argument ( " Invalid domain: " + domain ) ) ;
privs | = domainObj - > GetPrivileges ( endpoint - > GetName ( ) ) ;
}
} else {
2013-09-17 15:56:54 +02:00
privs = INT_MAX ;
2013-09-17 13:18:26 +02:00
}
2013-09-17 14:39:43 +02:00
Log ( LogDebug , " cluster " , " Privileges for object ' " + object - > GetName ( ) + " ' of type ' " + object - > GetType ( ) - > GetName ( ) + " ' for instance ' " + endpoint - > GetName ( ) + " ' are ' " + Convert : : ToString ( privs ) + " ' " ) ;
2013-09-17 13:18:26 +02:00
object - > SetPrivileges ( endpoint - > GetName ( ) , privs ) ;
}
}
}
2013-08-27 12:21:41 +02:00
}
/**
* Stops the component .
*/
2013-09-25 09:36:55 +02:00
void ClusterListener : : Stop ( void )
2013-08-27 12:21:41 +02:00
{
2013-09-03 10:30:28 +02:00
ObjectLock olock ( this ) ;
2013-09-03 10:08:02 +02:00
CloseLogFile ( ) ;
2013-09-03 15:56:48 +02:00
RotateLogFile ( ) ;
2013-08-27 12:21:41 +02:00
}
2013-09-25 09:36:55 +02:00
shared_ptr < SSL_CTX > ClusterListener : : GetSSLContext ( void ) const
2013-08-27 12:21:41 +02:00
{
return m_SSLContext ;
}
/**
* Creates a new JSON - RPC listener on the specified port .
*
* @ param service The port to listen on .
*/
2013-09-25 09:36:55 +02:00
void ClusterListener : : AddListener ( const String & service )
2013-08-27 12:21:41 +02:00
{
ObjectLock olock ( this ) ;
shared_ptr < SSL_CTX > sslContext = m_SSLContext ;
if ( ! sslContext )
BOOST_THROW_EXCEPTION ( std : : logic_error ( " SSL context is required for AddListener() " ) ) ;
std : : ostringstream s ;
s < < " Adding new listener: port " < < service ;
Log ( LogInformation , " cluster " , s . str ( ) ) ;
2013-11-06 08:51:56 +01:00
TcpSocket : : Ptr server = make_shared < TcpSocket > ( ) ;
2013-08-27 12:21:41 +02:00
server - > Bind ( service , AF_INET6 ) ;
2013-09-25 09:36:55 +02:00
boost : : thread thread ( boost : : bind ( & ClusterListener : : ListenerThreadProc , this , server ) ) ;
2013-08-27 12:21:41 +02:00
thread . detach ( ) ;
m_Servers . insert ( server ) ;
}
2013-09-25 09:36:55 +02:00
void ClusterListener : : ListenerThreadProc ( const Socket : : Ptr & server )
2013-08-27 12:21:41 +02:00
{
2013-08-30 10:19:32 +02:00
Utility : : SetThreadName ( " Cluster Listener " ) ;
2013-08-27 12:21:41 +02:00
server - > Listen ( ) ;
for ( ; ; ) {
Socket : : Ptr client = server - > Accept ( ) ;
2013-09-25 09:36:55 +02:00
Utility : : QueueAsyncCallback ( boost : : bind ( & ClusterListener : : NewClientHandler , this , client , TlsRoleServer ) ) ;
2013-08-27 12:21:41 +02:00
}
}
/**
* Creates a new JSON - RPC client and connects to the specified host and port .
*
* @ param node The remote host .
* @ param service The remote port .
*/
2013-09-25 09:36:55 +02:00
void ClusterListener : : AddConnection ( const String & node , const String & service ) {
2013-08-27 12:21:41 +02:00
{
ObjectLock olock ( this ) ;
shared_ptr < SSL_CTX > sslContext = m_SSLContext ;
if ( ! sslContext )
BOOST_THROW_EXCEPTION ( std : : logic_error ( " SSL context is required for AddConnection() " ) ) ;
}
2013-11-06 08:51:56 +01:00
TcpSocket : : Ptr client = make_shared < TcpSocket > ( ) ;
2013-08-27 12:21:41 +02:00
2013-08-30 10:34:32 +02:00
client - > Connect ( node , service ) ;
2013-09-25 09:36:55 +02:00
Utility : : QueueAsyncCallback ( boost : : bind ( & ClusterListener : : NewClientHandler , this , client , TlsRoleClient ) ) ;
2013-08-27 12:21:41 +02:00
}
2013-09-25 09:36:55 +02:00
void ClusterListener : : AsyncRelayMessage ( const Endpoint : : Ptr & source , const Dictionary : : Ptr & message , bool persistent )
2013-09-02 15:12:20 +02:00
{
2013-09-25 09:36:55 +02:00
m_RelayQueue . Enqueue ( boost : : bind ( & ClusterListener : : RelayMessage , this , source , message , persistent ) ) ;
2013-09-18 09:09:16 +02:00
}
2013-09-03 10:08:02 +02:00
2013-09-25 09:36:55 +02:00
void ClusterListener : : PersistMessage ( const Endpoint : : Ptr & source , const Dictionary : : Ptr & message )
2013-09-18 09:09:16 +02:00
{
double ts = message - > Get ( " ts " ) ;
2013-09-03 10:48:34 +02:00
2013-09-18 09:09:16 +02:00
ASSERT ( ts ! = 0 ) ;
2013-09-17 13:18:26 +02:00
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr pmessage = make_shared < Dictionary > ( ) ;
2013-09-18 09:09:16 +02:00
pmessage - > Set ( " timestamp " , ts ) ;
2013-09-03 10:08:02 +02:00
2013-09-18 09:09:16 +02:00
if ( source )
pmessage - > Set ( " source " , source - > GetName ( ) ) ;
2013-11-08 11:17:46 +01:00
pmessage - > Set ( " message " , JsonSerialize ( message ) ) ;
2013-09-18 09:09:16 +02:00
pmessage - > Set ( " security " , message - > Get ( " security " ) ) ;
ObjectLock olock ( this ) ;
if ( m_LogFile ) {
2013-11-08 11:17:46 +01:00
NetString : : WriteStringToStream ( m_LogFile , JsonSerialize ( pmessage ) ) ;
2013-09-18 09:09:16 +02:00
m_LogMessageCount + + ;
2013-10-26 09:41:45 +02:00
SetLogMessageTimestamp ( ts ) ;
2013-09-18 09:09:16 +02:00
if ( m_LogMessageCount > 50000 ) {
CloseLogFile ( ) ;
RotateLogFile ( ) ;
OpenLogFile ( ) ;
2013-09-03 10:08:02 +02:00
}
}
2013-09-18 09:09:16 +02:00
}
2013-09-25 09:36:55 +02:00
void ClusterListener : : RelayMessage ( const Endpoint : : Ptr & source , const Dictionary : : Ptr & message , bool persistent )
2013-09-18 09:09:16 +02:00
{
double ts = Utility : : GetTime ( ) ;
message - > Set ( " ts " , ts ) ;
if ( persistent )
2013-09-25 09:36:55 +02:00
m_LogQueue . Enqueue ( boost : : bind ( & ClusterListener : : PersistMessage , this , source , message ) ) ;
2013-09-03 10:08:02 +02:00
2013-09-17 13:18:26 +02:00
Dictionary : : Ptr security = message - > Get ( " security " ) ;
DynamicObject : : Ptr secobj ;
2013-09-18 10:43:23 +02:00
int privs = 0 ;
2013-09-17 13:18:26 +02:00
if ( security ) {
String type = security - > Get ( " type " ) ;
DynamicType : : Ptr dtype = DynamicType : : GetByName ( type ) ;
if ( ! dtype ) {
Log ( LogWarning , " cluster " , " Invalid type in security attribute: " + type ) ;
return ;
}
String name = security - > Get ( " name " ) ;
secobj = dtype - > GetObject ( name ) ;
if ( ! secobj ) {
Log ( LogWarning , " cluster " , " Invalid object name in security attribute: " + name + " (of type ' " + type + " ') " ) ;
return ;
}
privs = security - > Get ( " privs " ) ;
}
2013-09-02 15:12:20 +02:00
BOOST_FOREACH ( const Endpoint : : Ptr & endpoint , DynamicType : : GetObjects < Endpoint > ( ) ) {
if ( ! persistent & & ! endpoint - > IsConnected ( ) )
continue ;
2013-09-17 13:18:26 +02:00
if ( endpoint = = source )
2013-09-02 15:12:20 +02:00
continue ;
if ( endpoint - > GetName ( ) = = GetIdentity ( ) )
continue ;
2013-09-17 13:18:26 +02:00
if ( secobj & & ! secobj - > HasPrivileges ( endpoint - > GetName ( ) , privs ) ) {
2013-09-17 14:38:27 +02:00
Log ( LogDebug , " cluster " , " Not sending message to endpoint ' " + endpoint - > GetName ( ) + " ': Insufficient privileges. " ) ;
2013-09-17 13:18:26 +02:00
continue ;
}
2013-09-16 09:30:31 +02:00
{
ObjectLock olock ( endpoint ) ;
2013-10-26 09:41:45 +02:00
if ( ! endpoint - > GetSyncing ( ) )
2013-09-16 09:30:31 +02:00
endpoint - > SendMessage ( message ) ;
}
2013-09-02 15:12:20 +02:00
}
}
2013-09-25 09:36:55 +02:00
String ClusterListener : : GetClusterDir ( void ) const
2013-09-03 10:08:02 +02:00
{
return Application : : GetLocalStateDir ( ) + " /lib/icinga2/cluster/ " ;
}
2013-09-25 09:36:55 +02:00
void ClusterListener : : OpenLogFile ( void )
2013-09-03 10:08:02 +02:00
{
2013-09-03 10:30:28 +02:00
ASSERT ( OwnsLock ( ) ) ;
2013-09-04 15:47:15 +02:00
String path = GetClusterDir ( ) + " log/current " ;
2013-09-03 10:08:02 +02:00
std : : fstream * fp = new std : : fstream ( path . CStr ( ) , std : : fstream : : out | std : : ofstream : : app ) ;
if ( ! fp - > good ( ) ) {
Log ( LogWarning , " cluster " , " Could not open spool file: " + path ) ;
return ;
}
2013-11-06 08:51:56 +01:00
StdioStream : : Ptr logStream = make_shared < StdioStream > ( fp , true ) ;
2013-10-15 12:59:29 +02:00
# ifdef HAVE_BIOZLIB
2013-11-06 08:51:56 +01:00
m_LogFile = make_shared < ZlibStream > ( logStream ) ;
2013-10-15 12:59:29 +02:00
# else /* HAVE_BIOZLIB */
2013-10-08 15:41:23 +02:00
m_LogFile = logStream ;
2013-10-15 12:59:29 +02:00
# endif /* HAVE_BIOZLIB */
2013-09-03 10:08:02 +02:00
m_LogMessageCount = 0 ;
2013-10-26 09:41:45 +02:00
SetLogMessageTimestamp ( 0 ) ;
2013-09-03 10:08:02 +02:00
}
2013-09-25 09:36:55 +02:00
void ClusterListener : : CloseLogFile ( void )
2013-09-03 10:08:02 +02:00
{
2013-09-03 10:30:28 +02:00
ASSERT ( OwnsLock ( ) ) ;
2013-09-03 14:05:03 +02:00
if ( ! m_LogFile )
return ;
2013-09-03 10:08:02 +02:00
m_LogFile - > Close ( ) ;
m_LogFile . reset ( ) ;
2013-09-03 10:30:28 +02:00
2013-09-03 15:56:48 +02:00
}
2013-09-25 09:36:55 +02:00
void ClusterListener : : RotateLogFile ( void )
2013-09-03 15:56:48 +02:00
{
ASSERT ( OwnsLock ( ) ) ;
2013-10-26 09:41:45 +02:00
double ts = GetLogMessageTimestamp ( ) ;
2013-09-03 15:56:48 +02:00
if ( ts = = 0 )
ts = Utility : : GetTime ( ) ;
2013-09-04 15:47:15 +02:00
String oldpath = GetClusterDir ( ) + " log/current " ;
String newpath = GetClusterDir ( ) + " log/ " + Convert : : ToString ( static_cast < int > ( ts ) + 1 ) ;
2013-09-03 15:56:48 +02:00
( void ) rename ( oldpath . CStr ( ) , newpath . CStr ( ) ) ;
2013-09-03 10:08:02 +02:00
}
2013-09-25 09:36:55 +02:00
void ClusterListener : : LogGlobHandler ( std : : vector < int > & files , const String & file )
2013-09-03 10:08:02 +02:00
{
String name = Utility : : BaseName ( file ) ;
2013-09-03 10:32:42 +02:00
int ts ;
try {
ts = Convert : : ToLong ( name ) ;
} catch ( const std : : exception & ) {
return ;
}
2013-09-03 10:08:02 +02:00
files . push_back ( ts ) ;
}
2013-09-25 09:36:55 +02:00
void ClusterListener : : ReplayLog ( const Endpoint : : Ptr & endpoint , const Stream : : Ptr & stream )
2013-09-03 10:08:02 +02:00
{
2013-11-19 07:49:41 +01:00
CONTEXT ( " Replaying log for Endpoint ' " + endpoint - > GetName ( ) + " ' " ) ;
2013-09-16 10:00:06 +02:00
int count = - 1 ;
2013-09-16 09:49:28 +02:00
double peer_ts = endpoint - > GetLocalLogPosition ( ) ;
bool last_sync = false ;
2013-09-03 10:08:02 +02:00
2013-09-16 09:30:31 +02:00
ASSERT ( ! OwnsLock ( ) ) ;
2013-09-03 10:08:02 +02:00
2013-09-16 09:30:31 +02:00
for ( ; ; ) {
ObjectLock olock ( this ) ;
2013-09-03 10:08:02 +02:00
2013-09-16 09:30:31 +02:00
CloseLogFile ( ) ;
RotateLogFile ( ) ;
2013-09-03 10:08:02 +02:00
2013-09-16 10:00:06 +02:00
if ( count = = - 1 | | count > 50000 ) {
2013-09-16 09:30:31 +02:00
OpenLogFile ( ) ;
olock . Unlock ( ) ;
2013-09-16 09:49:28 +02:00
} else {
last_sync = true ;
2013-09-16 09:30:31 +02:00
}
2013-09-03 10:30:28 +02:00
2013-09-16 09:57:30 +02:00
count = 0 ;
2013-09-16 10:06:09 +02:00
std : : vector < int > files ;
2013-11-22 09:03:52 +01:00
Utility : : Glob ( GetClusterDir ( ) + " log/* " , boost : : bind ( & ClusterListener : : LogGlobHandler , boost : : ref ( files ) , _1 ) , GlobFile ) ;
2013-09-16 10:06:09 +02:00
std : : sort ( files . begin ( ) , files . end ( ) ) ;
2013-09-16 09:30:31 +02:00
BOOST_FOREACH ( int ts , files ) {
String path = GetClusterDir ( ) + " log/ " + Convert : : ToString ( ts ) ;
2013-09-03 10:08:02 +02:00
2013-09-16 09:49:28 +02:00
if ( ts < peer_ts )
2013-09-16 09:30:31 +02:00
continue ;
Log ( LogInformation , " cluster " , " Replaying log: " + path ) ;
std : : fstream * fp = new std : : fstream ( path . CStr ( ) , std : : fstream : : in ) ;
2013-11-06 08:51:56 +01:00
StdioStream : : Ptr logStream = make_shared < StdioStream > ( fp , true ) ;
2013-10-15 12:59:29 +02:00
# ifdef HAVE_BIOZLIB
2013-11-06 08:51:56 +01:00
ZlibStream : : Ptr lstream = make_shared < ZlibStream > ( logStream ) ;
2013-10-15 12:59:29 +02:00
# else /* HAVE_BIOZLIB */
2013-10-08 15:41:23 +02:00
Stream : : Ptr lstream = logStream ;
2013-10-15 12:59:29 +02:00
# endif /* HAVE_BIOZLIB */
2013-09-03 10:08:02 +02:00
2013-09-16 09:30:31 +02:00
String message ;
while ( true ) {
2013-10-11 08:19:58 +02:00
Dictionary : : Ptr pmessage ;
2013-09-16 09:30:31 +02:00
try {
if ( ! NetString : : ReadStringFromStream ( lstream , & message ) )
break ;
2013-10-11 08:19:58 +02:00
2013-11-08 11:17:46 +01:00
pmessage = JsonDeserialize ( message ) ;
2013-09-16 09:30:31 +02:00
} catch ( std : : exception & ) {
2013-09-19 15:08:34 +02:00
Log ( LogWarning , " cluster " , " Unexpected end-of-file for cluster log: " + path ) ;
2013-09-16 09:30:31 +02:00
/* Log files may be incomplete or corrupted. This is perfectly OK. */
2013-09-03 14:05:03 +02:00
break ;
2013-09-16 09:30:31 +02:00
}
2013-09-03 14:05:03 +02:00
2013-09-16 09:49:28 +02:00
if ( pmessage - > Get ( " timestamp " ) < peer_ts )
2013-09-16 09:30:31 +02:00
continue ;
2013-09-03 10:08:02 +02:00
2013-09-17 13:18:26 +02:00
if ( pmessage - > Get ( " source " ) = = endpoint - > GetName ( ) )
2013-09-16 09:30:31 +02:00
continue ;
2013-09-17 13:18:26 +02:00
Dictionary : : Ptr security = pmessage - > Get ( " security " ) ;
DynamicObject : : Ptr secobj ;
int privs ;
if ( security ) {
String type = security - > Get ( " type " ) ;
DynamicType : : Ptr dtype = DynamicType : : GetByName ( type ) ;
if ( ! dtype ) {
2013-09-19 15:14:32 +02:00
Log ( LogDebug , " cluster " , " Invalid type in security attribute: " + type ) ;
2013-09-19 15:08:34 +02:00
continue ;
2013-09-17 13:18:26 +02:00
}
String name = security - > Get ( " name " ) ;
secobj = dtype - > GetObject ( name ) ;
if ( ! secobj ) {
2013-09-19 15:14:32 +02:00
Log ( LogDebug , " cluster " , " Invalid object name in security attribute: " + name + " (of type ' " + type + " ') " ) ;
2013-09-19 15:08:34 +02:00
continue ;
2013-09-17 13:18:26 +02:00
}
privs = security - > Get ( " privs " ) ;
}
if ( secobj & & ! secobj - > HasPrivileges ( endpoint - > GetName ( ) , privs ) ) {
2013-09-17 14:38:27 +02:00
Log ( LogDebug , " cluster " , " Not replaying message to endpoint ' " + endpoint - > GetName ( ) + " ': Insufficient privileges. " ) ;
2013-09-17 13:18:26 +02:00
continue ;
}
2013-09-16 09:30:31 +02:00
NetString : : WriteStringToStream ( stream , pmessage - > Get ( " message " ) ) ;
count + + ;
2013-09-16 09:49:28 +02:00
peer_ts = pmessage - > Get ( " timestamp " ) ;
2013-09-16 09:30:31 +02:00
}
2013-09-03 10:48:34 +02:00
2013-09-16 09:30:31 +02:00
lstream - > Close ( ) ;
2013-09-03 10:08:02 +02:00
}
2013-09-16 09:30:31 +02:00
Log ( LogInformation , " cluster " , " Replayed " + Convert : : ToString ( count ) + " messages. " ) ;
2013-09-03 10:08:02 +02:00
2013-09-16 09:49:28 +02:00
if ( last_sync ) {
2013-09-16 10:06:09 +02:00
{
ObjectLock olock2 ( endpoint ) ;
endpoint - > SetSyncing ( false ) ;
}
OpenLogFile ( ) ;
2013-09-03 10:08:02 +02:00
2013-09-16 09:30:31 +02:00
break ;
}
}
2013-09-03 10:08:02 +02:00
}
2013-09-25 09:36:55 +02:00
void ClusterListener : : ConfigGlobHandler ( const Dictionary : : Ptr & config , const String & file , bool basename )
2013-09-04 15:47:15 +02:00
{
2013-11-22 09:03:52 +01:00
CONTEXT ( " Creating config update for file ' " + file + " ' " ) ;
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr elem = make_shared < Dictionary > ( ) ;
2013-09-04 15:47:15 +02:00
std : : ifstream fp ( file . CStr ( ) ) ;
if ( ! fp )
return ;
String content ( ( std : : istreambuf_iterator < char > ( fp ) ) , std : : istreambuf_iterator < char > ( ) ) ;
elem - > Set ( " content " , content ) ;
2013-09-05 12:09:09 +02:00
config - > Set ( basename ? Utility : : BaseName ( file ) : file , elem ) ;
2013-09-04 15:47:15 +02:00
}
2013-08-27 12:21:41 +02:00
/**
* Processes a new client connection .
*
* @ param client The new client .
*/
2013-09-25 09:36:55 +02:00
void ClusterListener : : NewClientHandler ( const Socket : : Ptr & client , TlsRole role )
2013-08-27 12:21:41 +02:00
{
2013-11-19 07:49:41 +01:00
CONTEXT ( " Handling new cluster client connection " ) ;
2013-11-06 08:51:56 +01:00
NetworkStream : : Ptr netStream = make_shared < NetworkStream > ( client ) ;
2013-08-27 12:21:41 +02:00
2013-11-06 08:51:56 +01:00
TlsStream : : Ptr tlsStream = make_shared < TlsStream > ( netStream , role , m_SSLContext ) ;
2013-08-27 12:21:41 +02:00
tlsStream - > Handshake ( ) ;
shared_ptr < X509 > cert = tlsStream - > GetPeerCertificate ( ) ;
String identity = GetCertificateCN ( cert ) ;
Log ( LogInformation , " cluster " , " New client connection for identity ' " + identity + " ' " ) ;
Endpoint : : Ptr endpoint = Endpoint : : GetByName ( identity ) ;
if ( ! endpoint ) {
Log ( LogInformation , " cluster " , " Closing endpoint ' " + identity + " ': No configuration available. " ) ;
tlsStream - > Close ( ) ;
return ;
}
2013-09-16 09:30:31 +02:00
{
ObjectLock olock ( endpoint ) ;
endpoint - > SetSyncing ( true ) ;
endpoint - > SetSeen ( Utility : : GetTime ( ) ) ;
endpoint - > SetClient ( tlsStream ) ;
}
2013-09-11 10:21:43 +02:00
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr config = make_shared < Dictionary > ( ) ;
2013-09-04 15:47:15 +02:00
Array : : Ptr configFiles = endpoint - > GetConfigFiles ( ) ;
if ( configFiles ) {
2013-09-09 10:06:49 +02:00
ObjectLock olock ( configFiles ) ;
2013-09-04 15:47:15 +02:00
BOOST_FOREACH ( const String & pattern , configFiles ) {
2013-11-22 09:03:52 +01:00
Utility : : Glob ( pattern , boost : : bind ( & ClusterListener : : ConfigGlobHandler , boost : : cref ( config ) , _1 , false ) , GlobFile ) ;
2013-09-04 15:47:15 +02:00
}
}
2014-02-11 09:42:48 +01:00
Array : : Ptr configFilesRecursive = endpoint - > GetConfigFilesRecursive ( ) ;
if ( configFilesRecursive ) {
ObjectLock olock ( configFilesRecursive ) ;
BOOST_FOREACH ( const Value & configFile , configFilesRecursive ) {
if ( configFile . IsObjectType < Dictionary > ( ) ) {
Dictionary : : Ptr configFileDict = configFile ;
String path = configFileDict - > Get ( " path " ) ;
String pattern = configFileDict - > Get ( " pattern " ) ;
Utility : : GlobRecursive ( path , pattern , boost : : bind ( & ClusterListener : : ConfigGlobHandler , boost : : cref ( config ) , _1 , false ) , GlobFile ) ;
} else {
String configFilePath = configFile ;
Utility : : GlobRecursive ( configFilePath , " *.conf " , boost : : bind ( & ClusterListener : : ConfigGlobHandler , boost : : cref ( config ) , _1 , false ) , GlobFile ) ;
}
}
}
2013-10-10 23:46:45 +02:00
Log ( LogInformation , " cluster " , " Sending " + Convert : : ToString ( static_cast < long > ( config - > GetLength ( ) ) ) + " config files to endpoint ' " + endpoint - > GetName ( ) + " '. " ) ;
2013-09-26 09:39:09 +02:00
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr params = make_shared < Dictionary > ( ) ;
2013-09-06 08:48:15 +02:00
params - > Set ( " identity " , GetIdentity ( ) ) ;
2013-09-04 15:47:15 +02:00
params - > Set ( " config_files " , config ) ;
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr message = make_shared < Dictionary > ( ) ;
2013-09-04 15:47:15 +02:00
message - > Set ( " jsonrpc " , " 2.0 " ) ;
message - > Set ( " method " , " cluster::Config " ) ;
message - > Set ( " params " , params ) ;
2013-11-08 11:17:46 +01:00
NetString : : WriteStringToStream ( tlsStream , JsonSerialize ( message ) ) ;
2013-09-04 15:47:15 +02:00
2013-09-16 09:30:31 +02:00
ReplayLog ( endpoint , tlsStream ) ;
2013-08-27 12:21:41 +02:00
}
2013-09-25 09:36:55 +02:00
void ClusterListener : : ClusterTimerHandler ( void )
2013-08-27 12:21:41 +02:00
{
2013-08-30 09:34:58 +02:00
/* broadcast a heartbeat message */
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr params = make_shared < Dictionary > ( ) ;
2013-09-11 09:09:04 +02:00
params - > Set ( " identity " , GetIdentity ( ) ) ;
2013-09-11 09:40:29 +02:00
/* Eww. */
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr features = make_shared < Dictionary > ( ) ;
2013-09-12 10:17:14 +02:00
features - > Set ( " checker " , SupportsChecks ( ) ? 1 : 0 ) ;
features - > Set ( " notification " , SupportsNotifications ( ) ? 1 : 0 ) ;
2013-09-11 09:40:29 +02:00
params - > Set ( " features " , features ) ;
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr message = make_shared < Dictionary > ( ) ;
2013-08-30 09:34:58 +02:00
message - > Set ( " jsonrpc " , " 2.0 " ) ;
message - > Set ( " method " , " cluster::HeartBeat " ) ;
2013-09-11 09:09:04 +02:00
message - > Set ( " params " , params ) ;
2013-08-30 09:34:58 +02:00
2013-09-12 15:12:19 +02:00
Endpoint : : GetByName ( GetIdentity ( ) ) - > SetFeatures ( features ) ;
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( Endpoint : : Ptr ( ) , message , false ) ;
2013-08-30 09:34:58 +02:00
2013-09-11 09:40:29 +02:00
{
ObjectLock olock ( this ) ;
/* check if we've recently seen heartbeat messages from our peers */
BOOST_FOREACH ( const Endpoint : : Ptr & endpoint , DynamicType : : GetObjects < Endpoint > ( ) ) {
2013-09-11 10:21:43 +02:00
if ( endpoint - > GetSeen ( ) > Utility : : GetTime ( ) - 60 )
2013-09-11 09:40:29 +02:00
continue ;
2013-09-11 07:49:43 +02:00
2013-09-11 09:40:29 +02:00
Stream : : Ptr client = endpoint - > GetClient ( ) ;
2013-09-11 07:49:43 +02:00
2013-09-11 09:40:29 +02:00
if ( client ) {
Log ( LogWarning , " cluster " , " Closing connection for endpoint ' " + endpoint - > GetName ( ) + " ' due to inactivity. " ) ;
client - > Close ( ) ;
2013-09-11 09:42:06 +02:00
endpoint - > SetClient ( Stream : : Ptr ( ) ) ;
2013-09-11 09:40:29 +02:00
}
2013-09-11 09:18:15 +02:00
}
2013-09-11 07:49:43 +02:00
}
2013-09-16 14:01:24 +02:00
std : : vector < int > files ;
2013-11-22 09:03:52 +01:00
Utility : : Glob ( GetClusterDir ( ) + " log/* " , boost : : bind ( & ClusterListener : : LogGlobHandler , boost : : ref ( files ) , _1 ) , GlobFile ) ;
2013-09-16 14:01:24 +02:00
std : : sort ( files . begin ( ) , files . end ( ) ) ;
BOOST_FOREACH ( int ts , files ) {
bool need = false ;
BOOST_FOREACH ( const Endpoint : : Ptr & endpoint , DynamicType : : GetObjects < Endpoint > ( ) ) {
if ( endpoint - > GetName ( ) = = GetIdentity ( ) )
continue ;
double position = endpoint - > GetLocalLogPosition ( ) ;
if ( position ! = 0 & & ts > position ) {
need = true ;
break ;
}
}
if ( ! need ) {
String path = GetClusterDir ( ) + " log/ " + Convert : : ToString ( ts ) ;
Log ( LogInformation , " cluster " , " Removing old log file: " + path ) ;
( void ) unlink ( path . CStr ( ) ) ;
}
}
UpdateAuthority ( ) ;
2013-08-27 12:21:41 +02:00
Array : : Ptr peers = GetPeers ( ) ;
2013-09-16 11:08:13 +02:00
if ( peers ) {
ObjectLock olock ( peers ) ;
BOOST_FOREACH ( const String & peer , peers ) {
Endpoint : : Ptr endpoint = Endpoint : : GetByName ( peer ) ;
2013-08-27 12:21:41 +02:00
2013-09-16 11:08:13 +02:00
if ( ! endpoint ) {
Log ( LogWarning , " cluster " , " Attempted to reconnect to endpoint ' " + peer + " ': No configuration found. " ) ;
continue ;
}
2013-08-27 12:21:41 +02:00
2013-09-16 11:08:13 +02:00
if ( endpoint - > IsConnected ( ) )
continue ;
2013-08-27 12:21:41 +02:00
2013-09-16 11:08:13 +02:00
String host , port ;
host = endpoint - > GetHost ( ) ;
port = endpoint - > GetPort ( ) ;
2013-08-27 12:21:41 +02:00
2013-09-16 11:08:13 +02:00
if ( host . IsEmpty ( ) | | port . IsEmpty ( ) ) {
Log ( LogWarning , " cluster " , " Can't reconnect "
" to endpoint ' " + endpoint - > GetName ( ) + " ': No "
" host/port information. " ) ;
continue ;
}
2013-08-27 12:21:41 +02:00
2013-09-16 11:08:13 +02:00
try {
Log ( LogInformation , " cluster " , " Attempting to reconnect to cluster endpoint ' " + endpoint - > GetName ( ) + " ' via ' " + host + " : " + port + " '. " ) ;
AddConnection ( host , port ) ;
} catch ( std : : exception & ex ) {
std : : ostringstream msgbuf ;
msgbuf < < " Exception occured while reconnecting to endpoint ' "
2013-11-20 21:55:14 +01:00
< < endpoint - > GetName ( ) < < " ': " < < DiagnosticInformation ( ex ) ;
2013-09-16 11:08:13 +02:00
Log ( LogWarning , " cluster " , msgbuf . str ( ) ) ;
}
2013-09-03 15:23:47 +02:00
}
}
2013-08-27 12:21:41 +02:00
}
2013-09-25 09:36:55 +02:00
void ClusterListener : : SetSecurityInfo ( const Dictionary : : Ptr & message , const DynamicObject : : Ptr & object , int privs )
2013-09-17 13:18:26 +02:00
{
ASSERT ( object ) ;
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr security = make_shared < Dictionary > ( ) ;
2013-09-17 13:18:26 +02:00
security - > Set ( " type " , object - > GetType ( ) - > GetName ( ) ) ;
security - > Set ( " name " , object - > GetName ( ) ) ;
security - > Set ( " privs " , privs ) ;
message - > Set ( " security " , security ) ;
}
2013-11-09 14:22:38 +01:00
void ClusterListener : : CheckResultHandler ( const Service : : Ptr & service , const CheckResult : : Ptr & cr , const String & authority )
2013-08-27 12:21:41 +02:00
{
2013-08-28 11:12:20 +02:00
if ( ! authority . IsEmpty ( ) & & authority ! = GetIdentity ( ) )
2013-08-27 12:21:41 +02:00
return ;
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr params = make_shared < Dictionary > ( ) ;
2013-08-27 12:21:41 +02:00
params - > Set ( " service " , service - > GetName ( ) ) ;
2013-11-20 15:33:04 +01:00
params - > Set ( " check_result " , Serialize ( cr ) ) ;
2013-08-27 12:21:41 +02:00
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr message = make_shared < Dictionary > ( ) ;
2013-08-27 12:21:41 +02:00
message - > Set ( " jsonrpc " , " 2.0 " ) ;
message - > Set ( " method " , " cluster::CheckResult " ) ;
message - > Set ( " params " , params ) ;
2013-09-17 13:18:26 +02:00
SetSecurityInfo ( message , service , DomainPrivRead ) ;
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( Endpoint : : Ptr ( ) , message , true ) ;
2013-08-27 12:21:41 +02:00
}
2013-09-25 09:36:55 +02:00
void ClusterListener : : NextCheckChangedHandler ( const Service : : Ptr & service , double nextCheck , const String & authority )
2013-08-28 11:12:20 +02:00
{
if ( ! authority . IsEmpty ( ) & & authority ! = GetIdentity ( ) )
return ;
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr params = make_shared < Dictionary > ( ) ;
2013-08-28 11:12:20 +02:00
params - > Set ( " service " , service - > GetName ( ) ) ;
params - > Set ( " next_check " , nextCheck ) ;
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr message = make_shared < Dictionary > ( ) ;
2013-08-28 11:12:20 +02:00
message - > Set ( " jsonrpc " , " 2.0 " ) ;
message - > Set ( " method " , " cluster::SetNextCheck " ) ;
message - > Set ( " params " , params ) ;
2013-09-17 13:18:26 +02:00
SetSecurityInfo ( message , service , DomainPrivRead ) ;
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( Endpoint : : Ptr ( ) , message , true ) ;
2013-08-28 11:12:20 +02:00
}
2013-09-25 09:36:55 +02:00
void ClusterListener : : NextNotificationChangedHandler ( const Notification : : Ptr & notification , double nextNotification , const String & authority )
2013-08-28 14:59:41 +02:00
{
if ( ! authority . IsEmpty ( ) & & authority ! = GetIdentity ( ) )
return ;
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr params = make_shared < Dictionary > ( ) ;
2013-08-28 14:59:41 +02:00
params - > Set ( " notification " , notification - > GetName ( ) ) ;
params - > Set ( " next_notification " , nextNotification ) ;
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr message = make_shared < Dictionary > ( ) ;
2013-08-28 14:59:41 +02:00
message - > Set ( " jsonrpc " , " 2.0 " ) ;
message - > Set ( " method " , " cluster::SetNextNotification " ) ;
message - > Set ( " params " , params ) ;
2013-09-17 13:18:26 +02:00
SetSecurityInfo ( message , notification - > GetService ( ) , DomainPrivRead ) ;
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( Endpoint : : Ptr ( ) , message , true ) ;
2013-08-28 14:59:41 +02:00
}
2013-09-25 09:36:55 +02:00
void ClusterListener : : ForceNextCheckChangedHandler ( const Service : : Ptr & service , bool forced , const String & authority )
2013-08-28 11:12:20 +02:00
{
if ( ! authority . IsEmpty ( ) & & authority ! = GetIdentity ( ) )
return ;
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr params = make_shared < Dictionary > ( ) ;
2013-08-28 11:12:20 +02:00
params - > Set ( " service " , service - > GetName ( ) ) ;
params - > Set ( " forced " , forced ) ;
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr message = make_shared < Dictionary > ( ) ;
2013-08-28 11:12:20 +02:00
message - > Set ( " jsonrpc " , " 2.0 " ) ;
message - > Set ( " method " , " cluster::SetForceNextCheck " ) ;
message - > Set ( " params " , params ) ;
2013-09-17 13:18:26 +02:00
SetSecurityInfo ( message , service , DomainPrivRead ) ;
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( Endpoint : : Ptr ( ) , message , true ) ;
2013-08-28 11:12:20 +02:00
}
2013-09-25 09:36:55 +02:00
void ClusterListener : : ForceNextNotificationChangedHandler ( const Service : : Ptr & service , bool forced , const String & authority )
2013-08-29 11:37:51 +02:00
{
if ( ! authority . IsEmpty ( ) & & authority ! = GetIdentity ( ) )
return ;
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr params = make_shared < Dictionary > ( ) ;
2013-08-29 11:37:51 +02:00
params - > Set ( " service " , service - > GetName ( ) ) ;
params - > Set ( " forced " , forced ) ;
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr message = make_shared < Dictionary > ( ) ;
2013-08-29 11:37:51 +02:00
message - > Set ( " jsonrpc " , " 2.0 " ) ;
message - > Set ( " method " , " cluster::SetForceNextNotification " ) ;
message - > Set ( " params " , params ) ;
2013-09-17 13:18:26 +02:00
SetSecurityInfo ( message , service , DomainPrivRead ) ;
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( Endpoint : : Ptr ( ) , message , true ) ;
2013-08-29 11:37:51 +02:00
}
2013-09-25 09:36:55 +02:00
void ClusterListener : : EnableActiveChecksChangedHandler ( const Service : : Ptr & service , bool enabled , const String & authority )
2013-08-28 11:12:20 +02:00
{
if ( ! authority . IsEmpty ( ) & & authority ! = GetIdentity ( ) )
return ;
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr params = make_shared < Dictionary > ( ) ;
2013-08-28 11:12:20 +02:00
params - > Set ( " service " , service - > GetName ( ) ) ;
params - > Set ( " enabled " , enabled ) ;
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr message = make_shared < Dictionary > ( ) ;
2013-08-28 11:12:20 +02:00
message - > Set ( " jsonrpc " , " 2.0 " ) ;
message - > Set ( " method " , " cluster::SetEnableActiveChecks " ) ;
message - > Set ( " params " , params ) ;
2013-09-17 13:18:26 +02:00
SetSecurityInfo ( message , service , DomainPrivRead ) ;
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( Endpoint : : Ptr ( ) , message , true ) ;
2013-08-28 11:12:20 +02:00
}
2013-09-25 09:36:55 +02:00
void ClusterListener : : EnablePassiveChecksChangedHandler ( const Service : : Ptr & service , bool enabled , const String & authority )
2013-08-28 11:12:20 +02:00
{
if ( ! authority . IsEmpty ( ) & & authority ! = GetIdentity ( ) )
return ;
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr params = make_shared < Dictionary > ( ) ;
2013-08-28 11:12:20 +02:00
params - > Set ( " service " , service - > GetName ( ) ) ;
params - > Set ( " enabled " , enabled ) ;
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr message = make_shared < Dictionary > ( ) ;
2013-08-28 11:12:20 +02:00
message - > Set ( " jsonrpc " , " 2.0 " ) ;
message - > Set ( " method " , " cluster::SetEnablePassiveChecks " ) ;
message - > Set ( " params " , params ) ;
2013-09-17 13:18:26 +02:00
SetSecurityInfo ( message , service , DomainPrivRead ) ;
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( Endpoint : : Ptr ( ) , message , true ) ;
2013-08-28 11:12:20 +02:00
}
2013-09-25 09:36:55 +02:00
void ClusterListener : : EnableNotificationsChangedHandler ( const Service : : Ptr & service , bool enabled , const String & authority )
2013-08-29 13:06:36 +02:00
{
if ( ! authority . IsEmpty ( ) & & authority ! = GetIdentity ( ) )
return ;
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr params = make_shared < Dictionary > ( ) ;
2013-08-29 13:06:36 +02:00
params - > Set ( " service " , service - > GetName ( ) ) ;
params - > Set ( " enabled " , enabled ) ;
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr message = make_shared < Dictionary > ( ) ;
2013-08-29 13:06:36 +02:00
message - > Set ( " jsonrpc " , " 2.0 " ) ;
message - > Set ( " method " , " cluster::SetEnableNotifications " ) ;
message - > Set ( " params " , params ) ;
2013-09-17 13:18:26 +02:00
SetSecurityInfo ( message , service , DomainPrivRead ) ;
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( Endpoint : : Ptr ( ) , message , true ) ;
2013-08-29 13:06:36 +02:00
}
2013-09-25 09:36:55 +02:00
void ClusterListener : : EnableFlappingChangedHandler ( const Service : : Ptr & service , bool enabled , const String & authority )
2013-08-29 13:06:36 +02:00
{
if ( ! authority . IsEmpty ( ) & & authority ! = GetIdentity ( ) )
return ;
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr params = make_shared < Dictionary > ( ) ;
2013-08-29 13:06:36 +02:00
params - > Set ( " service " , service - > GetName ( ) ) ;
params - > Set ( " enabled " , enabled ) ;
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr message = make_shared < Dictionary > ( ) ;
2013-08-29 13:06:36 +02:00
message - > Set ( " jsonrpc " , " 2.0 " ) ;
message - > Set ( " method " , " cluster::SetEnableFlapping " ) ;
message - > Set ( " params " , params ) ;
2013-09-17 13:18:26 +02:00
SetSecurityInfo ( message , service , DomainPrivRead ) ;
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( Endpoint : : Ptr ( ) , message , true ) ;
2013-08-29 13:06:36 +02:00
}
2013-11-09 21:19:52 +01:00
void ClusterListener : : CommentAddedHandler ( const Service : : Ptr & service , const Comment : : Ptr & comment , const String & authority )
2013-08-28 14:59:41 +02:00
{
if ( ! authority . IsEmpty ( ) & & authority ! = GetIdentity ( ) )
return ;
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr params = make_shared < Dictionary > ( ) ;
2013-08-28 14:59:41 +02:00
params - > Set ( " service " , service - > GetName ( ) ) ;
2013-11-20 15:33:04 +01:00
params - > Set ( " comment " , Serialize ( comment ) ) ;
2013-08-28 14:59:41 +02:00
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr message = make_shared < Dictionary > ( ) ;
2013-08-28 14:59:41 +02:00
message - > Set ( " jsonrpc " , " 2.0 " ) ;
message - > Set ( " method " , " cluster::AddComment " ) ;
message - > Set ( " params " , params ) ;
2013-09-17 13:18:26 +02:00
SetSecurityInfo ( message , service , DomainPrivRead ) ;
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( Endpoint : : Ptr ( ) , message , true ) ;
2013-08-28 14:59:41 +02:00
}
2013-11-09 21:19:52 +01:00
void ClusterListener : : CommentRemovedHandler ( const Service : : Ptr & service , const Comment : : Ptr & comment , const String & authority )
2013-08-28 14:59:41 +02:00
{
if ( ! authority . IsEmpty ( ) & & authority ! = GetIdentity ( ) )
return ;
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr params = make_shared < Dictionary > ( ) ;
2013-08-28 14:59:41 +02:00
params - > Set ( " service " , service - > GetName ( ) ) ;
2013-11-09 21:19:52 +01:00
params - > Set ( " id " , comment - > GetId ( ) ) ;
2013-08-28 14:59:41 +02:00
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr message = make_shared < Dictionary > ( ) ;
2013-08-28 14:59:41 +02:00
message - > Set ( " jsonrpc " , " 2.0 " ) ;
message - > Set ( " method " , " cluster::RemoveComment " ) ;
message - > Set ( " params " , params ) ;
2013-09-17 13:18:26 +02:00
SetSecurityInfo ( message , service , DomainPrivRead ) ;
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( Endpoint : : Ptr ( ) , message , true ) ;
2013-08-28 14:59:41 +02:00
}
2013-11-09 22:08:26 +01:00
void ClusterListener : : DowntimeAddedHandler ( const Service : : Ptr & service , const Downtime : : Ptr & downtime , const String & authority )
2013-08-28 16:08:22 +02:00
{
if ( ! authority . IsEmpty ( ) & & authority ! = GetIdentity ( ) )
return ;
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr params = make_shared < Dictionary > ( ) ;
2013-08-28 16:08:22 +02:00
params - > Set ( " service " , service - > GetName ( ) ) ;
2013-11-20 15:33:04 +01:00
params - > Set ( " downtime " , Serialize ( downtime ) ) ;
2013-08-28 16:08:22 +02:00
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr message = make_shared < Dictionary > ( ) ;
2013-08-28 16:08:22 +02:00
message - > Set ( " jsonrpc " , " 2.0 " ) ;
message - > Set ( " method " , " cluster::AddDowntime " ) ;
message - > Set ( " params " , params ) ;
2013-09-17 13:18:26 +02:00
SetSecurityInfo ( message , service , DomainPrivRead ) ;
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( Endpoint : : Ptr ( ) , message , true ) ;
2013-08-28 16:08:22 +02:00
}
2013-11-09 22:08:26 +01:00
void ClusterListener : : DowntimeRemovedHandler ( const Service : : Ptr & service , const Downtime : : Ptr & downtime , const String & authority )
2013-08-28 16:08:22 +02:00
{
if ( ! authority . IsEmpty ( ) & & authority ! = GetIdentity ( ) )
return ;
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr params = make_shared < Dictionary > ( ) ;
2013-08-28 16:08:22 +02:00
params - > Set ( " service " , service - > GetName ( ) ) ;
2013-11-09 22:08:26 +01:00
params - > Set ( " id " , downtime - > GetId ( ) ) ;
2013-08-28 16:08:22 +02:00
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr message = make_shared < Dictionary > ( ) ;
2013-08-28 16:08:22 +02:00
message - > Set ( " jsonrpc " , " 2.0 " ) ;
message - > Set ( " method " , " cluster::RemoveDowntime " ) ;
message - > Set ( " params " , params ) ;
2013-09-17 13:18:26 +02:00
SetSecurityInfo ( message , service , DomainPrivRead ) ;
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( Endpoint : : Ptr ( ) , message , true ) ;
2013-08-28 16:08:22 +02:00
}
2013-09-25 09:36:55 +02:00
void ClusterListener : : AcknowledgementSetHandler ( const Service : : Ptr & service , const String & author , const String & comment , AcknowledgementType type , double expiry , const String & authority )
2013-08-29 13:48:18 +02:00
{
if ( ! authority . IsEmpty ( ) & & authority ! = GetIdentity ( ) )
return ;
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr params = make_shared < Dictionary > ( ) ;
2013-08-29 13:48:18 +02:00
params - > Set ( " service " , service - > GetName ( ) ) ;
params - > Set ( " author " , author ) ;
params - > Set ( " comment " , comment ) ;
params - > Set ( " type " , type ) ;
params - > Set ( " expiry " , expiry ) ;
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr message = make_shared < Dictionary > ( ) ;
2013-08-29 13:48:18 +02:00
message - > Set ( " jsonrpc " , " 2.0 " ) ;
message - > Set ( " method " , " cluster::SetAcknowledgement " ) ;
message - > Set ( " params " , params ) ;
2013-09-17 13:18:26 +02:00
SetSecurityInfo ( message , service , DomainPrivRead ) ;
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( Endpoint : : Ptr ( ) , message , true ) ;
2013-08-29 13:48:18 +02:00
}
2013-09-25 09:36:55 +02:00
void ClusterListener : : AcknowledgementClearedHandler ( const Service : : Ptr & service , const String & authority )
2013-08-29 13:48:18 +02:00
{
if ( ! authority . IsEmpty ( ) & & authority ! = GetIdentity ( ) )
return ;
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr params = make_shared < Dictionary > ( ) ;
2013-08-29 13:48:18 +02:00
params - > Set ( " service " , service - > GetName ( ) ) ;
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr message = make_shared < Dictionary > ( ) ;
2013-08-29 13:48:18 +02:00
message - > Set ( " jsonrpc " , " 2.0 " ) ;
message - > Set ( " method " , " cluster::ClearAcknowledgement " ) ;
message - > Set ( " params " , params ) ;
2013-09-17 13:18:26 +02:00
SetSecurityInfo ( message , service , DomainPrivRead ) ;
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( Endpoint : : Ptr ( ) , message , true ) ;
2013-08-29 13:48:18 +02:00
}
2013-09-25 09:36:55 +02:00
void ClusterListener : : AsyncMessageHandler ( const Endpoint : : Ptr & sender , const Dictionary : : Ptr & message )
2013-09-18 09:09:16 +02:00
{
2013-09-25 09:36:55 +02:00
m_MessageQueue . Enqueue ( boost : : bind ( & ClusterListener : : MessageHandler , this , sender , message ) ) ;
2013-09-18 09:09:16 +02:00
}
2013-11-20 15:33:04 +01:00
void ClusterListener : : MessageExceptionHandler ( boost : : exception_ptr exp )
{
2013-11-20 21:55:14 +01:00
Log ( LogCritical , " cluster " , " Exception while processing cluster message: " + DiagnosticInformation ( exp ) ) ;
2013-11-20 15:33:04 +01:00
}
2013-09-25 09:36:55 +02:00
void ClusterListener : : MessageHandler ( const Endpoint : : Ptr & sender , const Dictionary : : Ptr & message )
2013-08-27 12:21:41 +02:00
{
2013-11-19 07:49:41 +01:00
CONTEXT ( " Processing cluster message of type ' " + message - > Get ( " method " ) + " ' " ) ;
2013-09-11 17:07:07 +02:00
sender - > SetSeen ( Utility : : GetTime ( ) ) ;
2013-09-06 14:05:50 +02:00
if ( message - > Contains ( " ts " ) ) {
double ts = message - > Get ( " ts " ) ;
2013-09-03 10:08:02 +02:00
2013-09-06 14:05:50 +02:00
/* ignore old messages */
if ( ts < sender - > GetRemoteLogPosition ( ) )
return ;
if ( sender - > GetRemoteLogPosition ( ) + 10 < ts ) {
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr lparams = make_shared < Dictionary > ( ) ;
2013-09-06 14:05:50 +02:00
lparams - > Set ( " log_position " , message - > Get ( " ts " ) ) ;
2013-09-03 10:08:02 +02:00
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr lmessage = make_shared < Dictionary > ( ) ;
2013-09-06 14:05:50 +02:00
lmessage - > Set ( " jsonrpc " , " 2.0 " ) ;
lmessage - > Set ( " method " , " cluster::SetLogPosition " ) ;
lmessage - > Set ( " params " , lparams ) ;
2013-09-03 10:12:07 +02:00
2013-09-06 14:05:50 +02:00
sender - > SendMessage ( lmessage ) ;
2013-09-06 08:48:15 +02:00
2013-09-06 14:05:50 +02:00
sender - > SetRemoteLogPosition ( message - > Get ( " ts " ) ) ;
Log ( LogInformation , " cluster " , " Acknowledging log position for identity ' " + sender - > GetName ( ) + " ': " + Utility : : FormatDateTime ( " %Y/%m/%d %H:%M:%S " , message - > Get ( " ts " ) ) ) ;
}
2013-09-03 10:08:02 +02:00
}
2013-08-28 11:12:20 +02:00
Dictionary : : Ptr params = message - > Get ( " params " ) ;
2013-09-11 09:09:04 +02:00
if ( message - > Get ( " method " ) = = " cluster::HeartBeat " ) {
2013-09-17 13:18:26 +02:00
if ( ! params )
return ;
2013-09-11 09:09:04 +02:00
String identity = params - > Get ( " identity " ) ;
Endpoint : : Ptr endpoint = Endpoint : : GetByName ( identity ) ;
2013-09-12 10:22:03 +02:00
if ( endpoint ) {
2013-09-11 09:09:04 +02:00
endpoint - > SetSeen ( Utility : : GetTime ( ) ) ;
2013-09-12 10:22:03 +02:00
endpoint - > SetFeatures ( params - > Get ( " features " ) ) ;
}
2013-09-17 13:18:26 +02:00
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( sender , message , true ) ;
2013-09-11 09:09:04 +02:00
} else if ( message - > Get ( " method " ) = = " cluster::CheckResult " ) {
2013-09-17 13:18:26 +02:00
if ( ! params )
return ;
2013-08-28 11:12:20 +02:00
String svc = params - > Get ( " service " ) ;
2013-08-27 12:21:41 +02:00
2013-08-28 11:12:20 +02:00
Service : : Ptr service = Service : : GetByName ( svc ) ;
2013-09-17 14:58:52 +02:00
if ( ! service )
2013-08-27 12:21:41 +02:00
return ;
2014-03-07 14:49:08 +01:00
if ( ! service - > HasPrivileges ( sender - > GetName ( ) , DomainPrivCheckResult ) ) {
2013-09-19 14:26:57 +02:00
Log ( LogDebug , " cluster " , " Not accepting cluster::CheckResult message from endpoint ' " + sender - > GetName ( ) + " ' for service ' " + service - > GetName ( ) + " ': Insufficient privileges. " ) ;
2013-09-17 14:58:52 +02:00
return ;
}
2013-12-18 10:18:57 +01:00
CheckResult : : Ptr cr = Deserialize ( params - > Get ( " check_result " ) , true ) ;
2013-08-27 12:21:41 +02:00
if ( ! cr )
return ;
2013-08-28 11:12:20 +02:00
service - > ProcessCheckResult ( cr , sender - > GetName ( ) ) ;
2013-09-17 13:18:26 +02:00
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( sender , message , true ) ;
2013-08-28 11:12:20 +02:00
} else if ( message - > Get ( " method " ) = = " cluster::SetNextCheck " ) {
2013-09-17 13:18:26 +02:00
if ( ! params )
return ;
2013-08-28 11:12:20 +02:00
String svc = params - > Get ( " service " ) ;
Service : : Ptr service = Service : : GetByName ( svc ) ;
2013-09-17 14:58:52 +02:00
if ( ! service )
2013-08-28 11:12:20 +02:00
return ;
2013-09-17 14:58:52 +02:00
if ( ! service - > HasPrivileges ( sender - > GetName ( ) , DomainPrivCommand ) ) {
2013-09-19 14:26:57 +02:00
Log ( LogDebug , " cluster " , " Not accepting cluster::SetNextCheck message from endpoint ' " + sender - > GetName ( ) + " ' for service ' " + service - > GetName ( ) + " ': Insufficient privileges. " ) ;
2013-09-17 14:58:52 +02:00
return ;
}
2013-08-28 11:12:20 +02:00
double nextCheck = params - > Get ( " next_check " ) ;
service - > SetNextCheck ( nextCheck , sender - > GetName ( ) ) ;
2013-09-17 13:18:26 +02:00
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( sender , message , true ) ;
2013-08-28 11:12:20 +02:00
} else if ( message - > Get ( " method " ) = = " cluster::SetForceNextCheck " ) {
2013-09-17 13:18:26 +02:00
if ( ! params )
return ;
2013-08-28 11:12:20 +02:00
String svc = params - > Get ( " service " ) ;
Service : : Ptr service = Service : : GetByName ( svc ) ;
2013-09-17 14:58:52 +02:00
if ( ! service )
2013-08-28 11:12:20 +02:00
return ;
2013-09-17 14:58:52 +02:00
if ( ! service - > HasPrivileges ( sender - > GetName ( ) , DomainPrivCommand ) ) {
2013-09-19 14:26:57 +02:00
Log ( LogDebug , " cluster " , " Not accepting cluster::SetForceNextCheck message from endpoint ' " + sender - > GetName ( ) + " ' for service ' " + service - > GetName ( ) + " ': Insufficient privileges. " ) ;
2013-09-17 14:58:52 +02:00
return ;
}
2013-08-28 11:12:20 +02:00
bool forced = params - > Get ( " forced " ) ;
service - > SetForceNextCheck ( forced , sender - > GetName ( ) ) ;
2013-09-17 13:18:26 +02:00
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( sender , message , true ) ;
2013-08-29 11:37:51 +02:00
} else if ( message - > Get ( " method " ) = = " cluster::SetForceNextNotification " ) {
2013-09-17 13:18:26 +02:00
if ( ! params )
return ;
2013-08-29 11:37:51 +02:00
String svc = params - > Get ( " service " ) ;
Service : : Ptr service = Service : : GetByName ( svc ) ;
2013-09-17 14:58:52 +02:00
if ( ! service )
2013-08-29 11:37:51 +02:00
return ;
2013-09-17 14:58:52 +02:00
if ( ! service - > HasPrivileges ( sender - > GetName ( ) , DomainPrivCommand ) ) {
2013-09-19 14:26:57 +02:00
Log ( LogDebug , " cluster " , " Not accepting cluster::SetForceNextNotification message from endpoint ' " + sender - > GetName ( ) + " ' for service ' " + service - > GetName ( ) + " ': Insufficient privileges. " ) ;
2013-09-17 14:58:52 +02:00
return ;
}
2013-08-29 11:37:51 +02:00
bool forced = params - > Get ( " forced " ) ;
service - > SetForceNextNotification ( forced , sender - > GetName ( ) ) ;
2013-09-17 13:18:26 +02:00
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( sender , message , true ) ;
2013-08-28 11:12:20 +02:00
} else if ( message - > Get ( " method " ) = = " cluster::SetEnableActiveChecks " ) {
2013-09-17 13:18:26 +02:00
if ( ! params )
return ;
2013-08-28 11:12:20 +02:00
String svc = params - > Get ( " service " ) ;
Service : : Ptr service = Service : : GetByName ( svc ) ;
2013-09-17 14:58:52 +02:00
if ( ! service )
2013-08-28 11:12:20 +02:00
return ;
2013-09-17 14:58:52 +02:00
if ( ! service - > HasPrivileges ( sender - > GetName ( ) , DomainPrivCommand ) ) {
2013-09-19 14:26:57 +02:00
Log ( LogDebug , " cluster " , " Not accepting cluster::SetEnableActiveChecks message from endpoint ' " + sender - > GetName ( ) + " ' for service ' " + service - > GetName ( ) + " ': Insufficient privileges. " ) ;
2013-09-17 14:58:52 +02:00
return ;
}
2013-08-28 11:12:20 +02:00
bool enabled = params - > Get ( " enabled " ) ;
service - > SetEnableActiveChecks ( enabled , sender - > GetName ( ) ) ;
2013-09-17 13:18:26 +02:00
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( sender , message , true ) ;
2013-08-28 11:12:20 +02:00
} else if ( message - > Get ( " method " ) = = " cluster::SetEnablePassiveChecks " ) {
2013-09-17 13:18:26 +02:00
if ( ! params )
return ;
2013-08-27 12:21:41 +02:00
String svc = params - > Get ( " service " ) ;
Service : : Ptr service = Service : : GetByName ( svc ) ;
2013-09-17 14:58:52 +02:00
if ( ! service )
2013-08-27 12:21:41 +02:00
return ;
2013-09-17 14:58:52 +02:00
if ( ! service - > HasPrivileges ( sender - > GetName ( ) , DomainPrivCommand ) ) {
2013-09-19 14:26:57 +02:00
Log ( LogDebug , " cluster " , " Not accepting cluster::SetEnablePassiveChecks message from endpoint ' " + sender - > GetName ( ) + " ' for service ' " + service - > GetName ( ) + " ': Insufficient privileges. " ) ;
2013-09-17 14:58:52 +02:00
return ;
}
2013-08-28 11:12:20 +02:00
bool enabled = params - > Get ( " enabled " ) ;
2013-08-27 15:57:00 +02:00
2013-08-28 11:12:20 +02:00
service - > SetEnablePassiveChecks ( enabled , sender - > GetName ( ) ) ;
2013-09-17 13:18:26 +02:00
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( sender , message , true ) ;
2013-08-29 13:06:36 +02:00
} else if ( message - > Get ( " method " ) = = " cluster::SetEnableNotifications " ) {
2013-09-17 13:18:26 +02:00
if ( ! params )
return ;
2013-08-29 13:06:36 +02:00
String svc = params - > Get ( " service " ) ;
Service : : Ptr service = Service : : GetByName ( svc ) ;
2013-09-17 14:58:52 +02:00
if ( ! service )
2013-08-29 13:06:36 +02:00
return ;
2013-09-17 14:58:52 +02:00
if ( ! service - > HasPrivileges ( sender - > GetName ( ) , DomainPrivCommand ) ) {
2013-09-19 14:26:57 +02:00
Log ( LogDebug , " cluster " , " Not accepting cluster::SetEnableNotifications message from endpoint ' " + sender - > GetName ( ) + " ' for service ' " + service - > GetName ( ) + " ': Insufficient privileges. " ) ;
2013-09-17 14:58:52 +02:00
return ;
}
2013-08-29 13:06:36 +02:00
bool enabled = params - > Get ( " enabled " ) ;
service - > SetEnableNotifications ( enabled , sender - > GetName ( ) ) ;
2013-09-17 13:18:26 +02:00
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( sender , message , true ) ;
2013-08-29 13:06:36 +02:00
} else if ( message - > Get ( " method " ) = = " cluster::SetEnableFlapping " ) {
2013-09-17 13:18:26 +02:00
if ( ! params )
return ;
2013-08-29 13:06:36 +02:00
String svc = params - > Get ( " service " ) ;
Service : : Ptr service = Service : : GetByName ( svc ) ;
2013-09-17 14:58:52 +02:00
if ( ! service )
return ;
if ( ! service - > HasPrivileges ( sender - > GetName ( ) , DomainPrivCommand ) ) {
2013-09-19 14:26:57 +02:00
Log ( LogDebug , " cluster " , " Not accepting cluster::SetEnableFlapping message from endpoint ' " + sender - > GetName ( ) + " ' for service ' " + service - > GetName ( ) + " ': Insufficient privileges. " ) ;
2013-08-29 13:06:36 +02:00
return ;
2013-09-17 14:58:52 +02:00
}
2013-08-29 13:06:36 +02:00
bool enabled = params - > Get ( " enabled " ) ;
service - > SetEnableFlapping ( enabled , sender - > GetName ( ) ) ;
2013-09-17 13:18:26 +02:00
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( sender , message , true ) ;
2013-08-28 14:59:41 +02:00
} else if ( message - > Get ( " method " ) = = " cluster::SetNextNotification " ) {
2013-09-17 13:18:26 +02:00
if ( ! params )
return ;
2013-08-28 14:59:41 +02:00
String nfc = params - > Get ( " notification " ) ;
Notification : : Ptr notification = Notification : : GetByName ( nfc ) ;
2013-09-17 14:58:52 +02:00
if ( ! notification )
return ;
Service : : Ptr service = notification - > GetService ( ) ;
if ( ! service - > HasPrivileges ( sender - > GetName ( ) , DomainPrivCommand ) ) {
2013-09-19 14:26:57 +02:00
Log ( LogDebug , " cluster " , " Not accepting cluster::SetNextNotification message from endpoint ' " + sender - > GetName ( ) + " ' for service ' " + service - > GetName ( ) + " ': Insufficient privileges. " ) ;
2013-08-28 14:59:41 +02:00
return ;
2013-09-17 14:58:52 +02:00
}
2013-08-28 14:59:41 +02:00
bool nextNotification = params - > Get ( " next_notification " ) ;
notification - > SetNextNotification ( nextNotification , sender - > GetName ( ) ) ;
2013-09-17 13:18:26 +02:00
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( sender , message , true ) ;
2013-08-28 14:59:41 +02:00
} else if ( message - > Get ( " method " ) = = " cluster::AddComment " ) {
2013-09-17 13:18:26 +02:00
if ( ! params )
return ;
2013-08-28 14:59:41 +02:00
String svc = params - > Get ( " service " ) ;
Service : : Ptr service = Service : : GetByName ( svc ) ;
2013-09-17 14:58:52 +02:00
if ( ! service )
return ;
if ( ! service - > HasPrivileges ( sender - > GetName ( ) , DomainPrivCommand ) ) {
2013-09-19 14:26:57 +02:00
Log ( LogDebug , " cluster " , " Not accepting cluster::AddComment message from endpoint ' " + sender - > GetName ( ) + " ' for service ' " + service - > GetName ( ) + " ': Insufficient privileges. " ) ;
2013-08-28 14:59:41 +02:00
return ;
2013-09-17 14:58:52 +02:00
}
2013-08-28 14:59:41 +02:00
2013-12-18 10:18:57 +01:00
Comment : : Ptr comment = Deserialize ( params - > Get ( " comment " ) , true ) ;
2013-08-28 14:59:41 +02:00
2013-11-09 21:19:52 +01:00
service - > AddComment ( comment - > GetEntryType ( ) , comment - > GetAuthor ( ) ,
comment - > GetText ( ) , comment - > GetExpireTime ( ) , comment - > GetId ( ) , sender - > GetName ( ) ) ;
2013-09-17 13:18:26 +02:00
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( sender , message , true ) ;
2013-08-28 14:59:41 +02:00
} else if ( message - > Get ( " method " ) = = " cluster::RemoveComment " ) {
2013-09-17 13:18:26 +02:00
if ( ! params )
return ;
2013-08-28 14:59:41 +02:00
String svc = params - > Get ( " service " ) ;
Service : : Ptr service = Service : : GetByName ( svc ) ;
2013-09-17 14:58:52 +02:00
if ( ! service )
return ;
if ( ! service - > HasPrivileges ( sender - > GetName ( ) , DomainPrivCommand ) ) {
2013-09-19 14:26:57 +02:00
Log ( LogDebug , " cluster " , " Not accepting cluster::RemoveComment message from endpoint ' " + sender - > GetName ( ) + " ' for service ' " + service - > GetName ( ) + " ': Insufficient privileges. " ) ;
2013-08-28 14:59:41 +02:00
return ;
2013-09-17 14:58:52 +02:00
}
2013-08-28 14:59:41 +02:00
String id = params - > Get ( " id " ) ;
service - > RemoveComment ( id , sender - > GetName ( ) ) ;
2013-09-17 13:18:26 +02:00
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( sender , message , true ) ;
2013-08-28 16:08:22 +02:00
} else if ( message - > Get ( " method " ) = = " cluster::AddDowntime " ) {
2013-09-17 13:18:26 +02:00
if ( ! params )
return ;
2013-08-28 16:08:22 +02:00
String svc = params - > Get ( " service " ) ;
Service : : Ptr service = Service : : GetByName ( svc ) ;
2013-09-17 14:58:52 +02:00
if ( ! service )
2013-08-28 16:08:22 +02:00
return ;
2013-09-17 14:58:52 +02:00
if ( ! service - > HasPrivileges ( sender - > GetName ( ) , DomainPrivCommand ) ) {
2013-09-19 14:26:57 +02:00
Log ( LogDebug , " cluster " , " Not accepting cluster::AddDowntime message from endpoint ' " + sender - > GetName ( ) + " ' for service ' " + service - > GetName ( ) + " ': Insufficient privileges. " ) ;
2013-09-17 14:58:52 +02:00
return ;
}
2013-12-18 10:18:57 +01:00
Downtime : : Ptr downtime = Deserialize ( params - > Get ( " downtime " ) , true ) ;
2013-08-28 16:08:22 +02:00
2013-11-09 22:08:26 +01:00
service - > AddDowntime ( downtime - > GetAuthor ( ) , downtime - > GetComment ( ) ,
downtime - > GetStartTime ( ) , downtime - > GetEndTime ( ) ,
downtime - > GetFixed ( ) , downtime - > GetTriggeredBy ( ) ,
2013-11-13 14:56:31 +01:00
downtime - > GetDuration ( ) , downtime - > GetScheduledBy ( ) ,
downtime - > GetId ( ) , sender - > GetName ( ) ) ;
2013-09-17 13:18:26 +02:00
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( sender , message , true ) ;
2013-08-28 16:08:22 +02:00
} else if ( message - > Get ( " method " ) = = " cluster::RemoveDowntime " ) {
2013-09-17 13:18:26 +02:00
if ( ! params )
return ;
2013-08-28 16:08:22 +02:00
String svc = params - > Get ( " service " ) ;
Service : : Ptr service = Service : : GetByName ( svc ) ;
2013-09-17 14:58:52 +02:00
if ( ! service )
return ;
if ( ! service - > HasPrivileges ( sender - > GetName ( ) , DomainPrivCommand ) ) {
2013-09-19 14:26:57 +02:00
Log ( LogDebug , " cluster " , " Not accepting cluster::RemoveDowntime message from endpoint ' " + sender - > GetName ( ) + " ' for service ' " + service - > GetName ( ) + " ': Insufficient privileges. " ) ;
2013-08-28 16:08:22 +02:00
return ;
2013-09-17 14:58:52 +02:00
}
2013-08-28 16:08:22 +02:00
String id = params - > Get ( " id " ) ;
2013-09-17 19:37:10 +02:00
service - > RemoveDowntime ( id , false , sender - > GetName ( ) ) ;
2013-09-17 13:18:26 +02:00
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( sender , message , true ) ;
2013-08-29 13:48:18 +02:00
} else if ( message - > Get ( " method " ) = = " cluster::SetAcknowledgement " ) {
2013-09-17 13:18:26 +02:00
if ( ! params )
return ;
2013-08-29 13:48:18 +02:00
String svc = params - > Get ( " service " ) ;
Service : : Ptr service = Service : : GetByName ( svc ) ;
2013-09-17 14:58:52 +02:00
if ( ! service )
2013-08-29 13:48:18 +02:00
return ;
2013-09-17 14:58:52 +02:00
if ( ! service - > HasPrivileges ( sender - > GetName ( ) , DomainPrivCommand ) ) {
2013-09-19 14:26:57 +02:00
Log ( LogDebug , " cluster " , " Not accepting cluster::SetAcknowledgement message from endpoint ' " + sender - > GetName ( ) + " ' for service ' " + service - > GetName ( ) + " ': Insufficient privileges. " ) ;
2013-09-17 14:58:52 +02:00
return ;
}
2013-08-29 13:48:18 +02:00
String author = params - > Get ( " author " ) ;
String comment = params - > Get ( " comment " ) ;
int type = params - > Get ( " type " ) ;
double expiry = params - > Get ( " expiry " ) ;
service - > AcknowledgeProblem ( author , comment , static_cast < AcknowledgementType > ( type ) , expiry , sender - > GetName ( ) ) ;
2013-09-17 13:18:26 +02:00
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( sender , message , true ) ;
2013-08-29 13:48:18 +02:00
} else if ( message - > Get ( " method " ) = = " cluster::ClearAcknowledgement " ) {
2013-09-17 13:18:26 +02:00
if ( ! params )
return ;
2013-08-29 13:48:18 +02:00
String svc = params - > Get ( " service " ) ;
Service : : Ptr service = Service : : GetByName ( svc ) ;
2013-09-17 14:58:52 +02:00
if ( ! service )
2013-08-29 13:48:18 +02:00
return ;
2013-09-17 14:58:52 +02:00
if ( ! service - > HasPrivileges ( sender - > GetName ( ) , DomainPrivCommand ) ) {
2013-09-19 14:26:57 +02:00
Log ( LogDebug , " cluster " , " Not accepting cluster::ClearAcknowledgement message from endpoint ' " + sender - > GetName ( ) + " ' for service ' " + service - > GetName ( ) + " ': Insufficient privileges. " ) ;
2013-09-17 14:58:52 +02:00
return ;
}
2013-09-17 13:18:26 +02:00
{
ObjectLock olock ( service ) ;
service - > ClearAcknowledgement ( sender - > GetName ( ) ) ;
}
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( sender , message , true ) ;
2013-09-03 10:08:02 +02:00
} else if ( message - > Get ( " method " ) = = " cluster::SetLogPosition " ) {
2013-09-17 13:18:26 +02:00
if ( ! params )
return ;
2013-09-03 10:12:07 +02:00
sender - > SetLocalLogPosition ( params - > Get ( " log_position " ) ) ;
2013-09-04 15:47:15 +02:00
} else if ( message - > Get ( " method " ) = = " cluster::Config " ) {
2013-09-17 13:18:26 +02:00
if ( ! params )
return ;
2013-09-05 12:09:09 +02:00
Dictionary : : Ptr remoteConfig = params - > Get ( " config_files " ) ;
2013-09-04 15:47:15 +02:00
2013-09-05 12:09:09 +02:00
if ( ! remoteConfig )
2013-09-04 15:47:15 +02:00
return ;
Endpoint : : Ptr self = Endpoint : : GetByName ( GetIdentity ( ) ) ;
Array : : Ptr acceptConfig = self - > GetAcceptConfig ( ) ;
bool accept = false ;
if ( acceptConfig ) {
2013-09-09 10:06:49 +02:00
ObjectLock olock ( acceptConfig ) ;
2013-09-04 15:47:15 +02:00
BOOST_FOREACH ( const String & pattern , acceptConfig ) {
2013-09-25 08:52:57 +02:00
if ( pattern = = sender - > GetName ( ) ) {
2013-09-04 15:47:15 +02:00
accept = true ;
break ;
}
}
}
2013-09-06 08:48:15 +02:00
String identity = params - > Get ( " identity " ) ;
2013-09-04 15:47:15 +02:00
if ( ! accept ) {
2013-09-06 09:01:34 +02:00
Log ( LogWarning , " cluster " , " Ignoring config update from endpoint ' " + sender - > GetName ( ) + " ' for identity ' " + identity + " '. " ) ;
2013-09-04 15:47:15 +02:00
return ;
}
2013-09-06 09:01:34 +02:00
Log ( LogInformation , " cluster " , " Processing config update for identity ' " + identity + " '. " ) ;
2013-09-06 08:48:15 +02:00
String dir = GetClusterDir ( ) + " config/ " + SHA256 ( identity ) ;
2013-10-10 23:06:28 +02:00
# ifndef _WIN32
2013-09-04 15:47:15 +02:00
if ( mkdir ( dir . CStr ( ) , 0700 ) < 0 & & errno ! = EEXIST ) {
2013-10-10 23:06:28 +02:00
# else /*_ WIN32 */
if ( mkdir ( dir . CStr ( ) ) < 0 & & errno ! = EEXIST ) {
# endif /* _WIN32 */
2013-09-04 15:47:15 +02:00
BOOST_THROW_EXCEPTION ( posix_error ( )
< < boost : : errinfo_api_function ( " localtime " )
< < boost : : errinfo_errno ( errno ) ) ;
}
2013-11-06 08:51:56 +01:00
Dictionary : : Ptr localConfig = make_shared < Dictionary > ( ) ;
2013-11-22 09:03:52 +01:00
Utility : : Glob ( dir + " /* " , boost : : bind ( & ClusterListener : : ConfigGlobHandler , boost : : cref ( localConfig ) , _1 , true ) , GlobFile ) ;
2013-09-05 12:09:09 +02:00
bool configChange = false ;
/* figure out whether config files were removed */
if ( localConfig - > GetLength ( ) ! = remoteConfig - > GetLength ( ) )
configChange = true ;
2013-09-09 10:06:49 +02:00
ObjectLock olock ( remoteConfig ) ;
2013-11-30 17:42:50 +01:00
BOOST_FOREACH ( const Dictionary : : Pair & kv , remoteConfig ) {
Dictionary : : Ptr remoteFile = kv . second ;
2013-09-05 12:09:09 +02:00
bool writeFile = false ;
2013-11-30 17:42:50 +01:00
String hash = SHA256 ( kv . first ) ;
2013-09-05 12:09:09 +02:00
String path = dir + " / " + hash ;
if ( ! localConfig - > Contains ( hash ) )
writeFile = true ;
else {
Dictionary : : Ptr localFile = localConfig - > Get ( hash ) ;
String localContent = localFile - > Get ( " content " ) ;
String remoteContent = remoteFile - > Get ( " content " ) ;
if ( localContent ! = remoteContent )
writeFile = true ;
}
if ( writeFile ) {
configChange = true ;
Log ( LogInformation , " cluster " , " Updating configuration file: " + path ) ;
std : : ofstream fp ( path . CStr ( ) , std : : ofstream : : out | std : : ostream : : trunc ) ;
fp < < remoteFile - > Get ( " content " ) ;
fp . close ( ) ;
}
localConfig - > Remove ( hash ) ;
}
2013-09-17 14:32:37 +02:00
olock . Unlock ( ) ;
2013-09-05 12:09:09 +02:00
2013-09-09 10:06:49 +02:00
ObjectLock olock2 ( localConfig ) ;
2013-11-30 17:42:50 +01:00
BOOST_FOREACH ( const Dictionary : : Pair & kv , localConfig ) {
String path = dir + " / " + kv . first ;
2013-09-05 12:09:09 +02:00
Log ( LogInformation , " cluster " , " Removing obsolete config file: " + path ) ;
( void ) unlink ( path . CStr ( ) ) ;
configChange = true ;
}
2013-09-17 14:32:37 +02:00
olock2 . Unlock ( ) ;
2013-09-05 12:09:09 +02:00
if ( configChange ) {
Log ( LogInformation , " cluster " , " Restarting after configuration change. " ) ;
Application : : RequestRestart ( ) ;
}
2013-09-17 13:18:26 +02:00
2013-09-18 09:16:29 +02:00
AsyncRelayMessage ( sender , message , true ) ;
2013-08-27 12:21:41 +02:00
}
}
2013-09-25 09:36:55 +02:00
bool ClusterListener : : IsAuthority ( const DynamicObject : : Ptr & object , const String & type )
2013-09-12 10:03:48 +02:00
{
Array : : Ptr authorities = object - > GetAuthorities ( ) ;
std : : vector < String > endpoints ;
BOOST_FOREACH ( const Endpoint : : Ptr & endpoint , DynamicType : : GetObjects < Endpoint > ( ) ) {
bool match = false ;
2013-09-12 15:38:01 +02:00
if ( ( ! endpoint - > IsConnected ( ) & & endpoint - > GetName ( ) ! = GetIdentity ( ) ) | | ! endpoint - > HasFeature ( type ) )
2013-09-12 10:03:48 +02:00
continue ;
if ( authorities ) {
2013-09-12 15:07:37 +02:00
ObjectLock olock ( authorities ) ;
2013-09-12 10:03:48 +02:00
BOOST_FOREACH ( const String & authority , authorities ) {
2013-09-25 08:52:57 +02:00
if ( authority = = endpoint - > GetName ( ) ) {
2013-09-12 10:03:48 +02:00
match = true ;
break ;
}
}
} else {
match = true ;
}
if ( match )
endpoints . push_back ( endpoint - > GetName ( ) ) ;
}
2013-09-13 07:49:12 +02:00
if ( endpoints . empty ( ) )
return false ;
2013-09-12 15:22:21 +02:00
2013-09-12 10:03:48 +02:00
std : : sort ( endpoints . begin ( ) , endpoints . end ( ) ) ;
String key = object - > GetType ( ) - > GetName ( ) + " \t " + object - > GetName ( ) ;
unsigned long hash = Utility : : SDBM ( key ) ;
unsigned long index = hash % endpoints . size ( ) ;
2013-09-21 09:00:40 +02:00
// Log(LogDebug, "cluster", "Authority for object '" + object->GetName() + "' of type '" + object->GetType()->GetName() + "' is '" + endpoints[index] + "'.");
2013-09-12 10:03:48 +02:00
2013-09-13 07:49:12 +02:00
return ( endpoints [ index ] = = GetIdentity ( ) ) ;
}
2013-09-25 09:36:55 +02:00
void ClusterListener : : UpdateAuthority ( void )
2013-09-13 07:49:12 +02:00
{
2013-09-16 12:27:25 +02:00
Log ( LogDebug , " cluster " , " Updating authority for objects. " ) ;
2013-09-16 11:06:21 +02:00
2013-10-17 09:28:49 +02:00
int checker_count = 0 , notifications_count = 0 ;
2013-09-13 07:49:12 +02:00
BOOST_FOREACH ( const DynamicType : : Ptr & type , DynamicType : : GetTypes ( ) ) {
BOOST_FOREACH ( const DynamicObject : : Ptr & object , type - > GetObjects ( ) ) {
2013-10-17 09:28:49 +02:00
bool checkerAuthority = IsAuthority ( object , " checker " ) ;
if ( checkerAuthority )
checker_count + + ;
object - > SetAuthority ( " checker " , checkerAuthority ) ;
bool notificationAuthority = IsAuthority ( object , " notifications " ) ;
if ( notificationAuthority )
notifications_count + + ;
object - > SetAuthority ( " notifications " , notificationAuthority ) ;
2013-09-13 07:49:12 +02:00
}
}
2013-10-17 09:28:49 +02:00
2013-10-17 16:03:44 +02:00
Log ( LogDebug , " cluster " , " Cluster authority: " + Convert : : ToString ( checker_count ) + " x checker, " + Convert : : ToString ( notifications_count ) + " x notifications " ) ;
2013-09-12 10:03:48 +02:00
}
2013-09-25 09:36:55 +02:00
bool ClusterListener : : SupportsChecks ( void )
2013-09-12 10:17:14 +02:00
{
2013-09-13 09:58:16 +02:00
DynamicType : : Ptr type = DynamicType : : GetByName ( " CheckerComponent " ) ;
if ( ! type )
return false ;
2013-10-08 11:57:35 +02:00
return ! type - > GetObjects ( ) . empty ( ) & & IcingaApplication : : GetInstance ( ) - > GetEnableChecks ( ) ;
2013-09-12 10:17:14 +02:00
}
2013-09-25 09:36:55 +02:00
bool ClusterListener : : SupportsNotifications ( void )
2013-09-12 10:17:14 +02:00
{
2013-09-13 09:58:16 +02:00
DynamicType : : Ptr type = DynamicType : : GetByName ( " NotificationComponent " ) ;
if ( ! type )
return false ;
2013-10-08 11:57:35 +02:00
return ! type - > GetObjects ( ) . empty ( ) & & IcingaApplication : : GetInstance ( ) - > GetEnableNotifications ( ) ;
2013-09-12 10:17:14 +02:00
}
2014-02-13 15:15:16 +01:00
bool ClusterListener : : SupportsFeature ( const String & name )
{
DynamicType : : Ptr type = DynamicType : : GetByName ( name ) ;
if ( ! type )
return false ;
return ! type - > GetObjects ( ) . empty ( ) ;
}
2014-02-18 10:53:44 +01:00
std : : pair < Dictionary : : Ptr , Dictionary : : Ptr > ClusterListener : : GetClusterStatus ( void )
2014-02-13 15:15:16 +01:00
{
2014-02-18 10:53:44 +01:00
Dictionary : : Ptr status = make_shared < Dictionary > ( ) ;
Dictionary : : Ptr perfdata = make_shared < Dictionary > ( ) ;
2014-02-13 15:15:16 +01:00
/* cluster stats */
2014-02-18 10:53:44 +01:00
status - > Set ( " node " , IcingaApplication : : GetInstance ( ) - > GetNodeName ( ) ) ;
status - > Set ( " identity " , GetIdentity ( ) ) ;
2014-02-13 15:15:16 +01:00
double count_endpoints = 0 ;
Array : : Ptr not_connected_endpoints = make_shared < Array > ( ) ;
Array : : Ptr connected_endpoints = make_shared < Array > ( ) ;
BOOST_FOREACH ( const Endpoint : : Ptr & endpoint , DynamicType : : GetObjects < Endpoint > ( ) ) {
count_endpoints + + ;
if ( ! endpoint - > IsConnected ( ) & & endpoint - > GetName ( ) ! = GetIdentity ( ) )
not_connected_endpoints - > Add ( endpoint - > GetName ( ) ) ;
else if ( endpoint - > IsConnected ( ) & & endpoint - > GetName ( ) ! = GetIdentity ( ) )
connected_endpoints - > Add ( endpoint - > GetName ( ) ) ;
}
std : : sort ( not_connected_endpoints - > Begin ( ) , not_connected_endpoints - > End ( ) ) ;
std : : sort ( connected_endpoints - > Begin ( ) , connected_endpoints - > End ( ) ) ;
2014-02-18 10:53:44 +01:00
status - > Set ( " num_endpoints " , count_endpoints ) ;
status - > Set ( " num_conn_endpoints " , connected_endpoints - > GetLength ( ) ) ;
status - > Set ( " num_not_conn_endpoints " , not_connected_endpoints - > GetLength ( ) ) ;
status - > Set ( " conn_endpoints " , connected_endpoints ) ;
status - > Set ( " not_conn_endpoints " , not_connected_endpoints ) ;
perfdata - > Set ( " num_endpoints " , count_endpoints ) ;
perfdata - > Set ( " num_conn_endpoints " , connected_endpoints - > GetLength ( ) ) ;
perfdata - > Set ( " num_not_conn_endpoints " , not_connected_endpoints - > GetLength ( ) ) ;
2014-02-13 15:15:16 +01:00
2014-02-18 10:53:44 +01:00
return std : : make_pair ( status , perfdata ) ;
2014-02-13 15:15:16 +01:00
}