[asterisk-scf-commits] asterisk-scf/integration/ice-util-cpp.git branch "workqueue" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Wed Mar 23 14:42:18 CDT 2011


branch "workqueue" has been updated
       via  912bde27adfb6abae62ac1c893c52b3810d6cfce (commit)
       via  152b4da7fe826cc34edba5a59e420c8028979525 (commit)
       via  2c217c8c4d8681a18fc5fa9b4eb2775d4f15761f (commit)
      from  12d737fef992d5d20ef2d3c1cfc8bd6007cd2e7e (commit)

Summary of changes:
 .../include/AsteriskSCF/SuspendableWorkQueue.h     |    4 +-
 WorkQueue/include/AsteriskSCF/WorkQueue.h          |    4 +-
 WorkQueue/src/SuspendableWorkQueue.cpp             |  251 ++++++++++++--------
 WorkQueue/src/WorkQueue.cpp                        |  150 +++++++-----
 WorkQueue/test/TestSuspendableWorkQueue.cpp        |    2 +
 5 files changed, 254 insertions(+), 157 deletions(-)


- Log -----------------------------------------------------------------
commit 912bde27adfb6abae62ac1c893c52b3810d6cfce
Author: Mark Michelson <mmichelson at digium.com>
Date:   Wed Mar 23 14:41:49 2011 -0500

    Add a findDuplicates method to WorkQueue

diff --git a/WorkQueue/src/WorkQueue.cpp b/WorkQueue/src/WorkQueue.cpp
index 5134f14..f5c4984 100644
--- a/WorkQueue/src/WorkQueue.cpp
+++ b/WorkQueue/src/WorkQueue.cpp
@@ -34,6 +34,31 @@ public:
     WorkQueuePriv(const QueueListenerPtr& listener)
         : mListener(listener) { }
 
