Stop ApiListener::ListenerCoroutineProc() when Stop() is called

This commit is contained in:
Johannes Schmidt 2025-05-26 10:01:05 +02:00
parent 157e3750e3
commit 00802ed9fa
2 changed files with 51 additions and 2 deletions

View File

@ -368,6 +368,8 @@ void ApiListener::Stop(bool runtimeDeleted)
m_Timer->Stop(true); m_Timer->Stop(true);
m_RenewOwnCertTimer->Stop(true); m_RenewOwnCertTimer->Stop(true);
StopListener();
m_WaitGroup->Join(); m_WaitGroup->Join();
ObjectImpl<ApiListener>::Stop(runtimeDeleted); ObjectImpl<ApiListener>::Stop(runtimeDeleted);
@ -486,13 +488,38 @@ bool ApiListener::AddListener(const String& node, const String& service)
Log(LogInformation, "ApiListener") Log(LogInformation, "ApiListener")
<< "Started new listener on '[" << localEndpoint.address() << "]:" << localEndpoint.port() << "'"; << "Started new listener on '[" << localEndpoint.address() << "]:" << localEndpoint.port() << "'";
IoEngine::SpawnCoroutine(io, [this, acceptor](asio::yield_context yc) { ListenerCoroutineProc(yc, acceptor); }); auto strand = Shared<asio::io_context::strand>::Make(io);
boost::signals2::scoped_connection closeSignal = m_OnListenerShutdown.connect([strand, acceptor]() {
boost::asio::post(*strand, [acceptor] {
try {
acceptor->close();
} catch (const std::exception& ex) {
Log(LogCritical, "ApiListener")
<< "Failed to close acceptor socket: " << ex.what();
}
});
});
IoEngine::SpawnCoroutine(*strand, [this, acceptor, closeSignal = std::move(closeSignal)](asio::yield_context yc) {
ListenerCoroutineProc(yc, acceptor);
});
UpdateStatusFile(localEndpoint); UpdateStatusFile(localEndpoint);
return true; return true;
} }
/**
* Stops the listener(s).
*/
void ApiListener::StopListener()
{
m_OnListenerShutdown();
m_ListenerWaitGroup->Join();
}
void ApiListener::ListenerCoroutineProc(boost::asio::yield_context yc, const Shared<boost::asio::ip::tcp::acceptor>::Ptr& server) void ApiListener::ListenerCoroutineProc(boost::asio::yield_context yc, const Shared<boost::asio::ip::tcp::acceptor>::Ptr& server)
{ {
namespace asio = boost::asio; namespace asio = boost::asio;
@ -506,7 +533,14 @@ void ApiListener::ListenerCoroutineProc(boost::asio::yield_context yc, const Sha
lastModified = Utility::GetFileCreationTime(crlPath); lastModified = Utility::GetFileCreationTime(crlPath);
} }
for (;;) { std::shared_lock wgLock(*m_ListenerWaitGroup, std::try_to_lock);
if (!wgLock) {
Log(LogCritical, "ApiListener")
<< "Could not lock the listener wait group.";
return;
}
while (server->is_open()) {
try { try {
asio::ip::tcp::socket socket (io); asio::ip::tcp::socket socket (io);
@ -546,6 +580,12 @@ void ApiListener::ListenerCoroutineProc(boost::asio::yield_context yc, const Sha
NewClientHandler(yc, strand, sslConn, String(), RoleServer); NewClientHandler(yc, strand, sslConn, String(), RoleServer);
}); });
} catch (const std::exception& ex) { } catch (const std::exception& ex) {
auto se (dynamic_cast<const boost::system::system_error*>(&ex));
if (se && se->code() == boost::asio::error::operation_aborted) {
return;
}
Log(LogCritical, "ApiListener") Log(LogCritical, "ApiListener")
<< "Cannot accept new connection: " << ex.what(); << "Cannot accept new connection: " << ex.what();
} }
@ -828,6 +868,11 @@ void ApiListener::NewClientHandlerInternal(
throw; throw;
} }
std::shared_lock wgLock(*m_ListenerWaitGroup, std::try_to_lock);
if (!wgLock) {
return;
}
if (ctype == ClientJsonRpc) { if (ctype == ClientJsonRpc) {
Log(LogNotice, "ApiListener", "New JSON-RPC client"); Log(LogNotice, "ApiListener", "New JSON-RPC client");

View File

@ -191,12 +191,16 @@ private:
static ApiListener::Ptr m_Instance; static ApiListener::Ptr m_Instance;
static std::atomic<bool> m_UpdatedObjectAuthority; static std::atomic<bool> m_UpdatedObjectAuthority;
boost::signals2::signal<void()> m_OnListenerShutdown;
StoppableWaitGroup::Ptr m_ListenerWaitGroup = new StoppableWaitGroup();
void ApiTimerHandler(); void ApiTimerHandler();
void ApiReconnectTimerHandler(); void ApiReconnectTimerHandler();
void CleanupCertificateRequestsTimerHandler(); void CleanupCertificateRequestsTimerHandler();
void CheckApiPackageIntegrity(); void CheckApiPackageIntegrity();
bool AddListener(const String& node, const String& service); bool AddListener(const String& node, const String& service);
void StopListener();
void AddConnection(const Endpoint::Ptr& endpoint); void AddConnection(const Endpoint::Ptr& endpoint);
void NewClientHandler( void NewClientHandler(