[asterisk-scf-commits] asterisk-scf/integration/ice-util-cpp.git branch "workqueue" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Mon Apr 18 09:04:56 CDT 2011


branch "workqueue" has been updated
       via  e62d3ce2544ac6f072e0be683656f486eed41252 (commit)
      from  abb9e7e5a234470e1591218a514b16d93a5b8f64 (commit)

Summary of changes:
 ThreadPool/src/CMakeLists.txt                      |    2 +
 ThreadPool/src/ThreadPool.cpp                      |  266 +++++++++++++-------
 ThreadPool/test/TestThreadPool.cpp                 |   40 +++-
 .../include/AsteriskSCF/DefaultQueueListener.h     |   43 ++++
 WorkQueue/src/CMakeLists.txt                       |    3 +
 WorkQueue/src/DefaultQueueListener.cpp             |  105 ++++++++
 WorkQueue/test/TestWorkQueue.cpp                   |   44 ++++
 7 files changed, 414 insertions(+), 89 deletions(-)
 create mode 100644 WorkQueue/include/AsteriskSCF/DefaultQueueListener.h
 create mode 100644 WorkQueue/src/DefaultQueueListener.cpp


- Log -----------------------------------------------------------------
commit e62d3ce2544ac6f072e0be683656f486eed41252
Author: Mark Michelson <mmichelson at digium.com>
Date:   Mon Apr 18 08:53:53 2011 -0500

    Thread pool code is now complete, at least with regards to code correctness.
    
    * In WorkQueue code, I implemented a "DefaultQueueListener" class. This class
      is a simple implementation of a WorkQueueListener. The class creates a thread
      that will execute tasks when the WorkQueue has tasks to execute and that will
      go idle when there are no tasks to execute. Since I imagine that this type of
      construct may be used often, it is included in the WorkQueue source so that
      it may be used as desired.
    
    * The ThreadPool now has all its operations queued onto a "Control" queue. The
      main reason is so PoolListener methods may be called without a lock being
      held, and the execution order of the PoolListener operations can be guaranteed.
    
    Though the code is correct and tests pass, I want to make at least one more run
    through the code for cleanup/documentation before opening a code review.

diff --git a/ThreadPool/src/CMakeLists.txt b/ThreadPool/src/CMakeLists.txt
index b7e1aee..110e410 100644
--- a/ThreadPool/src/CMakeLists.txt
+++ b/ThreadPool/src/CMakeLists.txt
@@ -11,6 +11,7 @@ asterisk_scf_slice_include_directories(${API_SLICE_DIR})
 asterisk_scf_component_init(ThreadPool CXX)
 
 include_directories(../include)
