[asterisk-scf-commits] asterisk-scf/integration/ice-util-cpp.git branch "workqueue" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Thu Mar 24 14:52:56 CDT 2011


branch "workqueue" has been updated
       via  77522e48b1b06b4b03c38c64b900f1459ead5c07 (commit)
      from  912bde27adfb6abae62ac1c893c52b3810d6cfce (commit)

Summary of changes:
 WorkQueue/src/SuspendableWorkQueue.cpp |   48 ++++++++++++++++++--------------
 WorkQueue/src/WorkQueue.cpp            |   45 ++++++++++++++++++------------
 2 files changed, 54 insertions(+), 39 deletions(-)


- Log -----------------------------------------------------------------
commit 77522e48b1b06b4b03c38c64b900f1459ead5c07
Author: Mark Michelson <mmichelson at digium.com>
Date:   Thu Mar 24 14:50:24 2011 -0500

    Fix problems pointed out by Kevin during code review.
    
    * Fixed potential race conditions that could result in attempting to
      send notices to listeners that had been erased.
    
    * Removed an unnecessary typedef in SuspendableWorkQueue.cpp
    
    * Added a comment to a method of SuspendableWorkQueuePriv indicating
      it expects a lock to be held as a precondition.

diff --git a/WorkQueue/src/SuspendableWorkQueue.cpp b/WorkQueue/src/SuspendableWorkQueue.cpp
index 80b6155..f70c3e4 100644
--- a/WorkQueue/src/SuspendableWorkQueue.cpp
+++ b/WorkQueue/src/SuspendableWorkQueue.cpp
@@ -34,17 +34,14 @@ public:
     SuspendableWorkQueuePriv(const QueueListenerPtr& listener)
         : mListener(listener), state(Ready), currentWork(0) { }
 
+    /**
+     * mLock is expected to be held when this function is called
+     */
     int workCount()
     {
         return mQueue.size() + (currentWork == 0 ? 0 : 1);
     }
 
-    bool sendEmptyNotice()
-    {
-        boost::shared_lock<boost::shared_mutex> lock(mLock);
-        return workCount() == 0 && mListener;
-    }
-
     void findDuplicates(const SuspendableWorkSeq& newItems)
     {
         SuspendableWorkSeq dupItems;
@@ -252,8 +249,6 @@ public:
     boost::shared_ptr<SuspendableWorkQueuePriv> mPriv;
 };
 
-typedef IceUtil::Handle<WorkListener> WorkListenerPtr;
-
 SuspendableWorkQueue::SuspendableWorkQueue()
     : mPriv(new SuspendableWorkQueuePriv) { }
 
@@ -265,18 +260,18 @@ void SuspendableWorkQueue::enqueueWork(const SuspendableWorkPtr& work)
     mPriv->findDuplicates(work);
 
     bool wasEmpty;
-    bool listenerExists;
+    QueueListenerPtr listenerRef;
     {
         boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
         //Call private version so we don't double grab the lock
         wasEmpty = mPriv->workCount() == 0; 
         mPriv->mQueue.push_back(work);
-        listenerExists = mPriv->mListener;
+        listenerRef = mPriv->mListener;
     }
 
-    if (listenerExists)
+    if (listenerRef != 0)
     {
-        mPriv->mListener->workAdded(wasEmpty);
+        listenerRef->workAdded(wasEmpty);
     }
 }
 
@@ -285,23 +280,25 @@ void SuspendableWorkQueue::enqueueWorkSeq(const SuspendableWorkSeq& works)
     mPriv->findDuplicates(works);
 
     bool wasEmpty;
-    bool listenerExists;
+    QueueListenerPtr listenerRef;
     {
         boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
         //Call private version so we don't double grab the lock
         wasEmpty = mPriv->workCount() == 0;
         mPriv->mQueue.insert(mPriv->mQueue.end(), works.begin(), works.end());
-        listenerExists = mPriv->mListener;
+        listenerRef = mPriv->mListener;
     }
 
-    if (listenerExists)
+    if (listenerRef != 0)
     {
-        mPriv->mListener->workAdded(wasEmpty);
+        listenerRef->workAdded(wasEmpty);
     }
 }
 
 void SuspendableWorkQueue::cancelWork(const SuspendableWorkPtr& work)
 {
+    bool isEmpty;
+    QueueListenerPtr listenerRef;
     {
         boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
 
@@ -317,11 +314,13 @@ void SuspendableWorkQueue::cancelWork(const SuspendableWorkPtr& work)
         }
 
         mPriv->mQueue.erase(i);
+        isEmpty = mPriv->mQueue.empty();
+        listenerRef = mPriv->mListener;
     }
 
