ApiListener: connect(2) via Boost ASIO

This commit is contained in:
Alexander A. Klimov 2019-02-18 14:56:45 +01:00
parent e9a64abd09
commit 832365195d
2 changed files with 49 additions and 193 deletions

View File

@ -430,210 +430,70 @@ void ApiListener::ListenerCoroutineProc(boost::asio::yield_context yc, const std
*/ */
void ApiListener::AddConnection(const Endpoint::Ptr& endpoint) void ApiListener::AddConnection(const Endpoint::Ptr& endpoint)
{ {
{ namespace asio = boost::asio;
ObjectLock olock(this); using asio::ip::tcp;
auto sslContext (m_SSLContext); auto sslContext (m_SSLContext);
if (!sslContext) { if (!sslContext) {
Log(LogCritical, "ApiListener", "SSL context is required for AddConnection()"); Log(LogCritical, "ApiListener", "SSL context is required for AddConnection()");
return;
}
}
String host = endpoint->GetHost();
String port = endpoint->GetPort();
Log(LogInformation, "ApiListener")
<< "Reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'";
TcpSocket::Ptr client = new TcpSocket();
try {
client->Connect(host, port);
NewClientHandler(client, endpoint->GetName(), RoleClient);
endpoint->SetConnecting(false);
Log(LogInformation, "ApiListener")
<< "Finished reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'";
} catch (const std::exception& ex) {
endpoint->SetConnecting(false);
client->Close();
std::ostringstream info;
info << "Cannot connect to host '" << host << "' on port '" << port << "'";
Log(LogCritical, "ApiListener", info.str());
Log(LogDebug, "ApiListener")
<< info.str() << "\n" << DiagnosticInformation(ex);
}
}
void ApiListener::NewClientHandler(const Socket::Ptr& client, const String& hostname, ConnectionRole role)
{
try {
NewClientHandlerInternal(client, hostname, role);
} catch (const std::exception& ex) {
Log(LogCritical, "ApiListener")
<< "Exception while handling new API client connection: " << DiagnosticInformation(ex, false);
Log(LogDebug, "ApiListener")
<< "Exception while handling new API client connection: " << DiagnosticInformation(ex);
}
}
/**
* Processes a new client connection.
*
* @param client The new client.
*/
void ApiListener::NewClientHandlerInternal(const Socket::Ptr& client, const String& hostname, ConnectionRole role)
{
CONTEXT("Handling new API client connection");
String conninfo;
if (role == RoleClient)
conninfo = "to";
else
conninfo = "from";
conninfo += " " + client->GetPeerAddress();
TlsStream::Ptr tlsStream;
String environmentName = Application::GetAppEnvironment();
String serverName = hostname;
if (!environmentName.IsEmpty())
serverName += ":" + environmentName;
{
ObjectLock olock(this);
try {
tlsStream = new TlsStream(client, serverName, role, m_SSLContext);
} catch (const std::exception&) {
Log(LogCritical, "ApiListener")
<< "Cannot create TLS stream from client connection (" << conninfo << ")";
return;
}
}
try {
tlsStream->Handshake();
} catch (const std::exception& ex) {
Log(LogCritical, "ApiListener")
<< "Client TLS handshake failed (" << conninfo << "): " << DiagnosticInformation(ex, false);
tlsStream->Close();
return; return;
} }
std::shared_ptr<X509> cert = tlsStream->GetPeerCertificate(); auto& io (IoEngine::Get().GetIoService());
String identity;
Endpoint::Ptr endpoint;
bool verify_ok = false;
if (cert) { asio::spawn(io, [this, endpoint, &io, sslContext](asio::yield_context yc) {
try { String host = endpoint->GetHost();
identity = GetCertificateCN(cert); String port = endpoint->GetPort();
} catch (const std::exception&) {
Log(LogCritical, "ApiListener")
<< "Cannot get certificate common name from cert path: '" << GetDefaultCertPath() << "'.";
tlsStream->Close();
return;
}
verify_ok = tlsStream->IsVerifyOK();
if (!hostname.IsEmpty()) {
if (identity != hostname) {
Log(LogWarning, "ApiListener")
<< "Unexpected certificate common name while connecting to endpoint '"
<< hostname << "': got '" << identity << "'";
tlsStream->Close();
return;
} else if (!verify_ok) {
Log(LogWarning, "ApiListener")
<< "Certificate validation failed for endpoint '" << hostname
<< "': " << tlsStream->GetVerifyError();
}
}
if (verify_ok)
endpoint = Endpoint::GetByName(identity);
{
Log log(LogInformation, "ApiListener");
log << "New client connection for identity '" << identity << "' " << conninfo;
if (!verify_ok)
log << " (certificate validation failed: " << tlsStream->GetVerifyError() << ")";
else if (!endpoint)
log << " (no Endpoint object found for identity)";
}
} else {
Log(LogInformation, "ApiListener") Log(LogInformation, "ApiListener")
<< "New client connection " << conninfo << " (no client certificate)"; << "Reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'";
}
ClientType ctype; try {
auto sslConn (std::make_shared<AsioTlsStream>(io, *sslContext));
if (role == RoleClient) { {
Dictionary::Ptr message = new Dictionary({ tcp::resolver resolver (io);
{ "jsonrpc", "2.0" }, tcp::resolver::query query (host, port);
{ "method", "icinga::Hello" }, auto result (resolver.async_resolve(query, yc));
{ "params", new Dictionary() } auto current (result.begin());
});
JsonRpc::SendMessage(tlsStream, message); for (;;) {
ctype = ClientJsonRpc; auto& tcpConn (sslConn->lowest_layer());
} else {
tlsStream->WaitForData(10);
if (!tlsStream->IsDataAvailable()) { try {
if (identity.IsEmpty()) tcpConn.open(current->endpoint().protocol());
Log(LogInformation, "ApiListener") tcpConn.set_option(tcp::socket::keep_alive(true));
<< "No data received on new API connection. " tcpConn.async_connect(current->endpoint(), yc);
<< "Ensure that the remote endpoints are properly configured in a cluster setup.";
else
Log(LogWarning, "ApiListener")
<< "No data received on new API connection for identity '" << identity << "'. "
<< "Ensure that the remote endpoints are properly configured in a cluster setup.";
tlsStream->Close();
return;
}
char firstByte; break;
tlsStream->Peek(&firstByte, 1, false); } catch (const std::exception&) {
if (++current == result.end()) {
throw;
}
if (firstByte >= '0' && firstByte <= '9') if (tcpConn.is_open()) {
ctype = ClientJsonRpc; tcpConn.close();
else }
ctype = ClientHttp; }
} }
if (ctype == ClientJsonRpc) {
Log(LogNotice, "ApiListener", "New JSON-RPC client");
JsonRpcConnection::Ptr aclient = new JsonRpcConnection(identity, verify_ok, tlsStream, role);
aclient->Start();
if (endpoint) {
bool needSync = !endpoint->GetConnected();
endpoint->AddClient(aclient);
m_SyncQueue.Enqueue(std::bind(&ApiListener::SyncClient, this, aclient, endpoint, needSync));
} else {
if (!AddAnonymousClient(aclient)) {
Log(LogNotice, "ApiListener")
<< "Ignoring anonymous JSON-RPC connection " << conninfo
<< ". Max connections (" << GetMaxAnonymousClients() << ") exceeded.";
aclient->Disconnect();
} }
NewClientHandler(yc, sslConn, endpoint->GetName(), RoleClient);
endpoint->SetConnecting(false);
Log(LogInformation, "ApiListener")
<< "Finished reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'";
} catch (const std::exception& ex) {
endpoint->SetConnecting(false);
std::ostringstream info;
info << "Cannot connect to host '" << host << "' on port '" << port << "'";
Log(LogCritical, "ApiListener", info.str());
Log(LogDebug, "ApiListener")
<< info.str() << "\n" << DiagnosticInformation(ex);
} }
} });
} }
void ApiListener::NewClientHandler(boost::asio::yield_context yc, const std::shared_ptr<AsioTlsStream>& client, const String& hostname, ConnectionRole role) void ApiListener::NewClientHandler(boost::asio::yield_context yc, const std::shared_ptr<AsioTlsStream>& client, const String& hostname, ConnectionRole role)
@ -1004,8 +864,7 @@ void ApiListener::ApiReconnectTimerHandler()
/* Set connecting state to prevent duplicated queue inserts later. */ /* Set connecting state to prevent duplicated queue inserts later. */
endpoint->SetConnecting(true); endpoint->SetConnecting(true);
/* Use dynamic thread pool with additional on demand resources with fast throughput. */ AddConnection(endpoint);
EnqueueAsyncCallback(std::bind(&ApiListener::AddConnection, this, endpoint), LowLatencyScheduler);
} }
} }

