Implement Livestatus support for the "repl" command

refs #7805
This commit is contained in:
Gunnar Beutner 2014-12-16 19:28:46 +01:00
parent 9011c9fa67
commit f800630c10
6 changed files with 190 additions and 33 deletions

View File

@ -102,13 +102,15 @@ static void Encode(yajl_gen handle, const Value& value)
} }
} }
String icinga::JsonEncode(const Value& value) String icinga::JsonEncode(const Value& value, bool pretty_print)
{ {
#if YAJL_MAJOR < 2 #if YAJL_MAJOR < 2
yajl_gen_config conf = { 0, "" }; yajl_gen_config conf = { pretty_print, "" };
yajl_gen handle = yajl_gen_alloc(&conf, NULL); yajl_gen handle = yajl_gen_alloc(&conf, NULL);
#else /* YAJL_MAJOR */ #else /* YAJL_MAJOR */
yajl_gen handle = yajl_gen_alloc(NULL); yajl_gen handle = yajl_gen_alloc(NULL);
if (pretty_print)
yajl_gen_config(handle, yajl_gen_beautify, 1);
#endif /* YAJL_MAJOR */ #endif /* YAJL_MAJOR */
Encode(handle, value); Encode(handle, value);

View File

@ -28,7 +28,7 @@ namespace icinga
class String; class String;
class Value; class Value;
I2_BASE_API String JsonEncode(const Value& value); I2_BASE_API String JsonEncode(const Value& value, bool pretty_print = false);
I2_BASE_API Value JsonDecode(const String& data); I2_BASE_API Value JsonDecode(const String& data);
} }

View File

