Bugfixes for the replication component.

This commit is contained in:
Gunnar Beutner 2013-04-05 12:09:26 +02:00
parent 09f395a7de
commit d9730f5b83
8 changed files with 100 additions and 61 deletions

View File

@ -119,7 +119,7 @@ RequestMessage ReplicationComponent::MakeObjectMessage(const DynamicObject::Ptr&
msg.SetParams(params); msg.SetParams(params);
params.Set("name", object->GetName()); params.Set("name", object->GetName());
params.Set("type", object->GetType()); params.Set("type", object->GetType()->GetName());
String source = object->GetSource(); String source = object->GetSource();
@ -164,7 +164,7 @@ void ReplicationComponent::TransactionClosingHandler(double tx, const std::set<D
std::ostringstream msgbuf; std::ostringstream msgbuf;
msgbuf << "Sending " << modifiedObjects.size() << " replication updates."; msgbuf << "Sending " << modifiedObjects.size() << " replication updates.";
Log(LogDebug, "replication", msgbuf.str()); Log(LogInformation, "replication", msgbuf.str());
BOOST_FOREACH(const DynamicObject::WeakPtr& wobject, modifiedObjects) { BOOST_FOREACH(const DynamicObject::WeakPtr& wobject, modifiedObjects) {
DynamicObject::Ptr object = wobject.lock(); DynamicObject::Ptr object = wobject.lock();

View File

@ -27,7 +27,8 @@
using namespace icinga; using namespace icinga;
BufferedStream::BufferedStream(const Stream::Ptr& innerStream) BufferedStream::BufferedStream(const Stream::Ptr& innerStream)
: m_InnerStream(innerStream), m_RecvQ(boost::make_shared<FIFO>()), m_SendQ(boost::make_shared<FIFO>()) : m_InnerStream(innerStream), m_RecvQ(boost::make_shared<FIFO>()), m_SendQ(boost::make_shared<FIFO>()),
m_Exception(), m_Blocking(true)
{ {
boost::thread readThread(boost::bind(&BufferedStream::ReadThreadProc, this)); boost::thread readThread(boost::bind(&BufferedStream::ReadThreadProc, this));
readThread.detach(); readThread.detach();
@ -52,11 +53,14 @@ void BufferedStream::ReadThreadProc(void)
m_ReadCV.notify_all(); m_ReadCV.notify_all();
} }
} catch (const std::exception& ex) { } catch (const std::exception& ex) {
std::ostringstream msgbuf; {
msgbuf << "Error for buffered stream (Read): " << boost::diagnostic_information(ex); boost::mutex::scoped_lock lock(m_Mutex);
Log(LogWarning, "base", msgbuf.str());
Close(); if (!m_Exception)
m_Exception = boost::current_exception();
m_ReadCV.notify_all();
}
} }
} }
@ -80,11 +84,14 @@ void BufferedStream::WriteThreadProc(void)
m_InnerStream->Write(buffer, rc); m_InnerStream->Write(buffer, rc);
} }
} catch (const std::exception& ex) { } catch (const std::exception& ex) {
std::ostringstream msgbuf; {
msgbuf << "Error for buffered stream (Write): " << boost::diagnostic_information(ex); boost::mutex::scoped_lock lock(m_Mutex);
Log(LogWarning, "base", msgbuf.str());
Close(); if (!m_Exception)
m_Exception = boost::current_exception();
m_WriteCV.notify_all();
}
} }
} }
@ -104,6 +111,13 @@ void BufferedStream::Close(void)
size_t BufferedStream::Read(void *buffer, size_t count) size_t BufferedStream::Read(void *buffer, size_t count)
{ {
boost::mutex::scoped_lock lock(m_Mutex); boost::mutex::scoped_lock lock(m_Mutex);
if (m_Blocking)
InternalWaitReadable(count, lock);
if (m_Exception)
boost::rethrow_exception(m_Exception);
return m_RecvQ->Read(buffer, count); return m_RecvQ->Read(buffer, count);
} }
@ -117,6 +131,10 @@ size_t BufferedStream::Read(void *buffer, size_t count)
void BufferedStream::Write(const void *buffer, size_t count) void BufferedStream::Write(const void *buffer, size_t count)
{ {
boost::mutex::scoped_lock lock(m_Mutex); boost::mutex::scoped_lock lock(m_Mutex);
if (m_Exception)
boost::rethrow_exception(m_Exception);
m_SendQ->Write(buffer, count); m_SendQ->Write(buffer, count);
m_WriteCV.notify_all(); m_WriteCV.notify_all();
} }
@ -125,9 +143,22 @@ void BufferedStream::WaitReadable(size_t count)
{ {
boost::mutex::scoped_lock lock(m_Mutex); boost::mutex::scoped_lock lock(m_Mutex);
while (m_RecvQ->GetAvailableBytes() < count) InternalWaitReadable(count, lock);
}
void BufferedStream::InternalWaitReadable(size_t count, boost::mutex::scoped_lock& lock)
{
while (m_RecvQ->GetAvailableBytes() < count && !m_Exception)
m_ReadCV.wait(lock); m_ReadCV.wait(lock);
} }
void BufferedStream::WaitWritable(size_t count) void BufferedStream::WaitWritable(size_t count)
{ /* Nothing to do here. */ } { /* Nothing to do here. */ }
void BufferedStream::MakeNonBlocking(void)
{
boost::mutex::scoped_lock lock(m_Mutex);
m_Blocking = false;
}