View File

@ -130,9 +130,6 @@ private:
bool AddListener(const String& node, const String& service); bool AddListener(const String& node, const String& service);
void AddConnection(const Endpoint::Ptr& endpoint); void AddConnection(const Endpoint::Ptr& endpoint);
void NewClientHandler(const Socket::Ptr& client, const String& hostname, ConnectionRole role);
void NewClientHandlerInternal(const Socket::Ptr& client, const String& hostname, ConnectionRole role);
void NewClientHandler(boost::asio::yield_context yc, const std::shared_ptr<AsioTlsStream>& client, const String& hostname, ConnectionRole role); void NewClientHandler(boost::asio::yield_context yc, const std::shared_ptr<AsioTlsStream>& client, const String& hostname, ConnectionRole role);
void NewClientHandlerInternal(boost::asio::yield_context yc, const std::shared_ptr<AsioTlsStream>& client, const String& hostname, ConnectionRole role); void NewClientHandlerInternal(boost::asio::yield_context yc, const std::shared_ptr<AsioTlsStream>& client, const String& hostname, ConnectionRole role);
void ListenerCoroutineProc(boost::asio::yield_context yc, const std::shared_ptr<boost::asio::ip::tcp::acceptor>& server, const std::shared_ptr<boost::asio::ssl::context>& sslContext); void ListenerCoroutineProc(boost::asio::yield_context yc, const std::shared_ptr<boost::asio::ip::tcp::acceptor>& server, const std::shared_ptr<boost::asio::ssl::context>& sslContext);