+include_directories(../../WorkQueue/include)
 include_directories(${API_INCLUDE_DIR})
 
 asterisk_scf_component_add_file(ThreadPool
@@ -21,5 +22,6 @@ asterisk_scf_component_add_file(ThreadPool ThreadPool.cpp)
 asterisk_scf_component_add_boost_libraries(ThreadPool thread)
 
 asterisk_scf_component_build_library(ThreadPool)
+target_link_libraries(ThreadPool WorkQueue)
 
 asterisk_scf_headers_install(../include/)
diff --git a/ThreadPool/src/ThreadPool.cpp b/ThreadPool/src/ThreadPool.cpp
index e5a5d99..9de49e3 100644
--- a/ThreadPool/src/ThreadPool.cpp
+++ b/ThreadPool/src/ThreadPool.cpp
@@ -18,6 +18,8 @@
 
 #include <AsteriskSCF/ThreadPool.h>
 #include <AsteriskSCF/WorkerThread.h>
+#include <AsteriskSCF/WorkQueue.h>
+#include <AsteriskSCF/DefaultQueueListener.h>
 
 namespace AsteriskSCF
 {
@@ -98,136 +100,227 @@ class ThreadPoolPriv : public WorkerThreadListener
 {
 public:
     ThreadPoolPriv(const PoolListenerPtr& listener, const QueuePtr& queue, Pool *pool)
-        : mListener(listener), mQueue(queue), mPool(pool)
+        : mControlQueue(new AsteriskSCF::WorkQueue::WorkQueue), mListener(listener), mQueue(queue), mPool(pool), mShuttingDown(false)
     {
+        mControlQueue->setListener(new AsteriskSCF::WorkQueue::DefaultQueueListener(mControlQueue));
         mQueue->setListener(new ThreadQueueListener(this, mPool));
     }
 
     ~ThreadPoolPriv()
     {
-        resize(0);
-    }
+        //The tricky thing when destroying the thread pool is that
+        //we have to be sure that no queued operations from the worker
+        //threads interfere with destruction.
+        //
+        //First, we make it known we are shutting down, and kill off
+        //the control queue so that no more operations that might
+        //move WorkerThreads from their current containers may run.
 
-    void activeThreadIdle(WorkerThread *thread)
-    {
-        int activeSize;
-        int idleSize;
-        int zombieSize;
         {
-            boost::unique_lock<boost::mutex> lock(mLock);
-            std::vector<WorkerThread*>::iterator iter =
-                std::find(mActiveThreads.begin(), mActiveThreads.end(), thread);
-    
-            if (iter != mActiveThreads.end())
-            {
-                mIdleThreads.push_back(*iter);
-                mActiveThreads.erase(iter);
-            }
-            activeSize = mActiveThreads.size();
-            idleSize = mIdleThreads.size();
-            zombieSize = mZombieThreads.size();
-            //XXX I don't like calling listener operations without a lock held since it makes
-            //it much more difficult to call pool operations from the listener. The problem is
-            //that by NOT calling with the lock held, state changes can arrive out of order...believe
-            //me, I've seen my tests fail as a result of this.
-            mListener->stateChanged(mPool, activeSize, idleSize, zombieSize);
+            boost::lock_guard<boost::mutex> lock(mQueueLock);
+            mShuttingDown = true;
+            mControlQueue = 0;
+        }
+
+        //Now WorkerThreads are stuck in their current containers. All
+        //we need to do now is to kill them off. Poking them into the
+        //Dead state, joining, and then deleting will do the trick.
+        while (!mIdleThreads.empty())
+        {
+            std::vector<WorkerThread*>::iterator i = mIdleThreads.begin();
+            WorkerThread *doomed = *i;
+            mIdleThreads.erase(i);
+            doomed->poke(Dead);
+            doomed->join();
+            delete doomed;
+        }
+        while (!mActiveThreads.empty())
+        {
+            std::vector<WorkerThread*>::iterator i = mActiveThreads.begin();
+            WorkerThread *doomed = *i;
+            mActiveThreads.erase(i);
+            doomed->poke(Dead);
+            doomed->join();
+            delete doomed;
+        }
+        while (!mZombieThreads.empty())
+        {
+            std::vector<WorkerThread*>::iterator i = mZombieThreads.begin();
+            WorkerThread *doomed = *i;
+            mZombieThreads.erase(i);
+            doomed->poke(Dead);
+            doomed->join();
+            delete doomed;
         }
     }
-    
-    void zombieThreadDead(WorkerThread *thread)
+
+    class ActiveThreadIdle : public Work
     {
-        int activeSize;
-        int idleSize;
-        int zombieSize;
+    public:
+        ActiveThreadIdle(WorkerThread *thread, ThreadPoolPriv *priv)
+            : mPriv(priv), mWorkerThread(thread) { }
+
+        void execute()
         {
-            boost::unique_lock<boost::mutex> lock(mLock);
-            std::vector<WorkerThread*>::iterator i = std::find(mZombieThreads.begin(), mZombieThreads.end(), thread);
-            if (i != mZombieThreads.end())
+            std::vector<WorkerThread*>::iterator iter =
+                std::find(mPriv->mActiveThreads.begin(),
+                        mPriv->mActiveThreads.end(), mWorkerThread);
+    
+            if (iter != mPriv->mActiveThreads.end())
             {
-                WorkerThread *doomed = *i;
-                mZombieThreads.erase(i);
-                delete doomed;
+                mPriv->mIdleThreads.push_back(*iter);
+                mPriv->mActiveThreads.erase(iter);
             }
-            activeSize = mActiveThreads.size();
-            idleSize = mIdleThreads.size();
-            zombieSize = mZombieThreads.size();
-            mListener->stateChanged(mPool, activeSize, idleSize, zombieSize);
+            int activeSize = mPriv->mActiveThreads.size();
+            int idleSize = mPriv->mIdleThreads.size();
+            int zombieSize = mPriv->mZombieThreads.size();
+            mPriv->mListener->stateChanged(mPriv->mPool, activeSize, idleSize, zombieSize);
         }
-    }
-    
-    void grow(int numNewThreads)
+    private:
+        ThreadPoolPriv *mPriv;
+        WorkerThread *mWorkerThread;
+    };
+
+    typedef IceUtil::Handle<ActiveThreadIdle> ActiveThreadIdlePtr;
+
+    void activeThreadIdle(WorkerThread *thread)
     {
-        for (int i = 0; i < numNewThreads; ++i)
+        boost::lock_guard<boost::mutex> lock(mQueueLock);
+        if (!mShuttingDown)
         {
-            WorkerThread *newThread(new WorkerThread(mQueue, this));
-            mActiveThreads.push_back(newThread);
+            ActiveThreadIdlePtr task(new ActiveThreadIdle(thread, this));
+            mControlQueue->enqueueWork(task);
         }
     }
-    
-    void shrink(int threadsToKill)
+
+    class ZombieThreadDead : public Work
     {
-        while (threadsToKill > 0)
+    public:
+        ZombieThreadDead(WorkerThread *thread, ThreadPoolPriv *priv)
+            : mPriv(priv), mWorkerThread(thread) { }
+
+        void execute()
         {
-            std::vector<WorkerThread*>::iterator i = mIdleThreads.begin();
-            if (i == mIdleThreads.end())
+            std::vector<WorkerThread*>::iterator i =
+                std::find(mPriv->mZombieThreads.begin(),
+                        mPriv->mZombieThreads.end(), mWorkerThread);
+            if (i != mPriv->mZombieThreads.end())
             {
-                //We're out of idle threads to kill.
-                break;
+                WorkerThread *doomed = *i;
+                mPriv->mZombieThreads.erase(i);
+                delete doomed;
             }
-            WorkerThread *doomed = *i;
-            mIdleThreads.erase(i);
-            doomed->poke(Dead);
-            doomed->join();
-            delete doomed;
-    
-            --threadsToKill;
+            int activeSize = mPriv->mActiveThreads.size();
+            int idleSize = mPriv->mIdleThreads.size();
+            int zombieSize = mPriv->mZombieThreads.size();
+            mPriv->mListener->stateChanged(mPriv->mPool, activeSize, idleSize, zombieSize);
         }
+    private:
+        ThreadPoolPriv *mPriv;
+        WorkerThread *mWorkerThread;
+    };
+
+    typedef IceUtil::Handle<ZombieThreadDead> ZombieThreadDeadPtr;
     
-        while (threadsToKill > 0)
+    void zombieThreadDead(WorkerThread *thread)
+    {
+        boost::lock_guard<boost::mutex> lock(mQueueLock);
+        if (!mShuttingDown)
         {
-            // If we've made it here, then it means that there weren't enough idle
-            // threads to kill. We'll need to zombify some active threads then.
-            std::vector<WorkerThread*>::iterator i = mActiveThreads.begin();
-            if (i != mActiveThreads.end())
-            {
-                mZombieThreads.push_back(*i);
-                (*i)->poke(Zombie);
-                mActiveThreads.erase(i);
-            }
-            --threadsToKill;
+            ZombieThreadDeadPtr task(new ZombieThreadDead(thread, this));
+            mControlQueue->enqueueWork(task);
         }
     }
 
-    void resize(int numThreads)
+    class Resize : public Work
     {
-        int activeSize;
-        int idleSize;
-        int zombieSize;
+    public:
+        Resize(int numThreads, ThreadPoolPriv *priv)
+            : mNumThreads(numThreads), mPriv(priv) { }
+        void execute()
         {
-            boost::unique_lock<boost::mutex> lock(mLock);
             //We don't count zombie threads as being "live" when potentially resizing
-            int currentSize = mActiveThreads.size() + mIdleThreads.size();
+            int currentSize = mPriv->mActiveThreads.size() + mPriv->mIdleThreads.size();
 
-            if (currentSize == numThreads)
+            if (currentSize == mNumThreads)
             {
                 return;
             }
 
-            if (currentSize < numThreads)
+            if (currentSize < mNumThreads)
             {
-                grow(numThreads - currentSize);
+                grow(mNumThreads - currentSize);
             }
             else
             {
-                shrink(currentSize - numThreads);
+                shrink(currentSize - mNumThreads);
+            }
+            int activeSize = mPriv->mActiveThreads.size();
+            int idleSize = mPriv->mIdleThreads.size();
+            int zombieSize = mPriv->mZombieThreads.size();
+            mPriv->mListener->stateChanged(mPriv->mPool, activeSize, idleSize, zombieSize);
+        }
+    private:
+        void grow(int numNewThreads)
+        {
+            for (int i = 0; i < numNewThreads; ++i)
+            {
+                WorkerThread *newThread(new WorkerThread(mPriv->mQueue, mPriv));
+                mPriv->mActiveThreads.push_back(newThread);
+            }
+        }
+        
+        void shrink(int threadsToKill)
+        {
+            while (threadsToKill > 0)
+            {
+                std::vector<WorkerThread*>::iterator i = mPriv->mIdleThreads.begin();
+                if (i == mPriv->mIdleThreads.end())
+                {
+                    //We're out of idle threads to kill.
+                    break;
+                }
+                WorkerThread *doomed = *i;
+                mPriv->mIdleThreads.erase(i);
+                doomed->poke(Dead);
+                doomed->join();
+                delete doomed;
+        
+                --threadsToKill;
+            }
+        
+            while (threadsToKill > 0)
+            {
+                // If we've made it here, then it means that there weren't enough idle
+                // threads to kill. We'll need to zombify some active threads then.
+                std::vector<WorkerThread*>::iterator i = mPriv->mActiveThreads.begin();
+                if (i != mPriv->mActiveThreads.end())
+                {
+                    mPriv->mZombieThreads.push_back(*i);
+                    (*i)->poke(Zombie);
+                    mPriv->mActiveThreads.erase(i);
+                }
+                --threadsToKill;
             }
-            activeSize = mActiveThreads.size();
-            idleSize = mIdleThreads.size();
-            zombieSize = mZombieThreads.size();
-            mListener->stateChanged(mPool, activeSize, idleSize, zombieSize);
+        }
+
+        const int mNumThreads;
+        ThreadPoolPriv *mPriv;
+    };
+
+    typedef IceUtil::Handle<Resize> ResizePtr;
+
+    void resize(int numThreads)
+    {
+        boost::lock_guard<boost::mutex> lock(mQueueLock);
+        if (!mShuttingDown)
+        {
+            ResizePtr task(new Resize(numThreads, this));
+            mControlQueue->enqueueWork(task);
         }
     }
 
+    QueuePtr mControlQueue;
     AsteriskSCF::System::ThreadPool::V1::PoolListenerPtr mListener;
     AsteriskSCF::System::WorkQueue::V1::QueuePtr mQueue;
     AsteriskSCF::System::ThreadPool::V1::Pool *mPool;
@@ -236,7 +329,8 @@ public:
     std::vector<WorkerThread*> mIdleThreads;
     std::vector<WorkerThread*> mZombieThreads;
 
-    boost::mutex mLock;
+    bool mShuttingDown;
+    boost::mutex mQueueLock;
 };
 
 ThreadQueueListener::ThreadQueueListener(ThreadPoolPriv *priv, Pool *pool)
diff --git a/ThreadPool/test/TestThreadPool.cpp b/ThreadPool/test/TestThreadPool.cpp
index 966515b..5ff1920 100644
--- a/ThreadPool/test/TestThreadPool.cpp
+++ b/ThreadPool/test/TestThreadPool.cpp
@@ -169,12 +169,15 @@ BOOST_AUTO_TEST_CASE(threadDestruction)
 {
     BOOST_TEST_MESSAGE("Running threadDestruction test");
 
+    std::cout << "Really early failure?" << std::endl;
     TestListenerPtr listener(new TestListener);
     QueuePtr queue(new WorkQueue());
     PoolPtr pool(new ThreadPool(listener, queue));
     
     pool->setSize(3);
 
+    std::cout << "Early failure?" << std::endl;
+
     {
         boost::unique_lock<boost::mutex> lock(listener->mLock);
         while (listener->mIdle != 3)
@@ -183,12 +186,16 @@ BOOST_AUTO_TEST_CASE(threadDestruction)
         }
     }
 
+    std::cout << "Everything cool?" << std::endl;
+
     BOOST_CHECK(listener->mIdle == 3);
     BOOST_CHECK(listener->mActive == 0);
     BOOST_CHECK(listener->mZombie == 0);
 
+    std::cout << "Everything still cool?" << std::endl;
     pool->setSize(2);
 
+    std::cout << "How about now?" << std::endl;
     {
         boost::unique_lock<boost::mutex> lock(listener->mLock);
         while (listener->mIdle != 2)
@@ -196,6 +203,7 @@ BOOST_AUTO_TEST_CASE(threadDestruction)
             listener->mCond.wait(lock);
         }
     }
