Use a dynamic thread pool for API connections

The full analysis is located in #6517.

fixes #6517
This commit is contained in:
Michael Friedrich 2018-09-24 16:38:48 +02:00
parent 4695e620d0
commit cbde35ff22
2 changed files with 30 additions and 5 deletions

View File

@ -112,6 +112,22 @@ void ApiListener::CopyCertificateFile(const String& oldCertPath, const String& n
}
}
/**
* Returns the API thread pool.
*
* @returns The API thread pool.
*/
ThreadPool& ApiListener::GetTP()
{
static ThreadPool tp;
return tp;
}
void ApiListener::EnqueueAsyncCallback(const std::function<void ()>& callback, SchedulerPolicy policy)
{
GetTP().Post(callback, policy);
}
void ApiListener::OnConfigLoaded()
{
if (m_Instance)
@ -364,8 +380,9 @@ void ApiListener::ListenerThreadProc(const Socket::Ptr& server)
for (;;) {
try {
Socket::Ptr client = server->Accept();
std::thread thread(std::bind(&ApiListener::NewClientHandler, this, client, String(), RoleServer));
thread.detach();
/* Use dynamic thread pool with additional on demand resources with fast throughput. */
EnqueueAsyncCallback(std::bind(&ApiListener::NewClientHandler, this, client, String(), RoleServer), LowLatencyScheduler);
} catch (const std::exception&) {
Log(LogCritical, "ApiListener", "Cannot accept new connection.");
}
@ -399,9 +416,10 @@ void ApiListener::AddConnection(const Endpoint::Ptr& endpoint)
TcpSocket::Ptr client = new TcpSocket();
try {
endpoint->SetConnecting(true);
client->Connect(host, port);
NewClientHandler(client, endpoint->GetName(), RoleClient);
endpoint->SetConnecting(false);
} catch (const std::exception& ex) {
endpoint->SetConnecting(false);
@ -784,8 +802,11 @@ void ApiListener::ApiReconnectTimerHandler()
continue;
}
std::thread thread(std::bind(&ApiListener::AddConnection, this, endpoint));
thread.detach();
/* Set connecting state to prevent duplicated queue inserts later. */
endpoint->SetConnecting(true);
/* Use dynamic thread pool with additional on demand resources with fast throughput. */
EnqueueAsyncCallback(std::bind(&ApiListener::AddConnection, this, endpoint), LowLatencyScheduler);
}
}

View File

@ -30,6 +30,7 @@
#include "base/workqueue.hpp"
#include "base/tcpsocket.hpp"
#include "base/tlsstream.hpp"
#include "base/threadpool.hpp"
#include <set>
namespace icinga
@ -148,6 +149,9 @@ private:
void NewClientHandlerInternal(const Socket::Ptr& client, const String& hostname, ConnectionRole role);
void ListenerThreadProc(const Socket::Ptr& server);
static ThreadPool& GetTP();
static void EnqueueAsyncCallback(const std::function<void ()>& callback, SchedulerPolicy policy = DefaultScheduler);
WorkQueue m_RelayQueue;
WorkQueue m_SyncQueue{0, 4};