[asterisk-scf-commits] asterisk-scf/integration/ice-util-cpp.git branch "workqueue" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Mon Apr 18 09:04:56 CDT 2011
branch "workqueue" has been updated
via e62d3ce2544ac6f072e0be683656f486eed41252 (commit)
from abb9e7e5a234470e1591218a514b16d93a5b8f64 (commit)
Summary of changes:
ThreadPool/src/CMakeLists.txt | 2 +
ThreadPool/src/ThreadPool.cpp | 266 +++++++++++++-------
ThreadPool/test/TestThreadPool.cpp | 40 +++-
.../include/AsteriskSCF/DefaultQueueListener.h | 43 ++++
WorkQueue/src/CMakeLists.txt | 3 +
WorkQueue/src/DefaultQueueListener.cpp | 105 ++++++++
WorkQueue/test/TestWorkQueue.cpp | 44 ++++
7 files changed, 414 insertions(+), 89 deletions(-)
create mode 100644 WorkQueue/include/AsteriskSCF/DefaultQueueListener.h
create mode 100644 WorkQueue/src/DefaultQueueListener.cpp
- Log -----------------------------------------------------------------
commit e62d3ce2544ac6f072e0be683656f486eed41252
Author: Mark Michelson <mmichelson at digium.com>
Date: Mon Apr 18 08:53:53 2011 -0500
Thread pool code is now complete, at least with regards to code correctness.
* In WorkQueue code, I implemented a "DefaultQueueListener" class. This class
is a simple implementation of a WorkQueueListener. The class creates a thread
that will execute tasks when the WorkQueue has tasks to execute and that will
go idle when there are no tasks to execute. Since I imagine that this type of
construct may be used often, it is included in the WorkQueue source so that
it may be used as desired.
* The ThreadPool now has all its operations queued onto a "Control" queue. The
main reason is so PoolListener methods may be called without a lock being
held, and the execution order of the PoolListener operations can be guaranteed.
Though the code is correct and tests pass, I want to make at least one more run
through the code for cleanup/documentation before opening a code review.
diff --git a/ThreadPool/src/CMakeLists.txt b/ThreadPool/src/CMakeLists.txt
index b7e1aee..110e410 100644
--- a/ThreadPool/src/CMakeLists.txt
+++ b/ThreadPool/src/CMakeLists.txt
@@ -11,6 +11,7 @@ asterisk_scf_slice_include_directories(${API_SLICE_DIR})
asterisk_scf_component_init(ThreadPool CXX)
include_directories(../include)
+include_directories(../../WorkQueue/include)
include_directories(${API_INCLUDE_DIR})
asterisk_scf_component_add_file(ThreadPool
@@ -21,5 +22,6 @@ asterisk_scf_component_add_file(ThreadPool ThreadPool.cpp)
asterisk_scf_component_add_boost_libraries(ThreadPool thread)
asterisk_scf_component_build_library(ThreadPool)
+target_link_libraries(ThreadPool WorkQueue)
asterisk_scf_headers_install(../include/)
diff --git a/ThreadPool/src/ThreadPool.cpp b/ThreadPool/src/ThreadPool.cpp
index e5a5d99..9de49e3 100644
--- a/ThreadPool/src/ThreadPool.cpp
+++ b/ThreadPool/src/ThreadPool.cpp
@@ -18,6 +18,8 @@
#include <AsteriskSCF/ThreadPool.h>
#include <AsteriskSCF/WorkerThread.h>
+#include <AsteriskSCF/WorkQueue.h>
+#include <AsteriskSCF/DefaultQueueListener.h>
namespace AsteriskSCF
{
@@ -98,136 +100,227 @@ class ThreadPoolPriv : public WorkerThreadListener
{
public:
ThreadPoolPriv(const PoolListenerPtr& listener, const QueuePtr& queue, Pool *pool)
- : mListener(listener), mQueue(queue), mPool(pool)
+ : mControlQueue(new AsteriskSCF::WorkQueue::WorkQueue), mListener(listener), mQueue(queue), mPool(pool), mShuttingDown(false)
{
+ mControlQueue->setListener(new AsteriskSCF::WorkQueue::DefaultQueueListener(mControlQueue));
mQueue->setListener(new ThreadQueueListener(this, mPool));
}
~ThreadPoolPriv()
{
- resize(0);
- }
+ //The tricky thing when destroying the thread pool is that
+ //we have to be sure that no queued operations from the worker
+ //threads interfere with destruction.
+ //
+ //First, we make it known we are shutting down, and kill off
+ //the control queue so that no more operations that might
+ //move WorkerThreads from their current containers may run.
- void activeThreadIdle(WorkerThread *thread)
- {
- int activeSize;
- int idleSize;
- int zombieSize;
{
- boost::unique_lock<boost::mutex> lock(mLock);
- std::vector<WorkerThread*>::iterator iter =
- std::find(mActiveThreads.begin(), mActiveThreads.end(), thread);
-
- if (iter != mActiveThreads.end())
- {
- mIdleThreads.push_back(*iter);
- mActiveThreads.erase(iter);
- }
- activeSize = mActiveThreads.size();
- idleSize = mIdleThreads.size();
- zombieSize = mZombieThreads.size();
- //XXX I don't like calling listener operations without a lock held since it makes
- //it much more difficult to call pool operations from the listener. The problem is
- //that by NOT calling with the lock held, state changes can arrive out of order...believe
- //me, I've seen my tests fail as a result of this.
- mListener->stateChanged(mPool, activeSize, idleSize, zombieSize);
+ boost::lock_guard<boost::mutex> lock(mQueueLock);
+ mShuttingDown = true;
+ mControlQueue = 0;
+ }
+
+ //Now WorkerThreads are stuck in their current containers. All
+ //we need to do now is to kill them off. Poking them into the
+ //Dead state, joining, and then deleting will do the trick.
+ while (!mIdleThreads.empty())
+ {
+ std::vector<WorkerThread*>::iterator i = mIdleThreads.begin();
+ WorkerThread *doomed = *i;
+ mIdleThreads.erase(i);
+ doomed->poke(Dead);
+ doomed->join();
+ delete doomed;
+ }
+ while (!mActiveThreads.empty())
+ {
+ std::vector<WorkerThread*>::iterator i = mActiveThreads.begin();
+ WorkerThread *doomed = *i;
+ mActiveThreads.erase(i);
+ doomed->poke(Dead);
+ doomed->join();
+ delete doomed;
+ }
+ while (!mZombieThreads.empty())
+ {
+ std::vector<WorkerThread*>::iterator i = mZombieThreads.begin();
+ WorkerThread *doomed = *i;
+ mZombieThreads.erase(i);
+ doomed->poke(Dead);
+ doomed->join();
+ delete doomed;
}
}
-
- void zombieThreadDead(WorkerThread *thread)
+
+ class ActiveThreadIdle : public Work
{
- int activeSize;
- int idleSize;
- int zombieSize;
+ public:
+ ActiveThreadIdle(WorkerThread *thread, ThreadPoolPriv *priv)
+ : mPriv(priv), mWorkerThread(thread) { }
+
+ void execute()
{
- boost::unique_lock<boost::mutex> lock(mLock);
- std::vector<WorkerThread*>::iterator i = std::find(mZombieThreads.begin(), mZombieThreads.end(), thread);
- if (i != mZombieThreads.end())
+ std::vector<WorkerThread*>::iterator iter =
+ std::find(mPriv->mActiveThreads.begin(),
+ mPriv->mActiveThreads.end(), mWorkerThread);
+
+ if (iter != mPriv->mActiveThreads.end())
{
- WorkerThread *doomed = *i;
- mZombieThreads.erase(i);
- delete doomed;
+ mPriv->mIdleThreads.push_back(*iter);
+ mPriv->mActiveThreads.erase(iter);
}
- activeSize = mActiveThreads.size();
- idleSize = mIdleThreads.size();
- zombieSize = mZombieThreads.size();
- mListener->stateChanged(mPool, activeSize, idleSize, zombieSize);
+ int activeSize = mPriv->mActiveThreads.size();
+ int idleSize = mPriv->mIdleThreads.size();
+ int zombieSize = mPriv->mZombieThreads.size();
+ mPriv->mListener->stateChanged(mPriv->mPool, activeSize, idleSize, zombieSize);
}
- }
-
- void grow(int numNewThreads)
+ private:
+ ThreadPoolPriv *mPriv;
+ WorkerThread *mWorkerThread;
+ };
+
+ typedef IceUtil::Handle<ActiveThreadIdle> ActiveThreadIdlePtr;
+
+ void activeThreadIdle(WorkerThread *thread)
{
- for (int i = 0; i < numNewThreads; ++i)
+ boost::lock_guard<boost::mutex> lock(mQueueLock);
+ if (!mShuttingDown)
{
- WorkerThread *newThread(new WorkerThread(mQueue, this));
- mActiveThreads.push_back(newThread);
+ ActiveThreadIdlePtr task(new ActiveThreadIdle(thread, this));
+ mControlQueue->enqueueWork(task);
}
}
-
- void shrink(int threadsToKill)
+
+ class ZombieThreadDead : public Work
{
- while (threadsToKill > 0)
+ public:
+ ZombieThreadDead(WorkerThread *thread, ThreadPoolPriv *priv)
+ : mPriv(priv), mWorkerThread(thread) { }
+
+ void execute()
{
- std::vector<WorkerThread*>::iterator i = mIdleThreads.begin();
- if (i == mIdleThreads.end())
+ std::vector<WorkerThread*>::iterator i =
+ std::find(mPriv->mZombieThreads.begin(),
+ mPriv->mZombieThreads.end(), mWorkerThread);
+ if (i != mPriv->mZombieThreads.end())
{
- //We're out of idle threads to kill.
- break;
+ WorkerThread *doomed = *i;
+ mPriv->mZombieThreads.erase(i);
+ delete doomed;
}
- WorkerThread *doomed = *i;
- mIdleThreads.erase(i);
- doomed->poke(Dead);
- doomed->join();
- delete doomed;
-
- --threadsToKill;
+ int activeSize = mPriv->mActiveThreads.size();
+ int idleSize = mPriv->mIdleThreads.size();
+ int zombieSize = mPriv->mZombieThreads.size();
+ mPriv->mListener->stateChanged(mPriv->mPool, activeSize, idleSize, zombieSize);
}
+ private:
+ ThreadPoolPriv *mPriv;
+ WorkerThread *mWorkerThread;
+ };
+
+ typedef IceUtil::Handle<ZombieThreadDead> ZombieThreadDeadPtr;
- while (threadsToKill > 0)
+ void zombieThreadDead(WorkerThread *thread)
+ {
+ boost::lock_guard<boost::mutex> lock(mQueueLock);
+ if (!mShuttingDown)
{
- // If we've made it here, then it means that there weren't enough idle
- // threads to kill. We'll need to zombify some active threads then.
- std::vector<WorkerThread*>::iterator i = mActiveThreads.begin();
- if (i != mActiveThreads.end())
- {
- mZombieThreads.push_back(*i);
- (*i)->poke(Zombie);
- mActiveThreads.erase(i);
- }
- --threadsToKill;
+ ZombieThreadDeadPtr task(new ZombieThreadDead(thread, this));
+ mControlQueue->enqueueWork(task);
}
}
- void resize(int numThreads)
+ class Resize : public Work
{
- int activeSize;
- int idleSize;
- int zombieSize;
+ public:
+ Resize(int numThreads, ThreadPoolPriv *priv)
+ : mNumThreads(numThreads), mPriv(priv) { }
+ void execute()
{
- boost::unique_lock<boost::mutex> lock(mLock);
//We don't count zombie threads as being "live" when potentially resizing
- int currentSize = mActiveThreads.size() + mIdleThreads.size();
+ int currentSize = mPriv->mActiveThreads.size() + mPriv->mIdleThreads.size();
- if (currentSize == numThreads)
+ if (currentSize == mNumThreads)
{
return;
}
- if (currentSize < numThreads)
+ if (currentSize < mNumThreads)
{
- grow(numThreads - currentSize);
+ grow(mNumThreads - currentSize);
}
else
{
- shrink(currentSize - numThreads);
+ shrink(currentSize - mNumThreads);
+ }
+ int activeSize = mPriv->mActiveThreads.size();
+ int idleSize = mPriv->mIdleThreads.size();
+ int zombieSize = mPriv->mZombieThreads.size();
+ mPriv->mListener->stateChanged(mPriv->mPool, activeSize, idleSize, zombieSize);
+ }
+ private:
+ void grow(int numNewThreads)
+ {
+ for (int i = 0; i < numNewThreads; ++i)
+ {
+ WorkerThread *newThread(new WorkerThread(mPriv->mQueue, mPriv));
+ mPriv->mActiveThreads.push_back(newThread);
+ }
+ }
+
+ void shrink(int threadsToKill)
+ {
+ while (threadsToKill > 0)
+ {
+ std::vector<WorkerThread*>::iterator i = mPriv->mIdleThreads.begin();
+ if (i == mPriv->mIdleThreads.end())
+ {
+ //We're out of idle threads to kill.
+ break;
+ }
+ WorkerThread *doomed = *i;
+ mPriv->mIdleThreads.erase(i);
+ doomed->poke(Dead);
+ doomed->join();
+ delete doomed;
+
+ --threadsToKill;
+ }
+
+ while (threadsToKill > 0)
+ {
+ // If we've made it here, then it means that there weren't enough idle
+ // threads to kill. We'll need to zombify some active threads then.
+ std::vector<WorkerThread*>::iterator i = mPriv->mActiveThreads.begin();
+ if (i != mPriv->mActiveThreads.end())
+ {
+ mPriv->mZombieThreads.push_back(*i);
+ (*i)->poke(Zombie);
+ mPriv->mActiveThreads.erase(i);
+ }
+ --threadsToKill;
}
- activeSize = mActiveThreads.size();
- idleSize = mIdleThreads.size();
- zombieSize = mZombieThreads.size();
- mListener->stateChanged(mPool, activeSize, idleSize, zombieSize);
+ }
+
+ const int mNumThreads;
+ ThreadPoolPriv *mPriv;
+ };
+
+ typedef IceUtil::Handle<Resize> ResizePtr;
+
+ void resize(int numThreads)
+ {
+ boost::lock_guard<boost::mutex> lock(mQueueLock);
+ if (!mShuttingDown)
+ {
+ ResizePtr task(new Resize(numThreads, this));
+ mControlQueue->enqueueWork(task);
}
}
+ QueuePtr mControlQueue;
AsteriskSCF::System::ThreadPool::V1::PoolListenerPtr mListener;
AsteriskSCF::System::WorkQueue::V1::QueuePtr mQueue;
AsteriskSCF::System::ThreadPool::V1::Pool *mPool;
@@ -236,7 +329,8 @@ public:
std::vector<WorkerThread*> mIdleThreads;
std::vector<WorkerThread*> mZombieThreads;
- boost::mutex mLock;
+ bool mShuttingDown;
+ boost::mutex mQueueLock;
};
ThreadQueueListener::ThreadQueueListener(ThreadPoolPriv *priv, Pool *pool)
diff --git a/ThreadPool/test/TestThreadPool.cpp b/ThreadPool/test/TestThreadPool.cpp
index 966515b..5ff1920 100644
--- a/ThreadPool/test/TestThreadPool.cpp
+++ b/ThreadPool/test/TestThreadPool.cpp
@@ -169,12 +169,15 @@ BOOST_AUTO_TEST_CASE(threadDestruction)
{
BOOST_TEST_MESSAGE("Running threadDestruction test");
+ std::cout << "Really early failure?" << std::endl;
TestListenerPtr listener(new TestListener);
QueuePtr queue(new WorkQueue());
PoolPtr pool(new ThreadPool(listener, queue));
pool->setSize(3);
+ std::cout << "Early failure?" << std::endl;
+
{
boost::unique_lock<boost::mutex> lock(listener->mLock);
while (listener->mIdle != 3)
@@ -183,12 +186,16 @@ BOOST_AUTO_TEST_CASE(threadDestruction)
}
}
+ std::cout << "Everything cool?" << std::endl;
+
BOOST_CHECK(listener->mIdle == 3);
BOOST_CHECK(listener->mActive == 0);
BOOST_CHECK(listener->mZombie == 0);
+ std::cout << "Everything still cool?" << std::endl;
pool->setSize(2);
+ std::cout << "How about now?" << std::endl;
{
boost::unique_lock<boost::mutex> lock(listener->mLock);
while (listener->mIdle != 2)
@@ -196,6 +203,7 @@ BOOST_AUTO_TEST_CASE(threadDestruction)
listener->mCond.wait(lock);
}
}
+ std::cout << "And now?" << std::endl;
BOOST_CHECK(listener->mIdle == 2);
BOOST_CHECK(listener->mActive == 0);
@@ -360,6 +368,19 @@ BOOST_AUTO_TEST_CASE(taskDistribution)
//Since these tasks halt until they are poked,
//The two tasks should be evenly divided amongst
//the threads.
+ //
+ //Pool operations are asynchronous, so we need to
+ //wait until the listener is informed that there are
+ //two active threads
+
+ {
+ boost::unique_lock<boost::mutex> lock(listener->mLock);
+ while (listener->mActive < 2)
+ {
+ listener->mCond.wait(lock);
+ }
+ }
+ std::cout << "how many active? " << listener->mActive << std::endl;
BOOST_CHECK(listener->mActive == 2);
BOOST_CHECK(listener->mIdle == 0);
BOOST_CHECK(listener->mZombie == 0);
@@ -423,9 +444,13 @@ BOOST_AUTO_TEST_CASE(zombies)
pool->setSize(2);
- //Since these tasks halt until they are poked,
- //The two tasks should be evenly divided among
- //the threads.
+ {
+ boost::unique_lock<boost::mutex> lock(listener->mLock);
+ while (listener->mActive < 2)
+ {
+ listener->mCond.wait(lock);
+ }
+ }
BOOST_CHECK(listener->mActive == 2);
BOOST_CHECK(listener->mIdle == 0);
BOOST_CHECK(listener->mZombie == 0);
@@ -434,6 +459,15 @@ BOOST_AUTO_TEST_CASE(zombies)
//result in the active threads immediately becoming
//zombies.
pool->setSize(0);
+
+ {
+ boost::unique_lock<boost::mutex> lock(listener->mLock);
+ while (listener->mZombie < 2)
+ {
+ listener->mCond.wait(lock);
+ }
+ }
+ std::cout << "Number of threads. Active: " << listener->mActive << " Idle: " << listener->mIdle << " Zombie: " << listener->mZombie;
BOOST_CHECK(listener->mActive == 0);
BOOST_CHECK(listener->mIdle == 0);
BOOST_CHECK(listener->mZombie == 2);
diff --git a/WorkQueue/include/AsteriskSCF/DefaultQueueListener.h b/WorkQueue/include/AsteriskSCF/DefaultQueueListener.h
new file mode 100644
index 0000000..9557b05
--- /dev/null
+++ b/WorkQueue/include/AsteriskSCF/DefaultQueueListener.h
@@ -0,0 +1,43 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#pragma once
+
+#include <boost/shared_ptr.hpp>
+#include <AsteriskSCF/System/WorkQueue/WorkQueueIf.h>
+
+namespace AsteriskSCF
+{
+namespace WorkQueue
+{
+
+class DefaultQueueListenerPriv;
+
+class DefaultQueueListener : public AsteriskSCF::System::WorkQueue::V1::QueueListener
+{
+public:
+ DefaultQueueListener(const AsteriskSCF::System::WorkQueue::V1::QueuePtr& queue);
+ ~DefaultQueueListener();
+ void workAdded(int numNewWork, bool wasEmpty);
+ void workResumable();
+ void emptied();
+private:
+ boost::shared_ptr<DefaultQueueListenerPriv> mPriv;
+};
+
+typedef IceUtil::Handle<DefaultQueueListener> DefaultQueueListenerPtr;
+
+}; // end namespace WorkQueue
+}; // end namespace AsteriskSCF
diff --git a/WorkQueue/src/CMakeLists.txt b/WorkQueue/src/CMakeLists.txt
index c570188..c8e34e4 100644
--- a/WorkQueue/src/CMakeLists.txt
+++ b/WorkQueue/src/CMakeLists.txt
@@ -17,8 +17,11 @@ asterisk_scf_component_add_file(WorkQueue
../include/AsteriskSCF/WorkQueue.h)
asterisk_scf_component_add_file(WorkQueue
../include/AsteriskSCF/SuspendableWorkQueue.h)
+asterisk_scf_component_add_file(WorkQueue
+ ../include/AsteriskSCF/DefaultQueueListener.h)
asterisk_scf_component_add_file(WorkQueue WorkQueue.cpp)
asterisk_scf_component_add_file(WorkQueue SuspendableWorkQueue.cpp)
+asterisk_scf_component_add_file(WorkQueue DefaultQueueListener.cpp)
asterisk_scf_component_add_boost_libraries(WorkQueue thread)
asterisk_scf_component_build_library(WorkQueue)
diff --git a/WorkQueue/src/DefaultQueueListener.cpp b/WorkQueue/src/DefaultQueueListener.cpp
new file mode 100644
index 0000000..651ff94
--- /dev/null
+++ b/WorkQueue/src/DefaultQueueListener.cpp
@@ -0,0 +1,105 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#include <boost/thread.hpp>
+#include <AsteriskSCF/DefaultQueueListener.h>
+
+namespace AsteriskSCF
+{
+namespace WorkQueue
+{
+
+enum ListenerState
+{
+ Active,
+ Idle,
+ Dead
+};
+
+using namespace AsteriskSCF::System::WorkQueue::V1;
+
+class DefaultQueueListenerPriv
+{
+public:
+ DefaultQueueListenerPriv(const QueuePtr& queue)
+ : mState(Active), mQueue(queue.get()),
+ mThread(boost::bind(&DefaultQueueListenerPriv::run, this)) { }
+
+ void idle()
+ {
+ boost::unique_lock<boost::mutex> lock(mLock);
+ if (mState == Dead)
+ {
+ return;
+ }
+
+ mState = Idle;
+ while (mState == Idle)
+ {
+ mCond.wait(lock);
+ }
+ }
+
+ void run()
+ {
+ while (mState != Dead)
+ {
+ if (!mQueue->executeWork())
+ {
+ idle();
+ }
+ }
+ }
+
+ void poke(ListenerState newState)
+ {
+ boost::unique_lock<boost::mutex> lock(mLock);
+ mState = newState;
+ mCond.notify_one();
+ }
+
+ ListenerState mState;
+ Queue *mQueue;
+ boost::mutex mLock;
+ boost::condition_variable mCond;
+ boost::thread mThread;
+};
+
+DefaultQueueListener::DefaultQueueListener(const QueuePtr& queue)
+ : mPriv(new DefaultQueueListenerPriv(queue)) { }
+
+DefaultQueueListener::~DefaultQueueListener()
+{
+ mPriv->poke(Dead);
+ mPriv->mThread.join();
+}
+
+void DefaultQueueListener::workAdded(int numNewWork, bool wasEmpty)
+{
+ mPriv->poke(Active);
+}
+
+void DefaultQueueListener::workResumable()
+{
+ mPriv->poke(Active);
+}
+
+void DefaultQueueListener::emptied()
+{
+}
+
+}; //end namespace WorkQueue
+}; //end namespace AsteriskSCF
diff --git a/WorkQueue/test/TestWorkQueue.cpp b/WorkQueue/test/TestWorkQueue.cpp
index 7cbf09b..3244eb0 100644
--- a/WorkQueue/test/TestWorkQueue.cpp
+++ b/WorkQueue/test/TestWorkQueue.cpp
@@ -17,6 +17,7 @@
#include <boost/test/unit_test.hpp>
#include <AsteriskSCF/WorkQueue.h>
+#include <AsteriskSCF/DefaultQueueListener.h>
using namespace AsteriskSCF::System::WorkQueue::V1;
using namespace AsteriskSCF::WorkQueue;
@@ -289,4 +290,47 @@ BOOST_AUTO_TEST_CASE(executionOrder2)
BOOST_CHECK(queue->getSize() == 0);
}
+BOOST_AUTO_TEST_CASE(defaultListener)
+{
+ QueuePtr queue(new WorkQueue());
+ DefaultQueueListenerPtr listener(new DefaultQueueListener(queue));
+ TaskPtr work1(new Task);
+ TaskPtr work2(new Task);
+ TaskPtr work3(new Task);
+ TaskPtr work4(new Task);
+ WorkSeq works;
+
+ queue->setListener(listener);
+
+ works.push_back(work1);
+ works.push_back(work2);
+ works.push_back(work3);
+ works.push_back(work4);
+
+ queue->enqueueWorkSeq(works);
+
+ while (work4->taskExecuted == false)
+ {
+ BOOST_TEST_MESSAGE("Waiting for work to complete");
+ }
+
+ BOOST_CHECK(work1->taskExecuted == true);
+ BOOST_CHECK(work2->taskExecuted == true);
+ BOOST_CHECK(work3->taskExecuted == true);
+ BOOST_CHECK(work4->taskExecuted == true);
+
+ //Thread should be idle. It should get woken up if
+ //we give it more work to do.
+
+ TaskPtr work5(new Task);
+ queue->enqueueWork(work5);
+
+ while (work5->taskExecuted == false)
+ {
+ BOOST_TEST_MESSAGE("Waiting for work to complete");
+ }
+
+ BOOST_CHECK(work5->taskExecuted == true);
+}
+
BOOST_AUTO_TEST_SUITE_END()
-----------------------------------------------------------------------
--
asterisk-scf/integration/ice-util-cpp.git
More information about the asterisk-scf-commits
mailing list