@ -350,7 +350,8 @@ int DaemonCommand::Run(const po::variables_map& vm, const std::vector<std::strin
if (!vm.count("validate")) if (!vm.count("validate"))
Logger::DisableTimestamp(false); Logger::DisableTimestamp(false);
ScriptGlobal::Set("UseVfork", true); if (!ScriptGlobal::Exists("UseVfork"))
ScriptGlobal::Set("UseVfork", true);
Log(LogInformation, "cli") Log(LogInformation, "cli")
<< "Icinga application loader (version: " << Application::GetVersion() << "Icinga application loader (version: " << Application::GetVersion()

View File

@ -23,6 +23,9 @@
#include "base/json.hpp" #include "base/json.hpp"
#include "base/console.hpp" #include "base/console.hpp"
#include "base/application.hpp" #include "base/application.hpp"
#include "base/unixsocket.hpp"
#include "base/utility.hpp"
#include "base/networkstream.hpp"
#include <iostream> #include <iostream>
#ifdef HAVE_LIBREADLINE #ifdef HAVE_LIBREADLINE
@ -60,6 +63,9 @@ ImpersonationLevel ReplCommand::GetImpersonationLevel(void) const
void ReplCommand::InitParameters(boost::program_options::options_description& visibleDesc, void ReplCommand::InitParameters(boost::program_options::options_description& visibleDesc,
boost::program_options::options_description& hiddenDesc) const boost::program_options::options_description& hiddenDesc) const
{ {
visibleDesc.add_options()
("connect,c", po::value<std::string>(), "connect to an Icinga 2 instance")
;
} }
/** /**
@ -73,6 +79,13 @@ int ReplCommand::Run(const po::variables_map& vm, const std::vector<std::string>
std::map<String, String> lines; std::map<String, String> lines;
int next_line = 1; int next_line = 1;
String addr, session;
if (vm.count("connect")) {
addr = vm["connect"].as<std::string>();
session = Utility::NewUniqueID();
}
std::cout << "Icinga (version: " << Application::GetVersion() << ")\n"; std::cout << "Icinga (version: " << Application::GetVersion() << ")\n";
while (std::cin.good()) { while (std::cin.good()) {
@ -115,46 +128,79 @@ int ReplCommand::Run(const po::variables_map& vm, const std::vector<std::string>
std::getline(std::cin, line); std::getline(std::cin, line);
#endif /* HAVE_LIBREADLINE */ #endif /* HAVE_LIBREADLINE */
Expression *expr; if (addr.IsEmpty()) {
Expression *expr;
try { try {
ConfigCompilerContext::GetInstance()->Reset(); ConfigCompilerContext::GetInstance()->Reset();
lines[fileName] = line; lines[fileName] = line;
expr = ConfigCompiler::CompileText(fileName, line); expr = ConfigCompiler::CompileText(fileName, line);
bool has_errors = false; bool has_errors = false;
BOOST_FOREACH(const ConfigCompilerMessage& message, ConfigCompilerContext::GetInstance()->GetMessages()) { BOOST_FOREACH(const ConfigCompilerMessage& message, ConfigCompilerContext::GetInstance()->GetMessages()) {
if (message.Error) if (message.Error)
has_errors = true; has_errors = true;
std::cout << (message.Error ? "Error" : "Warning") << ": " << message.Text << "\n"; std::cout << (message.Error ? "Error" : "Warning") << ": " << message.Text << "\n";
}
if (expr && !has_errors) {
Value result = expr->Evaluate(frame);
std::cout << ConsoleColorTag(Console_ForegroundCyan);
if (!result.IsObject() || result.IsObjectType<Array>() || result.IsObjectType<Dictionary>())
std::cout << JsonEncode(result);
else
std::cout << result;
std::cout << ConsoleColorTag(Console_Normal) << "\n";
}
} catch (const ScriptError& ex) {
DebugInfo di = ex.GetDebugInfo();
std::cout << di.Path << ": " << lines[di.Path] << "\n";
std::cout << String(di.Path.GetLength() + 2, ' ');
std::cout << String(di.FirstColumn, ' ') << String(di.LastColumn - di.FirstColumn + 1, '^') << "\n";
std::cout << ex.what() << "\n";
} catch (const std::exception& ex) {
std::cout << "Error: " << DiagnosticInformation(ex) << "\n";
} }
if (expr && !has_errors) { delete expr;
Value result = expr->Evaluate(frame); } else {
std::cout << ConsoleColorTag(Console_ForegroundCyan); Socket::Ptr socket;
if (!result.IsObject() || result.IsObjectType<Array>() || result.IsObjectType<Dictionary>())
std::cout << JsonEncode(result); if (addr[0] == '/') {
else UnixSocket::Ptr usocket = new UnixSocket();
std::cout << result; usocket->Connect(addr);
std::cout << ConsoleColorTag(Console_Normal) << "\n"; socket = usocket;
} else {
Log(LogCritical, "ReplCommand", "Sorry, TCP sockets aren't supported yet.");
return 1;
} }
} catch (const ScriptError& ex) {
DebugInfo di = ex.GetDebugInfo();
std::cout << di.Path << ": " << lines[di.Path] << "\n"; String query = "SCRIPT " + session + "\n" + line + "\n\n";
std::cout << String(di.Path.GetLength() + 2, ' ');
std::cout << String(di.FirstColumn, ' ') << String(di.LastColumn - di.FirstColumn + 1, '^') << "\n";
std::cout << ex.what() << "\n"; NetworkStream::Ptr ns = new NetworkStream(socket);
} catch (const std::exception& ex) { ns->Write(query.CStr(), query.GetLength());
std::cout << "Error: " << DiagnosticInformation(ex) << "\n";
String result;
char buf[1024];
while (!ns->IsEof()) {
size_t rc = ns->Read(buf, sizeof(buf));
result += String(buf, buf + rc);
}
if (result.GetLength() < 16) {
Log(LogCritical, "ReplCommand", "Received invalid response from Livestatus.");
continue;
}
std::cout << result.SubStr(16) << "\n";
} }
delete expr;
} }
return 0; return 0;

View File

