[asterisk-scf-commits] asterisk-scf/integration/ice-util-cpp.git branch "workqueue" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Thu Mar 24 14:52:56 CDT 2011
branch "workqueue" has been updated
via 77522e48b1b06b4b03c38c64b900f1459ead5c07 (commit)
from 912bde27adfb6abae62ac1c893c52b3810d6cfce (commit)
Summary of changes:
WorkQueue/src/SuspendableWorkQueue.cpp | 48 ++++++++++++++++++--------------
WorkQueue/src/WorkQueue.cpp | 45 ++++++++++++++++++------------
2 files changed, 54 insertions(+), 39 deletions(-)
- Log -----------------------------------------------------------------
commit 77522e48b1b06b4b03c38c64b900f1459ead5c07
Author: Mark Michelson <mmichelson at digium.com>
Date: Thu Mar 24 14:50:24 2011 -0500
Fix problems pointed out by Kevin during code review.
* Fixed potential race conditions that could result in attempting to
send notices to listeners that had been erased.
* Removed an unnecessary typedef in SuspendableWorkQueue.cpp
* Added a comment to a method of SuspendableWorkQueuePriv indicating
it expects a lock to be held as a precondition.
diff --git a/WorkQueue/src/SuspendableWorkQueue.cpp b/WorkQueue/src/SuspendableWorkQueue.cpp
index 80b6155..f70c3e4 100644
--- a/WorkQueue/src/SuspendableWorkQueue.cpp
+++ b/WorkQueue/src/SuspendableWorkQueue.cpp
@@ -34,17 +34,14 @@ public:
SuspendableWorkQueuePriv(const QueueListenerPtr& listener)
: mListener(listener), state(Ready), currentWork(0) { }
+ /**
+ * mLock is expected to be held when this function is called
+ */
int workCount()
{
return mQueue.size() + (currentWork == 0 ? 0 : 1);
}
- bool sendEmptyNotice()
- {
- boost::shared_lock<boost::shared_mutex> lock(mLock);
- return workCount() == 0 && mListener;
- }
-
void findDuplicates(const SuspendableWorkSeq& newItems)
{
SuspendableWorkSeq dupItems;
@@ -252,8 +249,6 @@ public:
boost::shared_ptr<SuspendableWorkQueuePriv> mPriv;
};
-typedef IceUtil::Handle<WorkListener> WorkListenerPtr;
-
SuspendableWorkQueue::SuspendableWorkQueue()
: mPriv(new SuspendableWorkQueuePriv) { }
@@ -265,18 +260,18 @@ void SuspendableWorkQueue::enqueueWork(const SuspendableWorkPtr& work)
mPriv->findDuplicates(work);
bool wasEmpty;
- bool listenerExists;
+ QueueListenerPtr listenerRef;
{
boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
//Call private version so we don't double grab the lock
wasEmpty = mPriv->workCount() == 0;
mPriv->mQueue.push_back(work);
- listenerExists = mPriv->mListener;
+ listenerRef = mPriv->mListener;
}
- if (listenerExists)
+ if (listenerRef != 0)
{
- mPriv->mListener->workAdded(wasEmpty);
+ listenerRef->workAdded(wasEmpty);
}
}
@@ -285,23 +280,25 @@ void SuspendableWorkQueue::enqueueWorkSeq(const SuspendableWorkSeq& works)
mPriv->findDuplicates(works);
bool wasEmpty;
- bool listenerExists;
+ QueueListenerPtr listenerRef;
{
boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
//Call private version so we don't double grab the lock
wasEmpty = mPriv->workCount() == 0;
mPriv->mQueue.insert(mPriv->mQueue.end(), works.begin(), works.end());
- listenerExists = mPriv->mListener;
+ listenerRef = mPriv->mListener;
}
- if (listenerExists)
+ if (listenerRef != 0)
{
- mPriv->mListener->workAdded(wasEmpty);
+ listenerRef->workAdded(wasEmpty);
}
}
void SuspendableWorkQueue::cancelWork(const SuspendableWorkPtr& work)
{
+ bool isEmpty;
+ QueueListenerPtr listenerRef;
{
boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
@@ -317,11 +314,13 @@ void SuspendableWorkQueue::cancelWork(const SuspendableWorkPtr& work)
}
mPriv->mQueue.erase(i);
+ isEmpty = mPriv->mQueue.empty();
+ listenerRef = mPriv->mListener;
}
- if (mPriv->sendEmptyNotice())
+ if (isEmpty && listenerRef != 0)
{
- mPriv->mListener->emptied();
+ listenerRef->emptied();
}
}
@@ -335,9 +334,11 @@ bool SuspendableWorkQueue::executeWork()
return false;
}
- WorkListenerPtr workListener(new WorkListener(mPriv));
+ SuspendableWorkListenerPtr workListener(new WorkListener(mPriv));
SuspendableWorkResult result = work->execute(workListener);
+ bool isEmpty;
+ QueueListenerPtr listenerRef;
if (result == AsteriskSCF::System::WorkQueue::V1::Suspended)
{
boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
@@ -368,11 +369,16 @@ bool SuspendableWorkQueue::executeWork()
boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
mPriv->state = SuspendableWorkQueuePriv::Ready;
mPriv->currentWork = 0;
+ isEmpty = mPriv->mQueue.empty();
+ listenerRef = mPriv->mListener;
}
- if (mPriv->sendEmptyNotice())
+ if (isEmpty)
{
- mPriv->mListener->emptied();
+ if (listenerRef != 0)
+ {
+ mPriv->mListener->emptied();
+ }
return false;
}
return true;
diff --git a/WorkQueue/src/WorkQueue.cpp b/WorkQueue/src/WorkQueue.cpp
index f5c4984..458612a 100644
--- a/WorkQueue/src/WorkQueue.cpp
+++ b/WorkQueue/src/WorkQueue.cpp
@@ -72,12 +72,6 @@ public:
return work;
}
- bool sendEmptyNotice()
- {
- boost::shared_lock<boost::shared_mutex> lock(mLock);
- return mQueue.empty() && mListener;
- }
-
QueueListenerPtr mListener;
boost::shared_mutex mLock;
std::deque<WorkPtr> mQueue;
@@ -94,17 +88,17 @@ void WorkQueue::enqueueWork(const WorkPtr& work)
mPriv->findDuplicates(work);
bool wasEmpty;
- bool listenerExists;
+ QueueListenerPtr listenerRef;
{
boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
wasEmpty = mPriv->mQueue.empty();
- listenerExists = mPriv->mListener;
+ listenerRef = mPriv->mListener;
mPriv->mQueue.push_back(work);
}
- if (listenerExists)
+ if (listenerRef != 0)
{
- mPriv->mListener->workAdded(wasEmpty);
+ listenerRef->workAdded(wasEmpty);
}
}
@@ -113,22 +107,24 @@ void WorkQueue::enqueueWorkSeq(const WorkSeq& works)
mPriv->findDuplicates(works);
bool wasEmpty;
- bool listenerExists;
+ QueueListenerPtr listenerRef;
{
boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
wasEmpty = mPriv->mQueue.empty();
- listenerExists = mPriv->mListener;
+ listenerRef = mPriv->mListener;
mPriv->mQueue.insert(mPriv->mQueue.end(), works.begin(), works.end());
}
- if (listenerExists)
+ if (listenerRef != 0)
{
- mPriv->mListener->workAdded(wasEmpty);
+ listenerRef->workAdded(wasEmpty);
}
}
void WorkQueue::cancelWork(const WorkPtr& work)
{
+ QueueListenerPtr listenerRef;
+ bool isEmpty;
{
boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
@@ -139,11 +135,13 @@ void WorkQueue::cancelWork(const WorkPtr& work)
}
mPriv->mQueue.erase(i);
+ isEmpty = mPriv->mQueue.empty();
+ listenerRef = mPriv->mListener;
}
- if (mPriv->sendEmptyNotice())
+ if (isEmpty && listenerRef != 0)
{
- mPriv->mListener->emptied();
+ listenerRef->emptied();
}
}
@@ -156,9 +154,20 @@ bool WorkQueue::executeWork()
}
work->execute();
- if (mPriv->sendEmptyNotice())
+ QueueListenerPtr listenerRef;
+ bool isEmpty;
{
- mPriv->mListener->emptied();
+ boost::shared_lock<boost::shared_mutex> lock(mPriv->mLock);
+ isEmpty = mPriv->mQueue.empty();
+ listenerRef = mPriv->mListener;
+ }
+
+ if (isEmpty)
+ {
+ if (listenerRef != 0)
+ {
+ listenerRef->emptied();
+ }
return false;
}
return true;
-----------------------------------------------------------------------
--
asterisk-scf/integration/ice-util-cpp.git
More information about the asterisk-scf-commits
mailing list