mirror of https://github.com/Icinga/icinga2.git
ApiListener: listen(2) via Boost ASIO
This commit is contained in:
parent
c547e9a863
commit
e4f3422b3a
|
@ -7,6 +7,7 @@
|
||||||
#include "remote/jsonrpc.hpp"
|
#include "remote/jsonrpc.hpp"
|
||||||
#include "remote/apifunction.hpp"
|
#include "remote/apifunction.hpp"
|
||||||
#include "base/convert.hpp"
|
#include "base/convert.hpp"
|
||||||
|
#include "base/io-engine.hpp"
|
||||||
#include "base/netstring.hpp"
|
#include "base/netstring.hpp"
|
||||||
#include "base/json.hpp"
|
#include "base/json.hpp"
|
||||||
#include "base/configtype.hpp"
|
#include "base/configtype.hpp"
|
||||||
|
@ -18,7 +19,12 @@
|
||||||
#include "base/context.hpp"
|
#include "base/context.hpp"
|
||||||
#include "base/statsfunction.hpp"
|
#include "base/statsfunction.hpp"
|
||||||
#include "base/exception.hpp"
|
#include "base/exception.hpp"
|
||||||
|
#include <boost/asio/ip/tcp.hpp>
|
||||||
|
#include <boost/asio/ip/v6_only.hpp>
|
||||||
|
#include <boost/asio/spawn.hpp>
|
||||||
|
#include <climits>
|
||||||
#include <fstream>
|
#include <fstream>
|
||||||
|
#include <memory>
|
||||||
|
|
||||||
using namespace icinga;
|
using namespace icinga;
|
||||||
|
|
||||||
|
@ -326,6 +332,10 @@ bool ApiListener::IsMaster() const
|
||||||
*/
|
*/
|
||||||
bool ApiListener::AddListener(const String& node, const String& service)
|
bool ApiListener::AddListener(const String& node, const String& service)
|
||||||
{
|
{
|
||||||
|
namespace asio = boost::asio;
|
||||||
|
namespace ip = asio::ip;
|
||||||
|
using ip::tcp;
|
||||||
|
|
||||||
ObjectLock olock(this);
|
ObjectLock olock(this);
|
||||||
|
|
||||||
std::shared_ptr<SSL_CTX> sslContext = m_SSLContext;
|
std::shared_ptr<SSL_CTX> sslContext = m_SSLContext;
|
||||||
|
@ -335,47 +345,40 @@ bool ApiListener::AddListener(const String& node, const String& service)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
TcpSocket::Ptr server = new TcpSocket();
|
auto& io (IoEngine::Get().GetIoService());
|
||||||
|
auto acceptor (std::make_shared<tcp::acceptor>(io));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
server->Bind(node, service, AF_UNSPEC);
|
tcp::resolver resolver (io);
|
||||||
|
tcp::resolver::query query (node, service, tcp::resolver::query::passive);
|
||||||
|
auto endpoint (resolver.resolve(query)->endpoint());
|
||||||
|
|
||||||
|
acceptor->open(endpoint.protocol());
|
||||||
|
acceptor->set_option(ip::v6_only(false));
|
||||||
|
acceptor->set_option(tcp::acceptor::reuse_address(true));
|
||||||
|
acceptor->bind(endpoint);
|
||||||
} catch (const std::exception&) {
|
} catch (const std::exception&) {
|
||||||
Log(LogCritical, "ApiListener")
|
Log(LogCritical, "ApiListener")
|
||||||
<< "Cannot bind TCP socket for host '" << node << "' on port '" << service << "'.";
|
<< "Cannot bind TCP socket for host '" << node << "' on port '" << service << "'.";
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
acceptor->listen(INT_MAX);
|
||||||
|
|
||||||
|
auto localEndpoint (acceptor->local_endpoint());
|
||||||
|
|
||||||
Log(LogInformation, "ApiListener")
|
Log(LogInformation, "ApiListener")
|
||||||
<< "Started new listener on '" << server->GetClientAddress() << "'";
|
<< "Started new listener on '[" << localEndpoint.address() << "]:" << localEndpoint.port() << "'";
|
||||||
|
|
||||||
std::thread thread(std::bind(&ApiListener::ListenerThreadProc, this, server));
|
asio::spawn(io, [acceptor](asio::yield_context yc) {
|
||||||
thread.detach();
|
// TODO
|
||||||
|
});
|
||||||
|
|
||||||
m_Servers.insert(server);
|
UpdateStatusFile(localEndpoint);
|
||||||
|
|
||||||
UpdateStatusFile(server);
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ApiListener::ListenerThreadProc(const Socket::Ptr& server)
|
|
||||||
{
|
|
||||||
Utility::SetThreadName("API Listener");
|
|
||||||
|
|
||||||
server->Listen();
|
|
||||||
|
|
||||||
for (;;) {
|
|
||||||
try {
|
|
||||||
Socket::Ptr client = server->Accept();
|
|
||||||
|
|
||||||
/* Use dynamic thread pool with additional on demand resources with fast throughput. */
|
|
||||||
EnqueueAsyncCallback(std::bind(&ApiListener::NewClientHandler, this, client, String(), RoleServer), LowLatencyScheduler);
|
|
||||||
} catch (const std::exception&) {
|
|
||||||
Log(LogCritical, "ApiListener", "Cannot accept new connection.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new JSON-RPC client and connects to the specified endpoint.
|
* Creates a new JSON-RPC client and connects to the specified endpoint.
|
||||||
*
|
*
|
||||||
|
@ -1513,14 +1516,13 @@ String ApiListener::GetFromZoneName(const Zone::Ptr& fromZone)
|
||||||
return fromZoneName;
|
return fromZoneName;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ApiListener::UpdateStatusFile(TcpSocket::Ptr socket)
|
void ApiListener::UpdateStatusFile(boost::asio::ip::tcp::endpoint localEndpoint)
|
||||||
{
|
{
|
||||||
String path = Configuration::CacheDir + "/api-state.json";
|
String path = Configuration::CacheDir + "/api-state.json";
|
||||||
std::pair<String, String> details = socket->GetClientAddressDetails();
|
|
||||||
|
|
||||||
Utility::SaveJsonFile(path, 0644, new Dictionary({
|
Utility::SaveJsonFile(path, 0644, new Dictionary({
|
||||||
{"host", details.first},
|
{"host", String(localEndpoint.address().to_string())},
|
||||||
{"port", Convert::ToLong(details.second)}
|
{"port", localEndpoint.port()}
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
#include "base/tcpsocket.hpp"
|
#include "base/tcpsocket.hpp"
|
||||||
#include "base/tlsstream.hpp"
|
#include "base/tlsstream.hpp"
|
||||||
#include "base/threadpool.hpp"
|
#include "base/threadpool.hpp"
|
||||||
|
#include <boost/asio/ip/tcp.hpp>
|
||||||
#include <set>
|
#include <set>
|
||||||
|
|
||||||
namespace icinga
|
namespace icinga
|
||||||
|
@ -106,7 +107,6 @@ protected:
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::shared_ptr<SSL_CTX> m_SSLContext;
|
std::shared_ptr<SSL_CTX> m_SSLContext;
|
||||||
std::set<TcpSocket::Ptr> m_Servers;
|
|
||||||
|
|
||||||
mutable boost::mutex m_AnonymousClientsLock;
|
mutable boost::mutex m_AnonymousClientsLock;
|
||||||
mutable boost::mutex m_HttpClientsLock;
|
mutable boost::mutex m_HttpClientsLock;
|
||||||
|
@ -130,7 +130,6 @@ private:
|
||||||
|
|
||||||
void NewClientHandler(const Socket::Ptr& client, const String& hostname, ConnectionRole role);
|
void NewClientHandler(const Socket::Ptr& client, const String& hostname, ConnectionRole role);
|
||||||
void NewClientHandlerInternal(const Socket::Ptr& client, const String& hostname, ConnectionRole role);
|
void NewClientHandlerInternal(const Socket::Ptr& client, const String& hostname, ConnectionRole role);
|
||||||
void ListenerThreadProc(const Socket::Ptr& server);
|
|
||||||
|
|
||||||
static ThreadPool& GetTP();
|
static ThreadPool& GetTP();
|
||||||
static void EnqueueAsyncCallback(const std::function<void ()>& callback, SchedulerPolicy policy = DefaultScheduler);
|
static void EnqueueAsyncCallback(const std::function<void ()>& callback, SchedulerPolicy policy = DefaultScheduler);
|
||||||
|
@ -154,7 +153,7 @@ private:
|
||||||
|
|
||||||
static void CopyCertificateFile(const String& oldCertPath, const String& newCertPath);
|
static void CopyCertificateFile(const String& oldCertPath, const String& newCertPath);
|
||||||
|
|
||||||
void UpdateStatusFile(TcpSocket::Ptr socket);
|
void UpdateStatusFile(boost::asio::ip::tcp::endpoint localEndpoint);
|
||||||
void RemoveStatusFile();
|
void RemoveStatusFile();
|
||||||
|
|
||||||
/* filesync */
|
/* filesync */
|
||||||
|
|
Loading…
Reference in New Issue