@ -31,6 +31,7 @@
#include "livestatus/orfilter.hpp" #include "livestatus/orfilter.hpp"
#include "livestatus/andfilter.hpp" #include "livestatus/andfilter.hpp"
#include "icinga/externalcommandprocessor.hpp" #include "icinga/externalcommandprocessor.hpp"
#include "config/configcompiler.hpp"
#include "base/debug.hpp" #include "base/debug.hpp"
#include "base/convert.hpp" #include "base/convert.hpp"
#include "base/objectlock.hpp" #include "base/objectlock.hpp"
@ -38,15 +39,49 @@
#include "base/exception.hpp" #include "base/exception.hpp"
#include "base/utility.hpp" #include "base/utility.hpp"
#include "base/json.hpp" #include "base/json.hpp"
#include "base/serializer.hpp"
#include "base/timer.hpp"
#include "base/initialize.hpp"
#include <boost/algorithm/string/classification.hpp> #include <boost/algorithm/string/classification.hpp>
#include <boost/foreach.hpp> #include <boost/foreach.hpp>
#include <boost/algorithm/string/replace.hpp> #include <boost/algorithm/string/replace.hpp>
#include <boost/algorithm/string/split.hpp> #include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/join.hpp>
using namespace icinga; using namespace icinga;
static int l_ExternalCommands = 0; static int l_ExternalCommands = 0;
static boost::mutex l_QueryMutex; static boost::mutex l_QueryMutex;
static std::map<String, LivestatusScriptFrame> l_LivestatusScriptFrames;
static Timer::Ptr l_FrameCleanupTimer;
static boost::mutex l_LivestatusScriptMutex;
static void ScriptFrameCleanupHandler(void)
{
boost::mutex::scoped_lock lock(l_LivestatusScriptMutex);
std::vector<String> cleanup_keys;
typedef std::pair<String, LivestatusScriptFrame> KVPair;
BOOST_FOREACH(const KVPair& kv, l_LivestatusScriptFrames) {
if (kv.second.Seen < Utility::GetTime() - 60)
cleanup_keys.push_back(kv.first);
}
BOOST_FOREACH(const String& key, cleanup_keys)
l_LivestatusScriptFrames.erase(key);
}
static void InitScriptFrameCleanup(void)
{
l_FrameCleanupTimer = new Timer();
l_FrameCleanupTimer->OnTimerExpired.connect(boost::bind(ScriptFrameCleanupHandler));
l_FrameCleanupTimer->SetInterval(30);
l_FrameCleanupTimer->Start();
}
INITIALIZE_ONCE(InitScriptFrameCleanup);
LivestatusQuery::LivestatusQuery(const std::vector<String>& lines, const String& compat_log_path) LivestatusQuery::LivestatusQuery(const std::vector<String>& lines, const String& compat_log_path)
: m_KeepAlive(false), m_OutputFormat("csv"), m_ColumnHeaders(true), : m_KeepAlive(false), m_OutputFormat("csv"), m_ColumnHeaders(true),
@ -88,6 +123,16 @@ LivestatusQuery::LivestatusQuery(const std::vector<String>& lines, const String&
if (m_Verb == "COMMAND") { if (m_Verb == "COMMAND") {
m_KeepAlive = true; m_KeepAlive = true;
m_Command = target; m_Command = target;
} else if (m_Verb == "SCRIPT") {
m_Session = target;
for (unsigned int i = 1; i < lines.size(); i++) {
if (m_Command != "")
m_Command += "\n";
m_Command += lines[i];
}
return;
} else if (m_Verb == "GET") { } else if (m_Verb == "GET") {
m_Table = target; m_Table = target;
} else { } else {
@ -549,6 +594,52 @@ void LivestatusQuery::ExecuteCommandHelper(const Stream::Ptr& stream)
SendResponse(stream, LivestatusErrorOK, ""); SendResponse(stream, LivestatusErrorOK, "");
} }
void LivestatusQuery::ExecuteScriptHelper(const Stream::Ptr& stream)
{
Log(LogInformation, "LivestatusQuery")
<< "Executing expression: " << m_Command;
m_ResponseHeader = "fixed16";
LivestatusScriptFrame& lsf = l_LivestatusScriptFrames[m_Session];
lsf.Seen = Utility::GetTime();
if (!lsf.Locals)
lsf.Locals = new Dictionary();
String fileName = "<" + Convert::ToString(lsf.NextLine) + ">";
lsf.NextLine++;
lsf.Lines[fileName] = m_Command;
Expression *expr = ConfigCompiler::CompileText(fileName, m_Command);
Value result;
try {
ScriptFrame frame;
frame.Locals = lsf.Locals;
result = expr->Evaluate(frame);
} catch (const ScriptError& ex) {
delete expr;
DebugInfo di = ex.GetDebugInfo();
std::ostringstream msgbuf;
msgbuf << di.Path << ": " << lsf.Lines[di.Path] << "\n"
<< String(di.Path.GetLength() + 2, ' ')
<< String(di.FirstColumn, ' ') << String(di.LastColumn - di.FirstColumn + 1, '^') << "\n"
<< ex.what() << "\n";
SendResponse(stream, LivestatusErrorQuery, msgbuf.str());
return;
} catch (...) {
delete expr;
throw;
}
delete expr;
SendResponse(stream, LivestatusErrorOK, JsonEncode(Serialize(result, FAState | FAConfig), true));
}
void LivestatusQuery::ExecuteErrorHelper(const Stream::Ptr& stream) void LivestatusQuery::ExecuteErrorHelper(const Stream::Ptr& stream)
{ {
Log(LogDebug, "LivestatusQuery") Log(LogDebug, "LivestatusQuery")
@ -596,6 +687,8 @@ bool LivestatusQuery::Execute(const Stream::Ptr& stream)
ExecuteGetHelper(stream); ExecuteGetHelper(stream);
else if (m_Verb == "COMMAND") else if (m_Verb == "COMMAND")
ExecuteCommandHelper(stream); ExecuteCommandHelper(stream);
else if (m_Verb == "SCRIPT")
ExecuteScriptHelper(stream);
else if (m_Verb == "ERROR") else if (m_Verb == "ERROR")
ExecuteErrorHelper(stream); ExecuteErrorHelper(stream);
else else

View File

@ -25,6 +25,7 @@
#include "base/object.hpp" #include "base/object.hpp"
#include "base/array.hpp" #include "base/array.hpp"
#include "base/stream.hpp" #include "base/stream.hpp"
#include "base/scriptframe.hpp"
#include <deque> #include <deque>
using namespace icinga; using namespace icinga;
@ -39,6 +40,18 @@ enum LivestatusError
LivestatusErrorQuery = 452 LivestatusErrorQuery = 452
}; };
struct LivestatusScriptFrame
{
double Seen;
int NextLine;
std::map<String, String> Lines;
Dictionary::Ptr Locals;
LivestatusScriptFrame(void)
: NextLine(1)
{ }
};
/** /**
* @ingroup livestatus * @ingroup livestatus
*/ */
@ -71,8 +84,9 @@ private:
String m_ResponseHeader; String m_ResponseHeader;
/* Parameters for COMMAND queries. */ /* Parameters for COMMAND/SCRIPT queries. */
String m_Command; String m_Command;
String m_Session;
/* Parameters for invalid queries. */ /* Parameters for invalid queries. */
int m_ErrorCode; int m_ErrorCode;
@ -89,6 +103,7 @@ private:
void ExecuteGetHelper(const Stream::Ptr& stream); void ExecuteGetHelper(const Stream::Ptr& stream);
void ExecuteCommandHelper(const Stream::Ptr& stream); void ExecuteCommandHelper(const Stream::Ptr& stream);
void ExecuteScriptHelper(const Stream::Ptr& stream);
void ExecuteErrorHelper(const Stream::Ptr& stream); void ExecuteErrorHelper(const Stream::Ptr& stream);
void SendResponse(const Stream::Ptr& stream, int code, const String& data); void SendResponse(const Stream::Ptr& stream, int code, const String& data);