mirror of https://github.com/Icinga/icinga2.git
parent
9134c7fab6
commit
37179cdf32
|
@ -181,7 +181,7 @@ void Application::RunEventLoop(void) const
|
|||
GetTP().Stop();
|
||||
m_ShuttingDown = false;
|
||||
|
||||
GetTP().Join();
|
||||
GetTP().Join(true);
|
||||
|
||||
Timer::Uninitialize();
|
||||
#endif /* _DEBUG */
|
||||
|
|
|
@ -33,79 +33,95 @@ using namespace icinga;
|
|||
|
||||
int ThreadPool::m_NextID = 1;
|
||||
|
||||
ThreadPool::ThreadPool(void)
|
||||
: m_ID(m_NextID++), m_WaitTime(0), m_ServiceTime(0),
|
||||
m_TaskCount(0), m_Stopped(false)
|
||||
ThreadPool::ThreadPool(int max_threads)
|
||||
: m_ID(m_NextID++), m_Stopped(false), m_MaxThreads(max_threads)
|
||||
{
|
||||
for (int i = 0; i < 2; i++)
|
||||
SpawnWorker();
|
||||
if (m_MaxThreads != -1 && m_MaxThreads < sizeof(m_Queues) / sizeof(m_Queues[0]))
|
||||
m_MaxThreads = sizeof(m_Queues) / sizeof(m_Queues[0]);
|
||||
|
||||
m_ThreadGroup.create_thread(boost::bind(&ThreadPool::ManagerThreadProc, this));
|
||||
m_ThreadGroup.create_thread(boost::bind(&ThreadPool::StatsThreadProc, this));
|
||||
Start();
|
||||
}
|
||||
|
||||
ThreadPool::~ThreadPool(void)
|
||||
{
|
||||
Stop();
|
||||
Join();
|
||||
Join(true);
|
||||
}
|
||||
|
||||
void ThreadPool::Start(void)
|
||||
{
|
||||
for (int i = 0; i < sizeof(m_Queues) / sizeof(m_Queues[0]); i++)
|
||||
m_Queues[i].SpawnWorker(m_ThreadGroup);
|
||||
|
||||
m_ThreadGroup.create_thread(boost::bind(&ThreadPool::ManagerThreadProc, this));
|
||||
m_ThreadGroup.create_thread(boost::bind(&ThreadPool::StatsThreadProc, this));
|
||||
}
|
||||
|
||||
void ThreadPool::Stop(void)
|
||||
{
|
||||
boost::mutex::scoped_lock lock(m_Mutex);
|
||||
for (int i = 0; i < sizeof(m_Queues) / sizeof(m_Queues[0]); i++) {
|
||||
boost::mutex::scoped_lock lock(m_Queues[i].Mutex);
|
||||
m_Queues[i].Stopped = true;
|
||||
m_Queues[i].CV.notify_all();
|
||||
}
|
||||
|
||||
boost::mutex::scoped_lock lock(m_MgmtMutex);
|
||||
m_Stopped = true;
|
||||
m_WorkCV.notify_all();
|
||||
m_MgmtCV.notify_all();
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits for all worker threads to finish.
|
||||
*/
|
||||
void ThreadPool::Join(void)
|
||||
void ThreadPool::Join(bool wait_for_stop)
|
||||
{
|
||||
{
|
||||
boost::mutex::scoped_lock lock(m_Mutex);
|
||||
|
||||
while (!m_Stopped || !m_WorkItems.empty()) {
|
||||
lock.unlock();
|
||||
Utility::Sleep(0.5);
|
||||
lock.lock();
|
||||
}
|
||||
if (wait_for_stop) {
|
||||
m_ThreadGroup.join_all();
|
||||
return;
|
||||
}
|
||||
|
||||
m_ThreadGroup.join_all();
|
||||
for (int i = 0; i < sizeof(m_Queues) / sizeof(m_Queues[0]); i++) {
|
||||
boost::mutex::scoped_lock lock(m_Queues[i].Mutex);
|
||||
|
||||
while (!m_Queues[i].Items.empty())
|
||||
m_Queues[i].CVStarved.wait(lock);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits for work items and processes them.
|
||||
*/
|
||||
void ThreadPool::QueueThreadProc(int tid)
|
||||
void ThreadPool::WorkerThread::ThreadProc(Queue& queue)
|
||||
{
|
||||
std::ostringstream idbuf;
|
||||
idbuf << "TP #" << m_ID << " W #" << tid;
|
||||
idbuf << "Q #" << &queue << " W #" << this;
|
||||
Utility::SetThreadName(idbuf.str());
|
||||
|
||||
for (;;) {
|
||||
WorkItem wi;
|
||||
|
||||
{
|
||||
boost::mutex::scoped_lock lock(m_Mutex);
|
||||
boost::mutex::scoped_lock lock(queue.Mutex);
|
||||
|
||||
UpdateThreadUtilization(tid, ThreadIdle);
|
||||
UpdateUtilization(ThreadIdle);
|
||||
|
||||
while (m_WorkItems.empty() && !m_Stopped && !m_Threads[tid].Zombie)
|
||||
m_WorkCV.wait(lock);
|
||||
while (queue.Items.empty() && !queue.Stopped && !Zombie) {
|
||||
if (queue.Items.empty())
|
||||
queue.CVStarved.notify_all();
|
||||
|
||||
if (m_Threads[tid].Zombie)
|
||||
queue.CV.wait(lock);
|
||||
}
|
||||
|
||||
if (Zombie)
|
||||
break;
|
||||
|
||||
if (m_WorkItems.empty() && m_Stopped)
|
||||
if (queue.Items.empty() && queue.Stopped)
|
||||
break;
|
||||
|
||||
wi = m_WorkItems.front();
|
||||
m_WorkItems.pop_front();
|
||||
wi = queue.Items.front();
|
||||
queue.Items.pop_front();
|
||||
|
||||
UpdateThreadUtilization(tid, ThreadBusy);
|
||||
UpdateUtilization(ThreadBusy);
|
||||
}
|
||||
|
||||
double st = Utility::GetTime();;
|
||||
|
@ -134,14 +150,11 @@ void ThreadPool::QueueThreadProc(int tid)
|
|||
double latency = st - wi.Timestamp;
|
||||
|
||||
{
|
||||
boost::mutex::scoped_lock lock(m_Mutex);
|
||||
boost::mutex::scoped_lock lock(queue.Mutex);
|
||||
|
||||
m_WaitTime += latency;
|
||||
m_ServiceTime += et - st;
|
||||
m_TaskCount++;
|
||||
|
||||
if (latency > m_MaxLatency)
|
||||
m_MaxLatency = latency;
|
||||
queue.WaitTime += latency;
|
||||
queue.ServiceTime += et - st;
|
||||
queue.TaskCount++;
|
||||
}
|
||||
|
||||
#ifdef _DEBUG
|
||||
|
@ -175,9 +188,9 @@ void ThreadPool::QueueThreadProc(int tid)
|
|||
#endif /* _DEBUG */
|
||||
}
|
||||
|
||||
boost::mutex::scoped_lock lock(m_Mutex);
|
||||
UpdateThreadUtilization(tid, ThreadDead);
|
||||
m_Threads[tid].Zombie = false;
|
||||
boost::mutex::scoped_lock lock(queue.Mutex);
|
||||
UpdateUtilization(ThreadDead);
|
||||
Zombie = false;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -192,14 +205,16 @@ bool ThreadPool::Post(const ThreadPool::WorkFunction& callback)
|
|||
wi.Callback = callback;
|
||||
wi.Timestamp = Utility::GetTime();
|
||||
|
||||
{
|
||||
boost::mutex::scoped_lock lock(m_Mutex);
|
||||
Queue& queue = m_Queues[Utility::Random() % (sizeof(m_Queues) / sizeof(m_Queues[0]))];
|
||||
|
||||
if (m_Stopped)
|
||||
{
|
||||
boost::mutex::scoped_lock lock(queue.Mutex);
|
||||
|
||||
if (queue.Stopped)
|
||||
return false;
|
||||
|
||||
m_WorkItems.push_back(wi);
|
||||
m_WorkCV.notify_one();
|
||||
queue.Items.push_back(wi);
|
||||
queue.CV.notify_one();
|
||||
}
|
||||
|
||||
return true;
|
||||
|
@ -212,34 +227,42 @@ void ThreadPool::ManagerThreadProc(void)
|
|||
Utility::SetThreadName(idbuf.str());
|
||||
|
||||
for (;;) {
|
||||
size_t pending, alive;
|
||||
double avg_latency, max_latency;
|
||||
double utilization = 0;
|
||||
size_t total_pending = 0, total_alive = 0;
|
||||
double total_avg_latency = 0;
|
||||
double total_utilization = 0;
|
||||
|
||||
{
|
||||
boost::mutex::scoped_lock lock(m_Mutex);
|
||||
boost::mutex::scoped_lock lock(m_MgmtMutex);
|
||||
|
||||
if (!m_Stopped)
|
||||
m_MgmtCV.timed_wait(lock, boost::posix_time::seconds(5));
|
||||
|
||||
if (m_Stopped)
|
||||
break;
|
||||
}
|
||||
|
||||
pending = m_WorkItems.size();
|
||||
for (int i = 0; i < sizeof(m_Queues) / sizeof(m_Queues[0]); i++) {
|
||||
size_t pending, alive = 0;
|
||||
double avg_latency;
|
||||
double utilization = 0;
|
||||
|
||||
alive = 0;
|
||||
Queue& queue = m_Queues[i];
|
||||
|
||||
for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++) {
|
||||
if (m_Threads[i].State != ThreadDead && !m_Threads[i].Zombie) {
|
||||
boost::mutex::scoped_lock lock(queue.Mutex);
|
||||
|
||||
pending = queue.Items.size();
|
||||
|
||||
for (size_t i = 0; i < sizeof(queue.Threads) / sizeof(queue.Threads[0]); i++) {
|
||||
if (queue.Threads[i].State != ThreadDead && !queue.Threads[i].Zombie) {
|
||||
alive++;
|
||||
utilization += m_Threads[i].Utilization * 100;
|
||||
utilization += queue.Threads[i].Utilization * 100;
|
||||
}
|
||||
}
|
||||
|
||||
utilization /= alive;
|
||||
|
||||
if (m_TaskCount > 0)
|
||||
avg_latency = m_WaitTime / (m_TaskCount * 1.0);
|
||||
if (queue.TaskCount > 0)
|
||||
avg_latency = queue.WaitTime / (queue.TaskCount * 1.0);
|
||||
else
|
||||
avg_latency = 0;
|
||||
|
||||
|
@ -248,9 +271,9 @@ void ThreadPool::ManagerThreadProc(void)
|
|||
|
||||
int tthreads = wthreads - alive;
|
||||
|
||||
/* Don't ever kill the last 8 threads. */
|
||||
if (alive + tthreads < 8)
|
||||
tthreads = 8 - alive;
|
||||
/* Don't ever kill the last threads. */
|
||||
if (alive + tthreads < 2)
|
||||
tthreads = 2 - alive;
|
||||
|
||||
/* Don't kill more than 8 threads at once. */
|
||||
if (tthreads < -8)
|
||||
|
@ -258,33 +281,37 @@ void ThreadPool::ManagerThreadProc(void)
|
|||
|
||||
/* Spawn more workers if there are outstanding work items. */
|
||||
if (tthreads > 0 && pending > 0)
|
||||
tthreads = (Utility::GetTime() - Application::GetStartTime() < 300) ? 128 : 8;
|
||||
tthreads = 8;
|
||||
|
||||
if (m_MaxThreads != -1 && (alive + tthreads) * (sizeof(m_Queues) / sizeof(m_Queues[0])) > m_MaxThreads)
|
||||
tthreads = m_MaxThreads / (sizeof(m_Queues) / sizeof(m_Queues[0])) - alive;
|
||||
|
||||
std::ostringstream msgbuf;
|
||||
msgbuf << "Thread pool; current: " << alive << "; adjustment: " << tthreads;
|
||||
Log(LogDebug, "base", msgbuf.str());
|
||||
|
||||
for (int i = 0; i < -tthreads; i++)
|
||||
KillWorker();
|
||||
queue.KillWorker(m_ThreadGroup);
|
||||
|
||||
for (int i = 0; i < tthreads; i++)
|
||||
SpawnWorker();
|
||||
queue.SpawnWorker(m_ThreadGroup);
|
||||
}
|
||||
|
||||
m_WaitTime = 0;
|
||||
m_ServiceTime = 0;
|
||||
m_TaskCount = 0;
|
||||
queue.WaitTime = 0;
|
||||
queue.ServiceTime = 0;
|
||||
queue.TaskCount = 0;
|
||||
|
||||
max_latency = m_MaxLatency;
|
||||
m_MaxLatency = 0;
|
||||
total_pending += pending;
|
||||
total_alive += alive;
|
||||
total_avg_latency += avg_latency;
|
||||
total_utilization += utilization;
|
||||
}
|
||||
|
||||
std::ostringstream msgbuf;
|
||||
msgbuf << "Pool #" << m_ID << ": Pending tasks: " << pending << "; Average latency: "
|
||||
<< (long)(avg_latency * 1000) << "ms"
|
||||
<< "; Max latency: " << (long)(max_latency * 1000) << "ms"
|
||||
<< "; Threads: " << alive
|
||||
<< "; Pool utilization: " << utilization << "%";
|
||||
msgbuf << "Pool #" << m_ID << ": Pending tasks: " << total_pending << "; Average latency: "
|
||||
<< (long)(total_avg_latency * 1000 / (sizeof(m_Queues) / sizeof(m_Queues[0]))) << "ms"
|
||||
<< "; Threads: " << total_alive
|
||||
<< "; Pool utilization: " << (total_utilization / (sizeof(m_Queues) / sizeof(m_Queues[0]))) << "%";
|
||||
Log(LogInformation, "base", msgbuf.str());
|
||||
}
|
||||
}
|
||||
|
@ -292,14 +319,14 @@ void ThreadPool::ManagerThreadProc(void)
|
|||
/**
|
||||
* Note: Caller must hold m_Mutex
|
||||
*/
|
||||
void ThreadPool::SpawnWorker(void)
|
||||
void ThreadPool::Queue::SpawnWorker(boost::thread_group& group)
|
||||
{
|
||||
for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++) {
|
||||
if (m_Threads[i].State == ThreadDead) {
|
||||
for (size_t i = 0; i < sizeof(Threads) / sizeof(Threads[0]); i++) {
|
||||
if (Threads[i].State == ThreadDead) {
|
||||
Log(LogDebug, "debug", "Spawning worker thread.");
|
||||
|
||||
m_Threads[i] = WorkerThread(ThreadIdle);
|
||||
m_Threads[i].Thread = m_ThreadGroup.create_thread(boost::bind(&ThreadPool::QueueThreadProc, this, i));
|
||||
Threads[i] = WorkerThread(ThreadIdle);
|
||||
Threads[i].Thread = group.create_thread(boost::bind(&ThreadPool::WorkerThread::ThreadProc, boost::ref(Threads[i]), boost::ref(*this)));
|
||||
|
||||
break;
|
||||
}
|
||||
|
@ -307,20 +334,20 @@ void ThreadPool::SpawnWorker(void)
|
|||
}
|
||||
|
||||
/**
|
||||
* Note: Caller must hold m_Mutex.
|
||||
* Note: Caller must hold Mutex.
|
||||
*/
|
||||
void ThreadPool::KillWorker(void)
|
||||
void ThreadPool::Queue::KillWorker(boost::thread_group& group)
|
||||
{
|
||||
for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++) {
|
||||
if (m_Threads[i].State == ThreadIdle && !m_Threads[i].Zombie) {
|
||||
for (size_t i = 0; i < sizeof(Threads) / sizeof(Threads[0]); i++) {
|
||||
if (Threads[i].State == ThreadIdle && !Threads[i].Zombie) {
|
||||
Log(LogDebug, "base", "Killing worker thread.");
|
||||
|
||||
m_ThreadGroup.remove_thread(m_Threads[i].Thread);
|
||||
m_Threads[i].Thread->detach();
|
||||
delete m_Threads[i].Thread;
|
||||
group.remove_thread(Threads[i].Thread);
|
||||
Threads[i].Thread->detach();
|
||||
delete Threads[i].Thread;
|
||||
|
||||
m_Threads[i].Zombie = true;
|
||||
m_WorkCV.notify_all();
|
||||
Threads[i].Zombie = true;
|
||||
CV.notify_all();
|
||||
|
||||
break;
|
||||
}
|
||||
|
@ -334,27 +361,35 @@ void ThreadPool::StatsThreadProc(void)
|
|||
Utility::SetThreadName(idbuf.str());
|
||||
|
||||
for (;;) {
|
||||
boost::mutex::scoped_lock lock(m_Mutex);
|
||||
{
|
||||
boost::mutex::scoped_lock lock(m_MgmtMutex);
|
||||
|
||||
if (!m_Stopped)
|
||||
m_MgmtCV.timed_wait(lock, boost::posix_time::milliseconds(250));
|
||||
if (!m_Stopped)
|
||||
m_MgmtCV.timed_wait(lock, boost::posix_time::milliseconds(250));
|
||||
|
||||
if (m_Stopped)
|
||||
break;
|
||||
if (m_Stopped)
|
||||
break;
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++)
|
||||
UpdateThreadUtilization(i);
|
||||
for (int i = 0; i < sizeof(m_Queues) / sizeof(m_Queues[0]); i++) {
|
||||
Queue& queue = m_Queues[i];
|
||||
|
||||
boost::mutex::scoped_lock lock(queue.Mutex);
|
||||
|
||||
for (size_t i = 0; i < sizeof(queue.Threads) / sizeof(queue.Threads[0]); i++)
|
||||
queue.Threads[i].UpdateUtilization();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Note: Caller must hold m_Mutex.
|
||||
* Note: Caller must hold queue Mutex.
|
||||
*/
|
||||
void ThreadPool::UpdateThreadUtilization(int tid, ThreadState state)
|
||||
void ThreadPool::WorkerThread::UpdateUtilization(ThreadState state)
|
||||
{
|
||||
double utilization;
|
||||
|
||||
switch (m_Threads[tid].State) {
|
||||
switch (State) {
|
||||
case ThreadDead:
|
||||
return;
|
||||
case ThreadIdle:
|
||||
|
@ -368,16 +403,16 @@ void ThreadPool::UpdateThreadUtilization(int tid, ThreadState state)
|
|||
}
|
||||
|
||||
double now = Utility::GetTime();
|
||||
double time = now - m_Threads[tid].LastUpdate;
|
||||
double time = now - LastUpdate;
|
||||
|
||||
const double avg_time = 5.0;
|
||||
|
||||
if (time > avg_time)
|
||||
time = avg_time;
|
||||
|
||||
m_Threads[tid].Utilization = (m_Threads[tid].Utilization * (avg_time - time) + utilization * time) / avg_time;
|
||||
m_Threads[tid].LastUpdate = now;
|
||||
Utilization = (Utilization * (avg_time - time) + utilization * time) / avg_time;
|
||||
LastUpdate = now;
|
||||
|
||||
if (state != ThreadUnspecified)
|
||||
m_Threads[tid].State = state;
|
||||
State = state;
|
||||
}
|
||||
|
|
|
@ -40,11 +40,12 @@ class I2_BASE_API ThreadPool
|
|||
public:
|
||||
typedef boost::function<void ()> WorkFunction;
|
||||
|
||||
ThreadPool(void);
|
||||
ThreadPool(int max_threads = -1);
|
||||
~ThreadPool(void);
|
||||
|
||||
void Start(void);
|
||||
void Stop(void);
|
||||
void Join(void);
|
||||
void Join(bool wait_for_stop = false);
|
||||
|
||||
bool Post(const WorkFunction& callback);
|
||||
|
||||
|
@ -57,6 +58,14 @@ private:
|
|||
ThreadBusy
|
||||
};
|
||||
|
||||
struct WorkItem
|
||||
{
|
||||
WorkFunction Callback;
|
||||
double Timestamp;
|
||||
};
|
||||
|
||||
struct Queue;
|
||||
|
||||
struct WorkerThread
|
||||
{
|
||||
ThreadState State;
|
||||
|
@ -68,43 +77,51 @@ private:
|
|||
WorkerThread(ThreadState state = ThreadDead)
|
||||
: State(state), Zombie(false), Utilization(0), LastUpdate(0), Thread(NULL)
|
||||
{ }
|
||||
|
||||
void UpdateUtilization(ThreadState state = ThreadUnspecified);
|
||||
|
||||
void ThreadProc(Queue& queue);
|
||||
};
|
||||
|
||||
struct Queue
|
||||
{
|
||||
boost::mutex Mutex;
|
||||
boost::condition_variable CV;
|
||||
boost::condition_variable CVStarved;
|
||||
|
||||
std::deque<WorkItem> Items;
|
||||
|
||||
double WaitTime;
|
||||
double ServiceTime;
|
||||
int TaskCount;
|
||||
|
||||
bool Stopped;
|
||||
|
||||
WorkerThread Threads[256];
|
||||
|
||||
Queue(void)
|
||||
: WaitTime(0), ServiceTime(0), TaskCount(0), Stopped(false)
|
||||
{ }
|
||||
|
||||
void SpawnWorker(boost::thread_group& group);
|
||||
void KillWorker(boost::thread_group& group);
|
||||
};
|
||||
|
||||
int m_ID;
|
||||
static int m_NextID;
|
||||
|
||||
int m_MaxThreads;
|
||||
|
||||
boost::thread_group m_ThreadGroup;
|
||||
WorkerThread m_Threads[4096];
|
||||
|
||||
double m_WaitTime;
|
||||
double m_ServiceTime;
|
||||
int m_TaskCount;
|
||||
|
||||
double m_MaxLatency;
|
||||
|
||||
boost::mutex m_Mutex;
|
||||
boost::condition_variable m_WorkCV;
|
||||
boost::mutex m_MgmtMutex;
|
||||
boost::condition_variable m_MgmtCV;
|
||||
|
||||
bool m_Stopped;
|
||||
|
||||
struct WorkItem
|
||||
{
|
||||
WorkFunction Callback;
|
||||
double Timestamp;
|
||||
};
|
||||
Queue m_Queues[16];
|
||||
|
||||
|
||||
std::deque<WorkItem> m_WorkItems;
|
||||
|
||||
void QueueThreadProc(int tid);
|
||||
void ManagerThreadProc(void);
|
||||
void StatsThreadProc(void);
|
||||
|
||||
void SpawnWorker(void);
|
||||
void KillWorker(void);
|
||||
|
||||
void UpdateThreadUtilization(int tid, ThreadState state = ThreadUnspecified);
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -40,6 +40,7 @@
|
|||
using namespace icinga;
|
||||
|
||||
boost::thread_specific_ptr<String> Utility::m_ThreadName;
|
||||
boost::thread_specific_ptr<unsigned int> Utility::m_RandSeed;
|
||||
|
||||
/**
|
||||
* Demangles a symbol name.
|
||||
|
@ -739,10 +740,18 @@ int Utility::CompareVersion(const String& v1, const String& v2)
|
|||
|
||||
int Utility::Random(void)
|
||||
{
|
||||
static boost::mutex mtx;
|
||||
boost::mutex::scoped_lock lock(mtx);
|
||||
|
||||
#ifdef _WIN32
|
||||
return rand();
|
||||
#else /* _WIN32 */
|
||||
unsigned int *seed = m_RandSeed.get();
|
||||
|
||||
if (!seed) {
|
||||
seed = new unsigned int(Utility::GetTime());
|
||||
m_RandSeed.reset(seed);
|
||||
}
|
||||
|
||||
return rand_r(seed);
|
||||
#endif /* _WIN32 */
|
||||
}
|
||||
|
||||
tm Utility::LocalTime(time_t ts)
|
||||
|
|
|
@ -114,6 +114,7 @@ private:
|
|||
Utility(void);
|
||||
|
||||
static boost::thread_specific_ptr<String> m_ThreadName;
|
||||
static boost::thread_specific_ptr<unsigned int> m_RandSeed;
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -178,21 +178,24 @@ DynamicObject::Ptr ConfigItem::Commit(void)
|
|||
DynamicObject::Ptr dobj = dtype->CreateObject(properties);
|
||||
dobj->Register();
|
||||
|
||||
m_Object = dobj;
|
||||
|
||||
return dobj;
|
||||
}
|
||||
|
||||
DynamicObject::Ptr ConfigItem::GetObject(void) const
|
||||
{
|
||||
return m_Object;
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers the configuration item.
|
||||
*/
|
||||
void ConfigItem::Register(void)
|
||||
{
|
||||
ASSERT(!OwnsLock());
|
||||
boost::mutex::scoped_lock lock(m_Mutex);
|
||||
|
||||
{
|
||||
ObjectLock olock(this);
|
||||
|
||||
m_Items[std::make_pair(m_Type, m_Name)] = GetSelf();
|
||||
}
|
||||
m_Items[std::make_pair(m_Type, m_Name)] = GetSelf();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -244,34 +247,49 @@ bool ConfigItem::ActivateItems(bool validateOnly)
|
|||
|
||||
Log(LogInformation, "config", "Validating config items (step 1)...");
|
||||
|
||||
ThreadPool tp(32);
|
||||
|
||||
BOOST_FOREACH(const ItemMap::value_type& kv, m_Items) {
|
||||
kv.second->ValidateItem();
|
||||
tp.Post(boost::bind(&ConfigItem::ValidateItem, kv.second));
|
||||
}
|
||||
|
||||
tp.Join();
|
||||
|
||||
if (ConfigCompilerContext::GetInstance()->HasErrors())
|
||||
return false;
|
||||
|
||||
Log(LogInformation, "config", "Activating config items");
|
||||
|
||||
std::vector<DynamicObject::Ptr> objects;
|
||||
Log(LogInformation, "config", "Comitting config items");
|
||||
|
||||
BOOST_FOREACH(const ItemMap::value_type& kv, m_Items) {
|
||||
DynamicObject::Ptr object = kv.second->Commit();
|
||||
tp.Post(boost::bind(&ConfigItem::Commit, kv.second));
|
||||
}
|
||||
|
||||
tp.Join();
|
||||
|
||||
std::vector<DynamicObject::Ptr> objects;
|
||||
BOOST_FOREACH(const ItemMap::value_type& kv, m_Items) {
|
||||
DynamicObject::Ptr object = kv.second->GetObject();
|
||||
|
||||
if (object)
|
||||
objects.push_back(object);
|
||||
}
|
||||
|
||||
|
||||
Log(LogInformation, "config", "Triggering OnConfigLoaded signal for config items");
|
||||
|
||||
BOOST_FOREACH(const DynamicObject::Ptr& object, objects) {
|
||||
object->OnConfigLoaded();
|
||||
tp.Post(boost::bind(&DynamicObject::OnConfigLoaded, object));
|
||||
}
|
||||
|
||||
tp.Join();
|
||||
|
||||
Log(LogInformation, "config", "Validating config items (step 2)...");
|
||||
|
||||
BOOST_FOREACH(const ItemMap::value_type& kv, m_Items) {
|
||||
kv.second->ValidateItem();
|
||||
tp.Post(boost::bind(&ConfigItem::ValidateItem, kv.second));
|
||||
}
|
||||
|
||||
tp.Join();
|
||||
|
||||
if (ConfigCompilerContext::GetInstance()->HasErrors())
|
||||
return false;
|
||||
|
||||
|
@ -281,6 +299,8 @@ bool ConfigItem::ActivateItems(bool validateOnly)
|
|||
/* restore the previous program state */
|
||||
DynamicObject::RestoreObjects(Application::GetStatePath());
|
||||
|
||||
Log(LogInformation, "config", "Triggering Start signal for config items");
|
||||
|
||||
BOOST_FOREACH(const DynamicType::Ptr& type, DynamicType::GetTypes()) {
|
||||
BOOST_FOREACH(const DynamicObject::Ptr& object, type->GetObjects()) {
|
||||
if (object->IsActive())
|
||||
|
@ -289,11 +309,21 @@ bool ConfigItem::ActivateItems(bool validateOnly)
|
|||
#ifdef _DEBUG
|
||||
Log(LogDebug, "config", "Activating object '" + object->GetName() + "' of type '" + object->GetType()->GetName() + "'");
|
||||
#endif /* _DEBUG */
|
||||
object->Start();
|
||||
|
||||
tp.Post(boost::bind(&DynamicObject::Start, object));
|
||||
}
|
||||
}
|
||||
|
||||
tp.Join();
|
||||
|
||||
#ifdef _DEBUG
|
||||
BOOST_FOREACH(const DynamicType::Ptr& type, DynamicType::GetTypes()) {
|
||||
BOOST_FOREACH(const DynamicObject::Ptr& object, type->GetObjects()) {
|
||||
ASSERT(object->IsActive());
|
||||
}
|
||||
}
|
||||
#endif /* _DEBUG */
|
||||
|
||||
Log(LogInformation, "config", "Activated all objects.");
|
||||
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -61,6 +61,8 @@ public:
|
|||
|
||||
void ValidateItem(void);
|
||||
|
||||
DynamicObject::Ptr GetObject(void) const;
|
||||
|
||||
static bool ActivateItems(bool validateOnly);
|
||||
static void DiscardItems(void);
|
||||
|
||||
|
@ -79,6 +81,8 @@ private:
|
|||
|
||||
ExpressionList::Ptr m_LinkedExpressionList;
|
||||
Dictionary::Ptr m_Properties;
|
||||
|
||||
DynamicObject::Ptr m_Object;
|
||||
|
||||
static boost::mutex m_Mutex;
|
||||
|
||||
|
|
Loading…
Reference in New Issue