2014-05-03 20:02:22 +02:00
/******************************************************************************
* Icinga 2 *
2015-01-22 12:00:23 +01:00
* Copyright ( C ) 2012 - 2015 Icinga Development Team ( http : //www.icinga.org) *
2014-05-03 20:02:22 +02:00
* *
* This program is free software ; you can redistribute it and / or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation ; either version 2 *
* of the License , or ( at your option ) any later version . *
* *
* This program is distributed in the hope that it will be useful , *
* but WITHOUT ANY WARRANTY ; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the *
* GNU General Public License for more details . *
* *
* You should have received a copy of the GNU General Public License *
* along with this program ; if not , write to the Free Software Foundation *
* Inc . , 51 Franklin St , Fifth Floor , Boston , MA 02110 - 1301 , USA . *
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
2014-05-25 16:23:35 +02:00
# include "remote/apilistener.hpp"
2015-03-28 11:04:42 +01:00
# include "remote/apilistener.tcpp"
2014-05-25 16:23:35 +02:00
# include "remote/apiclient.hpp"
# include "remote/endpoint.hpp"
2015-02-26 14:59:39 +01:00
# include "remote/jsonrpc.hpp"
2014-05-25 16:23:35 +02:00
# include "base/convert.hpp"
# include "base/netstring.hpp"
2014-10-26 19:59:49 +01:00
# include "base/json.hpp"
2014-05-25 16:23:35 +02:00
# include "base/dynamictype.hpp"
2014-10-19 14:21:12 +02:00
# include "base/logger.hpp"
2014-05-25 16:23:35 +02:00
# include "base/objectlock.hpp"
# include "base/stdiostream.hpp"
# include "base/application.hpp"
# include "base/context.hpp"
# include "base/statsfunction.hpp"
2014-12-15 10:16:06 +01:00
# include "base/exception.hpp"
2014-05-03 20:02:22 +02:00
# include <fstream>
using namespace icinga ;
REGISTER_TYPE ( ApiListener ) ;
boost : : signals2 : : signal < void ( bool ) > ApiListener : : OnMasterChanged ;
REGISTER_STATSFUNCTION ( ApiListenerStats , & ApiListener : : StatsFunc ) ;
2015-03-02 09:56:09 +01:00
ApiListener : : ApiListener ( void )
: m_LogMessageCount ( 0 )
{ }
2014-05-03 20:02:22 +02:00
void ApiListener : : OnConfigLoaded ( void )
{
/* set up SSL context */
2014-11-08 21:17:16 +01:00
boost : : shared_ptr < X509 > cert ;
2014-06-05 15:03:56 +02:00
try {
cert = GetX509Certificate ( GetCertPath ( ) ) ;
2014-08-25 08:35:35 +02:00
} catch ( const std : : exception & ) {
2015-02-26 17:09:45 +01:00
BOOST_THROW_EXCEPTION ( ScriptError ( " Cannot get certificate from cert path: ' " + GetCertPath ( ) + " '. " , GetDebugInfo ( ) ) ) ;
2014-06-05 15:03:56 +02:00
}
try {
SetIdentity ( GetCertificateCN ( cert ) ) ;
2014-08-25 08:35:35 +02:00
} catch ( const std : : exception & ) {
2015-02-26 17:09:45 +01:00
BOOST_THROW_EXCEPTION ( ScriptError ( " Cannot get certificate common name from cert path: ' " + GetCertPath ( ) + " '. " , GetDebugInfo ( ) ) ) ;
2014-06-05 15:03:56 +02:00
}
2014-10-20 10:09:57 +02:00
Log ( LogInformation , " ApiListener " )
< < " My API identity: " < < GetIdentity ( ) ;
2014-05-03 20:02:22 +02:00
2014-06-05 15:03:56 +02:00
try {
m_SSLContext = MakeSSLContext ( GetCertPath ( ) , GetKeyPath ( ) , GetCaPath ( ) ) ;
2014-08-25 08:35:35 +02:00
} catch ( const std : : exception & ) {
2015-02-26 17:09:45 +01:00
BOOST_THROW_EXCEPTION ( ScriptError ( " Cannot make SSL context for cert path: ' " + GetCertPath ( ) + " ' key path: ' " + GetKeyPath ( ) + " ' ca path: ' " + GetCaPath ( ) + " '. " , GetDebugInfo ( ) ) ) ;
2014-06-05 15:03:56 +02:00
}
2014-05-03 20:02:22 +02:00
2014-06-05 15:03:56 +02:00
if ( ! GetCrlPath ( ) . IsEmpty ( ) ) {
try {
AddCRLToSSLContext ( m_SSLContext , GetCrlPath ( ) ) ;
2014-08-25 08:35:35 +02:00
} catch ( const std : : exception & ) {
2015-02-26 17:09:45 +01:00
BOOST_THROW_EXCEPTION ( ScriptError ( " Cannot add certificate revocation list to SSL context for crl path: ' " + GetCrlPath ( ) + " '. " , GetDebugInfo ( ) ) ) ;
2014-06-05 15:03:56 +02:00
}
}
2014-11-21 23:23:31 +01:00
}
2014-05-03 20:02:22 +02:00
2014-11-21 23:23:31 +01:00
void ApiListener : : OnAllConfigLoaded ( void )
{
2015-02-26 17:09:45 +01:00
if ( ! Endpoint : : GetByName ( GetIdentity ( ) ) )
BOOST_THROW_EXCEPTION ( ScriptError ( " Endpoint object for ' " + GetIdentity ( ) + " ' is missing. " , GetDebugInfo ( ) ) ) ;
2014-05-03 20:02:22 +02:00
}
/**
* Starts the component .
*/
void ApiListener : : Start ( void )
{
2014-11-21 23:23:31 +01:00
SyncZoneDirs ( ) ;
2014-09-02 13:02:22 +02:00
if ( std : : distance ( DynamicType : : GetObjectsByType < ApiListener > ( ) . first , DynamicType : : GetObjectsByType < ApiListener > ( ) . second ) > 1 ) {
2014-06-05 14:36:57 +02:00
Log ( LogCritical , " ApiListener " , " Only one ApiListener object is allowed. " ) ;
return ;
}
2014-05-03 20:02:22 +02:00
DynamicObject : : Start ( ) ;
{
boost : : mutex : : scoped_lock ( m_LogLock ) ;
RotateLogFile ( ) ;
OpenLogFile ( ) ;
}
/* create the primary JSON-RPC listener */
2014-08-25 08:27:19 +02:00
if ( ! AddListener ( GetBindHost ( ) , GetBindPort ( ) ) ) {
2014-10-20 10:09:57 +02:00
Log ( LogCritical , " ApiListener " )
< < " Cannot add listener on host ' " < < GetBindHost ( ) < < " ' for port ' " < < GetBindPort ( ) < < " '. " ;
2015-03-12 11:43:04 +01:00
Application : : Exit ( EXIT_FAILURE ) ;
2014-08-04 16:34:17 +02:00
}
2014-05-03 20:02:22 +02:00
2014-11-08 21:17:16 +01:00
m_Timer = new Timer ( ) ;
2014-05-03 20:02:22 +02:00
m_Timer - > OnTimerExpired . connect ( boost : : bind ( & ApiListener : : ApiTimerHandler , this ) ) ;
m_Timer - > SetInterval ( 5 ) ;
m_Timer - > Start ( ) ;
m_Timer - > Reschedule ( 0 ) ;
OnMasterChanged ( true ) ;
}
ApiListener : : Ptr ApiListener : : GetInstance ( void )
{
2014-09-02 13:02:22 +02:00
BOOST_FOREACH ( const ApiListener : : Ptr & listener , DynamicType : : GetObjectsByType < ApiListener > ( ) )
2014-05-03 20:02:22 +02:00
return listener ;
return ApiListener : : Ptr ( ) ;
}
2014-11-08 21:17:16 +01:00
boost : : shared_ptr < SSL_CTX > ApiListener : : GetSSLContext ( void ) const
2014-05-03 20:02:22 +02:00
{
return m_SSLContext ;
}
Endpoint : : Ptr ApiListener : : GetMaster ( void ) const
{
Zone : : Ptr zone = Zone : : GetLocalZone ( ) ;
2014-08-04 16:34:17 +02:00
if ( ! zone )
return Endpoint : : Ptr ( ) ;
2014-05-03 20:02:22 +02:00
std : : vector < String > names ;
BOOST_FOREACH ( const Endpoint : : Ptr & endpoint , zone - > GetEndpoints ( ) )
if ( endpoint - > IsConnected ( ) | | endpoint - > GetName ( ) = = GetIdentity ( ) )
names . push_back ( endpoint - > GetName ( ) ) ;
std : : sort ( names . begin ( ) , names . end ( ) ) ;
return Endpoint : : GetByName ( * names . begin ( ) ) ;
}
bool ApiListener : : IsMaster ( void ) const
{
2014-08-04 16:34:17 +02:00
Endpoint : : Ptr master = GetMaster ( ) ;
if ( ! master )
return false ;
return master - > GetName ( ) = = GetIdentity ( ) ;
2014-05-03 20:02:22 +02:00
}
/**
* Creates a new JSON - RPC listener on the specified port .
*
2014-08-25 08:27:19 +02:00
* @ param node The host the listener should be bound to .
2014-05-03 20:02:22 +02:00
* @ param service The port to listen on .
*/
2014-08-25 08:27:19 +02:00
bool ApiListener : : AddListener ( const String & node , const String & service )
2014-05-03 20:02:22 +02:00
{
ObjectLock olock ( this ) ;
2014-11-08 21:17:16 +01:00
boost : : shared_ptr < SSL_CTX > sslContext = m_SSLContext ;
2014-05-03 20:02:22 +02:00
2014-06-05 14:36:57 +02:00
if ( ! sslContext ) {
Log ( LogCritical , " ApiListener " , " SSL context is required for AddListener() " ) ;
2014-08-04 16:34:17 +02:00
return false ;
2014-06-05 14:36:57 +02:00
}
2014-05-03 20:02:22 +02:00
2014-10-20 10:09:57 +02:00
Log ( LogInformation , " ApiListener " )
< < " Adding new listener on port ' " < < service < < " ' " ;
2014-05-03 20:02:22 +02:00
2014-11-08 21:17:16 +01:00
TcpSocket : : Ptr server = new TcpSocket ( ) ;
2014-06-05 14:36:57 +02:00
try {
2014-08-25 08:27:19 +02:00
server - > Bind ( node , service , AF_UNSPEC ) ;
} catch ( const std : : exception & ) {
2014-10-20 10:09:57 +02:00
Log ( LogCritical , " ApiListener " )
< < " Cannot bind TCP socket for host ' " < < node < < " ' on port ' " < < service < < " '. " ;
2014-08-04 16:34:17 +02:00
return false ;
2014-06-05 14:36:57 +02:00
}
2014-05-03 20:02:22 +02:00
boost : : thread thread ( boost : : bind ( & ApiListener : : ListenerThreadProc , this , server ) ) ;
thread . detach ( ) ;
m_Servers . insert ( server ) ;
2014-08-04 16:34:17 +02:00
return true ;
2014-05-03 20:02:22 +02:00
}
void ApiListener : : ListenerThreadProc ( const Socket : : Ptr & server )
{
Utility : : SetThreadName ( " API Listener " ) ;
server - > Listen ( ) ;
for ( ; ; ) {
2014-06-05 16:17:53 +02:00
try {
Socket : : Ptr client = server - > Accept ( ) ;
2015-03-05 14:15:42 +01:00
Utility : : QueueAsyncCallback ( boost : : bind ( & ApiListener : : NewClientHandler , this , client , String ( ) , RoleServer ) , LowLatencyScheduler ) ;
2014-08-25 08:35:35 +02:00
} catch ( const std : : exception & ) {
2014-06-05 16:17:53 +02:00
Log ( LogCritical , " ApiListener " , " Cannot accept new connection. " ) ;
}
2014-05-03 20:02:22 +02:00
}
}
/**
2014-06-23 10:00:02 +02:00
* Creates a new JSON - RPC client and connects to the specified endpoint .
2014-05-03 20:02:22 +02:00
*
2014-06-23 10:00:02 +02:00
* @ param endpoint The endpoint .
2014-05-03 20:02:22 +02:00
*/
2014-06-23 10:00:02 +02:00
void ApiListener : : AddConnection ( const Endpoint : : Ptr & endpoint )
2014-05-22 13:45:42 +02:00
{
2014-05-03 20:02:22 +02:00
{
ObjectLock olock ( this ) ;
2014-11-08 21:17:16 +01:00
boost : : shared_ptr < SSL_CTX > sslContext = m_SSLContext ;
2014-05-03 20:02:22 +02:00
2014-06-05 14:36:57 +02:00
if ( ! sslContext ) {
2014-08-04 16:34:17 +02:00
Log ( LogCritical , " ApiListener " , " SSL context is required for AddConnection() " ) ;
2014-06-05 14:36:57 +02:00
return ;
}
2014-05-03 20:02:22 +02:00
}
2014-06-23 10:00:02 +02:00
String host = endpoint - > GetHost ( ) ;
String port = endpoint - > GetPort ( ) ;
2014-10-20 10:09:57 +02:00
Log ( LogInformation , " ApiClient " )
< < " Reconnecting to API endpoint ' " < < endpoint - > GetName ( ) < < " ' via host ' " < < host < < " ' and port ' " < < port < < " ' " ;
2014-08-22 15:39:34 +02:00
2014-11-08 21:17:16 +01:00
TcpSocket : : Ptr client = new TcpSocket ( ) ;
2014-05-03 20:02:22 +02:00
2014-05-22 13:45:42 +02:00
try {
2014-06-23 10:00:02 +02:00
endpoint - > SetConnecting ( true ) ;
client - > Connect ( host , port ) ;
2015-03-05 14:15:42 +01:00
NewClientHandler ( client , endpoint - > GetName ( ) , RoleClient ) ;
2014-06-23 10:00:02 +02:00
endpoint - > SetConnecting ( false ) ;
2014-05-22 13:45:42 +02:00
} catch ( const std : : exception & ex ) {
2014-06-23 10:00:02 +02:00
endpoint - > SetConnecting ( false ) ;
2014-08-04 13:35:12 +02:00
client - > Close ( ) ;
2014-06-23 10:00:02 +02:00
2014-10-20 10:09:57 +02:00
std : : ostringstream info ;
2014-06-23 10:00:02 +02:00
info < < " Cannot connect to host ' " < < host < < " ' on port ' " < < port < < " ' " ;
2014-06-05 14:36:57 +02:00
Log ( LogCritical , " ApiListener " , info . str ( ) ) ;
2014-10-20 10:09:57 +02:00
Log ( LogDebug , " ApiListener " )
< < info . str ( ) < < " \n " < < DiagnosticInformation ( ex ) ;
2014-05-22 13:45:42 +02:00
}
2014-05-03 20:02:22 +02:00
}
/**
* Processes a new client connection .
*
* @ param client The new client .
*/
2015-03-05 14:15:42 +01:00
void ApiListener : : NewClientHandler ( const Socket : : Ptr & client , const String & hostname , ConnectionRole role )
2014-05-03 20:02:22 +02:00
{
CONTEXT ( " Handling new API client connection " ) ;
TlsStream : : Ptr tlsStream ;
{
ObjectLock olock ( this ) ;
2014-06-05 15:03:56 +02:00
try {
2015-03-05 14:15:42 +01:00
tlsStream = new TlsStream ( client , hostname , role , m_SSLContext ) ;
2014-08-25 08:35:35 +02:00
} catch ( const std : : exception & ) {
2014-10-20 10:09:57 +02:00
Log ( LogCritical , " ApiListener " , " Cannot create TLS stream from client connection. " ) ;
2014-06-05 15:03:56 +02:00
return ;
}
2014-05-03 20:02:22 +02:00
}
2014-06-05 15:03:56 +02:00
try {
tlsStream - > Handshake ( ) ;
2014-08-25 08:35:35 +02:00
} catch ( const std : : exception & ) {
2014-06-05 15:03:56 +02:00
Log ( LogCritical , " ApiListener " , " Client TLS handshake failed. " ) ;
return ;
}
2014-05-03 20:02:22 +02:00
2014-11-08 21:17:16 +01:00
boost : : shared_ptr < X509 > cert = tlsStream - > GetPeerCertificate ( ) ;
2014-06-05 15:03:56 +02:00
String identity ;
try {
identity = GetCertificateCN ( cert ) ;
2014-08-25 08:35:35 +02:00
} catch ( const std : : exception & ) {
2014-10-20 10:09:57 +02:00
Log ( LogCritical , " ApiListener " )
< < " Cannot get certificate common name from cert path: ' " < < GetCertPath ( ) < < " '. " ;
2014-06-05 15:03:56 +02:00
return ;
}
2014-05-03 20:02:22 +02:00
2014-10-16 09:01:18 +02:00
bool verify_ok = tlsStream - > IsVerifyOK ( ) ;
2014-05-08 15:00:09 +02:00
2014-10-19 17:52:17 +02:00
Log ( LogInformation , " ApiListener " )
2014-11-13 11:23:57 +01:00
< < " New client connection for identity ' " < < identity < < " ' " < < ( verify_ok ? " " : " (unauthenticated) " ) ;
2014-10-16 09:01:18 +02:00
Endpoint : : Ptr endpoint ;
if ( verify_ok )
endpoint = Endpoint : : GetByName ( identity ) ;
2014-05-03 20:02:22 +02:00
2014-05-22 09:02:44 +02:00
bool need_sync = false ;
2014-05-08 15:12:56 +02:00
if ( endpoint )
need_sync = ! endpoint - > IsConnected ( ) ;
2014-05-03 20:02:22 +02:00
2014-11-08 21:17:16 +01:00
ApiClient : : Ptr aclient = new ApiClient ( identity , verify_ok , tlsStream , role ) ;
2014-05-08 15:12:56 +02:00
aclient - > Start ( ) ;
2014-05-03 20:02:22 +02:00
2014-05-08 15:12:56 +02:00
if ( endpoint ) {
2014-08-05 09:10:59 +02:00
endpoint - > AddClient ( aclient ) ;
2014-05-08 15:00:09 +02:00
if ( need_sync ) {
{
ObjectLock olock ( endpoint ) ;
2014-05-03 20:02:22 +02:00
2014-05-08 15:00:09 +02:00
endpoint - > SetSyncing ( true ) ;
}
2014-05-03 20:02:22 +02:00
2014-05-08 15:00:09 +02:00
ReplayLog ( aclient ) ;
2014-05-03 20:02:22 +02:00
}
2014-05-13 15:57:02 +02:00
SendConfigUpdate ( aclient ) ;
2014-05-08 15:12:56 +02:00
} else
AddAnonymousClient ( aclient ) ;
2014-05-03 20:02:22 +02:00
}
void ApiListener : : ApiTimerHandler ( void )
{
double now = Utility : : GetTime ( ) ;
std : : vector < int > files ;
Utility : : Glob ( GetApiDir ( ) + " log/* " , boost : : bind ( & ApiListener : : LogGlobHandler , boost : : ref ( files ) , _1 ) , GlobFile ) ;
std : : sort ( files . begin ( ) , files . end ( ) ) ;
BOOST_FOREACH ( int ts , files ) {
bool need = false ;
2014-09-02 13:02:22 +02:00
BOOST_FOREACH ( const Endpoint : : Ptr & endpoint , DynamicType : : GetObjectsByType < Endpoint > ( ) ) {
2014-05-03 20:02:22 +02:00
if ( endpoint - > GetName ( ) = = GetIdentity ( ) )
continue ;
if ( endpoint - > GetLogDuration ( ) > = 0 & & ts < now - endpoint - > GetLogDuration ( ) )
continue ;
if ( ts > endpoint - > GetLocalLogPosition ( ) ) {
need = true ;
break ;
}
}
if ( ! need ) {
String path = GetApiDir ( ) + " log/ " + Convert : : ToString ( ts ) ;
2014-10-20 10:09:57 +02:00
Log ( LogNotice , " ApiListener " )
< < " Removing old log file: " < < path ;
2014-05-03 20:02:22 +02:00
( void ) unlink ( path . CStr ( ) ) ;
}
}
if ( IsMaster ( ) ) {
Zone : : Ptr my_zone = Zone : : GetLocalZone ( ) ;
2014-09-02 13:02:22 +02:00
BOOST_FOREACH ( const Zone : : Ptr & zone , DynamicType : : GetObjectsByType < Zone > ( ) ) {
2014-05-08 15:00:09 +02:00
/* only connect to endpoints in a) the same zone b) our parent zone c) immediate child zones */
2015-02-17 15:46:03 +01:00
if ( my_zone ! = zone & & my_zone ! = zone - > GetParent ( ) & & zone ! = my_zone - > GetParent ( ) ) {
Log ( LogDebug , " ApiListener " )
2015-02-20 15:11:44 +01:00
< < " Not connecting to Zone ' " < < zone - > GetName ( ) < < " ' because it's not in the same zone, a parent or a child zone. " ;
2014-05-03 20:02:22 +02:00
continue ;
2015-02-17 15:46:03 +01:00
}
2014-05-03 20:02:22 +02:00
2014-05-08 15:00:09 +02:00
bool connected = false ;
2014-05-03 20:02:22 +02:00
2014-05-08 15:00:09 +02:00
BOOST_FOREACH ( const Endpoint : : Ptr & endpoint , zone - > GetEndpoints ( ) ) {
if ( endpoint - > IsConnected ( ) ) {
connected = true ;
break ;
}
}
2014-05-03 20:02:22 +02:00
2014-05-08 15:00:09 +02:00
/* don't connect to an endpoint if we already have a connection to the zone */
2015-02-17 15:46:03 +01:00
if ( connected ) {
Log ( LogDebug , " ApiListener " )
2015-02-20 15:11:44 +01:00
< < " Not connecting to Zone ' " < < zone - > GetName ( ) < < " ' because we're already connected to it. " ;
2014-05-03 20:02:22 +02:00
continue ;
2015-02-17 15:46:03 +01:00
}
2014-05-03 20:02:22 +02:00
2014-05-08 15:00:09 +02:00
BOOST_FOREACH ( const Endpoint : : Ptr & endpoint , zone - > GetEndpoints ( ) ) {
/* don't connect to ourselves */
2015-02-17 15:46:03 +01:00
if ( endpoint - > GetName ( ) = = GetIdentity ( ) ) {
Log ( LogDebug , " ApiListener " )
2015-02-20 15:11:44 +01:00
< < " Not connecting to Endpoint ' " < < endpoint - > GetName ( ) < < " ' because that's us. " ;
2014-05-08 15:00:09 +02:00
continue ;
2015-02-17 15:46:03 +01:00
}
2014-05-08 15:00:09 +02:00
/* don't try to connect to endpoints which don't have a host and port */
2015-02-17 15:46:03 +01:00
if ( endpoint - > GetHost ( ) . IsEmpty ( ) | | endpoint - > GetPort ( ) . IsEmpty ( ) ) {
Log ( LogDebug , " ApiListener " )
2015-02-20 15:11:44 +01:00
< < " Not connecting to Endpoint ' " < < endpoint - > GetName ( ) < < " ' because the host/port attributes are missing. " ;
2014-05-08 15:00:09 +02:00
continue ;
2015-02-17 15:46:03 +01:00
}
2014-05-08 15:00:09 +02:00
2014-07-01 09:38:22 +02:00
/* don't try to connect if there's already a connection attempt */
2015-02-17 15:46:03 +01:00
if ( endpoint - > GetConnecting ( ) ) {
Log ( LogDebug , " ApiListener " )
2015-02-20 15:11:44 +01:00
< < " Not connecting to Endpoint ' " < < endpoint - > GetName ( ) < < " ' because we're already trying to connect to it. " ;
2014-07-01 09:38:22 +02:00
continue ;
2015-02-17 15:46:03 +01:00
}
2014-07-01 09:38:22 +02:00
2015-01-28 08:35:36 +01:00
boost : : thread thread ( boost : : bind ( & ApiListener : : AddConnection , this , endpoint ) ) ;
thread . detach ( ) ;
2014-05-08 15:00:09 +02:00
}
2014-05-03 20:02:22 +02:00
}
}
2014-09-02 13:02:22 +02:00
BOOST_FOREACH ( const Endpoint : : Ptr & endpoint , DynamicType : : GetObjectsByType < Endpoint > ( ) ) {
2014-05-03 20:02:22 +02:00
if ( ! endpoint - > IsConnected ( ) )
continue ;
double ts = endpoint - > GetRemoteLogPosition ( ) ;
if ( ts = = 0 )
continue ;
2014-11-08 21:17:16 +01:00
Dictionary : : Ptr lparams = new Dictionary ( ) ;
2014-05-03 20:02:22 +02:00
lparams - > Set ( " log_position " , ts ) ;
2014-11-08 21:17:16 +01:00
Dictionary : : Ptr lmessage = new Dictionary ( ) ;
2014-05-03 20:02:22 +02:00
lmessage - > Set ( " jsonrpc " , " 2.0 " ) ;
lmessage - > Set ( " method " , " log::SetLogPosition " ) ;
lmessage - > Set ( " params " , lparams ) ;
BOOST_FOREACH ( const ApiClient : : Ptr & client , endpoint - > GetClients ( ) )
client - > SendMessage ( lmessage ) ;
2014-10-20 10:09:57 +02:00
Log ( LogNotice , " ApiListener " )
< < " Setting log position for identity ' " < < endpoint - > GetName ( ) < < " ': "
< < Utility : : FormatDateTime ( " %Y/%m/%d %H:%M:%S " , ts ) ;
2014-05-03 20:02:22 +02:00
}
2014-08-04 16:34:17 +02:00
Endpoint : : Ptr master = GetMaster ( ) ;
if ( master )
2014-10-20 10:09:57 +02:00
Log ( LogNotice , " ApiListener " )
< < " Current zone master: " < < master - > GetName ( ) ;
2014-05-03 20:02:22 +02:00
std : : vector < String > names ;
2014-09-02 13:02:22 +02:00
BOOST_FOREACH ( const Endpoint : : Ptr & endpoint , DynamicType : : GetObjectsByType < Endpoint > ( ) )
2014-05-03 20:02:22 +02:00
if ( endpoint - > IsConnected ( ) )
names . push_back ( endpoint - > GetName ( ) + " ( " + Convert : : ToString ( endpoint - > GetClients ( ) . size ( ) ) + " ) " ) ;
2014-10-20 10:09:57 +02:00
Log ( LogNotice , " ApiListener " )
< < " Connected endpoints: " < < Utility : : NaturalJoin ( names ) ;
2014-05-03 20:02:22 +02:00
}
void ApiListener : : RelayMessage ( const MessageOrigin & origin , const DynamicObject : : Ptr & secobj , const Dictionary : : Ptr & message , bool log )
{
m_RelayQueue . Enqueue ( boost : : bind ( & ApiListener : : SyncRelayMessage , this , origin , secobj , message , log ) ) ;
}
2014-08-26 10:24:04 +02:00
void ApiListener : : PersistMessage ( const Dictionary : : Ptr & message , const DynamicObject : : Ptr & secobj )
2014-05-03 20:02:22 +02:00
{
double ts = message - > Get ( " ts " ) ;
ASSERT ( ts ! = 0 ) ;
2014-11-08 21:17:16 +01:00
Dictionary : : Ptr pmessage = new Dictionary ( ) ;
2014-05-03 20:02:22 +02:00
pmessage - > Set ( " timestamp " , ts ) ;
2014-10-26 19:59:49 +01:00
pmessage - > Set ( " message " , JsonEncode ( message ) ) ;
2014-08-26 10:24:04 +02:00
2014-11-08 21:17:16 +01:00
Dictionary : : Ptr secname = new Dictionary ( ) ;
2014-08-26 10:24:04 +02:00
secname - > Set ( " type " , secobj - > GetType ( ) - > GetName ( ) ) ;
secname - > Set ( " name " , secobj - > GetName ( ) ) ;
pmessage - > Set ( " secobj " , secname ) ;
2014-05-03 20:02:22 +02:00
boost : : mutex : : scoped_lock lock ( m_LogLock ) ;
if ( m_LogFile ) {
2014-10-26 19:59:49 +01:00
NetString : : WriteStringToStream ( m_LogFile , JsonEncode ( pmessage ) ) ;
2014-05-03 20:02:22 +02:00
m_LogMessageCount + + ;
SetLogMessageTimestamp ( ts ) ;
if ( m_LogMessageCount > 50000 ) {
CloseLogFile ( ) ;
RotateLogFile ( ) ;
OpenLogFile ( ) ;
}
}
}
2014-11-13 11:23:57 +01:00
void ApiListener : : SyncSendMessage ( const Endpoint : : Ptr & endpoint , const Dictionary : : Ptr & message )
{
ObjectLock olock ( endpoint ) ;
if ( ! endpoint - > GetSyncing ( ) ) {
Log ( LogNotice , " ApiListener " )
< < " Sending message to ' " < < endpoint - > GetName ( ) < < " ' " ;
BOOST_FOREACH ( const ApiClient : : Ptr & client , endpoint - > GetClients ( ) )
client - > SendMessage ( message ) ;
}
}
2014-05-03 20:02:22 +02:00
void ApiListener : : SyncRelayMessage ( const MessageOrigin & origin , const DynamicObject : : Ptr & secobj , const Dictionary : : Ptr & message , bool log )
{
double ts = Utility : : GetTime ( ) ;
message - > Set ( " ts " , ts ) ;
2014-10-20 10:09:57 +02:00
Log ( LogNotice , " ApiListener " )
< < " Relaying ' " < < message - > Get ( " method " ) < < " ' message " ;
2014-05-03 20:02:22 +02:00
if ( log )
2014-08-26 10:24:04 +02:00
PersistMessage ( message , secobj ) ;
2014-05-03 20:02:22 +02:00
if ( origin . FromZone )
message - > Set ( " originZone " , origin . FromZone - > GetName ( ) ) ;
bool is_master = IsMaster ( ) ;
Endpoint : : Ptr master = GetMaster ( ) ;
Zone : : Ptr my_zone = Zone : : GetLocalZone ( ) ;
std : : vector < Endpoint : : Ptr > skippedEndpoints ;
std : : set < Zone : : Ptr > finishedZones ;
2014-09-02 13:02:22 +02:00
BOOST_FOREACH ( const Endpoint : : Ptr & endpoint , DynamicType : : GetObjectsByType < Endpoint > ( ) ) {
2014-05-03 20:02:22 +02:00
/* don't relay messages to ourselves or disconnected endpoints */
if ( endpoint - > GetName ( ) = = GetIdentity ( ) | | ! endpoint - > IsConnected ( ) )
continue ;
Zone : : Ptr target_zone = endpoint - > GetZone ( ) ;
/* don't relay the message to the zone through more than one endpoint */
if ( finishedZones . find ( target_zone ) ! = finishedZones . end ( ) ) {
skippedEndpoints . push_back ( endpoint ) ;
continue ;
}
/* don't relay messages back to the endpoint which we got the message from */
if ( origin . FromClient & & endpoint = = origin . FromClient - > GetEndpoint ( ) ) {
skippedEndpoints . push_back ( endpoint ) ;
continue ;
}
/* don't relay messages back to the zone which we got the message from */
if ( origin . FromZone & & target_zone = = origin . FromZone ) {
skippedEndpoints . push_back ( endpoint ) ;
continue ;
}
/* only relay message to the master if we're not currently the master */
if ( ! is_master & & master ! = endpoint ) {
skippedEndpoints . push_back ( endpoint ) ;
continue ;
}
/* only relay the message to a) the same zone, b) the parent zone and c) direct child zones */
if ( target_zone ! = my_zone & & target_zone ! = my_zone - > GetParent ( ) & &
2015-02-09 08:50:17 +01:00
secobj - > GetZoneName ( ) ! = target_zone - > GetName ( ) ) {
2014-05-03 20:02:22 +02:00
skippedEndpoints . push_back ( endpoint ) ;
continue ;
}
/* only relay messages to zones which have access to the object */
if ( ! target_zone - > CanAccessObject ( secobj ) )
continue ;
finishedZones . insert ( target_zone ) ;
2014-11-13 11:23:57 +01:00
SyncSendMessage ( endpoint , message ) ;
2014-05-03 20:02:22 +02:00
}
BOOST_FOREACH ( const Endpoint : : Ptr & endpoint , skippedEndpoints )
endpoint - > SetLocalLogPosition ( ts ) ;
}
String ApiListener : : GetApiDir ( void )
{
return Application : : GetLocalStateDir ( ) + " /lib/icinga2/api/ " ;
}
/* must hold m_LogLock */
void ApiListener : : OpenLogFile ( void )
{
String path = GetApiDir ( ) + " log/current " ;
std : : fstream * fp = new std : : fstream ( path . CStr ( ) , std : : fstream : : out | std : : ofstream : : app ) ;
if ( ! fp - > good ( ) ) {
2014-10-20 10:09:57 +02:00
Log ( LogWarning , " ApiListener " )
< < " Could not open spool file: " < < path ;
2014-05-03 20:02:22 +02:00
return ;
}
2014-11-08 21:17:16 +01:00
m_LogFile = new StdioStream ( fp , true ) ;
2014-05-03 20:02:22 +02:00
m_LogMessageCount = 0 ;
SetLogMessageTimestamp ( Utility : : GetTime ( ) ) ;
}
/* must hold m_LogLock */
void ApiListener : : CloseLogFile ( void )
{
if ( ! m_LogFile )
return ;
m_LogFile - > Close ( ) ;
m_LogFile . reset ( ) ;
}
/* must hold m_LogLock */
void ApiListener : : RotateLogFile ( void )
{
double ts = GetLogMessageTimestamp ( ) ;
if ( ts = = 0 )
ts = Utility : : GetTime ( ) ;
String oldpath = GetApiDir ( ) + " log/current " ;
String newpath = GetApiDir ( ) + " log/ " + Convert : : ToString ( static_cast < int > ( ts ) + 1 ) ;
( void ) rename ( oldpath . CStr ( ) , newpath . CStr ( ) ) ;
}
void ApiListener : : LogGlobHandler ( std : : vector < int > & files , const String & file )
{
String name = Utility : : BaseName ( file ) ;
int ts ;
try {
ts = Convert : : ToLong ( name ) ;
}
catch ( const std : : exception & ) {
return ;
}
files . push_back ( ts ) ;
}
void ApiListener : : ReplayLog ( const ApiClient : : Ptr & client )
{
Endpoint : : Ptr endpoint = client - > GetEndpoint ( ) ;
CONTEXT ( " Replaying log for Endpoint ' " + endpoint - > GetName ( ) + " ' " ) ;
int count = - 1 ;
double peer_ts = endpoint - > GetLocalLogPosition ( ) ;
2015-02-26 14:59:39 +01:00
double logpos_ts = peer_ts ;
2014-05-03 20:02:22 +02:00
bool last_sync = false ;
2014-08-26 10:24:04 +02:00
Endpoint : : Ptr target_endpoint = client - > GetEndpoint ( ) ;
ASSERT ( target_endpoint ) ;
Zone : : Ptr target_zone = target_endpoint - > GetZone ( ) ;
if ( ! target_zone )
return ;
2014-05-03 20:02:22 +02:00
for ( ; ; ) {
boost : : mutex : : scoped_lock lock ( m_LogLock ) ;
CloseLogFile ( ) ;
RotateLogFile ( ) ;
if ( count = = - 1 | | count > 50000 ) {
OpenLogFile ( ) ;
lock . unlock ( ) ;
} else {
last_sync = true ;
}
count = 0 ;
std : : vector < int > files ;
Utility : : Glob ( GetApiDir ( ) + " log/* " , boost : : bind ( & ApiListener : : LogGlobHandler , boost : : ref ( files ) , _1 ) , GlobFile ) ;
std : : sort ( files . begin ( ) , files . end ( ) ) ;
BOOST_FOREACH ( int ts , files ) {
String path = GetApiDir ( ) + " log/ " + Convert : : ToString ( ts ) ;
if ( ts < peer_ts )
continue ;
2014-10-20 10:09:57 +02:00
Log ( LogNotice , " ApiListener " )
< < " Replaying log: " < < path ;
2014-05-03 20:02:22 +02:00
2015-02-26 14:59:39 +01:00
std : : fstream * fp = new std : : fstream ( path . CStr ( ) , std : : fstream : : in | std : : fstream : : binary ) ;
2014-11-08 21:17:16 +01:00
StdioStream : : Ptr logStream = new StdioStream ( fp , true ) ;
2014-05-03 20:02:22 +02:00
String message ;
2015-02-14 16:34:36 +01:00
StreamReadContext src ;
2014-05-03 20:02:22 +02:00
while ( true ) {
Dictionary : : Ptr pmessage ;
try {
2015-02-14 18:48:33 +01:00
StreamReadStatus srs = NetString : : ReadStringFromStream ( logStream , & message , src ) ;
if ( srs = = StatusEof )
2014-05-03 20:02:22 +02:00
break ;
2015-02-14 18:48:33 +01:00
if ( srs ! = StatusNewItem )
continue ;
2014-10-26 19:59:49 +01:00
pmessage = JsonDecode ( message ) ;
2014-05-03 20:02:22 +02:00
} catch ( const std : : exception & ) {
2014-10-20 10:09:57 +02:00
Log ( LogWarning , " ApiListener " )
< < " Unexpected end-of-file for cluster log: " < < path ;
2014-05-03 20:02:22 +02:00
/* Log files may be incomplete or corrupted. This is perfectly OK. */
break ;
}
if ( pmessage - > Get ( " timestamp " ) < = peer_ts )
continue ;
2014-10-24 17:48:02 +02:00
Dictionary : : Ptr secname = pmessage - > Get ( " secobj " ) ;
2014-08-26 10:24:04 +02:00
if ( secname ) {
DynamicType : : Ptr dtype = DynamicType : : GetByName ( secname - > Get ( " type " ) ) ;
if ( ! dtype )
continue ;
DynamicObject : : Ptr secobj = dtype - > GetObject ( secname - > Get ( " name " ) ) ;
if ( ! secobj )
continue ;
if ( ! target_zone - > CanAccessObject ( secobj ) )
continue ;
}
2014-05-03 20:02:22 +02:00
NetString : : WriteStringToStream ( client - > GetStream ( ) , pmessage - > Get ( " message " ) ) ;
count + + ;
peer_ts = pmessage - > Get ( " timestamp " ) ;
2015-02-26 14:59:39 +01:00
if ( ts > logpos_ts + 10 ) {
logpos_ts = ts ;
Dictionary : : Ptr lparams = new Dictionary ( ) ;
lparams - > Set ( " log_position " , logpos_ts ) ;
Dictionary : : Ptr lmessage = new Dictionary ( ) ;
lmessage - > Set ( " jsonrpc " , " 2.0 " ) ;
lmessage - > Set ( " method " , " log::SetLogPosition " ) ;
lmessage - > Set ( " params " , lparams ) ;
JsonRpc : : SendMessage ( client - > GetStream ( ) , lmessage ) ;
}
2014-05-03 20:02:22 +02:00
}
2014-05-09 10:23:54 +02:00
logStream - > Close ( ) ;
2014-05-03 20:02:22 +02:00
}
2014-10-20 10:09:57 +02:00
Log ( LogNotice , " ApiListener " )
< < " Replayed " < < count < < " messages. " ;
2014-05-03 20:02:22 +02:00
if ( last_sync ) {
{
ObjectLock olock2 ( endpoint ) ;
endpoint - > SetSyncing ( false ) ;
}
OpenLogFile ( ) ;
break ;
}
}
}
2015-02-13 11:28:43 +01:00
void ApiListener : : StatsFunc ( const Dictionary : : Ptr & status , const Array : : Ptr & perfdata )
2014-05-03 20:02:22 +02:00
{
2014-11-08 21:17:16 +01:00
Dictionary : : Ptr nodes = new Dictionary ( ) ;
2014-05-03 20:02:22 +02:00
std : : pair < Dictionary : : Ptr , Dictionary : : Ptr > stats ;
ApiListener : : Ptr listener = ApiListener : : GetInstance ( ) ;
if ( ! listener )
2015-02-07 22:36:17 +01:00
return ;
2014-05-03 20:02:22 +02:00
stats = listener - > GetStatus ( ) ;
2014-11-13 11:23:57 +01:00
ObjectLock olock ( stats . second ) ;
2014-09-17 15:38:39 +02:00
BOOST_FOREACH ( const Dictionary : : Pair & kv , stats . second )
perfdata - > Add ( " 'api_ " + kv . first + " '= " + Convert : : ToString ( kv . second ) ) ;
2014-05-03 20:02:22 +02:00
status - > Set ( " api " , stats . first ) ;
}
std : : pair < Dictionary : : Ptr , Dictionary : : Ptr > ApiListener : : GetStatus ( void )
{
2014-11-08 21:17:16 +01:00
Dictionary : : Ptr status = new Dictionary ( ) ;
Dictionary : : Ptr perfdata = new Dictionary ( ) ;
2014-05-03 20:02:22 +02:00
/* cluster stats */
status - > Set ( " identity " , GetIdentity ( ) ) ;
double count_endpoints = 0 ;
2014-11-08 21:17:16 +01:00
Array : : Ptr not_connected_endpoints = new Array ( ) ;
Array : : Ptr connected_endpoints = new Array ( ) ;
2014-05-03 20:02:22 +02:00
2014-09-02 13:02:22 +02:00
BOOST_FOREACH ( const Endpoint : : Ptr & endpoint , DynamicType : : GetObjectsByType < Endpoint > ( ) ) {
2014-05-03 20:02:22 +02:00
if ( endpoint - > GetName ( ) = = GetIdentity ( ) )
continue ;
count_endpoints + + ;
if ( ! endpoint - > IsConnected ( ) )
not_connected_endpoints - > Add ( endpoint - > GetName ( ) ) ;
else
connected_endpoints - > Add ( endpoint - > GetName ( ) ) ;
}
status - > Set ( " num_endpoints " , count_endpoints ) ;
status - > Set ( " num_conn_endpoints " , connected_endpoints - > GetLength ( ) ) ;
status - > Set ( " num_not_conn_endpoints " , not_connected_endpoints - > GetLength ( ) ) ;
status - > Set ( " conn_endpoints " , connected_endpoints ) ;
status - > Set ( " not_conn_endpoints " , not_connected_endpoints ) ;
perfdata - > Set ( " num_endpoints " , count_endpoints ) ;
perfdata - > Set ( " num_conn_endpoints " , Convert : : ToDouble ( connected_endpoints - > GetLength ( ) ) ) ;
perfdata - > Set ( " num_not_conn_endpoints " , Convert : : ToDouble ( not_connected_endpoints - > GetLength ( ) ) ) ;
return std : : make_pair ( status , perfdata ) ;
}
2014-05-08 15:12:56 +02:00
void ApiListener : : AddAnonymousClient ( const ApiClient : : Ptr & aclient )
{
ObjectLock olock ( this ) ;
m_AnonymousClients . insert ( aclient ) ;
}
void ApiListener : : RemoveAnonymousClient ( const ApiClient : : Ptr & aclient )
{
ObjectLock olock ( this ) ;
m_AnonymousClients . erase ( aclient ) ;
}
std : : set < ApiClient : : Ptr > ApiListener : : GetAnonymousClients ( void ) const
{
ObjectLock olock ( this ) ;
return m_AnonymousClients ;
2014-05-09 10:23:54 +02:00
}