[asterisk-scf-commits] asterisk-scf/integration/ice-util-cpp.git branch "thread_pool_update" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Fri Jul 27 15:06:32 CDT 2012
branch "thread_pool_update" has been updated
via ee188408f940769bd9860a6cb2ae61fca50ff8c6 (commit)
from d50bd6e62bd9a6f58fde0cd65047049ac2b09106 (commit)
Summary of changes:
include/AsteriskSCF/ThreadPool/ThreadPool.h | 5 +-
.../AsteriskSCF/WorkQueue/DefaultQueueListener.h | 2 +-
src/ThreadPool/ThreadPool.cpp | 65 +++++++++++++++----
src/WorkQueue/DefaultQueueListener.cpp | 4 +-
src/WorkQueue/SuspendableWorkQueue.cpp | 12 ++--
src/WorkQueue/WorkQueue.cpp | 5 +-
test/ThreadPool/TestThreadPool.cpp | 13 +++-
test/WorkQueue/TestSuspendableWorkQueue.cpp | 4 +-
test/WorkQueue/TestWorkQueue.cpp | 4 +-
9 files changed, 80 insertions(+), 34 deletions(-)
- Log -----------------------------------------------------------------
commit ee188408f940769bd9860a6cb2ae61fca50ff8c6
Author: Ken Hunt <ken.hunt at digium.com>
Date: Fri Jul 27 15:02:01 2012 -0500
Thread pool updates to provide additional details to listener.
diff --git a/include/AsteriskSCF/ThreadPool/ThreadPool.h b/include/AsteriskSCF/ThreadPool/ThreadPool.h
index 93846c0..b947965 100644
--- a/include/AsteriskSCF/ThreadPool/ThreadPool.h
+++ b/include/AsteriskSCF/ThreadPool/ThreadPool.h
@@ -52,8 +52,9 @@ class ASTSCF_DLL_EXPORT ThreadPoolFactory : public AsteriskSCF::System::ThreadPo
{
public:
ThreadPoolFactory();
- AsteriskSCF::System::ThreadPool::V1::PoolPtr createPool(const AsteriskSCF::System::ThreadPool::V1::PoolListenerPtr& listener,
- const AsteriskSCF::System::WorkQueue::V1::QueuePtr& queue);
+ AsteriskSCF::System::ThreadPool::V1::PoolPtr createPool(
+ const AsteriskSCF::System::ThreadPool::V1::PoolListenerPtr& listener,
+ const AsteriskSCF::System::WorkQueue::V1::QueuePtr& queue);
private:
};
diff --git a/include/AsteriskSCF/WorkQueue/DefaultQueueListener.h b/include/AsteriskSCF/WorkQueue/DefaultQueueListener.h
index 37df9ea..b1f5fa0 100644
--- a/include/AsteriskSCF/WorkQueue/DefaultQueueListener.h
+++ b/include/AsteriskSCF/WorkQueue/DefaultQueueListener.h
@@ -48,7 +48,7 @@ public:
DefaultQueueListener(const AsteriskSCF::System::WorkQueue::V1::QueueBasePtr& queue, const Ice::ThreadNotificationPtr& threadHook);
~DefaultQueueListener();
- void workAdded(const AsteriskSCF::System::WorkQueue::V1::QueueBasePtr&, Ice::Long numNewWork, bool wasEmpty);
+ void workAdded(const AsteriskSCF::System::WorkQueue::V1::QueueBasePtr&, Ice::Long numNewWork, Ice::Long newQueueSize);
void workResumable(const AsteriskSCF::System::WorkQueue::V1::QueueBasePtr&);
void emptied(const AsteriskSCF::System::WorkQueue::V1::QueueBasePtr&);
void shuttingDown(const AsteriskSCF::System::WorkQueue::V1::QueueBasePtr&);
diff --git a/src/ThreadPool/ThreadPool.cpp b/src/ThreadPool/ThreadPool.cpp
index 005bd27..f4caf13 100644
--- a/src/ThreadPool/ThreadPool.cpp
+++ b/src/ThreadPool/ThreadPool.cpp
@@ -31,7 +31,7 @@ using namespace AsteriskSCF::System::WorkQueue::V1;
class ThreadQueueListener;
-typedef std::vector<WorkerThreadPtr> ThreadContainer;
+typedef std::deque<WorkerThreadPtr> ThreadContainer;
typedef ThreadContainer::iterator ThreadIterator;
class ThreadPool : public Pool, public WorkerThreadListener
@@ -46,6 +46,8 @@ public:
ThreadQueueListenerFactory factory;
tqListener = factory.createThreadQueueListener(this);
mQueue->setListener(tqListener);
+
+ initialized();
}
~ThreadPool()
@@ -301,7 +303,8 @@ public:
* Queued task called when we are notified work has been added to the queue.
*
* When executed, we notify our listener that work has been added and we
- * awaken idle threads.
+ * awaken as many idle threads as we can, up to the amount of work items
+ * added.
*
* XXX The current method awakens all idle threads. A potential alternative
* would be to awaken a number of idle threads equal to the number of
@@ -310,33 +313,67 @@ public:
class WorkAdded : public Work
{
public:
- WorkAdded(Ice::Long numNewWork, bool wasEmpty, ThreadPool *pool)
- : mNewWork(numNewWork), mWasEmpty(wasEmpty), mPool(pool) { }
+ WorkAdded(Ice::Long numNewWork, Ice::Long newQueueSize, ThreadPool *pool)
+ : mNumNewWork(numNewWork), mNewQueueSize(newQueueSize), mPool(pool) { }
void execute()
{
- mPool->mListener->queueWorkAdded(mPool, mNewWork, mWasEmpty);
- for (ThreadIterator i = mPool->mIdleThreads.begin();
- i != mPool->mIdleThreads.end(); ++i)
+ mPool->mListener->queueWorkAdded(mPool, mNumNewWork, mNewQueueSize);
+
+ ThreadIterator i;
+ int count = 0;
+ for (i = mPool->mIdleThreads.begin();
+ i != mPool->mIdleThreads.end() && count < mNumNewWork; ++i, ++count)
{
mPool->mActiveThreads.push_back(*i);
(*i)->setState(Alive);
}
- mPool->mIdleThreads.erase(mPool->mIdleThreads.begin(), mPool->mIdleThreads.end());
+ mPool->mIdleThreads.erase(mPool->mIdleThreads.begin(), i);
}
private:
- const Ice::Long mNewWork;
- const bool mWasEmpty;
+ const Ice::Long mNumNewWork;
+ const Ice::Long mNewQueueSize;
ThreadPool *mPool;
};
- void handleWorkAdded(Ice::Long numNewWork, bool wasEmpty)
+ void handleWorkAdded(Ice::Long numNewWork, Ice::Long newQueueSize)
+ {
+ boost::lock_guard<boost::mutex> lock(mQueueLock);
+ if (!mShuttingDown)
+ {
+ mControlQueue->enqueueWork(new WorkAdded(numNewWork, newQueueSize, this));
+ }
+ }
+
+ /**
+ * Queued task called when the ThreadPool is initialized.
+ *
+ * When executed, we notify our listener that the queue is fully initialized.
+ * By default there are no threads, so this allows the listener to specify
+ * a pool size.
+ *
+ */
+ class Initialized : public Work
+ {
+ public:
+ Initialized(ThreadPool *pool)
+ : mPool(pool) { }
+
+ void execute()
+ {
+ mPool->mListener->initialized(mPool);
+ }
+ private:
+ ThreadPool *mPool;
+ };
+
+ void initialized()
{
boost::lock_guard<boost::mutex> lock(mQueueLock);
if (!mShuttingDown)
{
- mControlQueue->enqueueWork(new WorkAdded(numNewWork, wasEmpty, this));
+ mControlQueue->enqueueWork(new Initialized(this));
}
}
@@ -419,9 +456,9 @@ public:
/**
* Results in PoolListener::queueWorkAdded being called
*/
- void workAdded(const QueueBasePtr&, Ice::Long numNewWork, bool wasEmpty)
+ void workAdded(const QueueBasePtr&, Ice::Long numNewWork, Ice::Long newQueueSize)
{
- mThreadPool->handleWorkAdded(numNewWork, wasEmpty);
+ mThreadPool->handleWorkAdded(numNewWork, newQueueSize);
}
/**
diff --git a/src/WorkQueue/DefaultQueueListener.cpp b/src/WorkQueue/DefaultQueueListener.cpp
index 4d46860..2376274 100644
--- a/src/WorkQueue/DefaultQueueListener.cpp
+++ b/src/WorkQueue/DefaultQueueListener.cpp
@@ -108,9 +108,9 @@ DefaultQueueListener::~DefaultQueueListener()
}
}
-void DefaultQueueListener::workAdded(const QueueBasePtr&, Ice::Long, bool wasEmpty)
+void DefaultQueueListener::workAdded(const QueueBasePtr&, Ice::Long numWorkAdded, Ice::Long newQueueSize)
{
- if (wasEmpty)
+ if (numWorkAdded == newQueueSize)
{
mPriv->setDead(false);
}
diff --git a/src/WorkQueue/SuspendableWorkQueue.cpp b/src/WorkQueue/SuspendableWorkQueue.cpp
index 0419bb3..6939e36 100644
--- a/src/WorkQueue/SuspendableWorkQueue.cpp
+++ b/src/WorkQueue/SuspendableWorkQueue.cpp
@@ -243,39 +243,39 @@ SuspendableWorkQueue::SuspendableWorkQueue(const QueueListenerPtr& listener)
void SuspendableWorkQueue::enqueueWork(const SuspendableWorkPtr& work)
{
- bool wasEmpty;
+ Ice::Long newSize = 0;
QueueListenerPtr listenerRef;
{
boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
mPriv->checkForShuttingDown();
//Call private version so we don't double grab the lock
- wasEmpty = mPriv->getSize() == 0;
+ newSize = mPriv->getSize() + 1;
mPriv->mQueue.push_back(work);
listenerRef = mPriv->mListener;
}
if (listenerRef != 0)
{
- listenerRef->workAdded(this, 1, wasEmpty);
+ listenerRef->workAdded(this, 1, newSize);
}
}
void SuspendableWorkQueue::enqueueWorkSeq(const SuspendableWorkSeq& works)
{
- bool wasEmpty;
+ int newSize = 0;
QueueListenerPtr listenerRef;
{
boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
mPriv->checkForShuttingDown();
//Call private version so we don't double grab the lock
- wasEmpty = mPriv->getSize() == 0;
+ newSize = mPriv->getSize() + works.size();
mPriv->mQueue.insert(mPriv->mQueue.end(), works.begin(), works.end());
listenerRef = mPriv->mListener;
}
if (listenerRef != 0)
{
- listenerRef->workAdded(this, works.size(), wasEmpty);
+ listenerRef->workAdded(this, works.size(), newSize);
}
}
diff --git a/src/WorkQueue/WorkQueue.cpp b/src/WorkQueue/WorkQueue.cpp
index 34a022e..0e3eca8 100644
--- a/src/WorkQueue/WorkQueue.cpp
+++ b/src/WorkQueue/WorkQueue.cpp
@@ -102,18 +102,19 @@ void WorkQueue::enqueueWork(const WorkPtr& work)
void WorkQueue::enqueueWorkSeq(const WorkSeq& works)
{
bool wasEmpty;
+ int newSize=0;
QueueListenerPtr listenerRef;
{
boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
mPriv->checkForShuttingDown();
- wasEmpty = mPriv->mQueue.empty();
listenerRef = mPriv->mListener;
mPriv->mQueue.insert(mPriv->mQueue.end(), works.begin(), works.end());
+ newSize = mPriv->mQueue.size();
}
if (listenerRef != 0)
{
- listenerRef->workAdded(this, static_cast<long>(works.size()), wasEmpty);
+ listenerRef->workAdded(this, static_cast<long>(works.size()), newSize);
}
}
diff --git a/test/ThreadPool/TestThreadPool.cpp b/test/ThreadPool/TestThreadPool.cpp
index 5f77851..e2356a8 100644
--- a/test/ThreadPool/TestThreadPool.cpp
+++ b/test/ThreadPool/TestThreadPool.cpp
@@ -31,7 +31,7 @@ class TestListener : public PoolListener
{
public:
TestListener() : mActive(0), mIdle(0), mZombie(0), mTasks(0),
- mWorkAddedNotice(false), mWasEmpty(false), mEmptyNotice(false) { }
+ mWorkAddedNotice(false), mWasEmpty(false), mEmptyNotice(false), mInitialized(false) { }
void stateChanged(const PoolPtr&, Ice::Long active, Ice::Long idle, Ice::Long zombie)
{
@@ -42,11 +42,11 @@ public:
mDone.notify_one();
}
- void queueWorkAdded(const PoolPtr&, Ice::Long count, bool wasEmpty)
+ void queueWorkAdded(const PoolPtr&, Ice::Long count, Ice::Long newQueueSize)
{
boost::lock_guard<boost::mutex> lock(mLock);
mTasks = count;
- mWasEmpty = wasEmpty;
+ mWasEmpty = (newQueueSize == count);
mWorkAddedNotice = true;
mDone.notify_one();
}
@@ -57,6 +57,11 @@ public:
mEmptyNotice = true;
}
+ void initialized(const PoolPtr&)
+ {
+ mInitialized = true;
+ }
+
void threadStart()
{
}
@@ -73,6 +78,7 @@ public:
bool mWorkAddedNotice;
bool mWasEmpty;
bool mEmptyNotice;
+ bool mInitialized;
boost::mutex mLock;
boost::condition_variable mDone;
@@ -187,6 +193,7 @@ BOOST_AUTO_TEST_CASE(addWork)
waitForWorkNotice(listener);
+ BOOST_CHECK(listener->mInitialized == true);
BOOST_CHECK(listener->mWorkAddedNotice == true);
BOOST_CHECK(listener->mWasEmpty == true);
BOOST_CHECK(listener->mTasks == 1);
diff --git a/test/WorkQueue/TestSuspendableWorkQueue.cpp b/test/WorkQueue/TestSuspendableWorkQueue.cpp
index 7e6d6a0..ed75dab 100644
--- a/test/WorkQueue/TestSuspendableWorkQueue.cpp
+++ b/test/WorkQueue/TestSuspendableWorkQueue.cpp
@@ -32,10 +32,10 @@ public:
resumableNotice(false),
shutdownNotice(false) { }
- void workAdded(const QueueBasePtr&, Ice::Long, bool wasEmpty)
+ void workAdded(const QueueBasePtr&, Ice::Long added, Ice::Long newSize)
{
addedNotice = true;
- addedEmptyNotice = wasEmpty;
+ addedEmptyNotice = (newSize == added);
}
void emptied(const QueueBasePtr&)
diff --git a/test/WorkQueue/TestWorkQueue.cpp b/test/WorkQueue/TestWorkQueue.cpp
index 99dd979..dc842a8 100644
--- a/test/WorkQueue/TestWorkQueue.cpp
+++ b/test/WorkQueue/TestWorkQueue.cpp
@@ -32,10 +32,10 @@ public:
addedEmptyNotice(false),
emptyNotice(false), shutdownNotice(false) { }
- void workAdded(const QueueBasePtr&, Ice::Long numNewWork, bool wasEmpty)
+ void workAdded(const QueueBasePtr&, Ice::Long numNewWork, Ice::Long newQueueSize)
{
addedNotice = true;
- addedEmptyNotice = wasEmpty;
+ addedEmptyNotice = (newQueueSize == numNewWork);
tasksAdded = numNewWork;
}
-----------------------------------------------------------------------
--
asterisk-scf/integration/ice-util-cpp.git
More information about the asterisk-scf-commits
mailing list