+    std::cout << "And now?" << std::endl;
 
     BOOST_CHECK(listener->mIdle == 2);
     BOOST_CHECK(listener->mActive == 0);
@@ -360,6 +368,19 @@ BOOST_AUTO_TEST_CASE(taskDistribution)
     //Since these tasks halt until they are poked,
     //The two tasks should be evenly divided amongst
     //the threads.
+    //
+    //Pool operations are asynchronous, so we need to
+    //wait until the listener is informed that there are
+    //two active threads
+    
+    {
+        boost::unique_lock<boost::mutex> lock(listener->mLock);
+        while (listener->mActive < 2)
+        {
+            listener->mCond.wait(lock);
+        }
+    }
+    std::cout << "how many active? " << listener->mActive << std::endl;
     BOOST_CHECK(listener->mActive == 2);
     BOOST_CHECK(listener->mIdle == 0);
     BOOST_CHECK(listener->mZombie == 0);
@@ -423,9 +444,13 @@ BOOST_AUTO_TEST_CASE(zombies)
 
     pool->setSize(2);
     
-    //Since these tasks halt until they are poked,
-    //The two tasks should be evenly divided among
-    //the threads.
+    {
+        boost::unique_lock<boost::mutex> lock(listener->mLock);
+        while (listener->mActive < 2)
+        {
+            listener->mCond.wait(lock);
+        }
+    }
     BOOST_CHECK(listener->mActive == 2);
     BOOST_CHECK(listener->mIdle == 0);
     BOOST_CHECK(listener->mZombie == 0);