View File

@ -48,12 +48,16 @@ public:
void WaitReadable(size_t count); void WaitReadable(size_t count);
void WaitWritable(size_t count); void WaitWritable(size_t count);
void MakeNonBlocking(void);
private: private:
Stream::Ptr m_InnerStream; Stream::Ptr m_InnerStream;
FIFO::Ptr m_RecvQ; FIFO::Ptr m_RecvQ;
FIFO::Ptr m_SendQ; FIFO::Ptr m_SendQ;
bool m_Blocking;
boost::exception_ptr m_Exception; boost::exception_ptr m_Exception;
boost::mutex m_Mutex; boost::mutex m_Mutex;
@ -62,6 +66,8 @@ private:
void ReadThreadProc(void); void ReadThreadProc(void);
void WriteThreadProc(void); void WriteThreadProc(void);
void InternalWaitReadable(size_t count, boost::mutex::scoped_lock& lock);
}; };
} }

View File

@ -44,6 +44,8 @@ TlsStream::TlsStream(const Stream::Ptr& innerStream, TlsRole role, shared_ptr<SS
if (!m_InnerStream) if (!m_InnerStream)
m_InnerStream = boost::make_shared<BufferedStream>(innerStream); m_InnerStream = boost::make_shared<BufferedStream>(innerStream);
m_InnerStream->MakeNonBlocking();
m_SSL = shared_ptr<SSL>(SSL_new(m_SSLContext.get()), SSL_free); m_SSL = shared_ptr<SSL>(SSL_new(m_SSLContext.get()), SSL_free);
m_SSLContext.reset(); m_SSLContext.reset();

View File

@ -52,7 +52,7 @@ shared_ptr<SSL_CTX> MakeSSLContext(const String& pubkey, const String& privkey,
shared_ptr<SSL_CTX> sslContext = shared_ptr<SSL_CTX>(SSL_CTX_new(TLSv1_method()), SSL_CTX_free); shared_ptr<SSL_CTX> sslContext = shared_ptr<SSL_CTX>(SSL_CTX_new(TLSv1_method()), SSL_CTX_free);
SSL_CTX_set_mode(sslContext.get(), SSL_MODE_ENABLE_PARTIAL_WRITE | SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER); SSL_CTX_set_mode(sslContext.get(), 0);
if (!SSL_CTX_use_certificate_chain_file(sslContext.get(), pubkey.CStr())) { if (!SSL_CTX_use_certificate_chain_file(sslContext.get(), pubkey.CStr())) {
BOOST_THROW_EXCEPTION(openssl_error() BOOST_THROW_EXCEPTION(openssl_error()

View File

@ -33,7 +33,6 @@ using namespace icinga;
REGISTER_TYPE(Endpoint); REGISTER_TYPE(Endpoint);
boost::signals2::signal<void (const Endpoint::Ptr&)> Endpoint::OnConnected; boost::signals2::signal<void (const Endpoint::Ptr&)> Endpoint::OnConnected;
boost::signals2::signal<void (const Endpoint::Ptr&)> Endpoint::OnDisconnected;
/** /**
* Constructor for the Endpoint class. * Constructor for the Endpoint class.
@ -229,15 +228,6 @@ void Endpoint::RegisterTopicHandler(const String& topic, const boost::function<E
RegisterSubscription(topic); RegisterSubscription(topic);
} }
void Endpoint::UnregisterTopicHandler(const String&, const boost::function<Endpoint::Callback>&)
{
// TODO: implement
//m_TopicHandlers[method] -= callback;
//UnregisterSubscription(method);
BOOST_THROW_EXCEPTION(std::runtime_error("Not implemented."));
}
void Endpoint::ProcessRequest(const Endpoint::Ptr& sender, const RequestMessage& request) void Endpoint::ProcessRequest(const Endpoint::Ptr& sender, const RequestMessage& request)
{ {
if (!IsConnected()) { if (!IsConnected()) {
@ -260,7 +250,15 @@ void Endpoint::ProcessRequest(const Endpoint::Ptr& sender, const RequestMessage&
Utility::QueueAsyncCallback(boost::bind(boost::ref(*it->second), GetSelf(), sender, request)); Utility::QueueAsyncCallback(boost::bind(boost::ref(*it->second), GetSelf(), sender, request));
} else { } else {
try {
JsonRpc::SendMessage(GetClient(), request); JsonRpc::SendMessage(GetClient(), request);
} catch (const std::exception& ex) {
std::ostringstream msgbuf;
msgbuf << "Error while sending JSON-RPC message for endpoint '" << GetName() << "': " << boost::diagnostic_information(ex);
Log(LogWarning, "remoting", msgbuf.str());
m_Client.reset();
}
} }
} }
@ -272,15 +270,33 @@ void Endpoint::ProcessResponse(const Endpoint::Ptr& sender, const ResponseMessag
if (IsLocalEndpoint()) if (IsLocalEndpoint())
EndpointManager::GetInstance()->ProcessResponseMessage(sender, response); EndpointManager::GetInstance()->ProcessResponseMessage(sender, response);
else { else {
try {
JsonRpc::SendMessage(GetClient(), response); JsonRpc::SendMessage(GetClient(), response);
} catch (const std::exception& ex) {
std::ostringstream msgbuf;
msgbuf << "Error while sending JSON-RPC message for endpoint '" << GetName() << "': " << boost::diagnostic_information(ex);
Log(LogWarning, "remoting", msgbuf.str());
m_Client.reset();
}
} }
} }
void Endpoint::MessageThreadProc(const Stream::Ptr& stream) void Endpoint::MessageThreadProc(const Stream::Ptr& stream)
{ {
try {
for (;;) { for (;;) {
MessagePart message = JsonRpc::ReadMessage(stream); MessagePart message;
try {
message = JsonRpc::ReadMessage(stream);
} catch (const std::exception& ex) {
Log(LogWarning, "jsonrpc", "Error while reading JSON-RPC message for endpoint '" + GetName() + "': " + boost::diagnostic_information(ex));
GetClient()->Close();
m_Client.reset();
}
Endpoint::Ptr sender = GetSelf(); Endpoint::Ptr sender = GetSelf();
if (ResponseMessage::IsResponseMessage(message)) { if (ResponseMessage::IsResponseMessage(message)) {
@ -302,22 +318,6 @@ void Endpoint::MessageThreadProc(const Stream::Ptr& stream)
else else
EndpointManager::GetInstance()->SendMulticastMessage(sender, request); EndpointManager::GetInstance()->SendMulticastMessage(sender, request);
} }
} catch (const std::exception& ex) {
Log(LogWarning, "jsonrpc", "Lost connection to endpoint '" + GetName() + "': " + boost::diagnostic_information(ex));
{
ObjectLock olock(this);
// TODO: _only_ clear non-persistent subscriptions
// unregister ourselves if no persistent subscriptions are left (use a
// timer for that, once we have a TTL property for the topics)
ClearSubscriptions();
m_Client.reset();
}
OnDisconnected(GetSelf());
}
} }
/** /**

View File

@ -69,7 +69,6 @@ public:
void ClearSubscriptions(void); void ClearSubscriptions(void);
void RegisterTopicHandler(const String& topic, const boost::function<Callback>& callback); void RegisterTopicHandler(const String& topic, const boost::function<Callback>& callback);
void UnregisterTopicHandler(const String& topic, const boost::function<Callback>& callback);
String GetNode(void) const; String GetNode(void) const;
String GetService(void) const; String GetService(void) const;
@ -77,7 +76,6 @@ public:
static Endpoint::Ptr MakeEndpoint(const String& name, bool replicated, bool local = true); static Endpoint::Ptr MakeEndpoint(const String& name, bool replicated, bool local = true);
static boost::signals2::signal<void (const Endpoint::Ptr&)> OnConnected; static boost::signals2::signal<void (const Endpoint::Ptr&)> OnConnected;
static boost::signals2::signal<void (const Endpoint::Ptr&)> OnDisconnected;
private: private:
Attribute<bool> m_Local; Attribute<bool> m_Local;

View File

@ -203,7 +203,9 @@ void EndpointManager::NewClientHandler(const Socket::Ptr& client, TlsRole role)
if (!endpoint) if (!endpoint)
endpoint = Endpoint::MakeEndpoint(identity, true); endpoint = Endpoint::MakeEndpoint(identity, true);
endpoint->SetClient(tlsStream); BufferedStream::Ptr bufferedStream = boost::make_shared<BufferedStream>(tlsStream);
endpoint->SetClient(bufferedStream);
} }
/** /**