Fix deadlock in TlsStream::Close

fixes #10235
This commit is contained in:
Gunnar Beutner 2015-09-29 10:31:16 +02:00
parent 8dec953829
commit 0e40c3ee1d
9 changed files with 97 additions and 72 deletions

View File

@ -54,33 +54,32 @@ void ApiClient::TypesHttpCompletionCallback(HttpRequest& request, HttpResponse&
while ((count = response.ReadBody(buffer, sizeof(buffer))) > 0) while ((count = response.ReadBody(buffer, sizeof(buffer))) > 0)
body += String(buffer, buffer + count); body += String(buffer, buffer + count);
std::vector<ApiType::Ptr> types;
if (response.StatusCode < 200 || response.StatusCode > 299) { if (response.StatusCode < 200 || response.StatusCode > 299) {
Log(LogCritical, "ApiClient") Log(LogCritical, "ApiClient")
<< "Failed HTTP request; Code: " << response.StatusCode << "; Body: " << body; << "Failed HTTP request; Code: " << response.StatusCode << "; Body: " << body;
return; } else {
} try {
result = JsonDecode(body);
std::vector<ApiType::Ptr> types; Array::Ptr results = result->Get("results");
try { ObjectLock olock(results);
result = JsonDecode(body); BOOST_FOREACH(const Dictionary::Ptr typeInfo, results)
{
Array::Ptr results = result->Get("results"); ApiType::Ptr type = new ApiType();;
type->Abstract = typeInfo->Get("abstract");
ObjectLock olock(results); type->BaseName = typeInfo->Get("base");
BOOST_FOREACH(const Dictionary::Ptr typeInfo, results) type->Name = typeInfo->Get("name");
{ type->PluralName = typeInfo->Get("plural_name");
ApiType::Ptr type = new ApiType();; // TODO: attributes
type->Abstract = typeInfo->Get("abstract"); types.push_back(type);
type->BaseName = typeInfo->Get("base"); }
type->Name = typeInfo->Get("name"); } catch (const std::exception& ex) {
type->PluralName = typeInfo->Get("plural_name"); Log(LogCritical, "ApiClient")
// TODO: attributes << "Error while decoding response: " << DiagnosticInformation(ex);
types.push_back(type);
} }
} catch (const std::exception& ex) {
Log(LogCritical, "ApiClient")
<< "Error while decoding response: " << DiagnosticInformation(ex);
} }
callback(types); callback(types);
@ -125,54 +124,54 @@ void ApiClient::ObjectsHttpCompletionCallback(HttpRequest& request,
while ((count = response.ReadBody(buffer, sizeof(buffer))) > 0) while ((count = response.ReadBody(buffer, sizeof(buffer))) > 0)
body += String(buffer, buffer + count); body += String(buffer, buffer + count);
std::vector<ApiObject::Ptr> objects;
if (response.StatusCode < 200 || response.StatusCode > 299) { if (response.StatusCode < 200 || response.StatusCode > 299) {
Log(LogCritical, "ApiClient") Log(LogCritical, "ApiClient")
<< "Failed HTTP request; Code: " << response.StatusCode << "; Body: " << body; << "Failed HTTP request; Code: " << response.StatusCode << "; Body: " << body;
return; return;
} } else {
try {
result = JsonDecode(body);
std::vector<ApiObject::Ptr> objects; Array::Ptr results = result->Get("results");
try {
result = JsonDecode(body);
Array::Ptr results = result->Get("results");
if (results) {
ObjectLock olock(results);
BOOST_FOREACH(const Dictionary::Ptr objectInfo, results)
{
ApiObject::Ptr object = new ApiObject();
Dictionary::Ptr attrs = objectInfo->Get("attrs");
if (results) {
ObjectLock olock(results);
BOOST_FOREACH(const Dictionary::Ptr objectInfo, results)
{ {
ObjectLock olock(attrs); ApiObject::Ptr object = new ApiObject();
BOOST_FOREACH(const Dictionary::Pair& kv, attrs)
Dictionary::Ptr attrs = objectInfo->Get("attrs");
{ {
object->Attrs[kv.first] = kv.second; ObjectLock olock(attrs);
BOOST_FOREACH(const Dictionary::Pair& kv, attrs)
{
object->Attrs[kv.first] = kv.second;
}
} }
}
Array::Ptr used_by = objectInfo->Get("used_by"); Array::Ptr used_by = objectInfo->Get("used_by");
{
ObjectLock olock(used_by);
BOOST_FOREACH(const Dictionary::Ptr& refInfo, used_by)
{ {
ApiObjectReference ref; ObjectLock olock(used_by);
ref.Name = refInfo->Get("name"); BOOST_FOREACH(const Dictionary::Ptr& refInfo, used_by)
ref.Type = refInfo->Get("type"); {
object->UsedBy.push_back(ref); ApiObjectReference ref;
ref.Name = refInfo->Get("name");
ref.Type = refInfo->Get("type");
object->UsedBy.push_back(ref);
}
} }
}
objects.push_back(object); objects.push_back(object);
}
} }
} catch (const std::exception& ex) {
Log(LogCritical, "ApiClient")
<< "Error while decoding response: " << DiagnosticInformation(ex);
} }
} catch (const std::exception& ex) {
Log(LogCritical, "ApiClient")
<< "Error while decoding response: " << DiagnosticInformation(ex);
} }
callback(objects); callback(objects);

View File

@ -166,10 +166,12 @@ void SocketEvents::WakeUpThread(bool wait)
l_SocketIOFDChanged = true; l_SocketIOFDChanged = true;
(void) send(l_SocketIOEventFDs[1], "T", 1, 0); while (l_SocketIOFDChanged) {
(void) send(l_SocketIOEventFDs[1], "T", 1, 0);
while (l_SocketIOFDChanged) boost::system_time const timeout = boost::get_system_time() + boost::posix_time::milliseconds(50);
l_SocketIOCV.wait(lock); l_SocketIOCV.timed_wait(lock, timeout);
}
} }
} else { } else {
(void) send(l_SocketIOEventFDs[1], "T", 1, 0); (void) send(l_SocketIOEventFDs[1], "T", 1, 0);

View File

@ -22,7 +22,7 @@
using namespace icinga; using namespace icinga;
void Stream::RegisterDataHandler(const boost::function<void(void)>& handler) void Stream::RegisterDataHandler(const boost::function<void(const Stream::Ptr&)>& handler)
{ {
if (SupportsWaiting()) if (SupportsWaiting())
OnDataAvailable.connect(handler); OnDataAvailable.connect(handler);
@ -52,7 +52,7 @@ size_t Stream::Peek(void *buffer, size_t count, bool allow_partial)
void Stream::SignalDataAvailable(void) void Stream::SignalDataAvailable(void)
{ {
OnDataAvailable(); OnDataAvailable(this);
{ {
boost::mutex::scoped_lock lock(m_Mutex); boost::mutex::scoped_lock lock(m_Mutex);

View File

@ -131,7 +131,7 @@ public:
virtual bool IsDataAvailable(void) const; virtual bool IsDataAvailable(void) const;
void RegisterDataHandler(const boost::function<void(void)>& handler); void RegisterDataHandler(const boost::function<void(const Stream::Ptr&)>& handler);
StreamReadStatus ReadLine(String *line, StreamReadContext& context, bool may_wait = false); StreamReadStatus ReadLine(String *line, StreamReadContext& context, bool may_wait = false);
@ -139,7 +139,7 @@ protected:
void SignalDataAvailable(void); void SignalDataAvailable(void);
private: private:
boost::signals2::signal<void(void)> OnDataAvailable; boost::signals2::signal<void(const Stream::Ptr&)> OnDataAvailable;
boost::mutex m_Mutex; boost::mutex m_Mutex;
boost::condition_variable m_CV; boost::condition_variable m_CV;

View File

@ -184,10 +184,12 @@ void TlsStream::OnEvent(int revents)
if (rc > 0) { if (rc > 0) {
m_CurrentAction = TlsActionNone; m_CurrentAction = TlsActionNone;
if (m_SendQ->GetAvailableBytes() > 0) if (!m_Eof) {
ChangeEvents(POLLIN|POLLOUT); if (m_SendQ->GetAvailableBytes() > 0)
else ChangeEvents(POLLIN|POLLOUT);
ChangeEvents(POLLIN); else
ChangeEvents(POLLIN);
}
lock.unlock(); lock.unlock();

View File

@ -49,6 +49,9 @@ void HttpClientConnection::Reconnect(void)
m_Context.~StreamReadContext(); m_Context.~StreamReadContext();
new (&m_Context) StreamReadContext(); new (&m_Context) StreamReadContext();
m_Requests.clear();
m_CurrentResponse.reset();
TcpSocket::Ptr socket = new TcpSocket(); TcpSocket::Ptr socket = new TcpSocket();
socket->Connect(m_Host, m_Port); socket->Connect(m_Host, m_Port);
@ -59,9 +62,9 @@ void HttpClientConnection::Reconnect(void)
/* m_Stream = new NetworkStream(socket); /* m_Stream = new NetworkStream(socket);
-- does not currently work because the NetworkStream class doesn't support async I/O */ -- does not currently work because the NetworkStream class doesn't support async I/O */
m_Stream->RegisterDataHandler(boost::bind(&HttpClientConnection::DataAvailableHandler, this)); m_Stream->RegisterDataHandler(boost::bind(&HttpClientConnection::DataAvailableHandler, this, _1));
if (m_Stream->IsDataAvailable()) if (m_Stream->IsDataAvailable())
DataAvailableHandler(); DataAvailableHandler(m_Stream);
} }
Stream::Ptr HttpClientConnection::GetStream(void) const Stream::Ptr HttpClientConnection::GetStream(void) const
@ -95,8 +98,10 @@ bool HttpClientConnection::ProcessMessage(void)
{ {
bool res; bool res;
if (m_Requests.empty()) if (m_Requests.empty()) {
m_Stream->Close();
return false; return false;
}
const std::pair<boost::shared_ptr<HttpRequest>, HttpCompletionCallback>& currentRequest = *m_Requests.begin(); const std::pair<boost::shared_ptr<HttpRequest>, HttpCompletionCallback>& currentRequest = *m_Requests.begin();
HttpRequest& request = *currentRequest.first.get(); HttpRequest& request = *currentRequest.first.get();
@ -129,19 +134,26 @@ bool HttpClientConnection::ProcessMessage(void)
return res; return res;
} }
void HttpClientConnection::DataAvailableHandler(void) void HttpClientConnection::DataAvailableHandler(const Stream::Ptr& stream)
{ {
boost::mutex::scoped_lock lock(m_DataHandlerMutex); boost::mutex::scoped_lock lock(m_DataHandlerMutex);
ASSERT(stream == m_Stream);
try { try {
while (ProcessMessage()) while (ProcessMessage())
; /* empty loop body */ ; /* empty loop body */
} catch (const std::exception& ex) { } catch (const std::exception& ex) {
Log(LogWarning, "HttpClientConnection") Log(LogWarning, "HttpClientConnection")
<< "Error while reading Http request: " << DiagnosticInformation(ex); << "Error while reading Http response: " << DiagnosticInformation(ex);
Disconnect(); Disconnect();
} }
if (m_Context.Eof) {
Log(LogWarning, "HttpClientConnection", "Encountered unexpected EOF while reading Http response.");
m_Stream->Close();
}
} }
boost::shared_ptr<HttpRequest> HttpClientConnection::NewRequest(void) boost::shared_ptr<HttpRequest> HttpClientConnection::NewRequest(void)

View File

@ -68,7 +68,7 @@ private:
void Reconnect(void); void Reconnect(void);
bool ProcessMessage(void); bool ProcessMessage(void);
void DataAvailableHandler(void); void DataAvailableHandler(const Stream::Ptr& stream);
void ProcessMessageAsync(HttpRequest& request); void ProcessMessageAsync(HttpRequest& request);
}; };

View File

@ -34,10 +34,14 @@ HttpResponse::HttpResponse(const Stream::Ptr& stream, const HttpRequest& request
void HttpResponse::SetStatus(int code, const String& message) void HttpResponse::SetStatus(int code, const String& message)
{ {
ASSERT(m_State == HttpResponseStart);
ASSERT(code >= 100 && code <= 599); ASSERT(code >= 100 && code <= 599);
ASSERT(!message.IsEmpty()); ASSERT(!message.IsEmpty());
if (m_State != HttpResponseStart) {
Log(LogWarning, "HttpResponse", "Tried to set Http response status after headers had already been sent.");
return;
}
String status = "HTTP/"; String status = "HTTP/";
if (m_Request.ProtocolVersion == HttpVersion10) if (m_Request.ProtocolVersion == HttpVersion10)
@ -54,7 +58,11 @@ void HttpResponse::SetStatus(int code, const String& message)
void HttpResponse::AddHeader(const String& key, const String& value) void HttpResponse::AddHeader(const String& key, const String& value)
{ {
ASSERT(m_State = HttpResponseHeaders); if (m_State != HttpResponseHeaders) {
Log(LogWarning, "HttpResponse", "Tried to add header after headers had already been sent.");
return;
}
String header = key + ": " + value + "\r\n"; String header = key + ": " + value + "\r\n";
m_Stream->Write(header.CStr(), header.GetLength()); m_Stream->Write(header.CStr(), header.GetLength());
} }

View File

@ -157,6 +157,8 @@ void HttpServerConnection::ProcessMessageAsync(HttpRequest& request)
try { try {
HttpHandler::ProcessRequest(user, request, response); HttpHandler::ProcessRequest(user, request, response);
} catch (const std::exception& ex) { } catch (const std::exception& ex) {
Log(LogCritical, "HttpServerConnection")
<< "Unhandled exception while processing Http request: " << DiagnosticInformation(ex);
response.SetStatus(503, "Unhandled exception"); response.SetStatus(503, "Unhandled exception");
response.AddHeader("Content-Type", "text/plain"); response.AddHeader("Content-Type", "text/plain");
String errorInfo = DiagnosticInformation(ex); String errorInfo = DiagnosticInformation(ex);