@@ -434,6 +459,15 @@ BOOST_AUTO_TEST_CASE(zombies)
     //result in the active threads immediately becoming
     //zombies.
     pool->setSize(0);
+
+    {
+        boost::unique_lock<boost::mutex> lock(listener->mLock);
+        while (listener->mZombie < 2)
+        {
+            listener->mCond.wait(lock);
+        }
+    }
+    std::cout << "Number of threads. Active: " << listener->mActive << " Idle: " << listener->mIdle << " Zombie: " << listener->mZombie;
     BOOST_CHECK(listener->mActive == 0);
     BOOST_CHECK(listener->mIdle == 0);
     BOOST_CHECK(listener->mZombie == 2);
diff --git a/WorkQueue/include/AsteriskSCF/DefaultQueueListener.h b/WorkQueue/include/AsteriskSCF/DefaultQueueListener.h
new file mode 100644
index 0000000..9557b05
--- /dev/null
+++ b/WorkQueue/include/AsteriskSCF/DefaultQueueListener.h
@@ -0,0 +1,43 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#pragma once
+
+#include <boost/shared_ptr.hpp>
+#include <AsteriskSCF/System/WorkQueue/WorkQueueIf.h>
+
+namespace AsteriskSCF
+{
+namespace WorkQueue
+{
+
+class DefaultQueueListenerPriv;
+
+class DefaultQueueListener : public AsteriskSCF::System::WorkQueue::V1::QueueListener
+{
+public:
+    DefaultQueueListener(const AsteriskSCF::System::WorkQueue::V1::QueuePtr& queue);
+    ~DefaultQueueListener();
+    void workAdded(int numNewWork, bool wasEmpty);
+    void workResumable();
+    void emptied();
+private:
+    boost::shared_ptr<DefaultQueueListenerPriv> mPriv;
+};
+
+typedef IceUtil::Handle<DefaultQueueListener> DefaultQueueListenerPtr;
+
+}; // end namespace WorkQueue
+}; // end namespace AsteriskSCF
diff --git a/WorkQueue/src/CMakeLists.txt b/WorkQueue/src/CMakeLists.txt
index c570188..c8e34e4 100644
--- a/WorkQueue/src/CMakeLists.txt
+++ b/WorkQueue/src/CMakeLists.txt
@@ -17,8 +17,11 @@ asterisk_scf_component_add_file(WorkQueue
     ../include/AsteriskSCF/WorkQueue.h)
 asterisk_scf_component_add_file(WorkQueue
     ../include/AsteriskSCF/SuspendableWorkQueue.h)
