Implement HTTP support

refs #9447
This commit is contained in:
Gunnar Beutner 2015-06-22 11:11:21 +02:00
parent 84d83a4453
commit b357012ded
32 changed files with 1295 additions and 134 deletions

View File

@ -78,8 +78,10 @@ void FIFO::Optimize(void)
}
}
size_t FIFO::Peek(void *buffer, size_t count)
size_t FIFO::Peek(void *buffer, size_t count, bool allow_partial)
{
ASSERT(allow_partial);
if (count > m_DataSize)
count = m_DataSize;
@ -143,4 +145,4 @@ bool FIFO::SupportsWaiting(void) const
bool FIFO::IsDataAvailable(void) const
{
return m_DataSize > 0;
}
}

View File

@ -41,7 +41,7 @@ public:
FIFO(void);
~FIFO(void);
size_t Peek(void *buffer, size_t count);
virtual size_t Peek(void *buffer, size_t count, bool allow_partial = false);
virtual size_t Read(void *buffer, size_t count, bool allow_partial = false);
virtual void Write(const void *buffer, size_t count);
virtual void Close(void);

View File

@ -40,6 +40,16 @@ bool Stream::IsDataAvailable(void) const
return false;
}
void Stream::Shutdown(void)
{
BOOST_THROW_EXCEPTION(std::runtime_error("Stream does not support Shutdown()."));
}
size_t Stream::Peek(void *buffer, size_t count, bool allow_partial)
{
BOOST_THROW_EXCEPTION(std::runtime_error("Stream does not support Peek()."));
}
void Stream::SignalDataAvailable(void)
{
OnDataAvailable();
@ -50,15 +60,20 @@ void Stream::SignalDataAvailable(void)
}
}
void Stream::WaitForData(void)
bool Stream::WaitForData(int timeout)
{
if (!SupportsWaiting())
BOOST_THROW_EXCEPTION(std::runtime_error("Stream does not support waiting."));
boost::mutex::scoped_lock lock(m_Mutex);
while (!IsDataAvailable())
m_CV.wait(lock);
while (!IsDataAvailable() && !IsEof())
if (timeout < 0)
m_CV.wait(lock);
else
m_CV.timed_wait(lock, boost::posix_time::milliseconds(timeout * 1000));
return IsDataAvailable() || IsEof();
}
StreamReadStatus Stream::ReadLine(String *line, StreamReadContext& context, bool may_wait)
@ -132,6 +147,7 @@ bool StreamReadContext::FillFromStream(const Stream::Ptr& stream, bool may_wait)
void StreamReadContext::DropData(size_t count)
{
ASSERT(count <= Size);
memmove(Buffer, Buffer + count, Size - count);
Size -= count;
}

View File

@ -73,6 +73,17 @@ class I2_BASE_API Stream : public Object
public:
DECLARE_PTR_TYPEDEFS(Stream);
/**
* Reads data from the stream without removing it from the stream buffer.
*
* @param buffer The buffer where data should be stored. May be NULL if you're
* not actually interested in the data.
* @param count The number of bytes to read from the queue.
* @param allow_partial Whether to allow partial reads.
* @returns The number of bytes actually read.
*/
virtual size_t Peek(void *buffer, size_t count, bool allow_partial = false);
/**
* Reads data from the stream.
*
@ -93,6 +104,12 @@ public:
*/
virtual void Write(const void *buffer, size_t count) = 0;
/**
* Causes the stream to be closed (via Close()) once all pending data has been
* written.
*/
virtual void Shutdown(void);
/**
* Closes the stream and releases resources.
*/
@ -108,7 +125,7 @@ public:
/**
* Waits until data can be read from the stream.
*/
void WaitForData(void);
bool WaitForData(int timeout = -1);
virtual bool SupportsWaiting(void) const;

View File