+    void findDuplicates(const WorkSeq &works)
+    {
+        boost::shared_lock<boost::shared_mutex> lock(mLock);
+        WorkSeq exceptionItems;
+        for (WorkSeq::const_iterator iter = works.begin(); iter != works.end(); ++iter)
+        {
+            if (std::find(mQueue.begin(), mQueue.end(), *iter) != mQueue.end())
+            {
+                exceptionItems.push_back(*iter);
+            }
+        }
+    
+        if (!exceptionItems.empty())
+        {
+            throw WorkExists(__FILE__, __LINE__, exceptionItems);
+        }
+    }
+
+    void findDuplicates(const WorkPtr &work)
+    {
+        WorkSeq items;
+        items.push_back(work);
+        findDuplicates(items);
+    }
+
     WorkPtr getNextTask()
     {
         boost::unique_lock<boost::shared_mutex> lock(mLock);
@@ -66,19 +91,12 @@ WorkQueue::WorkQueue(const QueueListenerPtr& listener)
 
 void WorkQueue::enqueueWork(const WorkPtr& work)
 {
+    mPriv->findDuplicates(work);
+
     bool wasEmpty;
     bool listenerExists;
     {
         boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
-    
-        if (std::find(mPriv->mQueue.begin(), mPriv->mQueue.end(), work) != mPriv->mQueue.end())
-        {
-            WorkSeq exceptionItems;
-            exceptionItems.push_back(work);
-            //XXX The file and line number are remQueueuired because
-            //of a limitation regarding local exceptions in Ice.
-            throw WorkExists(__FILE__, __LINE__, exceptionItems);
-        }
         wasEmpty = mPriv->mQueue.empty();
         listenerExists = mPriv->mListener;
         mPriv->mQueue.push_back(work);
@@ -92,25 +110,12 @@ void WorkQueue::enqueueWork(const WorkPtr& work)
 
 void WorkQueue::enqueueWorkSeq(const WorkSeq& works)
 {
+    mPriv->findDuplicates(works);
+
     bool wasEmpty;
     bool listenerExists;
     {
         boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
-    
-        WorkSeq exceptionItems;
-        for (WorkSeq::const_iterator iter = works.begin(); iter != works.end(); ++iter)
-        {
-            if (std::find(mPriv->mQueue.begin(), mPriv->mQueue.end(), *iter) != mPriv->mQueue.end())
-            {
-                exceptionItems.push_back(*iter);
-            }
-        }
-    
-        if (!exceptionItems.empty())
-        {
-            throw WorkExists(__FILE__, __LINE__, exceptionItems);
-        }
-    
         wasEmpty = mPriv->mQueue.empty();
         listenerExists = mPriv->mListener;
         mPriv->mQueue.insert(mPriv->mQueue.end(), works.begin(), works.end());

commit 152b4da7fe826cc34edba5a59e420c8028979525
Author: Mark Michelson <mmichelson at digium.com>
Date:   Wed Mar 23 14:35:40 2011 -0500

    Make similar changes to SuspendableWorkQueue as I did to WorkQueue.
    
    In addition, there are findDuplicates() methods added for convenience.
    I do believe I'll be adding these to the regular WorkQueue as well.

diff --git a/WorkQueue/include/AsteriskSCF/SuspendableWorkQueue.h b/WorkQueue/include/AsteriskSCF/SuspendableWorkQueue.h
index bcc586f..3f08353 100644
--- a/WorkQueue/include/AsteriskSCF/SuspendableWorkQueue.h
+++ b/WorkQueue/include/AsteriskSCF/SuspendableWorkQueue.h
@@ -25,7 +25,7 @@ namespace AsteriskSCF
 namespace WorkQueue
 {
 
-class SuspendableWorkQueueImpl;
+class SuspendableWorkQueuePriv;
 
 class SuspendableWorkQueue : public AsteriskSCF::System::WorkQueue::V1::SuspendableQueue
 {
@@ -39,7 +39,7 @@ public:
     int workCount();
     void setListener(const AsteriskSCF::System::WorkQueue::V1::QueueListenerPtr& listener);
 private:
-    boost::shared_ptr<SuspendableWorkQueueImpl> mImpl;
+    boost::shared_ptr<SuspendableWorkQueuePriv> mPriv;
 };
 
 }; //end namespace WorkQueue
diff --git a/WorkQueue/src/SuspendableWorkQueue.cpp b/WorkQueue/src/SuspendableWorkQueue.cpp
index aaaa921..80b6155 100644
--- a/WorkQueue/src/SuspendableWorkQueue.cpp
+++ b/WorkQueue/src/SuspendableWorkQueue.cpp
@@ -26,17 +26,94 @@ namespace WorkQueue
 
 using namespace AsteriskSCF::System::WorkQueue::V1;
 
-class SuspendableWorkQueueImpl
+class SuspendableWorkQueuePriv
 {
 public:
-    SuspendableWorkQueueImpl()
+    SuspendableWorkQueuePriv()
         : mListener(0), state(Ready), currentWork(0) { }
-    SuspendableWorkQueueImpl(const QueueListenerPtr& listener)
+    SuspendableWorkQueuePriv(const QueueListenerPtr& listener)
         : mListener(listener), state(Ready), currentWork(0) { }
 
+    int workCount()
+    {
+        return mQueue.size() + (currentWork == 0 ? 0 : 1);
+    }
+
+    bool sendEmptyNotice()
+    {
+        boost::shared_lock<boost::shared_mutex> lock(mLock);
+        return workCount() == 0 && mListener;
+    }
+
+    void findDuplicates(const SuspendableWorkSeq& newItems)
+    {
+        SuspendableWorkSeq dupItems;
+        {
+            boost::shared_lock<boost::shared_mutex> lock(mLock);
+
+            if (workCount() == 0)
+            {
+                return;
+            }
+
+            for (SuspendableWorkSeq::const_iterator iter = newItems.begin();
+                    iter != newItems.end(); ++iter)
+            {
+                if (*iter == currentWork ||
+                        std::find(mQueue.begin(), mQueue.end(), *iter) != mQueue.end())
+                {
+                    dupItems.push_back(*iter);
+                }
+            }
+        }
+
+        if (!dupItems.empty())
+        {
+            throw SuspendableWorkExists(__FILE__, __LINE__, dupItems);
+        }
+    }
+
+    void findDuplicates(const SuspendableWorkPtr &item)
+    {
+        SuspendableWorkSeq items;
+        items.push_back(item);
+        findDuplicates(items);
+    }
+
+    SuspendableWorkPtr getNextTask()
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+
+        SuspendableWorkPtr work;
+        switch (state)
+        {
+        case Ready:
+            assert(currentWork == 0);
+            if (mQueue.empty())
+            {
+                return false;
+            }
+            work = mQueue.front();
+            mQueue.pop_front();
+            state = Executing;
+            break;
+        case Resumable:
+            assert(currentWork != 0);
+            work = currentWork;
+            state = Executing;
+            break;
+        case Suspended:
+        case Executing:
+        default:
+            work = 0;
+        }
+
+        return work;
+    }
+
     QueueListenerPtr mListener;
-    boost::mutex mLock;
-    std::deque<SuspendableWorkPtr> q;
+    boost::shared_mutex mLock;
+    std::deque<SuspendableWorkPtr> mQueue;
     enum WorkState
     {
         /**
@@ -145,14 +222,16 @@ public:
 class WorkListener : public SuspendableWorkListener
 {
 public:
-    WorkListener(boost::shared_ptr<SuspendableWorkQueueImpl> impl)
-        : mImpl(impl) { }
+    WorkListener(boost::shared_ptr<SuspendableWorkQueuePriv> impl)
+        : mPriv(impl) { }
     void workResumable()
     {
-        boost::unique_lock<boost::mutex> lock(mImpl->mLock);
-        SuspendableWorkQueueImpl::WorkState previousState = mImpl->state;
-        mImpl->state = SuspendableWorkQueueImpl::Resumable;
-        lock.unlock();
+        SuspendableWorkQueuePriv::WorkState previousState;
+        {
+            boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
+            previousState = mPriv->state;
+            mPriv->state = SuspendableWorkQueuePriv::Resumable;
+        }
 
         // If the workResumable notification is received and the
         // state of things was not suspended, then that must mean
@@ -164,128 +243,105 @@ public:
         // that work is resumable, because no entity outside the
         // queue itself knows the work ever tried to enter a
         // suspended state.
-        if (previousState == SuspendableWorkQueueImpl::Suspended)
+        if (previousState == SuspendableWorkQueuePriv::Suspended)
         {
-            mImpl->mListener->workResumable();
+            mPriv->mListener->workResumable();
         }
     }
 
-    boost::shared_ptr<SuspendableWorkQueueImpl> mImpl;
+    boost::shared_ptr<SuspendableWorkQueuePriv> mPriv;
 };
 
 typedef IceUtil::Handle<WorkListener> WorkListenerPtr;
 
 SuspendableWorkQueue::SuspendableWorkQueue()
-    : mImpl(new SuspendableWorkQueueImpl) { }
+    : mPriv(new SuspendableWorkQueuePriv) { }
 
 SuspendableWorkQueue::SuspendableWorkQueue(const QueueListenerPtr& listener)
-    : mImpl(new SuspendableWorkQueueImpl(listener)) { }
+    : mPriv(new SuspendableWorkQueuePriv(listener)) { }
 
 void SuspendableWorkQueue::enqueueWork(const SuspendableWorkPtr& work)
 {
-    boost::unique_lock<boost::mutex> lock(mImpl->mLock);
+    mPriv->findDuplicates(work);
 
-    if (work == mImpl->currentWork ||
-            std::find(mImpl->q.begin(), mImpl->q.end(), work) != mImpl->q.end())
+    bool wasEmpty;
+    bool listenerExists;
     {
-        SuspendableWorkSeq exceptionItems;
-        exceptionItems.push_back(work);
-        throw SuspendableWorkExists(__FILE__, __LINE__, exceptionItems);
+        boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
+        //Call private version so we don't double grab the lock
+        wasEmpty = mPriv->workCount() == 0; 
+        mPriv->mQueue.push_back(work);
+        listenerExists = mPriv->mListener;
+    }
+
+    if (listenerExists)
+    {
+        mPriv->mListener->workAdded(wasEmpty);
     }
-    
-    bool wasEmpty = mImpl->q.empty();
-    mImpl->q.push_back(work);
-    lock.unlock();
-    mImpl->mListener->workAdded(wasEmpty);
 }
 
 void SuspendableWorkQueue::enqueueWorkSeq(const SuspendableWorkSeq& works)
 {
-    boost::unique_lock<boost::mutex> lock(mImpl->mLock);
-    
-    SuspendableWorkSeq exceptionItems;
-    for (SuspendableWorkSeq::const_iterator iter = works.begin(); iter != works.end(); ++iter)
+    mPriv->findDuplicates(works);
+
+    bool wasEmpty;
+    bool listenerExists;
     {
-        if (*iter == mImpl->currentWork ||
-                std::find(mImpl->q.begin(), mImpl->q.end(), *iter) != mImpl->q.end())
-        {
-            exceptionItems.push_back(*iter);
-        }
+        boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
+        //Call private version so we don't double grab the lock
+        wasEmpty = mPriv->workCount() == 0;
+        mPriv->mQueue.insert(mPriv->mQueue.end(), works.begin(), works.end());
+        listenerExists = mPriv->mListener;
     }
 
-    if (!exceptionItems.empty())
+    if (listenerExists)
     {
-        throw SuspendableWorkExists(__FILE__, __LINE__, exceptionItems);
+        mPriv->mListener->workAdded(wasEmpty);
     }
-
-    bool wasEmpty = mImpl->q.empty();
-    mImpl->q.insert(mImpl->q.end(), works.begin(), works.end());
-    lock.unlock();
-    mImpl->mListener->workAdded(wasEmpty);
 }
 
 void SuspendableWorkQueue::cancelWork(const SuspendableWorkPtr& work)
 {
-    boost::unique_lock<boost::mutex> lock(mImpl->mLock);
-
-    if (work == mImpl->currentWork)
     {
-        throw WorkNotFound(__FILE__, __LINE__);
-    }
+        boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
 
-    std::deque<SuspendableWorkPtr>::iterator i = std::find(mImpl->q.begin(), mImpl->q.end(), work);
-    if (i == mImpl->q.end())
-    {
-        throw WorkNotFound(__FILE__, __LINE__);
-    }
+        if (work == mPriv->currentWork)
+        {
+            throw WorkNotFound(__FILE__, __LINE__);
+        }
 
-    mImpl->q.erase(i);
+        std::deque<SuspendableWorkPtr>::iterator i = std::find(mPriv->mQueue.begin(), mPriv->mQueue.end(), work);
+        if (i == mPriv->mQueue.end())
+        {
+            throw WorkNotFound(__FILE__, __LINE__);
+        }
 
-    if (mImpl->q.empty())
+        mPriv->mQueue.erase(i);
+    }
+
+    if (mPriv->sendEmptyNotice())
     {
-        lock.unlock();
-        mImpl->mListener->emptied();
+        mPriv->mListener->emptied();
     }
 }
 
 bool SuspendableWorkQueue::executeWork()
 {
-    boost::unique_lock<boost::mutex> lock(mImpl->mLock);
 
-    if (mImpl->state != SuspendableWorkQueueImpl::Ready
-            && mImpl->state != SuspendableWorkQueueImpl::Resumable)
-    {
-        return false;
-    }
+    SuspendableWorkPtr work = mPriv->getNextTask();
 
-    SuspendableWorkPtr work;
-    if (mImpl->state == SuspendableWorkQueueImpl::Ready)
-    {
-        assert(mImpl->currentWork == 0);
-        if (mImpl->q.empty())
-        {
-            return false;
-        }
-        work = mImpl->q.front();
-        mImpl->q.pop_front();
-    }
-    else //state is Resumable
+    if (work == 0)
     {
-        assert(mImpl->currentWork != 0);
-        work = mImpl->currentWork;
+        return false;
     }
-
-    mImpl->state = SuspendableWorkQueueImpl::Executing;
-    lock.unlock();
-
-    WorkListenerPtr workListener(new WorkListener(mImpl));
+    
+    WorkListenerPtr workListener(new WorkListener(mPriv));
     SuspendableWorkResult result = work->execute(workListener);
 
-    lock.lock();
-
     if (result == AsteriskSCF::System::WorkQueue::V1::Suspended)
     {
-        mImpl->currentWork = work;
+        boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
+        mPriv->currentWork = work;
 
         // It's possible that a piece of work called out to an
         // asynchronous method and needed to be suspended. Before
@@ -297,24 +353,26 @@ bool SuspendableWorkQueue::executeWork()
         // can forego setting the state to Suspended and instead
         // treat the situation as if the work's execution had
         // actually returned "Complete"
-        if (mImpl->state == SuspendableWorkQueueImpl::Resumable)
+        if (mPriv->state == SuspendableWorkQueuePriv::Resumable)
         {
             return true;
         }
         else //State is still "Executing"
         {
-            mImpl->state = SuspendableWorkQueueImpl::Suspended;
+            mPriv->state = SuspendableWorkQueuePriv::Suspended;
             return false;
         }
     }
+    else
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
+        mPriv->state = SuspendableWorkQueuePriv::Ready;
+        mPriv->currentWork = 0;
+    }
     
-    mImpl->state = SuspendableWorkQueueImpl::Ready;
-    mImpl->currentWork = 0;
-    
-    if (mImpl->q.empty())
+    if (mPriv->sendEmptyNotice())
     {
-        lock.unlock();
-        mImpl->mListener->emptied();
+        mPriv->mListener->emptied();
         return false;
     }
     return true;
@@ -322,13 +380,14 @@ bool SuspendableWorkQueue::executeWork()
 
 int SuspendableWorkQueue::workCount()
 {
-    boost::unique_lock<boost::mutex> lock(mImpl->mLock);
-    return mImpl->q.size() + (mImpl->currentWork ? 1 : 0);
+    boost::shared_lock<boost::shared_mutex> lock(mPriv->mLock);
+    return mPriv->workCount();
 }
 
 void SuspendableWorkQueue::setListener(const QueueListenerPtr& listener)
 {
-    mImpl->mListener = listener;
+    boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
+    mPriv->mListener = listener;
 }
 
 }; //end namespace WorkQueue
diff --git a/WorkQueue/test/TestSuspendableWorkQueue.cpp b/WorkQueue/test/TestSuspendableWorkQueue.cpp
index 121cee4..d4b26d6 100644
--- a/WorkQueue/test/TestSuspendableWorkQueue.cpp
+++ b/WorkQueue/test/TestSuspendableWorkQueue.cpp
@@ -528,7 +528,9 @@ BOOST_AUTO_TEST_CASE(complexWork)
     moreWork = queue->executeWork();
 
     BOOST_CHECK(moreWork == false);
+    std::cout << "Work count is " << queue->workCount() << std::endl;
     BOOST_CHECK(queue->workCount() == 0);
+    std::cout << "State of work is " << work->currentState << std::endl;
     BOOST_CHECK(work->currentState == ComplexTask::Task2Complete);
     BOOST_CHECK(listener->emptyNotice == true);
 }

commit 2c217c8c4d8681a18fc5fa9b4eb2775d4f15761f
Author: Mark Michelson <mmichelson at digium.com>
Date:   Wed Mar 23 13:22:12 2011 -0500

    Address Brent's comments on CR-ASTSCF-69 for WorkQueue.
    
    * Changed instances of "Impl" to "Priv"
    * Changed 'q' member to "mQueue"
    * Use scopes instead of explicit unlock statements
    * Simplify flow of several method calls.
    * Account for potential mListener == 0 cases
    * Lock when setting mListener
    * Despite what I said on the review, use a shared_mutex and shared_locks
      where applicable.
    
    Tests all still pass.

diff --git a/WorkQueue/include/AsteriskSCF/WorkQueue.h b/WorkQueue/include/AsteriskSCF/WorkQueue.h
index 86b07d5..04e8a08 100644
--- a/WorkQueue/include/AsteriskSCF/WorkQueue.h
+++ b/WorkQueue/include/AsteriskSCF/WorkQueue.h
@@ -24,7 +24,7 @@ namespace AsteriskSCF
 namespace WorkQueue
 {
 
-class WorkQueueImpl;
+class WorkQueuePriv;
 
 class WorkQueue : public AsteriskSCF::System::WorkQueue::V1::Queue
 {
@@ -38,7 +38,7 @@ public:
     int workCount();
     void setListener(const AsteriskSCF::System::WorkQueue::V1::QueueListenerPtr& listener);
 private:
-    boost::shared_ptr<WorkQueueImpl> mImpl;
+    boost::shared_ptr<WorkQueuePriv> mPriv;
 };
 
 }; //end namespace WorkQueue
diff --git a/WorkQueue/src/WorkQueue.cpp b/WorkQueue/src/WorkQueue.cpp
index de80d03..5134f14 100644
--- a/WorkQueue/src/WorkQueue.cpp
+++ b/WorkQueue/src/WorkQueue.cpp
@@ -26,104 +26,134 @@ namespace WorkQueue
 
 using namespace AsteriskSCF::System::WorkQueue::V1;
 
-class WorkQueueImpl
+class WorkQueuePriv
 {
 public:
-    WorkQueueImpl()
+    WorkQueuePriv()
         : mListener(0) { }
-    WorkQueueImpl(const QueueListenerPtr& listener)
+    WorkQueuePriv(const QueueListenerPtr& listener)
         : mListener(listener) { }
 
+    WorkPtr getNextTask()
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        if (mQueue.empty())
+        {
+            return 0;
+        }
+
+        WorkPtr work = mQueue.front();
+        mQueue.pop_front();
+        return work;
+    }
+
+    bool sendEmptyNotice()
+    {
+        boost::shared_lock<boost::shared_mutex> lock(mLock);
+        return mQueue.empty() && mListener;
+    }
+
     QueueListenerPtr mListener;
-    boost::mutex mLock;
-    std::deque<WorkPtr> q;
+    boost::shared_mutex mLock;
+    std::deque<WorkPtr> mQueue;
 };
 
 WorkQueue::WorkQueue()
-    : mImpl(new WorkQueueImpl) { }
+    : mPriv(new WorkQueuePriv) { }
 
 WorkQueue::WorkQueue(const QueueListenerPtr& listener)
-    : mImpl(new WorkQueueImpl(listener)) { }
+    : mPriv(new WorkQueuePriv(listener)) { }
 
 void WorkQueue::enqueueWork(const WorkPtr& work)
 {
-    boost::unique_lock<boost::mutex> lock(mImpl->mLock);
-    
-    if (std::find(mImpl->q.begin(), mImpl->q.end(), work) != mImpl->q.end())
+    bool wasEmpty;
+    bool listenerExists;
     {
-        WorkSeq exceptionItems;
-        exceptionItems.push_back(work);
-        //XXX The file and line number are required because
-        //of a limitation regarding local exceptions in Ice.
-        throw WorkExists(__FILE__, __LINE__, exceptionItems);
+        boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
+    
+        if (std::find(mPriv->mQueue.begin(), mPriv->mQueue.end(), work) != mPriv->mQueue.end())
+        {
+            WorkSeq exceptionItems;
+            exceptionItems.push_back(work);
+            //XXX The file and line number are remQueueuired because
+            //of a limitation regarding local exceptions in Ice.
+            throw WorkExists(__FILE__, __LINE__, exceptionItems);
+        }
+        wasEmpty = mPriv->mQueue.empty();
+        listenerExists = mPriv->mListener;
+        mPriv->mQueue.push_back(work);
     }
 
-    bool wasEmpty = mImpl->q.empty();
-    mImpl->q.push_back(work);
-    lock.unlock();
-    mImpl->mListener->workAdded(wasEmpty);
+    if (listenerExists)
+    {
+        mPriv->mListener->workAdded(wasEmpty);
+    }
 }
 
 void WorkQueue::enqueueWorkSeq(const WorkSeq& works)
 {
-    boost::unique_lock<boost::mutex> lock(mImpl->mLock);
-    
-    WorkSeq exceptionItems;
-    for (WorkSeq::const_iterator iter = works.begin(); iter != works.end(); ++iter)
+    bool wasEmpty;
+    bool listenerExists;
     {
-        if (std::find(mImpl->q.begin(), mImpl->q.end(), *iter) != mImpl->q.end())
+        boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
+    
+        WorkSeq exceptionItems;
+        for (WorkSeq::const_iterator iter = works.begin(); iter != works.end(); ++iter)
+        {
+            if (std::find(mPriv->mQueue.begin(), mPriv->mQueue.end(), *iter) != mPriv->mQueue.end())
+            {
+                exceptionItems.push_back(*iter);
+            }
+        }
+    
+        if (!exceptionItems.empty())
         {
-            exceptionItems.push_back(*iter);
+            throw WorkExists(__FILE__, __LINE__, exceptionItems);
         }
+    
+        wasEmpty = mPriv->mQueue.empty();
+        listenerExists = mPriv->mListener;
+        mPriv->mQueue.insert(mPriv->mQueue.end(), works.begin(), works.end());
     }
 
-    if (!exceptionItems.empty())
+    if (listenerExists)
     {
-        throw WorkExists(__FILE__, __LINE__, exceptionItems);
+        mPriv->mListener->workAdded(wasEmpty);
     }
-
-    bool wasEmpty = mImpl->q.empty();
-    mImpl->q.insert(mImpl->q.end(), works.begin(), works.end());
-    lock.unlock();
-    mImpl->mListener->workAdded(wasEmpty);
 }
 
 void WorkQueue::cancelWork(const WorkPtr& work)
 {
-    boost::unique_lock<boost::mutex> lock(mImpl->mLock);
-
-    std::deque<WorkPtr>::iterator i = std::find(mImpl->q.begin(), mImpl->q.end(), work);
-    if (i == mImpl->q.end())
     {
-        throw WorkNotFound(__FILE__, __LINE__);
-    }
+        boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
+
+        std::deque<WorkPtr>::iterator i = std::find(mPriv->mQueue.begin(), mPriv->mQueue.end(), work);
+        if (i == mPriv->mQueue.end())
+        {
+            throw WorkNotFound(__FILE__, __LINE__);
+        }
 
-    mImpl->q.erase(i);
+        mPriv->mQueue.erase(i);
+    }
 
-    if (mImpl->q.empty())
+    if (mPriv->sendEmptyNotice())
     {
-        lock.unlock();
-        mImpl->mListener->emptied();
+        mPriv->mListener->emptied();
     }
 }
 
 bool WorkQueue::executeWork()
 {
-    boost::unique_lock<boost::mutex> lock(mImpl->mLock);
-    if (mImpl->q.empty())
+    WorkPtr work = mPriv->getNextTask();
+    if (work == 0)
     {
         return false;
     }
-
-    WorkPtr work = mImpl->q.front();
-    mImpl->q.pop_front();
-    lock.unlock();
     work->execute();
-    lock.lock();
-    if (mImpl->q.empty())
+
+    if (mPriv->sendEmptyNotice())
     {
-        lock.unlock();
-        mImpl->mListener->emptied();
+        mPriv->mListener->emptied();
         return false;
     }
     return true;
@@ -131,13 +161,14 @@ bool WorkQueue::executeWork()
 
 int WorkQueue::workCount()
 {
-    boost::unique_lock<boost::mutex> lock(mImpl->mLock);
-    return mImpl->q.size();
+    boost::shared_lock<boost::shared_mutex> lock(mPriv->mLock);
+    return mPriv->mQueue.size();
 }
 
 void WorkQueue::setListener(const QueueListenerPtr& listener)
 {
-    mImpl->mListener = listener;
+    boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
+    mPriv->mListener = listener;
 }
 
 }; // end namespace WorkQueue

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/ice-util-cpp.git



More information about the asterisk-scf-commits mailing list