-    if (mPriv->sendEmptyNotice())
+    if (isEmpty && listenerRef != 0)
     {
-        mPriv->mListener->emptied();
+        listenerRef->emptied();
     }
 }
 
@@ -335,9 +334,11 @@ bool SuspendableWorkQueue::executeWork()
         return false;
     }
     
-    WorkListenerPtr workListener(new WorkListener(mPriv));
+    SuspendableWorkListenerPtr workListener(new WorkListener(mPriv));
     SuspendableWorkResult result = work->execute(workListener);
 
+    bool isEmpty;
+    QueueListenerPtr listenerRef;
     if (result == AsteriskSCF::System::WorkQueue::V1::Suspended)
     {
         boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
@@ -368,11 +369,16 @@ bool SuspendableWorkQueue::executeWork()
         boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
         mPriv->state = SuspendableWorkQueuePriv::Ready;
         mPriv->currentWork = 0;
+        isEmpty = mPriv->mQueue.empty();
+        listenerRef = mPriv->mListener;
     }
     
-    if (mPriv->sendEmptyNotice())
+    if (isEmpty)
     {
-        mPriv->mListener->emptied();
+        if (listenerRef != 0)
+        {
+            mPriv->mListener->emptied();
+        }
         return false;
     }
     return true;
diff --git a/WorkQueue/src/WorkQueue.cpp b/WorkQueue/src/WorkQueue.cpp
index f5c4984..458612a 100644
--- a/WorkQueue/src/WorkQueue.cpp
+++ b/WorkQueue/src/WorkQueue.cpp
@@ -72,12 +72,6 @@ public:
         return work;
     }
 
-    bool sendEmptyNotice()
-    {
-        boost::shared_lock<boost::shared_mutex> lock(mLock);
-        return mQueue.empty() && mListener;
-    }
-
     QueueListenerPtr mListener;
     boost::shared_mutex mLock;
     std::deque<WorkPtr> mQueue;
@@ -94,17 +88,17 @@ void WorkQueue::enqueueWork(const WorkPtr& work)
     mPriv->findDuplicates(work);
 
     bool wasEmpty;
-    bool listenerExists;
+    QueueListenerPtr listenerRef;
     {
         boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
         wasEmpty = mPriv->mQueue.empty();
-        listenerExists = mPriv->mListener;
+        listenerRef = mPriv->mListener;
         mPriv->mQueue.push_back(work);
     }
 
-    if (listenerExists)
+    if (listenerRef != 0)
     {
-        mPriv->mListener->workAdded(wasEmpty);
+        listenerRef->workAdded(wasEmpty);
     }
 }
 
@@ -113,22 +107,24 @@ void WorkQueue::enqueueWorkSeq(const WorkSeq& works)
     mPriv->findDuplicates(works);
 
     bool wasEmpty;
-    bool listenerExists;
+    QueueListenerPtr listenerRef;
     {
         boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
         wasEmpty = mPriv->mQueue.empty();
-        listenerExists = mPriv->mListener;
+        listenerRef = mPriv->mListener;
         mPriv->mQueue.insert(mPriv->mQueue.end(), works.begin(), works.end());
     }
 
-    if (listenerExists)
+    if (listenerRef != 0)
     {
-        mPriv->mListener->workAdded(wasEmpty);
+        listenerRef->workAdded(wasEmpty);
     }
 }
 
 void WorkQueue::cancelWork(const WorkPtr& work)
 {
+    QueueListenerPtr listenerRef;
+    bool isEmpty;
     {
         boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
 
@@ -139,11 +135,13 @@ void WorkQueue::cancelWork(const WorkPtr& work)
         }
 
         mPriv->mQueue.erase(i);
+        isEmpty = mPriv->mQueue.empty();
+        listenerRef = mPriv->mListener;
     }
 
-    if (mPriv->sendEmptyNotice())
+    if (isEmpty && listenerRef != 0)
     {
-        mPriv->mListener->emptied();
+        listenerRef->emptied();
     }
 }
 
@@ -156,9 +154,20 @@ bool WorkQueue::executeWork()
     }
     work->execute();
 
-    if (mPriv->sendEmptyNotice())
+    QueueListenerPtr listenerRef;
+    bool isEmpty;
     {
-        mPriv->mListener->emptied();
+        boost::shared_lock<boost::shared_mutex> lock(mPriv->mLock);
+        isEmpty = mPriv->mQueue.empty();
+        listenerRef = mPriv->mListener;
+    }
+
+    if (isEmpty)
+    {
+        if (listenerRef != 0)
+        {
+            listenerRef->emptied();
+        }
         return false;
     }
     return true;

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/ice-util-cpp.git



More information about the asterisk-scf-commits mailing list