[asterisk-scf-commits] asterisk-scf/integration/ice-util-cpp.git branch "workqueue" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Wed Mar 23 14:42:18 CDT 2011
branch "workqueue" has been updated
via 912bde27adfb6abae62ac1c893c52b3810d6cfce (commit)
via 152b4da7fe826cc34edba5a59e420c8028979525 (commit)
via 2c217c8c4d8681a18fc5fa9b4eb2775d4f15761f (commit)
from 12d737fef992d5d20ef2d3c1cfc8bd6007cd2e7e (commit)
Summary of changes:
.../include/AsteriskSCF/SuspendableWorkQueue.h | 4 +-
WorkQueue/include/AsteriskSCF/WorkQueue.h | 4 +-
WorkQueue/src/SuspendableWorkQueue.cpp | 251 ++++++++++++--------
WorkQueue/src/WorkQueue.cpp | 150 +++++++-----
WorkQueue/test/TestSuspendableWorkQueue.cpp | 2 +
5 files changed, 254 insertions(+), 157 deletions(-)
- Log -----------------------------------------------------------------
commit 912bde27adfb6abae62ac1c893c52b3810d6cfce
Author: Mark Michelson <mmichelson at digium.com>
Date: Wed Mar 23 14:41:49 2011 -0500
Add a findDuplicates method to WorkQueue
diff --git a/WorkQueue/src/WorkQueue.cpp b/WorkQueue/src/WorkQueue.cpp
index 5134f14..f5c4984 100644
--- a/WorkQueue/src/WorkQueue.cpp
+++ b/WorkQueue/src/WorkQueue.cpp
@@ -34,6 +34,31 @@ public:
WorkQueuePriv(const QueueListenerPtr& listener)
: mListener(listener) { }
+ void findDuplicates(const WorkSeq &works)
+ {
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ WorkSeq exceptionItems;
+ for (WorkSeq::const_iterator iter = works.begin(); iter != works.end(); ++iter)
+ {
+ if (std::find(mQueue.begin(), mQueue.end(), *iter) != mQueue.end())
+ {
+ exceptionItems.push_back(*iter);
+ }
+ }
+
+ if (!exceptionItems.empty())
+ {
+ throw WorkExists(__FILE__, __LINE__, exceptionItems);
+ }
+ }
+
+ void findDuplicates(const WorkPtr &work)
+ {
+ WorkSeq items;
+ items.push_back(work);
+ findDuplicates(items);
+ }
+
WorkPtr getNextTask()
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
@@ -66,19 +91,12 @@ WorkQueue::WorkQueue(const QueueListenerPtr& listener)
void WorkQueue::enqueueWork(const WorkPtr& work)
{
+ mPriv->findDuplicates(work);
+
bool wasEmpty;
bool listenerExists;
{
boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
-
- if (std::find(mPriv->mQueue.begin(), mPriv->mQueue.end(), work) != mPriv->mQueue.end())
- {
- WorkSeq exceptionItems;
- exceptionItems.push_back(work);
- //XXX The file and line number are remQueueuired because
- //of a limitation regarding local exceptions in Ice.
- throw WorkExists(__FILE__, __LINE__, exceptionItems);
- }
wasEmpty = mPriv->mQueue.empty();
listenerExists = mPriv->mListener;
mPriv->mQueue.push_back(work);
@@ -92,25 +110,12 @@ void WorkQueue::enqueueWork(const WorkPtr& work)
void WorkQueue::enqueueWorkSeq(const WorkSeq& works)
{
+ mPriv->findDuplicates(works);
+
bool wasEmpty;
bool listenerExists;
{
boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
-
- WorkSeq exceptionItems;
- for (WorkSeq::const_iterator iter = works.begin(); iter != works.end(); ++iter)
- {
- if (std::find(mPriv->mQueue.begin(), mPriv->mQueue.end(), *iter) != mPriv->mQueue.end())
- {
- exceptionItems.push_back(*iter);
- }
- }
-
- if (!exceptionItems.empty())
- {
- throw WorkExists(__FILE__, __LINE__, exceptionItems);
- }
-
wasEmpty = mPriv->mQueue.empty();
listenerExists = mPriv->mListener;
mPriv->mQueue.insert(mPriv->mQueue.end(), works.begin(), works.end());
commit 152b4da7fe826cc34edba5a59e420c8028979525
Author: Mark Michelson <mmichelson at digium.com>
Date: Wed Mar 23 14:35:40 2011 -0500
Make similar changes to SuspendableWorkQueue as I did to WorkQueue.
In addition, there are findDuplicates() methods added for convenience.
I do believe I'll be adding these to the regular WorkQueue as well.
diff --git a/WorkQueue/include/AsteriskSCF/SuspendableWorkQueue.h b/WorkQueue/include/AsteriskSCF/SuspendableWorkQueue.h
index bcc586f..3f08353 100644
--- a/WorkQueue/include/AsteriskSCF/SuspendableWorkQueue.h
+++ b/WorkQueue/include/AsteriskSCF/SuspendableWorkQueue.h
@@ -25,7 +25,7 @@ namespace AsteriskSCF
namespace WorkQueue
{
-class SuspendableWorkQueueImpl;
+class SuspendableWorkQueuePriv;
class SuspendableWorkQueue : public AsteriskSCF::System::WorkQueue::V1::SuspendableQueue
{
@@ -39,7 +39,7 @@ public:
int workCount();
void setListener(const AsteriskSCF::System::WorkQueue::V1::QueueListenerPtr& listener);
private:
- boost::shared_ptr<SuspendableWorkQueueImpl> mImpl;
+ boost::shared_ptr<SuspendableWorkQueuePriv> mPriv;
};
}; //end namespace WorkQueue
diff --git a/WorkQueue/src/SuspendableWorkQueue.cpp b/WorkQueue/src/SuspendableWorkQueue.cpp
index aaaa921..80b6155 100644
--- a/WorkQueue/src/SuspendableWorkQueue.cpp
+++ b/WorkQueue/src/SuspendableWorkQueue.cpp
@@ -26,17 +26,94 @@ namespace WorkQueue
using namespace AsteriskSCF::System::WorkQueue::V1;
-class SuspendableWorkQueueImpl
+class SuspendableWorkQueuePriv
{
public:
- SuspendableWorkQueueImpl()
+ SuspendableWorkQueuePriv()
: mListener(0), state(Ready), currentWork(0) { }
- SuspendableWorkQueueImpl(const QueueListenerPtr& listener)
+ SuspendableWorkQueuePriv(const QueueListenerPtr& listener)
: mListener(listener), state(Ready), currentWork(0) { }
+ int workCount()
+ {
+ return mQueue.size() + (currentWork == 0 ? 0 : 1);
+ }
+
+ bool sendEmptyNotice()
+ {
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ return workCount() == 0 && mListener;
+ }
+
+ void findDuplicates(const SuspendableWorkSeq& newItems)
+ {
+ SuspendableWorkSeq dupItems;
+ {
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+
+ if (workCount() == 0)
+ {
+ return;
+ }
+
+ for (SuspendableWorkSeq::const_iterator iter = newItems.begin();
+ iter != newItems.end(); ++iter)
+ {
+ if (*iter == currentWork ||
+ std::find(mQueue.begin(), mQueue.end(), *iter) != mQueue.end())
+ {
+ dupItems.push_back(*iter);
+ }
+ }
+ }
+
+ if (!dupItems.empty())
+ {
+ throw SuspendableWorkExists(__FILE__, __LINE__, dupItems);
+ }
+ }
+
+ void findDuplicates(const SuspendableWorkPtr &item)
+ {
+ SuspendableWorkSeq items;
+ items.push_back(item);
+ findDuplicates(items);
+ }
+
+ SuspendableWorkPtr getNextTask()
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+
+ SuspendableWorkPtr work;
+ switch (state)
+ {
+ case Ready:
+ assert(currentWork == 0);
+ if (mQueue.empty())
+ {
+ return false;
+ }
+ work = mQueue.front();
+ mQueue.pop_front();
+ state = Executing;
+ break;
+ case Resumable:
+ assert(currentWork != 0);
+ work = currentWork;
+ state = Executing;
+ break;
+ case Suspended:
+ case Executing:
+ default:
+ work = 0;
+ }
+
+ return work;
+ }
+
QueueListenerPtr mListener;
- boost::mutex mLock;
- std::deque<SuspendableWorkPtr> q;
+ boost::shared_mutex mLock;
+ std::deque<SuspendableWorkPtr> mQueue;
enum WorkState
{
/**
@@ -145,14 +222,16 @@ public:
class WorkListener : public SuspendableWorkListener
{
public:
- WorkListener(boost::shared_ptr<SuspendableWorkQueueImpl> impl)
- : mImpl(impl) { }
+ WorkListener(boost::shared_ptr<SuspendableWorkQueuePriv> impl)
+ : mPriv(impl) { }
void workResumable()
{
- boost::unique_lock<boost::mutex> lock(mImpl->mLock);
- SuspendableWorkQueueImpl::WorkState previousState = mImpl->state;
- mImpl->state = SuspendableWorkQueueImpl::Resumable;
- lock.unlock();
+ SuspendableWorkQueuePriv::WorkState previousState;
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
+ previousState = mPriv->state;
+ mPriv->state = SuspendableWorkQueuePriv::Resumable;
+ }
// If the workResumable notification is received and the
// state of things was not suspended, then that must mean
@@ -164,128 +243,105 @@ public:
// that work is resumable, because no entity outside the
// queue itself knows the work ever tried to enter a
// suspended state.
- if (previousState == SuspendableWorkQueueImpl::Suspended)
+ if (previousState == SuspendableWorkQueuePriv::Suspended)
{
- mImpl->mListener->workResumable();
+ mPriv->mListener->workResumable();
}
}
- boost::shared_ptr<SuspendableWorkQueueImpl> mImpl;
+ boost::shared_ptr<SuspendableWorkQueuePriv> mPriv;
};
typedef IceUtil::Handle<WorkListener> WorkListenerPtr;
SuspendableWorkQueue::SuspendableWorkQueue()
- : mImpl(new SuspendableWorkQueueImpl) { }
+ : mPriv(new SuspendableWorkQueuePriv) { }
SuspendableWorkQueue::SuspendableWorkQueue(const QueueListenerPtr& listener)
- : mImpl(new SuspendableWorkQueueImpl(listener)) { }
+ : mPriv(new SuspendableWorkQueuePriv(listener)) { }
void SuspendableWorkQueue::enqueueWork(const SuspendableWorkPtr& work)
{
- boost::unique_lock<boost::mutex> lock(mImpl->mLock);
+ mPriv->findDuplicates(work);
- if (work == mImpl->currentWork ||
- std::find(mImpl->q.begin(), mImpl->q.end(), work) != mImpl->q.end())
+ bool wasEmpty;
+ bool listenerExists;
{
- SuspendableWorkSeq exceptionItems;
- exceptionItems.push_back(work);
- throw SuspendableWorkExists(__FILE__, __LINE__, exceptionItems);
+ boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
+ //Call private version so we don't double grab the lock
+ wasEmpty = mPriv->workCount() == 0;
+ mPriv->mQueue.push_back(work);
+ listenerExists = mPriv->mListener;
+ }
+
+ if (listenerExists)
+ {
+ mPriv->mListener->workAdded(wasEmpty);
}
-
- bool wasEmpty = mImpl->q.empty();
- mImpl->q.push_back(work);
- lock.unlock();
- mImpl->mListener->workAdded(wasEmpty);
}
void SuspendableWorkQueue::enqueueWorkSeq(const SuspendableWorkSeq& works)
{
- boost::unique_lock<boost::mutex> lock(mImpl->mLock);
-
- SuspendableWorkSeq exceptionItems;
- for (SuspendableWorkSeq::const_iterator iter = works.begin(); iter != works.end(); ++iter)
+ mPriv->findDuplicates(works);
+
+ bool wasEmpty;
+ bool listenerExists;
{
- if (*iter == mImpl->currentWork ||
- std::find(mImpl->q.begin(), mImpl->q.end(), *iter) != mImpl->q.end())
- {
- exceptionItems.push_back(*iter);
- }
+ boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
+ //Call private version so we don't double grab the lock
+ wasEmpty = mPriv->workCount() == 0;
+ mPriv->mQueue.insert(mPriv->mQueue.end(), works.begin(), works.end());
+ listenerExists = mPriv->mListener;
}
- if (!exceptionItems.empty())
+ if (listenerExists)
{
- throw SuspendableWorkExists(__FILE__, __LINE__, exceptionItems);
+ mPriv->mListener->workAdded(wasEmpty);
}
-
- bool wasEmpty = mImpl->q.empty();
- mImpl->q.insert(mImpl->q.end(), works.begin(), works.end());
- lock.unlock();
- mImpl->mListener->workAdded(wasEmpty);
}
void SuspendableWorkQueue::cancelWork(const SuspendableWorkPtr& work)
{
- boost::unique_lock<boost::mutex> lock(mImpl->mLock);
-
- if (work == mImpl->currentWork)
{
- throw WorkNotFound(__FILE__, __LINE__);
- }
+ boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
- std::deque<SuspendableWorkPtr>::iterator i = std::find(mImpl->q.begin(), mImpl->q.end(), work);
- if (i == mImpl->q.end())
- {
- throw WorkNotFound(__FILE__, __LINE__);
- }
+ if (work == mPriv->currentWork)
+ {
+ throw WorkNotFound(__FILE__, __LINE__);
+ }
- mImpl->q.erase(i);
+ std::deque<SuspendableWorkPtr>::iterator i = std::find(mPriv->mQueue.begin(), mPriv->mQueue.end(), work);
+ if (i == mPriv->mQueue.end())
+ {
+ throw WorkNotFound(__FILE__, __LINE__);
+ }
- if (mImpl->q.empty())
+ mPriv->mQueue.erase(i);
+ }
+
+ if (mPriv->sendEmptyNotice())
{
- lock.unlock();
- mImpl->mListener->emptied();
+ mPriv->mListener->emptied();
}
}
bool SuspendableWorkQueue::executeWork()
{
- boost::unique_lock<boost::mutex> lock(mImpl->mLock);
- if (mImpl->state != SuspendableWorkQueueImpl::Ready
- && mImpl->state != SuspendableWorkQueueImpl::Resumable)
- {
- return false;
- }
+ SuspendableWorkPtr work = mPriv->getNextTask();
- SuspendableWorkPtr work;
- if (mImpl->state == SuspendableWorkQueueImpl::Ready)
- {
- assert(mImpl->currentWork == 0);
- if (mImpl->q.empty())
- {
- return false;
- }
- work = mImpl->q.front();
- mImpl->q.pop_front();
- }
- else //state is Resumable
+ if (work == 0)
{
- assert(mImpl->currentWork != 0);
- work = mImpl->currentWork;
+ return false;
}
-
- mImpl->state = SuspendableWorkQueueImpl::Executing;
- lock.unlock();
-
- WorkListenerPtr workListener(new WorkListener(mImpl));
+
+ WorkListenerPtr workListener(new WorkListener(mPriv));
SuspendableWorkResult result = work->execute(workListener);
- lock.lock();
-
if (result == AsteriskSCF::System::WorkQueue::V1::Suspended)
{
- mImpl->currentWork = work;
+ boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
+ mPriv->currentWork = work;
// It's possible that a piece of work called out to an
// asynchronous method and needed to be suspended. Before
@@ -297,24 +353,26 @@ bool SuspendableWorkQueue::executeWork()
// can forego setting the state to Suspended and instead
// treat the situation as if the work's execution had
// actually returned "Complete"
- if (mImpl->state == SuspendableWorkQueueImpl::Resumable)
+ if (mPriv->state == SuspendableWorkQueuePriv::Resumable)
{
return true;
}
else //State is still "Executing"
{
- mImpl->state = SuspendableWorkQueueImpl::Suspended;
+ mPriv->state = SuspendableWorkQueuePriv::Suspended;
return false;
}
}
+ else
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
+ mPriv->state = SuspendableWorkQueuePriv::Ready;
+ mPriv->currentWork = 0;
+ }
- mImpl->state = SuspendableWorkQueueImpl::Ready;
- mImpl->currentWork = 0;
-
- if (mImpl->q.empty())
+ if (mPriv->sendEmptyNotice())
{
- lock.unlock();
- mImpl->mListener->emptied();
+ mPriv->mListener->emptied();
return false;
}
return true;
@@ -322,13 +380,14 @@ bool SuspendableWorkQueue::executeWork()
int SuspendableWorkQueue::workCount()
{
- boost::unique_lock<boost::mutex> lock(mImpl->mLock);
- return mImpl->q.size() + (mImpl->currentWork ? 1 : 0);
+ boost::shared_lock<boost::shared_mutex> lock(mPriv->mLock);
+ return mPriv->workCount();
}
void SuspendableWorkQueue::setListener(const QueueListenerPtr& listener)
{
- mImpl->mListener = listener;
+ boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
+ mPriv->mListener = listener;
}
}; //end namespace WorkQueue
diff --git a/WorkQueue/test/TestSuspendableWorkQueue.cpp b/WorkQueue/test/TestSuspendableWorkQueue.cpp
index 121cee4..d4b26d6 100644
--- a/WorkQueue/test/TestSuspendableWorkQueue.cpp
+++ b/WorkQueue/test/TestSuspendableWorkQueue.cpp
@@ -528,7 +528,9 @@ BOOST_AUTO_TEST_CASE(complexWork)
moreWork = queue->executeWork();
BOOST_CHECK(moreWork == false);
+ std::cout << "Work count is " << queue->workCount() << std::endl;
BOOST_CHECK(queue->workCount() == 0);
+ std::cout << "State of work is " << work->currentState << std::endl;
BOOST_CHECK(work->currentState == ComplexTask::Task2Complete);
BOOST_CHECK(listener->emptyNotice == true);
}
commit 2c217c8c4d8681a18fc5fa9b4eb2775d4f15761f
Author: Mark Michelson <mmichelson at digium.com>
Date: Wed Mar 23 13:22:12 2011 -0500
Address Brent's comments on CR-ASTSCF-69 for WorkQueue.
* Changed instances of "Impl" to "Priv"
* Changed 'q' member to "mQueue"
* Use scopes instead of explicit unlock statements
* Simplify flow of several method calls.
* Account for potential mListener == 0 cases
* Lock when setting mListener
* Despite what I said on the review, use a shared_mutex and shared_locks
where applicable.
Tests all still pass.
diff --git a/WorkQueue/include/AsteriskSCF/WorkQueue.h b/WorkQueue/include/AsteriskSCF/WorkQueue.h
index 86b07d5..04e8a08 100644
--- a/WorkQueue/include/AsteriskSCF/WorkQueue.h
+++ b/WorkQueue/include/AsteriskSCF/WorkQueue.h
@@ -24,7 +24,7 @@ namespace AsteriskSCF
namespace WorkQueue
{
-class WorkQueueImpl;
+class WorkQueuePriv;
class WorkQueue : public AsteriskSCF::System::WorkQueue::V1::Queue
{
@@ -38,7 +38,7 @@ public:
int workCount();
void setListener(const AsteriskSCF::System::WorkQueue::V1::QueueListenerPtr& listener);
private:
- boost::shared_ptr<WorkQueueImpl> mImpl;
+ boost::shared_ptr<WorkQueuePriv> mPriv;
};
}; //end namespace WorkQueue
diff --git a/WorkQueue/src/WorkQueue.cpp b/WorkQueue/src/WorkQueue.cpp
index de80d03..5134f14 100644
--- a/WorkQueue/src/WorkQueue.cpp
+++ b/WorkQueue/src/WorkQueue.cpp
@@ -26,104 +26,134 @@ namespace WorkQueue
using namespace AsteriskSCF::System::WorkQueue::V1;
-class WorkQueueImpl
+class WorkQueuePriv
{
public:
- WorkQueueImpl()
+ WorkQueuePriv()
: mListener(0) { }
- WorkQueueImpl(const QueueListenerPtr& listener)
+ WorkQueuePriv(const QueueListenerPtr& listener)
: mListener(listener) { }
+ WorkPtr getNextTask()
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ if (mQueue.empty())
+ {
+ return 0;
+ }
+
+ WorkPtr work = mQueue.front();
+ mQueue.pop_front();
+ return work;
+ }
+
+ bool sendEmptyNotice()
+ {
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ return mQueue.empty() && mListener;
+ }
+
QueueListenerPtr mListener;
- boost::mutex mLock;
- std::deque<WorkPtr> q;
+ boost::shared_mutex mLock;
+ std::deque<WorkPtr> mQueue;
};
WorkQueue::WorkQueue()
- : mImpl(new WorkQueueImpl) { }
+ : mPriv(new WorkQueuePriv) { }
WorkQueue::WorkQueue(const QueueListenerPtr& listener)
- : mImpl(new WorkQueueImpl(listener)) { }
+ : mPriv(new WorkQueuePriv(listener)) { }
void WorkQueue::enqueueWork(const WorkPtr& work)
{
- boost::unique_lock<boost::mutex> lock(mImpl->mLock);
-
- if (std::find(mImpl->q.begin(), mImpl->q.end(), work) != mImpl->q.end())
+ bool wasEmpty;
+ bool listenerExists;
{
- WorkSeq exceptionItems;
- exceptionItems.push_back(work);
- //XXX The file and line number are required because
- //of a limitation regarding local exceptions in Ice.
- throw WorkExists(__FILE__, __LINE__, exceptionItems);
+ boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
+
+ if (std::find(mPriv->mQueue.begin(), mPriv->mQueue.end(), work) != mPriv->mQueue.end())
+ {
+ WorkSeq exceptionItems;
+ exceptionItems.push_back(work);
+ //XXX The file and line number are remQueueuired because
+ //of a limitation regarding local exceptions in Ice.
+ throw WorkExists(__FILE__, __LINE__, exceptionItems);
+ }
+ wasEmpty = mPriv->mQueue.empty();
+ listenerExists = mPriv->mListener;
+ mPriv->mQueue.push_back(work);
}
- bool wasEmpty = mImpl->q.empty();
- mImpl->q.push_back(work);
- lock.unlock();
- mImpl->mListener->workAdded(wasEmpty);
+ if (listenerExists)
+ {
+ mPriv->mListener->workAdded(wasEmpty);
+ }
}
void WorkQueue::enqueueWorkSeq(const WorkSeq& works)
{
- boost::unique_lock<boost::mutex> lock(mImpl->mLock);
-
- WorkSeq exceptionItems;
- for (WorkSeq::const_iterator iter = works.begin(); iter != works.end(); ++iter)
+ bool wasEmpty;
+ bool listenerExists;
{
- if (std::find(mImpl->q.begin(), mImpl->q.end(), *iter) != mImpl->q.end())
+ boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
+
+ WorkSeq exceptionItems;
+ for (WorkSeq::const_iterator iter = works.begin(); iter != works.end(); ++iter)
+ {
+ if (std::find(mPriv->mQueue.begin(), mPriv->mQueue.end(), *iter) != mPriv->mQueue.end())
+ {
+ exceptionItems.push_back(*iter);
+ }
+ }
+
+ if (!exceptionItems.empty())
{
- exceptionItems.push_back(*iter);
+ throw WorkExists(__FILE__, __LINE__, exceptionItems);
}
+
+ wasEmpty = mPriv->mQueue.empty();
+ listenerExists = mPriv->mListener;
+ mPriv->mQueue.insert(mPriv->mQueue.end(), works.begin(), works.end());
}
- if (!exceptionItems.empty())
+ if (listenerExists)
{
- throw WorkExists(__FILE__, __LINE__, exceptionItems);
+ mPriv->mListener->workAdded(wasEmpty);
}
-
- bool wasEmpty = mImpl->q.empty();
- mImpl->q.insert(mImpl->q.end(), works.begin(), works.end());
- lock.unlock();
- mImpl->mListener->workAdded(wasEmpty);
}
void WorkQueue::cancelWork(const WorkPtr& work)
{
- boost::unique_lock<boost::mutex> lock(mImpl->mLock);
-
- std::deque<WorkPtr>::iterator i = std::find(mImpl->q.begin(), mImpl->q.end(), work);
- if (i == mImpl->q.end())
{
- throw WorkNotFound(__FILE__, __LINE__);
- }
+ boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
+
+ std::deque<WorkPtr>::iterator i = std::find(mPriv->mQueue.begin(), mPriv->mQueue.end(), work);
+ if (i == mPriv->mQueue.end())
+ {
+ throw WorkNotFound(__FILE__, __LINE__);
+ }
- mImpl->q.erase(i);
+ mPriv->mQueue.erase(i);
+ }
- if (mImpl->q.empty())
+ if (mPriv->sendEmptyNotice())
{
- lock.unlock();
- mImpl->mListener->emptied();
+ mPriv->mListener->emptied();
}
}
bool WorkQueue::executeWork()
{
- boost::unique_lock<boost::mutex> lock(mImpl->mLock);
- if (mImpl->q.empty())
+ WorkPtr work = mPriv->getNextTask();
+ if (work == 0)
{
return false;
}
-
- WorkPtr work = mImpl->q.front();
- mImpl->q.pop_front();
- lock.unlock();
work->execute();
- lock.lock();
- if (mImpl->q.empty())
+
+ if (mPriv->sendEmptyNotice())
{
- lock.unlock();
- mImpl->mListener->emptied();
+ mPriv->mListener->emptied();
return false;
}
return true;
@@ -131,13 +161,14 @@ bool WorkQueue::executeWork()
int WorkQueue::workCount()
{
- boost::unique_lock<boost::mutex> lock(mImpl->mLock);
- return mImpl->q.size();
+ boost::shared_lock<boost::shared_mutex> lock(mPriv->mLock);
+ return mPriv->mQueue.size();
}
void WorkQueue::setListener(const QueueListenerPtr& listener)
{
- mImpl->mListener = listener;
+ boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
+ mPriv->mListener = listener;
}
}; // end namespace WorkQueue
-----------------------------------------------------------------------
--
asterisk-scf/integration/ice-util-cpp.git
More information about the asterisk-scf-commits
mailing list