+asterisk_scf_component_add_file(WorkQueue
+    ../include/AsteriskSCF/DefaultQueueListener.h)
 asterisk_scf_component_add_file(WorkQueue WorkQueue.cpp)
 asterisk_scf_component_add_file(WorkQueue SuspendableWorkQueue.cpp)
+asterisk_scf_component_add_file(WorkQueue DefaultQueueListener.cpp)
 asterisk_scf_component_add_boost_libraries(WorkQueue thread)
 
 asterisk_scf_component_build_library(WorkQueue)
diff --git a/WorkQueue/src/DefaultQueueListener.cpp b/WorkQueue/src/DefaultQueueListener.cpp
new file mode 100644
index 0000000..651ff94
--- /dev/null
+++ b/WorkQueue/src/DefaultQueueListener.cpp
@@ -0,0 +1,105 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#include <boost/thread.hpp>
+#include <AsteriskSCF/DefaultQueueListener.h>
+
+namespace AsteriskSCF
+{
+namespace WorkQueue
+{
+
+enum ListenerState
+{
+    Active,
+    Idle,
+    Dead
+};
+
+using namespace AsteriskSCF::System::WorkQueue::V1;
+
+class DefaultQueueListenerPriv
+{
+public:
+    DefaultQueueListenerPriv(const QueuePtr& queue)
+        : mState(Active), mQueue(queue.get()),
+        mThread(boost::bind(&DefaultQueueListenerPriv::run, this)) { }
+
+    void idle()
+    {
+        boost::unique_lock<boost::mutex> lock(mLock);
+        if (mState == Dead)
+        {
+            return;
+        }
+
+        mState = Idle;
+        while (mState == Idle)
+        {
+            mCond.wait(lock);
+        }
+    }
+
+    void run()
+    {
+        while (mState != Dead)
+        {
+            if (!mQueue->executeWork())
+            {
+                idle();
+            }
+        }
+    }
+
+    void poke(ListenerState newState)
+    {
+        boost::unique_lock<boost::mutex> lock(mLock);
+        mState = newState;
+        mCond.notify_one();
+    }
+
+    ListenerState mState;
+    Queue *mQueue;
+    boost::mutex mLock;
+    boost::condition_variable mCond;
+    boost::thread mThread;
+};
+
+DefaultQueueListener::DefaultQueueListener(const QueuePtr& queue)
+    : mPriv(new DefaultQueueListenerPriv(queue)) { }
+
+DefaultQueueListener::~DefaultQueueListener()
+{
+    mPriv->poke(Dead);
+    mPriv->mThread.join();
+}
+
+void DefaultQueueListener::workAdded(int numNewWork, bool wasEmpty)
+{
+    mPriv->poke(Active);
+}
+
+void DefaultQueueListener::workResumable()
+{
+    mPriv->poke(Active);
+}
+
+void DefaultQueueListener::emptied()
+{
+}
+
+}; //end namespace WorkQueue
+}; //end namespace AsteriskSCF
diff --git a/WorkQueue/test/TestWorkQueue.cpp b/WorkQueue/test/TestWorkQueue.cpp
index 7cbf09b..3244eb0 100644
--- a/WorkQueue/test/TestWorkQueue.cpp
+++ b/WorkQueue/test/TestWorkQueue.cpp
@@ -17,6 +17,7 @@
 #include <boost/test/unit_test.hpp>
 
 #include <AsteriskSCF/WorkQueue.h>
