Merge pull request #6633 from Icinga/feature/apilistener-conn-thread-pool

Use a dynamic thread pool for API connections
This commit is contained in:
Michael Friedrich 2018-09-25 17:06:40 +02:00 committed by GitHub
commit 2e33ca6fc4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 30 additions and 5 deletions

View File

@ -112,6 +112,22 @@ void ApiListener::CopyCertificateFile(const String& oldCertPath, const String& n
} }
} }
/**
* Returns the API thread pool.
*
* @returns The API thread pool.
*/
ThreadPool& ApiListener::GetTP()
{
static ThreadPool tp;
return tp;
}
void ApiListener::EnqueueAsyncCallback(const std::function<void ()>& callback, SchedulerPolicy policy)
{
GetTP().Post(callback, policy);
}
void ApiListener::OnConfigLoaded() void ApiListener::OnConfigLoaded()
{ {
if (m_Instance) if (m_Instance)
@ -364,8 +380,9 @@ void ApiListener::ListenerThreadProc(const Socket::Ptr& server)
for (;;) { for (;;) {
try { try {
Socket::Ptr client = server->Accept(); Socket::Ptr client = server->Accept();
std::thread thread(std::bind(&ApiListener::NewClientHandler, this, client, String(), RoleServer));
thread.detach(); /* Use dynamic thread pool with additional on demand resources with fast throughput. */
EnqueueAsyncCallback(std::bind(&ApiListener::NewClientHandler, this, client, String(), RoleServer), LowLatencyScheduler);
} catch (const std::exception&) { } catch (const std::exception&) {
Log(LogCritical, "ApiListener", "Cannot accept new connection."); Log(LogCritical, "ApiListener", "Cannot accept new connection.");
} }
@ -399,9 +416,10 @@ void ApiListener::AddConnection(const Endpoint::Ptr& endpoint)
TcpSocket::Ptr client = new TcpSocket(); TcpSocket::Ptr client = new TcpSocket();
try { try {
endpoint->SetConnecting(true);
client->Connect(host, port); client->Connect(host, port);
NewClientHandler(client, endpoint->GetName(), RoleClient); NewClientHandler(client, endpoint->GetName(), RoleClient);
endpoint->SetConnecting(false); endpoint->SetConnecting(false);
} catch (const std::exception& ex) { } catch (const std::exception& ex) {
endpoint->SetConnecting(false); endpoint->SetConnecting(false);
@ -784,8 +802,11 @@ void ApiListener::ApiReconnectTimerHandler()
continue; continue;
} }
std::thread thread(std::bind(&ApiListener::AddConnection, this, endpoint)); /* Set connecting state to prevent duplicated queue inserts later. */
thread.detach(); endpoint->SetConnecting(true);
/* Use dynamic thread pool with additional on demand resources with fast throughput. */
EnqueueAsyncCallback(std::bind(&ApiListener::AddConnection, this, endpoint), LowLatencyScheduler);
} }
} }

View File

@ -30,6 +30,7 @@
#include "base/workqueue.hpp" #include "base/workqueue.hpp"
#include "base/tcpsocket.hpp" #include "base/tcpsocket.hpp"
#include "base/tlsstream.hpp" #include "base/tlsstream.hpp"
#include "base/threadpool.hpp"
#include <set> #include <set>
namespace icinga namespace icinga
@ -148,6 +149,9 @@ private:
void NewClientHandlerInternal(const Socket::Ptr& client, const String& hostname, ConnectionRole role); void NewClientHandlerInternal(const Socket::Ptr& client, const String& hostname, ConnectionRole role);
void ListenerThreadProc(const Socket::Ptr& server); void ListenerThreadProc(const Socket::Ptr& server);
static ThreadPool& GetTP();
static void EnqueueAsyncCallback(const std::function<void ()>& callback, SchedulerPolicy policy = DefaultScheduler);
WorkQueue m_RelayQueue; WorkQueue m_RelayQueue;
WorkQueue m_SyncQueue{0, 4}; WorkQueue m_SyncQueue{0, 4};