Bugfixes for the JSON-RPC sub-system.

This commit is contained in:
Gunnar Beutner 2013-04-01 16:25:23 +02:00
parent b0c8f3f626
commit 49c6c358b1
4 changed files with 78 additions and 61 deletions

View File

@ -48,43 +48,37 @@ TlsStream::TlsStream(const Stream::Ptr& innerStream, TlsRole role, shared_ptr<SS
void TlsStream::Start(void) void TlsStream::Start(void)
{ {
m_SSL = shared_ptr<SSL>(SSL_new(m_SSLContext.get()), SSL_free); {
boost::mutex::scoped_lock lock(m_SSLMutex);
m_SSLContext.reset(); m_SSL = shared_ptr<SSL>(SSL_new(m_SSLContext.get()), SSL_free);
if (!m_SSL) { m_SSLContext.reset();
BOOST_THROW_EXCEPTION(openssl_error()
<< boost::errinfo_api_function("SSL_new") if (!m_SSL) {
<< errinfo_openssl_error(ERR_get_error())); BOOST_THROW_EXCEPTION(openssl_error()
<< boost::errinfo_api_function("SSL_new")
<< errinfo_openssl_error(ERR_get_error()));
}
if (!m_SSLIndexInitialized) {
m_SSLIndex = SSL_get_ex_new_index(0, const_cast<char *>("TlsStream"), NULL, NULL, NULL);
m_SSLIndexInitialized = true;
}
SSL_set_ex_data(m_SSL.get(), m_SSLIndex, this);
SSL_set_verify(m_SSL.get(), SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, NULL);
m_BIO = BIO_new_I2Stream(m_InnerStream);
SSL_set_bio(m_SSL.get(), m_BIO, m_BIO);
if (m_Role == TlsRoleServer)
SSL_set_accept_state(m_SSL.get());
else
SSL_set_connect_state(m_SSL.get());
} }
if (!m_SSL)
BOOST_THROW_EXCEPTION(std::logic_error("No X509 client certificate was specified."));
if (!m_SSLIndexInitialized) {
m_SSLIndex = SSL_get_ex_new_index(0, const_cast<char *>("TlsStream"), NULL, NULL, NULL);
m_SSLIndexInitialized = true;
}
SSL_set_ex_data(m_SSL.get(), m_SSLIndex, this);
SSL_set_verify(m_SSL.get(), SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, NULL);
m_BIO = BIO_new_I2Stream(m_InnerStream);
SSL_set_bio(m_SSL.get(), m_BIO, m_BIO);
if (m_Role == TlsRoleServer)
SSL_set_accept_state(m_SSL.get());
else
SSL_set_connect_state(m_SSL.get());
/*int rc = SSL_do_handshake(m_SSL.get());
if (rc == 1) {
SetConnected(true);
OnConnected(GetSelf());
}*/
Stream::Start(); Stream::Start();
HandleIO(); HandleIO();
@ -97,7 +91,7 @@ void TlsStream::Start(void)
*/ */
shared_ptr<X509> TlsStream::GetClientCertificate(void) const shared_ptr<X509> TlsStream::GetClientCertificate(void) const
{ {
ObjectLock olock(this); boost::mutex::scoped_lock lock(m_SSLMutex);
return shared_ptr<X509>(SSL_get_certificate(m_SSL.get()), &Utility::NullDeleter); return shared_ptr<X509>(SSL_get_certificate(m_SSL.get()), &Utility::NullDeleter);
} }
@ -109,7 +103,7 @@ shared_ptr<X509> TlsStream::GetClientCertificate(void) const
*/ */
shared_ptr<X509> TlsStream::GetPeerCertificate(void) const shared_ptr<X509> TlsStream::GetPeerCertificate(void) const
{ {
ObjectLock olock(this); boost::mutex::scoped_lock lock(m_SSLMutex);
return shared_ptr<X509>(SSL_get_peer_certificate(m_SSL.get()), X509_free); return shared_ptr<X509>(SSL_get_peer_certificate(m_SSL.get()), X509_free);
} }
@ -139,15 +133,18 @@ void TlsStream::ClosedHandler(void)
void TlsStream::HandleIO(void) void TlsStream::HandleIO(void)
{ {
ASSERT(!OwnsLock()); ASSERT(!OwnsLock());
ObjectLock olock(this);
char data[16 * 1024]; char data[16 * 1024];
int rc; int rc;
if (!IsConnected()) { if (!IsConnected()) {
boost::mutex::scoped_lock lock(m_SSLMutex);
rc = SSL_do_handshake(m_SSL.get()); rc = SSL_do_handshake(m_SSL.get());
if (rc == 1) { if (rc == 1) {
lock.unlock();
SetConnected(true); SetConnected(true);
} else { } else {
switch (SSL_get_error(m_SSL.get(), rc)) { switch (SSL_get_error(m_SSL.get(), rc)) {
@ -170,9 +167,14 @@ void TlsStream::HandleIO(void)
bool new_data = false, read_ok = true; bool new_data = false, read_ok = true;
while (read_ok) { while (read_ok) {
boost::mutex::scoped_lock lock(m_SSLMutex);
rc = SSL_read(m_SSL.get(), data, sizeof(data)); rc = SSL_read(m_SSL.get(), data, sizeof(data));
if (rc > 0) { if (rc > 0) {
lock.unlock();
ObjectLock olock(this);
m_RecvQueue->Write(data, rc); m_RecvQueue->Write(data, rc);
new_data = true; new_data = true;
} else { } else {
@ -194,11 +196,10 @@ void TlsStream::HandleIO(void)
} }
} }
if (new_data) { if (new_data)
olock.Unlock();
OnDataAvailable(GetSelf()); OnDataAvailable(GetSelf());
olock.Lock();
} ObjectLock olock(this);
while (m_SendQueue->GetAvailableBytes() > 0) { while (m_SendQueue->GetAvailableBytes() > 0) {
size_t count = m_SendQueue->GetAvailableBytes(); size_t count = m_SendQueue->GetAvailableBytes();
@ -211,9 +212,16 @@ void TlsStream::HandleIO(void)
m_SendQueue->Peek(data, count); m_SendQueue->Peek(data, count);
olock.Unlock();
boost::mutex::scoped_lock lock(m_SSLMutex);
rc = SSL_write(m_SSL.get(), (const char *)data, count); rc = SSL_write(m_SSL.get(), (const char *)data, count);
if (rc > 0) { if (rc > 0) {
lock.unlock();
olock.Lock();
m_SendQueue->Read(NULL, rc); m_SendQueue->Read(NULL, rc);
} else { } else {
switch (SSL_get_error(m_SSL.get(), rc)) { switch (SSL_get_error(m_SSL.get(), rc)) {
@ -239,13 +247,19 @@ void TlsStream::HandleIO(void)
*/ */
void TlsStream::Close(void) void TlsStream::Close(void)
{ {
ObjectLock olock(this); {
boost::mutex::scoped_lock lock(m_SSLMutex);
if (m_SSL)
SSL_shutdown(m_SSL.get());
}
if (m_SSL) {
SSL_shutdown(m_SSL.get()); ObjectLock olock(this);
m_SendQueue->Close(); m_SendQueue->Close();
m_RecvQueue->Close(); m_RecvQueue->Close();
}
Stream::Close(); Stream::Close();
} }

View File

@ -61,6 +61,7 @@ public:
private: private:
shared_ptr<SSL_CTX> m_SSLContext; shared_ptr<SSL_CTX> m_SSLContext;
shared_ptr<SSL> m_SSL; shared_ptr<SSL> m_SSL;
mutable boost::mutex m_SSLMutex;
BIO *m_BIO; BIO *m_BIO;
FIFO::Ptr m_SendQueue; FIFO::Ptr m_SendQueue;

View File

@ -165,32 +165,30 @@ void EndpointManager::AddConnection(const String& node, const String& service) {
*/ */
void EndpointManager::NewClientHandler(const Socket::Ptr& client, TlsRole role) void EndpointManager::NewClientHandler(const Socket::Ptr& client, TlsRole role)
{ {
ObjectLock olock(this);
String peerAddress = client->GetPeerAddress();
TlsStream::Ptr tlsStream = boost::make_shared<TlsStream>(client, role, m_SSLContext); TlsStream::Ptr tlsStream = boost::make_shared<TlsStream>(client, role, m_SSLContext);
tlsStream->Start();
m_PendingClients.insert(tlsStream); m_PendingClients.insert(tlsStream);
tlsStream->OnConnected.connect(boost::bind(&EndpointManager::ClientConnectedHandler, this, _1, peerAddress)); tlsStream->OnConnected.connect(boost::bind(&EndpointManager::ClientConnectedHandler, this, _1));
tlsStream->OnClosed.connect(boost::bind(&EndpointManager::ClientClosedHandler, this, _1)); tlsStream->OnClosed.connect(boost::bind(&EndpointManager::ClientClosedHandler, this, _1));
client->Start(); client->Start();
tlsStream->Start();
} }
void EndpointManager::ClientConnectedHandler(const Stream::Ptr& client, const String& peerAddress) void EndpointManager::ClientConnectedHandler(const Stream::Ptr& client)
{ {
ObjectLock olock(this);
TlsStream::Ptr tlsStream = static_pointer_cast<TlsStream>(client); TlsStream::Ptr tlsStream = static_pointer_cast<TlsStream>(client);
JsonRpcConnection::Ptr jclient = boost::make_shared<JsonRpcConnection>(tlsStream); JsonRpcConnection::Ptr jclient = boost::make_shared<JsonRpcConnection>(tlsStream);
m_PendingClients.erase(tlsStream); {
ObjectLock olock(this);
m_PendingClients.erase(tlsStream);
}
shared_ptr<X509> cert = tlsStream->GetPeerCertificate(); shared_ptr<X509> cert = tlsStream->GetPeerCertificate();
String identity = GetCertificateCN(cert); String identity = GetCertificateCN(cert);
Log(LogInformation, "icinga", "New client connection at " + peerAddress + " for identity '" + identity + "'"); Log(LogInformation, "icinga", "New client connection for identity '" + identity + "'");
Endpoint::Ptr endpoint = Endpoint::GetByName(identity); Endpoint::Ptr endpoint = Endpoint::GetByName(identity);
@ -202,10 +200,12 @@ void EndpointManager::ClientConnectedHandler(const Stream::Ptr& client, const St
void EndpointManager::ClientClosedHandler(const Stream::Ptr& client) void EndpointManager::ClientClosedHandler(const Stream::Ptr& client)
{ {
ObjectLock olock(this);
TlsStream::Ptr tlsStream = static_pointer_cast<TlsStream>(client); TlsStream::Ptr tlsStream = static_pointer_cast<TlsStream>(client);
m_PendingClients.erase(tlsStream);
{
ObjectLock olock(this);
m_PendingClients.erase(tlsStream);
}
} }
/** /**
@ -370,8 +370,10 @@ void EndpointManager::SubscriptionTimerHandler(void)
subscriptions->Seal(); subscriptions->Seal();
if (m_Endpoint) if (m_Endpoint) {
ObjectLock olock(m_Endpoint);
m_Endpoint->SetSubscriptions(subscriptions); m_Endpoint->SetSubscriptions(subscriptions);
}
} }
void EndpointManager::ReconnectTimerHandler(void) void EndpointManager::ReconnectTimerHandler(void)

View File

@ -111,8 +111,8 @@ private:
void ReconnectTimerHandler(void); void ReconnectTimerHandler(void);
void NewClientHandler(const Socket::Ptr& client, TlsRole rol); void NewClientHandler(const Socket::Ptr& client, TlsRole role);
void ClientConnectedHandler(const Stream::Ptr& client, const String& peerAddress); void ClientConnectedHandler(const Stream::Ptr& client);
void ClientClosedHandler(const Stream::Ptr& client); void ClientClosedHandler(const Stream::Ptr& client);
}; };