+#include <AsteriskSCF/DefaultQueueListener.h>
 
 using namespace AsteriskSCF::System::WorkQueue::V1;
 using namespace AsteriskSCF::WorkQueue;
@@ -289,4 +290,47 @@ BOOST_AUTO_TEST_CASE(executionOrder2)
     BOOST_CHECK(queue->getSize() == 0);
 }
 
+BOOST_AUTO_TEST_CASE(defaultListener)
+{
+    QueuePtr queue(new WorkQueue());
+    DefaultQueueListenerPtr listener(new DefaultQueueListener(queue));
+    TaskPtr work1(new Task);
+    TaskPtr work2(new Task);
+    TaskPtr work3(new Task);
+    TaskPtr work4(new Task);
+    WorkSeq works;
+
+    queue->setListener(listener);
+
+    works.push_back(work1);
+    works.push_back(work2);
+    works.push_back(work3);
+    works.push_back(work4);
+
+    queue->enqueueWorkSeq(works);
+
+    while (work4->taskExecuted == false)
+    {
+        BOOST_TEST_MESSAGE("Waiting for work to complete");
+    }
+
+    BOOST_CHECK(work1->taskExecuted == true);
+    BOOST_CHECK(work2->taskExecuted == true);
+    BOOST_CHECK(work3->taskExecuted == true);
+    BOOST_CHECK(work4->taskExecuted == true);
+
+    //Thread should be idle. It should get woken up if
+    //we give it more work to do.
+
+    TaskPtr work5(new Task);
+    queue->enqueueWork(work5);
+
+    while (work5->taskExecuted == false)
+    {
+        BOOST_TEST_MESSAGE("Waiting for work to complete");
+    }
+
+    BOOST_CHECK(work5->taskExecuted == true);
+}
+
 BOOST_AUTO_TEST_SUITE_END()

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/ice-util-cpp.git



More information about the asterisk-scf-commits mailing list