@ -42,7 +42,7 @@ bool I2_EXPORT TlsStream::m_SSLIndexInitialized = false;
TlsStream::TlsStream(const Socket::Ptr& socket, const String& hostname, ConnectionRole role, const boost::shared_ptr<SSL_CTX>& sslContext)
: SocketEvents(socket, this), m_Eof(false), m_HandshakeOK(false), m_VerifyOK(true), m_ErrorCode(0),
m_ErrorOccurred(false), m_Socket(socket), m_Role(role), m_SendQ(new FIFO()), m_RecvQ(new FIFO()),
m_CurrentAction(TlsActionNone), m_Retry(false)
m_CurrentAction(TlsActionNone), m_Retry(false), m_Shutdown(false)
{
std::ostringstream msgbuf;
char errbuf[120];
@ -65,7 +65,7 @@ TlsStream::TlsStream(const Socket::Ptr& socket, const String& hostname, Connecti
SSL_set_ex_data(m_SSL.get(), m_SSLIndex, this);
SSL_set_verify(m_SSL.get(), SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, &TlsStream::ValidateCertificate);
SSL_set_verify(m_SSL.get(), SSL_VERIFY_PEER | SSL_VERIFY_CLIENT_ONCE, &TlsStream::ValidateCertificate);
socket->MakeNonBlocking();
@ -160,7 +160,7 @@ void TlsStream::OnEvent(int revents)
break;
case TlsActionWrite:
count = m_SendQ->Peek(buffer, sizeof(buffer));
count = m_SendQ->Peek(buffer, sizeof(buffer), true);
rc = SSL_write(m_SSL.get(), buffer, count);
@ -191,9 +191,12 @@ void TlsStream::OnEvent(int revents)
lock.unlock();
if (m_RecvQ->IsDataAvailable())
while (m_RecvQ->IsDataAvailable())
SignalDataAvailable();
if (m_Shutdown && !m_SendQ->IsDataAvailable())
Close();
return;
}
@ -232,6 +235,9 @@ void TlsStream::OnEvent(int revents)
m_ErrorCode = ERR_peek_error();
m_ErrorOccurred = true;
Log(LogWarning, "TlsStream")
<< "OpenSSL error: " << ERR_error_string(m_ErrorCode, NULL);
m_CV.notify_all();
break;
@ -263,6 +269,19 @@ void TlsStream::Handshake(void)
/**
* Processes data for the stream.
*/
size_t TlsStream::Peek(void *buffer, size_t count, bool allow_partial)
{
boost::mutex::scoped_lock lock(m_Mutex);
if (!allow_partial)
while (m_RecvQ->GetAvailableBytes() < count && !m_ErrorOccurred && !m_Eof)
m_CV.wait(lock);
HandleError();
return m_RecvQ->Peek(buffer, count, true);
}
size_t TlsStream::Read(void *buffer, size_t count, bool allow_partial)
{
boost::mutex::scoped_lock lock(m_Mutex);
@ -285,6 +304,11 @@ void TlsStream::Write(const void *buffer, size_t count)
ChangeEvents(POLLIN|POLLOUT);
}
void TlsStream::Shutdown(void)
{
m_Shutdown = true;
}
/**
* Closes the stream.
*/

View File

@ -57,7 +57,9 @@ public:
void Handshake(void);
virtual void Close(void);
virtual void Shutdown(void);
virtual size_t Peek(void *buffer, size_t count, bool allow_partial = false);
virtual size_t Read(void *buffer, size_t count, bool allow_partial = false);
virtual void Write(const void *buffer, size_t count);
@ -86,6 +88,7 @@ private:
TlsAction m_CurrentAction;
bool m_Retry;
bool m_Shutdown;
static int m_SSLIndex;
static bool m_SSLIndexInitialized;

View File

@ -85,6 +85,7 @@ boost::shared_ptr<SSL_CTX> MakeSSLContext(const String& pubkey, const String& pr
boost::shared_ptr<SSL_CTX> sslContext = boost::shared_ptr<SSL_CTX>(SSL_CTX_new(TLSv1_method()), SSL_CTX_free);
SSL_CTX_set_mode(sslContext.get(), SSL_MODE_ENABLE_PARTIAL_WRITE | SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER);
SSL_CTX_set_session_id_context(sslContext.get(), (const unsigned char *)"Icinga 2", 8);
if (!SSL_CTX_use_certificate_chain_file(sslContext.get(), pubkey.CStr())) {
Log(LogCritical, "SSL")

View File

@ -21,8 +21,10 @@ mkclass_target(endpoint.ti endpoint.tcpp endpoint.thpp)
mkclass_target(zone.ti zone.tcpp zone.thpp)
set(remote_SOURCES
apiclient.cpp apiclient-heartbeat.cpp apifunction.cpp apilistener.cpp apilistener.thpp apilistener-sync.cpp
apiuser.cpp apiuser.thpp authority.cpp endpoint.cpp endpoint.thpp jsonrpc.cpp
apifunction.cpp apilistener.cpp apilistener.thpp apilistener-sync.cpp
apiuser.cpp apiuser.thpp authority.cpp endpoint.cpp endpoint.thpp
httpchunkedencoding.cpp httpconnection.cpp httpdemohandler.cpp httphandler.cpp httprequest.cpp httpresponse.cpp
jsonrpc.cpp jsonrpcconnection.cpp jsonrpcconnection-heartbeat.cpp
messageorigin.cpp zone.cpp zone.thpp
)

View File

@ -157,7 +157,7 @@ void ApiListener::SyncZoneDirs(void) const
}
}
void ApiListener::SendConfigUpdate(const ApiClient::Ptr& aclient)
void ApiListener::SendConfigUpdate(const JsonRpcConnection::Ptr& aclient)
{
Endpoint::Ptr endpoint = aclient->GetEndpoint();
ASSERT(endpoint);

View File

@ -19,7 +19,7 @@
#include "remote/apilistener.hpp"
#include "remote/apilistener.tcpp"
#include "remote/apiclient.hpp"
#include "remote/jsonrpcconnection.hpp"
#include "remote/endpoint.hpp"
#include "remote/jsonrpc.hpp"
#include "base/convert.hpp"
@ -211,7 +211,8 @@ void ApiListener::ListenerThreadProc(const Socket::Ptr& server)
for (;;) {
try {
Socket::Ptr client = server->Accept();
Utility::QueueAsyncCallback(boost::bind(&ApiListener::NewClientHandler, this, client, String(), RoleServer), LowLatencyScheduler);
boost::thread thread(boost::bind(&ApiListener::NewClientHandler, this, client, String(), RoleServer));
thread.detach();
} catch (const std::exception&) {
Log(LogCritical, "ApiListener", "Cannot accept new connection.");
}
@ -239,7 +240,7 @@ void ApiListener::AddConnection(const Endpoint::Ptr& endpoint)
String host = endpoint->GetHost();
String port = endpoint->GetPort();
Log(LogInformation, "ApiClient")
Log(LogInformation, "JsonRpcConnection")
<< "Reconnecting to API endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'";
TcpSocket::Ptr client = new TcpSocket();
@ -284,56 +285,98 @@ void ApiListener::NewClientHandler(const Socket::Ptr& client, const String& host
try {
tlsStream->Handshake();
} catch (const std::exception&) {
Log(LogCritical, "ApiListener", "Client TLS handshake failed.");
} catch (const std::exception& ex) {
Log(LogCritical, "ApiListener", "Client TLS handshake failed");
return;
}
boost::shared_ptr<X509> cert = tlsStream->GetPeerCertificate();
String identity;
try {
identity = GetCertificateCN(cert);
} catch (const std::exception&) {
Log(LogCritical, "ApiListener")
<< "Cannot get certificate common name from cert path: '" << GetCertPath() << "'.";
return;
}
bool verify_ok = tlsStream->IsVerifyOK();
Log(LogInformation, "ApiListener")
<< "New client connection for identity '" << identity << "'" << (verify_ok ? "" : " (unauthenticated)");
Endpoint::Ptr endpoint;
bool verify_ok = false;
if (verify_ok)
endpoint = Endpoint::GetByName(identity);
if (cert) {
try {
identity = GetCertificateCN(cert);
} catch (const std::exception&) {
Log(LogCritical, "ApiListener")
<< "Cannot get certificate common name from cert path: '" << GetCertPath() << "'.";
return;
}
verify_ok = tlsStream->IsVerifyOK();
Log(LogInformation, "ApiListener")
<< "New client connection for identity '" << identity << "'" << (verify_ok ? "" : " (unauthenticated)");
if (verify_ok)
endpoint = Endpoint::GetByName(identity);
} else {
Log(LogInformation, "ApiListener")
<< "New client connection (no client certificate)";
}
bool need_sync = false;
if (endpoint)
need_sync = !endpoint->IsConnected();
ApiClient::Ptr aclient = new ApiClient(identity, verify_ok, tlsStream, role);
aclient->Start();
ClientType ctype;
if (endpoint) {
endpoint->AddClient(aclient);
if (role == RoleClient) {
Dictionary::Ptr message = new Dictionary();
message->Set("jsonrpc", "2.0");
message->Set("method", "icinga::Hello");
message->Set("params", new Dictionary());
JsonRpc::SendMessage(tlsStream, message);
ctype = ClientJsonRpc;
} else {
tlsStream->WaitForData(5);
if (need_sync) {
{
ObjectLock olock(endpoint);
endpoint->SetSyncing(true);
}
ReplayLog(aclient);
if (!tlsStream->IsDataAvailable()) {
Log(LogWarning, "ApiListener", "No data received on new API connection.");
return;
}
SendConfigUpdate(aclient);
} else
AddAnonymousClient(aclient);
char firstByte;
tlsStream->Peek(&firstByte, 1, false);
if (firstByte >= '0' && firstByte <= '9')
ctype = ClientJsonRpc;
else
ctype = ClientHttp;
}
if (ctype == ClientJsonRpc) {
Log(LogInformation, "ApiListener", "New JSON-RPC client");
JsonRpcConnection::Ptr aclient = new JsonRpcConnection(identity, verify_ok, tlsStream, role);
aclient->Start();
if (endpoint) {
endpoint->AddClient(aclient);
if (need_sync) {
{
ObjectLock olock(endpoint);
endpoint->SetSyncing(true);
}
ReplayLog(aclient);
}
SendConfigUpdate(aclient);
} else
AddAnonymousClient(aclient);
} else {
Log(LogInformation, "ApiListener", "New HTTP client");
HttpConnection::Ptr aclient = new HttpConnection(identity, verify_ok, tlsStream);
aclient->Start();
AddHttpClient(aclient);
}
}
void ApiListener::ApiTimerHandler(void)
@ -429,7 +472,7 @@ void ApiListener::ApiTimerHandler(void)
lmessage->Set("method", "log::SetLogPosition");
lmessage->Set("params", lparams);
BOOST_FOREACH(const ApiClient::Ptr& client, endpoint->GetClients())
BOOST_FOREACH(const JsonRpcConnection::Ptr& client, endpoint->GetClients())
client->SendMessage(lmessage);
Log(LogNotice, "ApiListener")
@ -495,7 +538,7 @@ void ApiListener::SyncSendMessage(const Endpoint::Ptr& endpoint, const Dictionar
Log(LogNotice, "ApiListener")
<< "Sending message to '" << endpoint->GetName() << "'";
BOOST_FOREACH(const ApiClient::Ptr& client, endpoint->GetClients())
BOOST_FOREACH(const JsonRpcConnection::Ptr& client, endpoint->GetClients())
client->SendMessage(message);
}
}
@ -635,7 +678,7 @@ void ApiListener::LogGlobHandler(std::vector<int>& files, const String& file)
files.push_back(ts);
}
void ApiListener::ReplayLog(const ApiClient::Ptr& client)
void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
{
Endpoint::Ptr endpoint = client->GetEndpoint();
@ -823,20 +866,38 @@ std::pair<Dictionary::Ptr, Dictionary::Ptr> ApiListener::GetStatus(void)
return std::make_pair(status, perfdata);
}
void ApiListener::AddAnonymousClient(const ApiClient::Ptr& aclient)
void ApiListener::AddAnonymousClient(const JsonRpcConnection::Ptr& aclient)
{
ObjectLock olock(this);
m_AnonymousClients.insert(aclient);
}
void ApiListener::RemoveAnonymousClient(const ApiClient::Ptr& aclient)
void ApiListener::RemoveAnonymousClient(const JsonRpcConnection::Ptr& aclient)
{
ObjectLock olock(this);
m_AnonymousClients.erase(aclient);
}
std::set<ApiClient::Ptr> ApiListener::GetAnonymousClients(void) const
std::set<JsonRpcConnection::Ptr> ApiListener::GetAnonymousClients(void) const
{
ObjectLock olock(this);
return m_AnonymousClients;
}
void ApiListener::AddHttpClient(const HttpConnection::Ptr& aclient)
{
ObjectLock olock(this);
m_HttpClients.insert(aclient);
}
void ApiListener::RemoveHttpClient(const HttpConnection::Ptr& aclient)
{
ObjectLock olock(this);
m_HttpClients.erase(aclient);
}
std::set<HttpConnection::Ptr> ApiListener::GetHttpClients(void) const
{
ObjectLock olock(this);
return m_HttpClients;
}

View File

@ -21,7 +21,8 @@
#define APILISTENER_H
#include "remote/apilistener.thpp"
#include "remote/apiclient.hpp"
#include "remote/jsonrpcconnection.hpp"
#include "remote/httpconnection.hpp"
#include "remote/endpoint.hpp"
#include "remote/messageorigin.hpp"
#include "base/dynamicobject.hpp"
@ -34,7 +35,7 @@
namespace icinga
{
class ApiClient;
class JsonRpcConnection;
/**
* @ingroup remote
@ -64,9 +65,13 @@ public:
static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
std::pair<Dictionary::Ptr, Dictionary::Ptr> GetStatus(void);
void AddAnonymousClient(const ApiClient::Ptr& aclient);
void RemoveAnonymousClient(const ApiClient::Ptr& aclient);
std::set<ApiClient::Ptr> GetAnonymousClients(void) const;
void AddAnonymousClient(const JsonRpcConnection::Ptr& aclient);
void RemoveAnonymousClient(const JsonRpcConnection::Ptr& aclient);
std::set<JsonRpcConnection::Ptr> GetAnonymousClients(void) const;
void AddHttpClient(const HttpConnection::Ptr& aclient);
void RemoveHttpClient(const HttpConnection::Ptr& aclient);
std::set<HttpConnection::Ptr> GetHttpClients(void) const;
static Value ConfigUpdateHandler(const MessageOrigin& origin, const Dictionary::Ptr& params);
@ -78,7 +83,8 @@ protected:
private:
boost::shared_ptr<SSL_CTX> m_SSLContext;
std::set<TcpSocket::Ptr> m_Servers;
std::set<ApiClient::Ptr> m_AnonymousClients;
std::set<JsonRpcConnection::Ptr> m_AnonymousClients;
std::set<HttpConnection::Ptr> m_HttpClients;
Timer::Ptr m_Timer;
void ApiTimerHandler(void);
@ -102,7 +108,7 @@ private:
void RotateLogFile(void);
void CloseLogFile(void);
static void LogGlobHandler(std::vector<int>& files, const String& file);
void ReplayLog(const ApiClient::Ptr& client);
void ReplayLog(const JsonRpcConnection::Ptr& client);
static Dictionary::Ptr LoadConfigDir(const String& dir);
static bool UpdateConfigDir(const Dictionary::Ptr& oldConfig, const Dictionary::Ptr& newConfig, const String& configDir, bool authoritative);
@ -112,7 +118,7 @@ private:
static bool IsConfigMaster(const Zone::Ptr& zone);
static void ConfigGlobHandler(Dictionary::Ptr& config, const String& path, const String& file);
void SendConfigUpdate(const ApiClient::Ptr& aclient);
void SendConfigUpdate(const JsonRpcConnection::Ptr& aclient);
};
}

View File

@ -20,7 +20,7 @@
#include "remote/endpoint.hpp"
#include "remote/endpoint.tcpp"
#include "remote/apilistener.hpp"
#include "remote/apiclient.hpp"
#include "remote/jsonrpcconnection.hpp"
#include "remote/zone.hpp"
#include "base/dynamictype.hpp"
#include "base/utility.hpp"
@ -32,8 +32,8 @@ using namespace icinga;
REGISTER_TYPE(Endpoint);
boost::signals2::signal<void(const Endpoint::Ptr&, const ApiClient::Ptr&)> Endpoint::OnConnected;
boost::signals2::signal<void(const Endpoint::Ptr&, const ApiClient::Ptr&)> Endpoint::OnDisconnected;
boost::signals2::signal<void(const Endpoint::Ptr&, const JsonRpcConnection::Ptr&)> Endpoint::OnConnected;
boost::signals2::signal<void(const Endpoint::Ptr&, const JsonRpcConnection::Ptr&)> Endpoint::OnDisconnected;
void Endpoint::OnAllConfigLoaded(void)
{
@ -57,7 +57,7 @@ void Endpoint::OnAllConfigLoaded(void)
BOOST_THROW_EXCEPTION(ScriptError("Endpoint '" + GetName() + "' does not belong to a zone.", GetDebugInfo()));
}
void Endpoint::AddClient(const ApiClient::Ptr& client)
void Endpoint::AddClient(const JsonRpcConnection::Ptr& client)
{
bool was_master = ApiListener::GetInstance()->IsMaster();
@ -74,7 +74,7 @@ void Endpoint::AddClient(const ApiClient::Ptr& client)
OnConnected(this, client);
}
void Endpoint::RemoveClient(const ApiClient::Ptr& client)
void Endpoint::RemoveClient(const JsonRpcConnection::Ptr& client)
{
bool was_master = ApiListener::GetInstance()->IsMaster();
@ -96,7 +96,7 @@ void Endpoint::RemoveClient(const ApiClient::Ptr& client)
OnDisconnected(this, client);
}
std::set<ApiClient::Ptr> Endpoint::GetClients(void) const
std::set<JsonRpcConnection::Ptr> Endpoint::GetClients(void) const
{
boost::mutex::scoped_lock lock(m_ClientsLock);
return m_Clients;

View File

@ -27,7 +27,7 @@
namespace icinga
{
class ApiClient;
class JsonRpcConnection;
class Zone;
/**
@ -41,12 +41,12 @@ public:
DECLARE_OBJECT(Endpoint);
DECLARE_OBJECTNAME(Endpoint);
static boost::signals2::signal<void(const Endpoint::Ptr&, const intrusive_ptr<ApiClient>&)> OnConnected;
static boost::signals2::signal<void(const Endpoint::Ptr&, const intrusive_ptr<ApiClient>&)> OnDisconnected;
static boost::signals2::signal<void(const Endpoint::Ptr&, const intrusive_ptr<JsonRpcConnection>&)> OnConnected;
static boost::signals2::signal<void(const Endpoint::Ptr&, const intrusive_ptr<JsonRpcConnection>&)> OnDisconnected;
void AddClient(const intrusive_ptr<ApiClient>& client);
void RemoveClient(const intrusive_ptr<ApiClient>& client);
std::set<intrusive_ptr<ApiClient> > GetClients(void) const;
void AddClient(const intrusive_ptr<JsonRpcConnection>& client);
void RemoveClient(const intrusive_ptr<JsonRpcConnection>& client);
std::set<intrusive_ptr<JsonRpcConnection> > GetClients(void) const;
intrusive_ptr<Zone> GetZone(void) const;
@ -59,7 +59,7 @@ protected:
private:
mutable boost::mutex m_ClientsLock;
std::set<intrusive_ptr<ApiClient> > m_Clients;
std::set<intrusive_ptr<JsonRpcConnection> > m_Clients;
intrusive_ptr<Zone> m_Zone;
};

View File

@ -0,0 +1,79 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "remote/httpchunkedencoding.hpp"
#include <sstream>
using namespace icinga;
StreamReadStatus HttpChunkedEncoding::ReadChunkFromStream(const Stream::Ptr& stream,
char **data, size_t *size, ChunkReadContext& context, bool may_wait)
{
if (context.LengthIndicator == -1) {
String line;
StreamReadStatus status = stream->ReadLine(&line, context.StreamContext, may_wait);
if (status != StatusNewItem)
return status;
std::stringstream msgbuf;
msgbuf << std::hex << line;
msgbuf >> context.LengthIndicator;
return StatusNeedData;
} else {
StreamReadContext& scontext = context.StreamContext;
if (scontext.Eof)
return StatusEof;
if (scontext.MustRead) {
if (!scontext.FillFromStream(stream, may_wait)) {
scontext.Eof = true;
return StatusEof;
}
scontext.MustRead = false;
}
if (scontext.Size < context.LengthIndicator) {
scontext.MustRead = true;
return StatusNeedData;
}
*data = new char[context.LengthIndicator];
*size = context.LengthIndicator;
memcpy(data, scontext.Buffer, context.LengthIndicator);
scontext.DropData(context.LengthIndicator);
context.LengthIndicator = -1;
return StatusNewItem;
}
}
void HttpChunkedEncoding::WriteChunkToStream(const Stream::Ptr& stream, const char *data, size_t count)
{
std::ostringstream msgbuf;
msgbuf << std::hex << count << "\r\n";
String lengthIndicator = msgbuf.str();
stream->Write(lengthIndicator.CStr(), lengthIndicator.GetLength());
stream->Write(data, count);
if (count > 0)
stream->Write("\r\n", 2);
}

View File

@ -0,0 +1,54 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef HTTPCHUNKEDENCODING_H
#define HTTPCHUNKEDENCODING_H
#include "remote/i2-remote.hpp"
#include "base/stream.hpp"
namespace icinga
{
struct ChunkReadContext
{
StreamReadContext& StreamContext;
int LengthIndicator;
ChunkReadContext(StreamReadContext& scontext)
: StreamContext(scontext), LengthIndicator(-1)
{ }
};
/**
* HTTP chunked encoding.
*
* @ingroup remote
*/
struct I2_REMOTE_API HttpChunkedEncoding
{
static StreamReadStatus ReadChunkFromStream(const Stream::Ptr& stream,
char **data, size_t *size, ChunkReadContext& ccontext, bool may_wait = false);
static void WriteChunkToStream(const Stream::Ptr& stream, const char *data, size_t count);
};
}
#endif /* HTTPCHUNKEDENCODING_H */

View File

@ -0,0 +1,157 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "remote/httpconnection.hpp"
#include "remote/httphandler.hpp"
#include "remote/apilistener.hpp"
#include "remote/apifunction.hpp"
#include "remote/jsonrpc.hpp"
#include "base/dynamictype.hpp"
#include "base/objectlock.hpp"
#include "base/utility.hpp"
#include "base/logger.hpp"
#include "base/exception.hpp"
#include "base/convert.hpp"
#include <boost/thread/once.hpp>
using namespace icinga;
static boost::once_flag l_HttpConnectionOnceFlag = BOOST_ONCE_INIT;
static Timer::Ptr l_HttpConnectionTimeoutTimer;
HttpConnection::HttpConnection(const String& identity, bool authenticated, const TlsStream::Ptr& stream)
: m_Stream(stream), m_Seen(Utility::GetTime()),
m_CurrentRequest(m_Context), m_PendingRequests(0)
{
boost::call_once(l_HttpConnectionOnceFlag, &HttpConnection::StaticInitialize);
if (authenticated)
m_ApiUser = ApiUser::GetByName(identity);
}
void HttpConnection::StaticInitialize(void)
{
l_HttpConnectionTimeoutTimer = new Timer();
l_HttpConnectionTimeoutTimer->OnTimerExpired.connect(boost::bind(&HttpConnection::TimeoutTimerHandler));
l_HttpConnectionTimeoutTimer->SetInterval(15);
l_HttpConnectionTimeoutTimer->Start();
}
void HttpConnection::Start(void)
{
m_Stream->RegisterDataHandler(boost::bind(&HttpConnection::DataAvailableHandler, this));
if (m_Stream->IsDataAvailable())
DataAvailableHandler();
}
ApiUser::Ptr HttpConnection::GetApiUser(void) const
{
return m_ApiUser;
}
TlsStream::Ptr HttpConnection::GetStream(void) const
{
return m_Stream;
}
void HttpConnection::Disconnect(void)
{
Log(LogDebug, "HttpConnection", "Http client disconnected");
ApiListener::Ptr listener = ApiListener::GetInstance();
listener->RemoveHttpClient(this);
m_Stream->Shutdown();
}
bool HttpConnection::ProcessMessage(void)
{
bool res;
try {
res = m_CurrentRequest.Parse(m_Stream, m_Context, false);
} catch (const std::exception& ex) {
HttpResponse response(m_Stream, m_CurrentRequest);
response.SetStatus(400, "Bad request");
String msg = "<h1>Bad request</h1>";
response.WriteBody(msg.CStr(), msg.GetLength());
response.FinishBody();
m_Stream->Shutdown();
return false;
}
if (m_CurrentRequest.Complete) {
m_RequestQueue.Enqueue(boost::bind(&HttpConnection::ProcessMessageAsync, HttpConnection::Ptr(this), m_CurrentRequest));
m_Seen = Utility::GetTime();
m_PendingRequests++;
m_CurrentRequest.~HttpRequest();
new (&m_CurrentRequest) HttpRequest(m_Context);
return true;
}
return res;
}
void HttpConnection::ProcessMessageAsync(HttpRequest& request)
{
Log(LogInformation, "HttpConnection", "Processing Http message");
HttpResponse response(m_Stream, request);
HttpHandler::ProcessRequest(request, response);
response.Finish();
m_PendingRequests--;
}
void HttpConnection::DataAvailableHandler(void)
{
boost::mutex::scoped_lock lock(m_DataHandlerMutex);
try {
while (ProcessMessage())
; /* empty loop body */
} catch (const std::exception& ex) {
Log(LogWarning, "HttpConnection")
<< "Error while reading Http request: " << DiagnosticInformation(ex);
Disconnect();
}
}
void HttpConnection::CheckLiveness(void)
{
if (m_Seen < Utility::GetTime() - 10 && m_PendingRequests == 0) {
Log(LogInformation, "HttpConnection")
<< "No messages for Http connection have been received in the last 10 seconds.";
Disconnect();
}
}
void HttpConnection::TimeoutTimerHandler(void)
{
ApiListener::Ptr listener = ApiListener::GetInstance();
BOOST_FOREACH(const HttpConnection::Ptr& client, listener->GetHttpClients()) {
client->CheckLiveness();
}
}

View File

@ -0,0 +1,75 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef HTTPCONNECTION_H
#define HTTPCONNECTION_H
#include "remote/httprequest.hpp"
#include "remote/apiuser.hpp"
#include "base/tlsstream.hpp"
#include "base/timer.hpp"
#include "base/workqueue.hpp"
namespace icinga
{
/**
* An API client connection.
*
* @ingroup remote
*/
class I2_REMOTE_API HttpConnection : public Object
{
public:
DECLARE_PTR_TYPEDEFS(HttpConnection);
HttpConnection(const String& identity, bool authenticated, const TlsStream::Ptr& stream);
void Start(void);
ApiUser::Ptr GetApiUser(void) const;
bool IsAuthenticated(void) const;
TlsStream::Ptr GetStream(void) const;
void Disconnect(void);
private:
ApiUser::Ptr m_ApiUser;
TlsStream::Ptr m_Stream;
double m_Seen;
HttpRequest m_CurrentRequest;
boost::mutex m_DataHandlerMutex;
WorkQueue m_RequestQueue;
int m_PendingRequests;
StreamReadContext m_Context;
bool ProcessMessage(void);
void DataAvailableHandler(void);
static void StaticInitialize(void);
static void TimeoutTimerHandler(void);
void CheckLiveness(void);
void ProcessMessageAsync(HttpRequest& request);
};
}
#endif /* HTTPCONNECTION_H */

View File

@ -0,0 +1,46 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "remote/httpdemohandler.hpp"
using namespace icinga;
REGISTER_URLHANDLER("/demo", HttpDemoHandler);
void HttpDemoHandler::HandleRequest(HttpRequest& request, HttpResponse& response)
{
if (request.RequestMethod == "GET") {
String form = "<form action=\"/demo\" method=\"post\"><input type=\"text\" name=\"msg\"><input type=\"submit\"></form>";
response.SetStatus(200, "OK");
response.AddHeader("Content-Type", "text/html");
response.WriteBody(form.CStr(), form.GetLength());
} else if (request.RequestMethod == "POST") {
response.SetStatus(200, "OK");
String msg = "You sent: ";
char buffer[512];
size_t count;
while ((count = request.ReadBody(buffer, sizeof(buffer))) > 0)
msg += String(buffer, buffer + count);
response.WriteBody(msg.CStr(), msg.GetLength());
} else {
response.SetStatus(400, "Bad request");
}
}

View File

@ -0,0 +1,38 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef HTTPDEMOHANDLER_H
#define HTTPDEMOHANDLER_H
#include "remote/httphandler.hpp"
namespace icinga
{
class I2_REMOTE_API HttpDemoHandler : public HttpHandler
{
public:
DECLARE_PTR_TYPEDEFS(HttpDemoHandler);
virtual void HandleRequest(HttpRequest& request, HttpResponse& response);
};
}
#endif /* HTTPDEMOHANDLER_H */

View File

@ -0,0 +1,98 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "remote/httphandler.hpp"
#include "base/singleton.hpp"
using namespace icinga;
Dictionary::Ptr HttpHandler::m_UrlTree;
void HttpHandler::Register(const Url::Ptr& url, const HttpHandler::Ptr& handler)
{
if (!m_UrlTree)
m_UrlTree = new Dictionary();
Dictionary::Ptr node = m_UrlTree;
BOOST_FOREACH(const String& elem, url->GetPath()) {
Dictionary::Ptr children = node->Get("children");
if (!children) {
children = new Dictionary();
node->Set("children", children);
}
Dictionary::Ptr sub_node = new Dictionary();
children->Set(elem, sub_node);
node = sub_node;
}
node->Set("handler", handler);
}
bool HttpHandler::CanAlsoHandleUrl(const Url::Ptr& url) const
{
return false;
}
void HttpHandler::ProcessRequest(HttpRequest& request, HttpResponse& response)
{
Dictionary::Ptr node = m_UrlTree;
HttpHandler::Ptr current_handler, handler;
bool exact_match = true;
BOOST_FOREACH(const String& elem, request.Url->GetPath()) {
current_handler = node->Get("handler");
if (current_handler)
handler = current_handler;
Dictionary::Ptr children = node->Get("children");
if (!children) {
exact_match = false;
node.reset();
break;
}
node = children->Get(elem);
if (!node) {
exact_match = false;
break;
}
}
if (node) {
current_handler = node->Get("handler");
if (current_handler)
handler = current_handler;
}
if (!handler || (!exact_match && !handler->CanAlsoHandleUrl(request.Url))) {
response.SetStatus(404, "Not found");
String msg = "<h1>Not found</h1>";
response.WriteBody(msg.CStr(), msg.GetLength());
response.FinishBody();
return;
}
handler->HandleRequest(request, response);
}

View File

@ -0,0 +1,76 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef HTTPHANDLER_H
#define HTTPHANDLER_H
#include "remote/i2-remote.hpp"
#include "remote/httpresponse.hpp"
#include "base/registry.hpp"
#include <vector>
#include <boost/function.hpp>
namespace icinga
{
/**
* HTTP handler.
*
* @ingroup remote
*/
class I2_REMOTE_API HttpHandler : public Object
{
public:
DECLARE_PTR_TYPEDEFS(HttpHandler);
virtual bool CanAlsoHandleUrl(const Url::Ptr& url) const;
virtual void HandleRequest(HttpRequest& request, HttpResponse& response) = 0;
static void Register(const Url::Ptr& url, const HttpHandler::Ptr& handler);
static void ProcessRequest(HttpRequest& request, HttpResponse& response);
private:
static Dictionary::Ptr m_UrlTree;
};
/**
* Helper class for registering HTTP handlers.
*
* @ingroup remote
*/
class I2_REMOTE_API RegisterHttpHandler
{
public:
RegisterHttpHandler(const String& url, const HttpHandler& function);
};
#define REGISTER_URLHANDLER(url, klass) \
namespace { namespace UNIQUE_NAME(apif) { namespace apif ## name { \
void RegisterHandler(void) \
{ \
Url::Ptr uurl = new Url(url); \
HttpHandler::Ptr handler = new klass(); \
HttpHandler::Register(uurl, handler); \
} \
INITIALIZE_ONCE(RegisterHandler); \
} } }
}
#endif /* HTTPHANDLER_H */

154
lib/remote/httprequest.cpp Normal file
View File

@ -0,0 +1,154 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "remote/httprequest.hpp"
#include "base/logger.hpp"
#include "base/convert.hpp"
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/classification.hpp>
using namespace icinga;
HttpRequest::HttpRequest(StreamReadContext& src)
: m_State(HttpRequestStart), m_Context(src),
m_ChunkContext(m_Context),
ProtocolVersion(HttpVersion10),
Complete(false),
Headers(new Dictionary())
{ }
bool HttpRequest::Parse(const Stream::Ptr& stream, StreamReadContext& src, bool may_wait)
{
if (m_State != HttpRequestBody) {
String line;
StreamReadStatus srs = stream->ReadLine(&line, src, may_wait);
if (srs != StatusNewItem)
return false;
if (m_State == HttpRequestStart) {
/* ignore trailing new-lines */
if (line == "")
return true;
std::vector<String> tokens;
boost::algorithm::split(tokens, line, boost::is_any_of(" "));
Log(LogWarning, "HttpRequest")
<< "line: " << line << ", tokens: " << tokens.size();
if (tokens.size() != 3)
BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid HTTP request"));
RequestMethod = tokens[0];
Url = new class Url(tokens[1]);
if (tokens[2] == "HTTP/1.0")
ProtocolVersion = HttpVersion10;
else if (tokens[2] == "HTTP/1.1") {
ProtocolVersion = HttpVersion11;
} else
BOOST_THROW_EXCEPTION(std::invalid_argument("Unsupported HTTP version"));
m_State = HttpRequestHeaders;
Log(LogWarning, "HttpRequest")
<< "Method: " << RequestMethod << ", Url: " << Url;
} else if (m_State == HttpRequestHeaders) {
if (line == "") {
m_State = HttpRequestBody;
/* we're done if the request doesn't contain a message body */
if (!Headers->Contains("content-length") && !Headers->Contains("transfer-encoding"))
Complete = true;
else
m_Body = new FIFO();
Log(LogWarning, "HttpRequest", "Waiting for message body");
return true;
} else {
String::SizeType pos = line.FindFirstOf(":");
if (pos == String::NPos)
BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid HTTP request"));
String key = line.SubStr(0, pos);
boost::algorithm::to_lower(key);
key.Trim();
String value = line.SubStr(pos + 1);
value.Trim();
Headers->Set(key, value);
}
} else {
VERIFY(!"Invalid HTTP request state.");
}
} else if (m_State == HttpRequestBody) {
if (Headers->Get("transfer-encoding") == "chunked") {
char *data;
size_t size;
StreamReadStatus srs = HttpChunkedEncoding::ReadChunkFromStream(stream, &data, &size, m_ChunkContext, false);
if (srs != StatusNewItem)
return false;
Log(LogInformation, "HttpRequest")
<< "Read " << size << " bytes";
m_Body->Write(data, size);
delete [] data;
if (size == 0) {
Complete = true;
return true;
}
} else {
if (m_Context.Eof)
BOOST_THROW_EXCEPTION(std::invalid_argument("Unexpected EOF in HTTP body"));
if (m_Context.MustRead) {
if (!m_Context.FillFromStream(stream, false)) {
m_Context.Eof = true;
BOOST_THROW_EXCEPTION(std::invalid_argument("Unexpected EOF in HTTP body"));
}
m_Context.MustRead = false;
}
size_t length_indicator = Convert::ToLong(Headers->Get("content-length"));
if (m_Context.Size < length_indicator) {
m_Context.MustRead = true;
return false;
}
m_Body->Write(m_Context.Buffer, length_indicator);
m_Context.DropData(length_indicator);
Complete = true;
return true;
}
}
return true;
}
size_t HttpRequest::ReadBody(char *data, size_t count)
{
if (!m_Body)
return 0;
else
return m_Body->Read(data, count, true);
}

View File

@ -0,0 +1,77 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef HTTPREQUEST_H
#define HTTPREQUEST_H
#include "remote/i2-remote.hpp"
#include "remote/httpchunkedencoding.hpp"
#include "base/stream.hpp"
#include "base/fifo.hpp"
#include "base/dictionary.hpp"
#include "base/url.hpp"
namespace icinga
{
enum HttpVersion
{
HttpVersion10,
HttpVersion11
};
enum HttpRequestState
{
HttpRequestStart,
HttpRequestHeaders,
HttpRequestBody
};
/**
* An HTTP request.
*
* @ingroup remote
*/
struct I2_REMOTE_API HttpRequest
{
public:
bool Complete;
String RequestMethod;
Url::Ptr Url;
HttpVersion ProtocolVersion;
Dictionary::Ptr Headers;
HttpRequest(StreamReadContext& ctx);
bool Parse(const Stream::Ptr& stream, StreamReadContext& src, bool may_wait);
size_t ReadBody(char *data, size_t count);
private:
StreamReadContext& m_Context;
ChunkReadContext m_ChunkContext;
HttpRequestState m_State;
FIFO::Ptr m_Body;
};
}
#endif /* HTTPREQUEST_H */

107
lib/remote/httpresponse.cpp Normal file
View File

@ -0,0 +1,107 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "remote/httpresponse.hpp"
#include "remote/httpchunkedencoding.hpp"
#include "base/logger.hpp"
#include "base/application.hpp"
#include "base/convert.hpp"
using namespace icinga;
HttpResponse::HttpResponse(const Stream::Ptr& stream, const HttpRequest& request)
: m_Stream(stream), m_Request(request), m_State(HttpResponseStart)
{ }
void HttpResponse::SetStatus(int code, const String& message)
{
ASSERT(m_State == HttpResponseStart);
ASSERT(code >= 100 && code <= 599);
ASSERT(!message.IsEmpty());
String status = "HTTP/";
if (m_Request.ProtocolVersion == HttpVersion10)
status += "1.0";
else
status += "1.1";
status += " " + Convert::ToString(code) + " " + message + "\r\n";
m_Stream->Write(status.CStr(), status.GetLength());
m_State = HttpResponseHeaders;
}
void HttpResponse::AddHeader(const String& key, const String& value)
{
ASSERT(m_State = HttpResponseHeaders);
String header = key + ": " + value + "\r\n";
m_Stream->Write(header.CStr(), header.GetLength());
}
void HttpResponse::FinishHeaders(void)
{
if (m_State == HttpResponseHeaders) {
if (m_Request.ProtocolVersion == HttpVersion11)
AddHeader("Transfer-Encoding", "chunked");
AddHeader("Server", "Icinga/" + Application::GetVersion());
m_Stream->Write("\r\n", 2);
m_State = HttpResponseBody;
}
}
void HttpResponse::WriteBody(const char *data, size_t count)
{
ASSERT(m_State == HttpResponseHeaders || m_State == HttpResponseBody);
if (m_Request.ProtocolVersion == HttpVersion10) {
if (!m_Body)
m_Body = new FIFO();
m_Body->Write(data, count);
} else {
FinishHeaders();
HttpChunkedEncoding::WriteChunkToStream(m_Stream, data, count);
}
}
void HttpResponse::Finish(void)
{
if (m_Request.ProtocolVersion == HttpVersion10) {
if (m_Body)
AddHeader("Content-Length", Convert::ToString(m_Body->GetAvailableBytes()));
FinishHeaders();
while (m_Body && m_Body->IsDataAvailable()) {
char buffer[1024];
size_t rc = m_Body->Read(buffer, sizeof(buffer), true);
m_Stream->Write(buffer, rc);
}
} else {
WriteBody(NULL, 0);
m_Stream->Write("\r\n", 2);
}
if (m_Request.ProtocolVersion == HttpVersion10 || m_Request.Headers->Get("connection") == "close")
m_Stream->Shutdown();
}

View File

@ -0,0 +1,63 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef HTTPRESPONSE_H
#define HTTPRESPONSE_H
#include "remote/httprequest.hpp"
#include "base/stream.hpp"
#include "base/fifo.hpp"
namespace icinga
{
enum HttpResponseState
{
HttpResponseStart,
HttpResponseHeaders,
HttpResponseBody
};
/**
* An HTTP response.
*
* @ingroup remote
*/
struct I2_REMOTE_API HttpResponse
{
public:
HttpResponse(const Stream::Ptr& stream, const HttpRequest& request);
void SetStatus(int code, const String& message);
void AddHeader(const String& key, const String& value);
void WriteBody(const char *data, size_t count);
void Finish(void);
private:
HttpResponseState m_State;
const HttpRequest& m_Request;
Stream::Ptr m_Stream;
FIFO::Ptr m_Body;
void FinishHeaders(void);
};
}
#endif /* HTTPRESPONSE_H */

View File

@ -36,10 +36,10 @@ void JsonRpc::SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& mess
NetString::WriteStringToStream(stream, json);
}
StreamReadStatus JsonRpc::ReadMessage(const Stream::Ptr& stream, Dictionary::Ptr *message, StreamReadContext& src)
StreamReadStatus JsonRpc::ReadMessage(const Stream::Ptr& stream, Dictionary::Ptr *message, StreamReadContext& src, bool may_wait)
{
String jsonString;
StreamReadStatus srs = NetString::ReadStringFromStream(stream, &jsonString, src);
StreamReadStatus srs = NetString::ReadStringFromStream(stream, &jsonString, src, may_wait);
if (srs != StatusNewItem)
return srs;

View File

@ -36,7 +36,7 @@ class I2_REMOTE_API JsonRpc
{
public:
static void SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& message);
static StreamReadStatus ReadMessage(const Stream::Ptr& stream, Dictionary::Ptr *message, StreamReadContext& src);
static StreamReadStatus ReadMessage(const Stream::Ptr& stream, Dictionary::Ptr *message, StreamReadContext& src, bool may_wait = false);
private:
JsonRpc(void);

View File

@ -17,7 +17,7 @@
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "remote/apiclient.hpp"
#include "remote/jsonrpcconnection.hpp"
#include "remote/messageorigin.hpp"
#include "remote/apifunction.hpp"
#include "base/initialize.hpp"
@ -28,32 +28,32 @@
using namespace icinga;
REGISTER_APIFUNCTION(Heartbeat, event, &ApiClient::HeartbeatAPIHandler);
REGISTER_APIFUNCTION(Heartbeat, event, &JsonRpcConnection::HeartbeatAPIHandler);
static Timer::Ptr l_HeartbeatTimer;
static void StartHeartbeatTimer(void)
{
l_HeartbeatTimer = new Timer();
l_HeartbeatTimer->OnTimerExpired.connect(boost::bind(&ApiClient::HeartbeatTimerHandler));
l_HeartbeatTimer->OnTimerExpired.connect(boost::bind(&JsonRpcConnection::HeartbeatTimerHandler));
l_HeartbeatTimer->SetInterval(10);
l_HeartbeatTimer->Start();
}
INITIALIZE_ONCE(StartHeartbeatTimer);
void ApiClient::HeartbeatTimerHandler(void)
void JsonRpcConnection::HeartbeatTimerHandler(void)
{
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjectsByType<Endpoint>()) {
BOOST_FOREACH(const ApiClient::Ptr& client, endpoint->GetClients()) {
BOOST_FOREACH(const JsonRpcConnection::Ptr& client, endpoint->GetClients()) {
if (endpoint->GetSyncing()) {
Log(LogInformation, "ApiClient")
Log(LogInformation, "JsonRpcConnection")
<< "Not sending heartbeat for endpoint '" << endpoint->GetName() << "' because we're replaying the log for it.";
continue;
}
if (client->m_NextHeartbeat != 0 && client->m_NextHeartbeat < Utility::GetTime()) {
Log(LogWarning, "ApiClient")
Log(LogWarning, "JsonRpcConnection")
<< "Client for endpoint '" << endpoint->GetName() << "' has requested "
<< "heartbeat message but hasn't responded in time. Closing connection.";
@ -75,7 +75,7 @@ void ApiClient::HeartbeatTimerHandler(void)
}
}
Value ApiClient::HeartbeatAPIHandler(const MessageOrigin& origin, const Dictionary::Ptr& params)
Value JsonRpcConnection::HeartbeatAPIHandler(const MessageOrigin& origin, const Dictionary::Ptr& params)
{
Value vtimeout = params->Get("timeout");

View File

@ -17,7 +17,7 @@
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "remote/apiclient.hpp"
#include "remote/jsonrpcconnection.hpp"
#include "remote/apilistener.hpp"
#include "remote/apifunction.hpp"
#include "remote/jsonrpc.hpp"
@ -35,65 +35,65 @@ REGISTER_APIFUNCTION(SetLogPosition, log, &SetLogPositionHandler);
static Value RequestCertificateHandler(const MessageOrigin& origin, const Dictionary::Ptr& params);
REGISTER_APIFUNCTION(RequestCertificate, pki, &RequestCertificateHandler);
static boost::once_flag l_ApiClientOnceFlag = BOOST_ONCE_INIT;
static Timer::Ptr l_ApiClientTimeoutTimer;
static boost::once_flag l_JsonRpcConnectionOnceFlag = BOOST_ONCE_INIT;
static Timer::Ptr l_JsonRpcConnectionTimeoutTimer;
ApiClient::ApiClient(const String& identity, bool authenticated, const TlsStream::Ptr& stream, ConnectionRole role)
JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated, const TlsStream::Ptr& stream, ConnectionRole role)
: m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), m_Role(role), m_Seen(Utility::GetTime()),
m_NextHeartbeat(0), m_HeartbeatTimeout(0)
{
boost::call_once(l_ApiClientOnceFlag, &ApiClient::StaticInitialize);
boost::call_once(l_JsonRpcConnectionOnceFlag, &JsonRpcConnection::StaticInitialize);
if (authenticated)
m_Endpoint = Endpoint::GetByName(identity);
}
void ApiClient::StaticInitialize(void)
void JsonRpcConnection::StaticInitialize(void)
{
l_ApiClientTimeoutTimer = new Timer();
l_ApiClientTimeoutTimer->OnTimerExpired.connect(boost::bind(&ApiClient::TimeoutTimerHandler));
l_ApiClientTimeoutTimer->SetInterval(15);
l_ApiClientTimeoutTimer->Start();
l_JsonRpcConnectionTimeoutTimer = new Timer();
l_JsonRpcConnectionTimeoutTimer->OnTimerExpired.connect(boost::bind(&JsonRpcConnection::TimeoutTimerHandler));
l_JsonRpcConnectionTimeoutTimer->SetInterval(15);
l_JsonRpcConnectionTimeoutTimer->Start();
}
void ApiClient::Start(void)
void JsonRpcConnection::Start(void)
{
m_Stream->RegisterDataHandler(boost::bind(&ApiClient::DataAvailableHandler, this));
m_Stream->RegisterDataHandler(boost::bind(&JsonRpcConnection::DataAvailableHandler, this));
if (m_Stream->IsDataAvailable())
DataAvailableHandler();
}
String ApiClient::GetIdentity(void) const
String JsonRpcConnection::GetIdentity(void) const
{
return m_Identity;
}
bool ApiClient::IsAuthenticated(void) const
bool JsonRpcConnection::IsAuthenticated(void) const
{
return m_Authenticated;
}
Endpoint::Ptr ApiClient::GetEndpoint(void) const
Endpoint::Ptr JsonRpcConnection::GetEndpoint(void) const
{
return m_Endpoint;
}
TlsStream::Ptr ApiClient::GetStream(void) const
TlsStream::Ptr JsonRpcConnection::GetStream(void) const
{
return m_Stream;
}
ConnectionRole ApiClient::GetRole(void) const
ConnectionRole JsonRpcConnection::GetRole(void) const
{
return m_Role;
}
void ApiClient::SendMessage(const Dictionary::Ptr& message)
void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message)
{
m_WriteQueue.Enqueue(boost::bind(&ApiClient::SendMessageSync, ApiClient::Ptr(this), message));
m_WriteQueue.Enqueue(boost::bind(&JsonRpcConnection::SendMessageSync, JsonRpcConnection::Ptr(this), message));
}
void ApiClient::SendMessageSync(const Dictionary::Ptr& message)
void JsonRpcConnection::SendMessageSync(const Dictionary::Ptr& message)
{
try {
ObjectLock olock(m_Stream);
@ -103,18 +103,18 @@ void ApiClient::SendMessageSync(const Dictionary::Ptr& message)
} catch (const std::exception& ex) {
std::ostringstream info;
info << "Error while sending JSON-RPC message for identity '" << m_Identity << "'";
Log(LogWarning, "ApiClient")
Log(LogWarning, "JsonRpcConnection")
<< info.str();
Log(LogDebug, "ApiClient")
Log(LogDebug, "JsonRpcConnection")
<< info.str() << "\n" << DiagnosticInformation(ex);
Disconnect();
}
}
void ApiClient::Disconnect(void)
void JsonRpcConnection::Disconnect(void)
{
Log(LogWarning, "ApiClient")
Log(LogWarning, "JsonRpcConnection")
<< "API client disconnected for identity '" << m_Identity << "'";
if (m_Endpoint)
@ -127,11 +127,11 @@ void ApiClient::Disconnect(void)
m_Stream->Close();
}
bool ApiClient::ProcessMessage(void)
bool JsonRpcConnection::ProcessMessage(void)
{
Dictionary::Ptr message;
StreamReadStatus srs = JsonRpc::ReadMessage(m_Stream, &message, m_Context);
StreamReadStatus srs = JsonRpc::ReadMessage(m_Stream, &message, m_Context, false);
if (srs != StatusNewItem)
return false;
@ -163,7 +163,7 @@ bool ApiClient::ProcessMessage(void)
String method = message->Get("method");
Log(LogNotice, "ApiClient")
Log(LogNotice, "JsonRpcConnection")
<< "Received '" << method << "' message from '" << m_Identity << "'";
Dictionary::Ptr resultMessage = new Dictionary();
@ -180,9 +180,9 @@ bool ApiClient::ProcessMessage(void)
resultMessage->Set("error", DiagnosticInformation(ex));
std::ostringstream info;
info << "Error while processing message for identity '" << m_Identity << "'";
Log(LogWarning, "ApiClient")
Log(LogWarning, "JsonRpcConnection")
<< info.str();
Log(LogDebug, "ApiClient")
Log(LogDebug, "JsonRpcConnection")
<< info.str() << "\n" << DiagnosticInformation(ex);
}
@ -195,7 +195,7 @@ bool ApiClient::ProcessMessage(void)
return true;
}
void ApiClient::DataAvailableHandler(void)
void JsonRpcConnection::DataAvailableHandler(void)
{
boost::mutex::scoped_lock lock(m_DataHandlerMutex);
@ -203,7 +203,7 @@ void ApiClient::DataAvailableHandler(void)
while (ProcessMessage())
; /* empty loop body */
} catch (const std::exception& ex) {
Log(LogWarning, "ApiClient")
Log(LogWarning, "JsonRpcConnection")
<< "Error while reading JSON-RPC message for identity '" << m_Identity << "': " << DiagnosticInformation(ex);
Disconnect();
@ -267,25 +267,25 @@ Value RequestCertificateHandler(const MessageOrigin& origin, const Dictionary::P
return result;
}
void ApiClient::CheckLiveness(void)
void JsonRpcConnection::CheckLiveness(void)
{
if (m_Seen < Utility::GetTime() - 60 && (!m_Endpoint || !m_Endpoint->GetSyncing())) {
Log(LogInformation, "ApiClient")
Log(LogInformation, "JsonRpcConnection")
<< "No messages for identity '" << m_Identity << "' have been received in the last 60 seconds.";
Disconnect();
}
}
void ApiClient::TimeoutTimerHandler(void)
void JsonRpcConnection::TimeoutTimerHandler(void)
{
ApiListener::Ptr listener = ApiListener::GetInstance();
BOOST_FOREACH(const ApiClient::Ptr& client, listener->GetAnonymousClients()) {
BOOST_FOREACH(const JsonRpcConnection::Ptr& client, listener->GetAnonymousClients()) {
client->CheckLiveness();
}
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjectsByType<Endpoint>()) {
BOOST_FOREACH(const ApiClient::Ptr& client, endpoint->GetClients()) {
BOOST_FOREACH(const JsonRpcConnection::Ptr& client, endpoint->GetClients()) {
client->CheckLiveness();
}
}

View File

@ -17,8 +17,8 @@
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef APICLIENT_H
#define APICLIENT_H
#ifndef JSONRPCCONNECTION_H
#define JSONRPCCONNECTION_H
#include "remote/endpoint.hpp"
#include "base/tlsstream.hpp"
@ -35,6 +35,12 @@ enum ClientRole
ClientOutbound
};
enum ClientType
{
ClientJsonRpc,
ClientHttp
};
struct MessageOrigin;
/**
@ -42,12 +48,12 @@ struct MessageOrigin;
*
* @ingroup remote
*/
class I2_REMOTE_API ApiClient : public Object
class I2_REMOTE_API JsonRpcConnection : public Object
{
public:
DECLARE_PTR_TYPEDEFS(ApiClient);
DECLARE_PTR_TYPEDEFS(JsonRpcConnection);
ApiClient(const String& identity, bool authenticated, const TlsStream::Ptr& stream, ConnectionRole role);
JsonRpcConnection(const String& identity, bool authenticated, const TlsStream::Ptr& stream, ConnectionRole role);
void Start(void);
@ -73,7 +79,6 @@ private:
double m_Seen;
double m_NextHeartbeat;
double m_HeartbeatTimeout;
Timer::Ptr m_TimeoutTimer;
boost::mutex m_DataHandlerMutex;
StreamReadContext m_Context;
@ -91,4 +96,4 @@ private:
}
#endif /* APICLIENT_H */
#endif /* JSONRPCCONNECTION_H */

View File

@ -21,7 +21,7 @@
#define MESSAGEORIGIN_H
#include "remote/zone.hpp"
#include "remote/apiclient.hpp"
#include "remote/jsonrpcconnection.hpp"
namespace icinga
{
@ -32,7 +32,7 @@ namespace icinga
struct I2_REMOTE_API MessageOrigin
{
Zone::Ptr FromZone;
ApiClient::Ptr FromClient;
JsonRpcConnection::Ptr FromClient;
bool IsLocal(void) const;
};

View File

@ -19,7 +19,7 @@
#include "remote/zone.hpp"
#include "remote/zone.tcpp"
#include "remote/apiclient.hpp"
#include "remote/jsonrpcconnection.hpp"
#include "base/objectlock.hpp"
#include <boost/foreach.hpp>