Merge pull request #10080 from Icinga/net-stack-2.14.3

Fix network stack stability issues
This commit is contained in:
Yonas Habteab 2024-11-14 11:02:36 +01:00 committed by GitHub
commit d5cd5aff2c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 121 additions and 201 deletions

View File

@ -109,8 +109,7 @@ public:
// https://github.com/boostorg/coroutine/issues/39
throw;
} catch (const std::exception& ex) {
Log(LogCritical, "IoEngine", "Exception in coroutine!");
Log(LogDebug, "IoEngine") << "Exception in coroutine: " << DiagnosticInformation(ex);
Log(LogCritical, "IoEngine") << "Exception in coroutine: " << DiagnosticInformation(ex);
} catch (...) {
Log(LogCritical, "IoEngine", "Exception in coroutine!");
}

View File

@ -33,55 +33,6 @@ using namespace icinga;
REGISTER_FUNCTION_NONCONST(Internal, IfwApiCheck, &IfwApiCheckTask::ScriptFunc, "checkable:cr:resolvedMacros:useResolvedMacros");
static void ReportIfwCheckResult(
const Checkable::Ptr& checkable, const Value& cmdLine, const CheckResult::Ptr& cr,
const String& output, double start, double end, int exitcode = 3, const Array::Ptr& perfdata = nullptr
)
{
if (Checkable::ExecuteCommandProcessFinishedHandler) {
ProcessResult pr;
pr.PID = -1;
pr.Output = perfdata ? output + " |" + String(perfdata->Join(" ")) : output;
pr.ExecutionStart = start;
pr.ExecutionEnd = end;
pr.ExitStatus = exitcode;
Checkable::ExecuteCommandProcessFinishedHandler(cmdLine, pr);
} else {
auto splittedPerfdata (perfdata);
if (perfdata) {
splittedPerfdata = new Array();
ObjectLock oLock (perfdata);
for (String pv : perfdata) {
PluginUtility::SplitPerfdata(pv)->CopyTo(splittedPerfdata);
}
}
cr->SetOutput(output);
cr->SetPerformanceData(splittedPerfdata);
cr->SetState((ServiceState)exitcode);
cr->SetExitStatus(exitcode);
cr->SetExecutionStart(start);
cr->SetExecutionEnd(end);
cr->SetCommand(cmdLine);
checkable->ProcessCheckResult(cr);
}
}
static void ReportIfwCheckResult(
boost::asio::yield_context yc, const Checkable::Ptr& checkable, const Value& cmdLine,
const CheckResult::Ptr& cr, const String& output, double start
)
{
double end = Utility::GetTime();
CpuBoundWork cbw (yc);
ReportIfwCheckResult(checkable, cmdLine, cr, output, start, end);
}
static const char* GetUnderstandableError(const std::exception& ex)
{
auto se (dynamic_cast<const boost::system::system_error*>(&ex));
@ -93,10 +44,12 @@ static const char* GetUnderstandableError(const std::exception& ex)
return ex.what();
}
// Note: If DoIfwNetIo returns due to an error, the plugin output of the specified CheckResult (cr) will always be set,
// and if it was successful, the cr exit status, plugin state and performance data (if any) will also be overridden.
// Therefore, you have to take care yourself of setting all the other necessary fields for the check result.
static void DoIfwNetIo(
boost::asio::yield_context yc, const Checkable::Ptr& checkable, const Array::Ptr& cmdLine,
const CheckResult::Ptr& cr, const String& psCommand, const String& psHost, const String& san, const String& psPort,
AsioTlsStream& conn, boost::beast::http::request<boost::beast::http::string_body>& req, double start
boost::asio::yield_context yc, const CheckResult::Ptr& cr, const String& psCommand, const String& psHost, const String& san,
const String& psPort, AsioTlsStream& conn, boost::beast::http::request<boost::beast::http::string_body>& req
)
{
namespace http = boost::beast::http;
@ -107,11 +60,7 @@ static void DoIfwNetIo(
try {
Connect(conn.lowest_layer(), psHost, psPort, yc);
} catch (const std::exception& ex) {
ReportIfwCheckResult(
yc, checkable, cmdLine, cr,
"Can't connect to IfW API on host '" + psHost + "' port '" + psPort + "': " + GetUnderstandableError(ex),
start
);
cr->SetOutput("Can't connect to IfW API on host '" + psHost + "' port '" + psPort + "': " + GetUnderstandableError(ex));
return;
}
@ -120,12 +69,7 @@ static void DoIfwNetIo(
try {
sslConn.async_handshake(conn.next_layer().client, yc);
} catch (const std::exception& ex) {
ReportIfwCheckResult(
yc, checkable, cmdLine, cr,
"TLS handshake with IfW API on host '" + psHost + "' (SNI: '" + san
+ "') port '" + psPort + "' failed: " + GetUnderstandableError(ex),
start
);
cr->SetOutput("TLS handshake with IfW API on host '" + psHost + "' (SNI: '" + san+ "') port '" + psPort + "' failed: " + GetUnderstandableError(ex));
return;
}
@ -135,15 +79,10 @@ static void DoIfwNetIo(
try {
cn = GetCertificateCN(cert);
} catch (const std::exception&) {
}
} catch (const std::exception&) { }
ReportIfwCheckResult(
yc, checkable, cmdLine, cr,
"Certificate validation failed for IfW API on host '" + psHost + "' (SNI: '" + san + "'; CN: "
+ (cn.IsString() ? "'" + cn + "'" : "N/A") + ") port '" + psPort + "': " + sslConn.GetVerifyError(),
start
);
cr->SetOutput("Certificate validation failed for IfW API on host '" + psHost + "' (SNI: '" + san + "'; CN: "
+ (cn.IsString() ? "'" + cn + "'" : "N/A") + ") port '" + psPort + "': " + sslConn.GetVerifyError());
return;
}
@ -151,87 +90,56 @@ static void DoIfwNetIo(
http::async_write(conn, req, yc);
conn.async_flush(yc);
} catch (const std::exception& ex) {
ReportIfwCheckResult(
yc, checkable, cmdLine, cr,
"Can't send HTTP request to IfW API on host '" + psHost + "' port '" + psPort + "': " + GetUnderstandableError(ex),
start
);
cr->SetOutput("Can't send HTTP request to IfW API on host '" + psHost + "' port '" + psPort + "': " + GetUnderstandableError(ex));
return;
}
try {
http::async_read(conn, buf, resp, yc);
} catch (const std::exception& ex) {
ReportIfwCheckResult(
yc, checkable, cmdLine, cr,
"Can't read HTTP response from IfW API on host '" + psHost + "' port '" + psPort + "': " + GetUnderstandableError(ex),
start
);
cr->SetOutput("Can't read HTTP response from IfW API on host '" + psHost + "' port '" + psPort + "': " + GetUnderstandableError(ex));
return;
}
double end = Utility::GetTime();
{
boost::system::error_code ec;
sslConn.async_shutdown(yc[ec]);
}
CpuBoundWork cbw (yc);
Value jsonRoot;
try {
jsonRoot = JsonDecode(resp.body());
} catch (const std::exception& ex) {
ReportIfwCheckResult(
checkable, cmdLine, cr,
"Got bad JSON from IfW API on host '" + psHost + "' port '" + psPort + "': " + ex.what(), start, end
);
cr->SetOutput("Got bad JSON from IfW API on host '" + psHost + "' port '" + psPort + "': " + ex.what());
return;
}
if (!jsonRoot.IsObjectType<Dictionary>()) {
ReportIfwCheckResult(
checkable, cmdLine, cr,
"Got JSON, but not an object, from IfW API on host '"
+ psHost + "' port '" + psPort + "': " + JsonEncode(jsonRoot),
start, end
);
cr->SetOutput("Got JSON, but not an object, from IfW API on host '"+ psHost + "' port '" + psPort + "': " + JsonEncode(jsonRoot));
return;
}
Value jsonBranch;
if (!Dictionary::Ptr(jsonRoot)->Get(psCommand, &jsonBranch)) {
ReportIfwCheckResult(
checkable, cmdLine, cr,
"Missing ." + psCommand + " in JSON object from IfW API on host '"
+ psHost + "' port '" + psPort + "': " + JsonEncode(jsonRoot),
start, end
);
cr->SetOutput("Missing ." + psCommand + " in JSON object from IfW API on host '" + psHost + "' port '" + psPort + "': " + JsonEncode(jsonRoot));
return;
}
if (!jsonBranch.IsObjectType<Dictionary>()) {
ReportIfwCheckResult(
checkable, cmdLine, cr,
"." + psCommand + " in JSON from IfW API on host '"
+ psHost + "' port '" + psPort + "' is not an object: " + JsonEncode(jsonBranch),
start, end
);
cr->SetOutput("." + psCommand + " in JSON from IfW API on host '" + psHost + "' port '" + psPort + "' is not an object: " + JsonEncode(jsonBranch));
return;
}
Dictionary::Ptr result = jsonBranch;
Value exitcode;
Value rawExitcode;
if (!result->Get("exitcode", &exitcode)) {
ReportIfwCheckResult(
checkable, cmdLine, cr,
if (!result->Get("exitcode", &rawExitcode)) {
cr->SetOutput(
"Missing ." + psCommand + ".exitcode in JSON object from IfW API on host '"
+ psHost + "' port '" + psPort + "': " + JsonEncode(result),
start, end
+ psHost + "' port '" + psPort + "': " + JsonEncode(result)
);
return;
}
@ -239,27 +147,25 @@ static void DoIfwNetIo(
static const std::set<double> exitcodes {ServiceOK, ServiceWarning, ServiceCritical, ServiceUnknown};
static const auto exitcodeList (Array::FromSet(exitcodes)->Join(", "));
if (!exitcode.IsNumber() || exitcodes.find(exitcode) == exitcodes.end()) {
ReportIfwCheckResult(
checkable, cmdLine, cr,
"Got bad exitcode " + JsonEncode(exitcode) + " from IfW API on host '" + psHost + "' port '" + psPort
+ "', expected one of: " + exitcodeList,
start, end
if (!rawExitcode.IsNumber() || exitcodes.find(rawExitcode) == exitcodes.end()) {
cr->SetOutput(
"Got bad exitcode " + JsonEncode(rawExitcode) + " from IfW API on host '" + psHost + "' port '" + psPort
+ "', expected one of: " + exitcodeList
);
return;
}
auto exitcode (static_cast<ServiceState>(rawExitcode.Get<double>()));
auto perfdataVal (result->Get("perfdata"));
Array::Ptr perfdata;
try {
perfdata = perfdataVal;
} catch (const std::exception&) {
ReportIfwCheckResult(
checkable, cmdLine, cr,
cr->SetOutput(
"Got bad perfdata " + JsonEncode(perfdataVal) + " from IfW API on host '"
+ psHost + "' port '" + psPort + "', expected an array",
start, end
+ psHost + "' port '" + psPort + "', expected an array"
);
return;
}
@ -269,18 +175,20 @@ static void DoIfwNetIo(
for (auto& pv : perfdata) {
if (!pv.IsString()) {
ReportIfwCheckResult(
checkable, cmdLine, cr,
cr->SetOutput(
"Got bad perfdata value " + JsonEncode(perfdata) + " from IfW API on host '"
+ psHost + "' port '" + psPort + "', expected an array of strings",
start, end
+ psHost + "' port '" + psPort + "', expected an array of strings"
);
return;
}
}
cr->SetPerformanceData(PluginUtility::SplitPerfdata(perfdata->Join(" ")));
}
ReportIfwCheckResult(checkable, cmdLine, cr, result->Get("checkresult"), start, end, exitcode, perfdata);
cr->SetState(exitcode);
cr->SetExitStatus(exitcode);
cr->SetOutput(result->Get("checkresult"));
}
void IfwApiCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr,
@ -344,6 +252,34 @@ void IfwApiCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckRes
String username = resolveMacros("$ifw_api_username$");
String password = resolveMacros("$ifw_api_password$");
// Use this lambda to process the final Ifw check result. Callers don't need to pass the check result
// as an argument, as the lambda already captures the `cr` and notices all the `cr` changes made across
// the code. You just need to set the necessary cr fields when appropriated and then call this closure.
std::function<void()> reportResult;
if (auto callback = Checkable::ExecuteCommandProcessFinishedHandler; callback) {
reportResult = [cr, callback = std::move(callback)]() {
ProcessResult pr;
pr.PID = -1;
if (auto pd = cr->GetPerformanceData(); pd) {
pr.Output = cr->GetOutput() +" |" + String(pd->Join(" "));
} else {
pr.Output = cr->GetOutput();
}
pr.ExecutionStart = cr->GetExecutionStart();
pr.ExecutionEnd = cr->GetExecutionEnd();
pr.ExitStatus = cr->GetExitStatus();
callback(cr->GetCommand(), pr);
};
} else {
reportResult = [checkable, cr]() { checkable->ProcessCheckResult(cr); };
}
// Set the default check result state and exit code to unknown for the moment!
cr->SetExitStatus(ServiceUnknown);
cr->SetState(ServiceUnknown);
Dictionary::Ptr params = new Dictionary();
if (arguments) {
@ -369,11 +305,12 @@ void IfwApiCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckRes
if (kv.second.GetType() == ValueObject) {
auto now (Utility::GetTime());
ReportIfwCheckResult(
checkable, command->GetName(), cr,
"$ifw_api_arguments$ may not directly contain objects (especially functions).", now, now
);
cr->SetCommand(command->GetName());
cr->SetExecutionStart(now);
cr->SetExecutionEnd(now);
cr->SetOutput("$ifw_api_arguments$ may not directly contain objects (especially functions).");
reportResult();
return;
}
}
@ -498,12 +435,17 @@ void IfwApiCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckRes
auto& io (IoEngine::Get().GetIoContext());
auto strand (Shared<asio::io_context::strand>::Make(io));
Shared<asio::ssl::context>::Ptr ctx;
double start = Utility::GetTime();
cr->SetExecutionStart(Utility::GetTime());
cr->SetCommand(cmdLine);
try {
ctx = SetupSslContext(cert, key, ca, crl, DEFAULT_TLS_CIPHERS, DEFAULT_TLS_PROTOCOLMIN, DebugInfo());
} catch (const std::exception& ex) {
ReportIfwCheckResult(checkable, cmdLine, cr, ex.what(), start, Utility::GetTime());
cr->SetOutput(ex.what());
cr->SetExecutionEnd(Utility::GetTime());
reportResult();
return;
}
@ -511,7 +453,7 @@ void IfwApiCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckRes
IoEngine::SpawnCoroutine(
*strand,
[strand, checkable, cmdLine, cr, psCommand, psHost, expectedSan, psPort, conn, req, start, checkTimeout](asio::yield_context yc) {
[strand, checkable, cr, psCommand, psHost, expectedSan, psPort, conn, req, checkTimeout, reportResult = std::move(reportResult)](asio::yield_context yc) {
Timeout::Ptr timeout = new Timeout(strand->context(), *strand, boost::posix_time::microseconds(int64_t(checkTimeout * 1e6)),
[&conn, &checkable](boost::asio::yield_context yc) {
Log(LogNotice, "IfwApiCheckTask")
@ -525,7 +467,13 @@ void IfwApiCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckRes
Defer cancelTimeout ([&timeout]() { timeout->Cancel(); });
DoIfwNetIo(yc, checkable, cmdLine, cr, psCommand, psHost, expectedSan, psPort, *conn, *req, start);
DoIfwNetIo(yc, cr, psCommand, psHost, expectedSan, psPort, *conn, *req);
cr->SetExecutionEnd(Utility::GetTime());
// Post the check result processing to the global pool not to block the I/O threads,
// which could affect processing important RPC messages and HTTP connections.
Utility::QueueAsyncCallback(reportResult);
}
);
}

View File

@ -513,6 +513,8 @@ void ApiListener::ListenerCoroutineProc(boost::asio::yield_context yc, const Sha
server->async_accept(socket.lowest_layer(), yc);
auto remoteEndpoint (socket.lowest_layer().remote_endpoint());
if (!crlPath.IsEmpty()) {
time_t currentCreationTime = Utility::GetFileCreationTime(crlPath);
@ -531,12 +533,11 @@ void ApiListener::ListenerCoroutineProc(boost::asio::yield_context yc, const Sha
auto strand (Shared<asio::io_context::strand>::Make(io));
IoEngine::SpawnCoroutine(*strand, [this, strand, sslConn](asio::yield_context yc) {
IoEngine::SpawnCoroutine(*strand, [this, strand, sslConn, remoteEndpoint](asio::yield_context yc) {
Timeout::Ptr timeout(new Timeout(strand->context(), *strand, boost::posix_time::microseconds(int64_t(GetConnectTimeout() * 1e6)),
[sslConn](asio::yield_context yc) {
[sslConn, remoteEndpoint](asio::yield_context yc) {
Log(LogWarning, "ApiListener")
<< "Timeout while processing incoming connection from "
<< sslConn->lowest_layer().remote_endpoint();
<< "Timeout while processing incoming connection from " << remoteEndpoint;
boost::system::error_code ec;
sslConn->lowest_layer().cancel(ec);
@ -1385,7 +1386,6 @@ void ApiListener::OpenLogFile()
}
m_LogFile = new StdioStream(fp.release(), true);
m_LogMessageCount = 0;
SetLogMessageTimestamp(Utility::GetTime());
}
@ -1415,6 +1415,9 @@ void ApiListener::RotateLogFile()
if (!Utility::PathExists(newpath)) {
try {
Utility::RenameFile(oldpath, newpath);
// We're rotating the current log file, so reset the log message counter as well.
m_LogMessageCount = 0;
} catch (const std::exception& ex) {
Log(LogCritical, "ApiListener")
<< "Cannot rotate replay log file from '" << oldpath << "' to '"

View File

@ -116,16 +116,12 @@ bool EventsHandler::HandleRequest(
auto event (subscriber.GetInbox()->Shift(yc));
if (event) {
CpuBoundWork buildingResponse (yc);
String body = JsonEncode(event);
boost::algorithm::replace_all(body, "\n", "");
asio::const_buffer payload (body.CStr(), body.GetLength());
buildingResponse.Done();
asio::async_write(stream, payload, yc);
asio::async_write(stream, newLine, yc);
stream.async_flush(yc);

View File

@ -102,8 +102,6 @@ void HttpServerConnection::Disconnect()
auto listener (ApiListener::GetInstance());
if (listener) {
CpuBoundWork removeHttpClient (yc);
listener->RemoveHttpClient(this);
}
}
@ -192,10 +190,8 @@ bool EnsureValidHeaders(
response.set(http::field::connection, "close");
boost::system::error_code ec;
http::async_write(stream, response, yc[ec]);
stream.async_flush(yc[ec]);
http::async_write(stream, response, yc);
stream.async_flush(yc);
return false;
}
@ -217,10 +213,8 @@ void HandleExpect100(
response.result(http::status::continue_);
boost::system::error_code ec;
http::async_write(stream, response, yc[ec]);
stream.async_flush(yc[ec]);
http::async_write(stream, response, yc);
stream.async_flush(yc);
}
}
@ -240,8 +234,6 @@ bool HandleAccessControl(
auto headerAllowOrigin (listener->GetAccessControlAllowOrigin());
if (headerAllowOrigin) {
CpuBoundWork allowOriginHeader (yc);
auto allowedOrigins (headerAllowOrigin->ToSet<String>());
if (!allowedOrigins.empty()) {
@ -251,8 +243,6 @@ bool HandleAccessControl(
response.set(http::field::access_control_allow_origin, origin);
}
allowOriginHeader.Done();
response.set(http::field::access_control_allow_credentials, "true");
if (request.method() == http::verb::options && !request[http::field::access_control_request_method].empty()) {
@ -263,10 +253,8 @@ bool HandleAccessControl(
response.content_length(response.body().size());
response.set(http::field::connection, "close");
boost::system::error_code ec;
http::async_write(stream, response, yc[ec]);
stream.async_flush(yc[ec]);
http::async_write(stream, response, yc);
stream.async_flush(yc);
return false;
}
@ -294,10 +282,8 @@ bool EnsureAcceptHeader(
response.content_length(response.body().size());
response.set(http::field::connection, "close");
boost::system::error_code ec;
http::async_write(stream, response, yc[ec]);
stream.async_flush(yc[ec]);
http::async_write(stream, response, yc);
stream.async_flush(yc);
return false;
}
@ -335,10 +321,8 @@ bool EnsureAuthenticatedUser(
response.content_length(response.body().size());
}
boost::system::error_code ec;
http::async_write(stream, response, yc[ec]);
stream.async_flush(yc[ec]);
http::async_write(stream, response, yc);
stream.async_flush(yc);
return false;
}
@ -364,8 +348,6 @@ bool EnsureValidBody(
Array::Ptr permissions = authenticatedUser->GetPermissions();
if (permissions) {
CpuBoundWork evalPermissions (yc);
ObjectLock olock(permissions);
for (const Value& permissionInfo : permissions) {
@ -429,8 +411,8 @@ bool EnsureValidBody(
response.set(http::field::connection, "close");
http::async_write(stream, response, yc[ec]);
stream.async_flush(yc[ec]);
http::async_write(stream, response, yc);
stream.async_flush(yc);
return false;
}
@ -470,10 +452,8 @@ bool ProcessRequest(
HttpUtility::SendJsonError(response, nullptr, 500, "Unhandled exception" , DiagnosticInformation(ex));
boost::system::error_code ec;
http::async_write(stream, response, yc[ec]);
stream.async_flush(yc[ec]);
http::async_write(stream, response, yc);
stream.async_flush(yc);
return true;
}
@ -482,10 +462,8 @@ bool ProcessRequest(
return false;
}
boost::system::error_code ec;
http::async_write(stream, response, yc[ec]);
stream.async_flush(yc[ec]);
http::async_write(stream, response, yc);
stream.async_flush(yc);
return true;
}
@ -537,8 +515,6 @@ void HttpServerConnection::ProcessMessages(boost::asio::yield_context yc)
auto authenticatedUser (m_ApiUser);
if (!authenticatedUser) {
CpuBoundWork fetchingAuthenticatedUser (yc);
authenticatedUser = ApiUser::GetByAuthHeader(std::string(request[http::field::authorization]));
}
@ -582,8 +558,8 @@ void HttpServerConnection::ProcessMessages(boost::asio::yield_context yc)
}
} catch (const std::exception& ex) {
if (!m_ShuttingDown) {
Log(LogCritical, "HttpServerConnection")
<< "Unhandled exception while processing HTTP request: " << ex.what();
Log(LogWarning, "HttpServerConnection")
<< "Exception while processing HTTP request from " << m_PeerAddress << ": " << ex.what();
}
}

View File

@ -62,7 +62,7 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
{
m_Stream->next_layer().SetSeen(&m_Seen);
for (;;) {
while (!m_ShuttingDown) {
String message;
try {
@ -81,6 +81,8 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
CpuBoundWork handleMessage (yc);
MessageHandler(message);
l_TaskStats.InsertValue(Utility::GetTime(), 1);
} catch (const std::exception& ex) {
Log(m_ShuttingDown ? LogDebug : LogWarning, "JsonRpcConnection")
<< "Error while processing JSON-RPC message for identity '" << m_Identity
@ -88,10 +90,6 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
break;
}
CpuBoundWork taskStats (yc);
l_TaskStats.InsertValue(Utility::GetTime(), 1);
}
Disconnect();
@ -213,14 +211,14 @@ void JsonRpcConnection::Disconnect()
Log(LogWarning, "JsonRpcConnection")
<< "API client disconnected for identity '" << m_Identity << "'";
{
CpuBoundWork removeClient (yc);
if (m_Endpoint) {
m_Endpoint->RemoveClient(this);
} else {
ApiListener::GetInstance()->RemoveAnonymousClient(this);
}
// We need to unregister the endpoint client as soon as possible not to confuse Icinga 2,
// given that Endpoint::GetConnected() is just performing a check that the endpoint's client
// cache is not empty, which could result in an already disconnected endpoint never trying to
// reconnect again. See #7444.
if (m_Endpoint) {
m_Endpoint->RemoveClient(this);
} else {
ApiListener::GetInstance()->RemoveAnonymousClient(this);
}
m_OutgoingMessagesQueued.Set();