diff --git a/lib/remote/apilistener.cpp b/lib/remote/apilistener.cpp index c0b75aecf..b5fc8e18b 100644 --- a/lib/remote/apilistener.cpp +++ b/lib/remote/apilistener.cpp @@ -430,210 +430,70 @@ void ApiListener::ListenerCoroutineProc(boost::asio::yield_context yc, const std */ void ApiListener::AddConnection(const Endpoint::Ptr& endpoint) { - { - ObjectLock olock(this); + namespace asio = boost::asio; + using asio::ip::tcp; - auto sslContext (m_SSLContext); + auto sslContext (m_SSLContext); - if (!sslContext) { - Log(LogCritical, "ApiListener", "SSL context is required for AddConnection()"); - return; - } - } - - String host = endpoint->GetHost(); - String port = endpoint->GetPort(); - - Log(LogInformation, "ApiListener") - << "Reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'"; - - TcpSocket::Ptr client = new TcpSocket(); - - try { - client->Connect(host, port); - - NewClientHandler(client, endpoint->GetName(), RoleClient); - - endpoint->SetConnecting(false); - Log(LogInformation, "ApiListener") - << "Finished reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'"; - } catch (const std::exception& ex) { - endpoint->SetConnecting(false); - client->Close(); - - std::ostringstream info; - info << "Cannot connect to host '" << host << "' on port '" << port << "'"; - Log(LogCritical, "ApiListener", info.str()); - Log(LogDebug, "ApiListener") - << info.str() << "\n" << DiagnosticInformation(ex); - } -} - -void ApiListener::NewClientHandler(const Socket::Ptr& client, const String& hostname, ConnectionRole role) -{ - try { - NewClientHandlerInternal(client, hostname, role); - } catch (const std::exception& ex) { - Log(LogCritical, "ApiListener") - << "Exception while handling new API client connection: " << DiagnosticInformation(ex, false); - - Log(LogDebug, "ApiListener") - << "Exception while handling new API client connection: " << DiagnosticInformation(ex); - } -} - -/** - * Processes a new client connection. - * - * @param client The new client. - */ -void ApiListener::NewClientHandlerInternal(const Socket::Ptr& client, const String& hostname, ConnectionRole role) -{ - CONTEXT("Handling new API client connection"); - - String conninfo; - - if (role == RoleClient) - conninfo = "to"; - else - conninfo = "from"; - - conninfo += " " + client->GetPeerAddress(); - - TlsStream::Ptr tlsStream; - - String environmentName = Application::GetAppEnvironment(); - - String serverName = hostname; - - if (!environmentName.IsEmpty()) - serverName += ":" + environmentName; - - { - ObjectLock olock(this); - try { - tlsStream = new TlsStream(client, serverName, role, m_SSLContext); - } catch (const std::exception&) { - Log(LogCritical, "ApiListener") - << "Cannot create TLS stream from client connection (" << conninfo << ")"; - return; - } - } - - try { - tlsStream->Handshake(); - } catch (const std::exception& ex) { - Log(LogCritical, "ApiListener") - << "Client TLS handshake failed (" << conninfo << "): " << DiagnosticInformation(ex, false); - tlsStream->Close(); + if (!sslContext) { + Log(LogCritical, "ApiListener", "SSL context is required for AddConnection()"); return; } - std::shared_ptr cert = tlsStream->GetPeerCertificate(); - String identity; - Endpoint::Ptr endpoint; - bool verify_ok = false; + auto& io (IoEngine::Get().GetIoService()); - if (cert) { - try { - identity = GetCertificateCN(cert); - } catch (const std::exception&) { - Log(LogCritical, "ApiListener") - << "Cannot get certificate common name from cert path: '" << GetDefaultCertPath() << "'."; - tlsStream->Close(); - return; - } + asio::spawn(io, [this, endpoint, &io, sslContext](asio::yield_context yc) { + String host = endpoint->GetHost(); + String port = endpoint->GetPort(); - verify_ok = tlsStream->IsVerifyOK(); - if (!hostname.IsEmpty()) { - if (identity != hostname) { - Log(LogWarning, "ApiListener") - << "Unexpected certificate common name while connecting to endpoint '" - << hostname << "': got '" << identity << "'"; - tlsStream->Close(); - return; - } else if (!verify_ok) { - Log(LogWarning, "ApiListener") - << "Certificate validation failed for endpoint '" << hostname - << "': " << tlsStream->GetVerifyError(); - } - } - - if (verify_ok) - endpoint = Endpoint::GetByName(identity); - - { - Log log(LogInformation, "ApiListener"); - - log << "New client connection for identity '" << identity << "' " << conninfo; - - if (!verify_ok) - log << " (certificate validation failed: " << tlsStream->GetVerifyError() << ")"; - else if (!endpoint) - log << " (no Endpoint object found for identity)"; - } - } else { Log(LogInformation, "ApiListener") - << "New client connection " << conninfo << " (no client certificate)"; - } + << "Reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'"; - ClientType ctype; + try { + auto sslConn (std::make_shared(io, *sslContext)); - if (role == RoleClient) { - Dictionary::Ptr message = new Dictionary({ - { "jsonrpc", "2.0" }, - { "method", "icinga::Hello" }, - { "params", new Dictionary() } - }); + { + tcp::resolver resolver (io); + tcp::resolver::query query (host, port); + auto result (resolver.async_resolve(query, yc)); + auto current (result.begin()); - JsonRpc::SendMessage(tlsStream, message); - ctype = ClientJsonRpc; - } else { - tlsStream->WaitForData(10); + for (;;) { + auto& tcpConn (sslConn->lowest_layer()); - if (!tlsStream->IsDataAvailable()) { - if (identity.IsEmpty()) - Log(LogInformation, "ApiListener") - << "No data received on new API connection. " - << "Ensure that the remote endpoints are properly configured in a cluster setup."; - else - Log(LogWarning, "ApiListener") - << "No data received on new API connection for identity '" << identity << "'. " - << "Ensure that the remote endpoints are properly configured in a cluster setup."; - tlsStream->Close(); - return; - } + try { + tcpConn.open(current->endpoint().protocol()); + tcpConn.set_option(tcp::socket::keep_alive(true)); + tcpConn.async_connect(current->endpoint(), yc); - char firstByte; - tlsStream->Peek(&firstByte, 1, false); + break; + } catch (const std::exception&) { + if (++current == result.end()) { + throw; + } - if (firstByte >= '0' && firstByte <= '9') - ctype = ClientJsonRpc; - else - ctype = ClientHttp; - } - - if (ctype == ClientJsonRpc) { - Log(LogNotice, "ApiListener", "New JSON-RPC client"); - - JsonRpcConnection::Ptr aclient = new JsonRpcConnection(identity, verify_ok, tlsStream, role); - aclient->Start(); - - if (endpoint) { - bool needSync = !endpoint->GetConnected(); - - endpoint->AddClient(aclient); - - m_SyncQueue.Enqueue(std::bind(&ApiListener::SyncClient, this, aclient, endpoint, needSync)); - } else { - if (!AddAnonymousClient(aclient)) { - Log(LogNotice, "ApiListener") - << "Ignoring anonymous JSON-RPC connection " << conninfo - << ". Max connections (" << GetMaxAnonymousClients() << ") exceeded."; - aclient->Disconnect(); + if (tcpConn.is_open()) { + tcpConn.close(); + } + } + } } + + NewClientHandler(yc, sslConn, endpoint->GetName(), RoleClient); + + endpoint->SetConnecting(false); + Log(LogInformation, "ApiListener") + << "Finished reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'"; + } catch (const std::exception& ex) { + endpoint->SetConnecting(false); + + std::ostringstream info; + info << "Cannot connect to host '" << host << "' on port '" << port << "'"; + Log(LogCritical, "ApiListener", info.str()); + Log(LogDebug, "ApiListener") + << info.str() << "\n" << DiagnosticInformation(ex); } - } + }); } void ApiListener::NewClientHandler(boost::asio::yield_context yc, const std::shared_ptr& client, const String& hostname, ConnectionRole role) @@ -1004,8 +864,7 @@ void ApiListener::ApiReconnectTimerHandler() /* Set connecting state to prevent duplicated queue inserts later. */ endpoint->SetConnecting(true); - /* Use dynamic thread pool with additional on demand resources with fast throughput. */ - EnqueueAsyncCallback(std::bind(&ApiListener::AddConnection, this, endpoint), LowLatencyScheduler); + AddConnection(endpoint); } } diff --git a/lib/remote/apilistener.hpp b/lib/remote/apilistener.hpp index 093837493..b88f1de1e 100644 --- a/lib/remote/apilistener.hpp +++ b/lib/remote/apilistener.hpp @@ -130,9 +130,6 @@ private: bool AddListener(const String& node, const String& service); void AddConnection(const Endpoint::Ptr& endpoint); - void NewClientHandler(const Socket::Ptr& client, const String& hostname, ConnectionRole role); - void NewClientHandlerInternal(const Socket::Ptr& client, const String& hostname, ConnectionRole role); - void NewClientHandler(boost::asio::yield_context yc, const std::shared_ptr& client, const String& hostname, ConnectionRole role); void NewClientHandlerInternal(boost::asio::yield_context yc, const std::shared_ptr& client, const String& hostname, ConnectionRole role); void ListenerCoroutineProc(boost::asio::yield_context yc, const std::shared_ptr& server, const std::shared_ptr& sslContext);