[asterisk-scf-commits] asterisk-scf/integration/ice-util-cpp.git branch "workqueue" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Fri Apr 22 18:14:06 CDT 2011
branch "workqueue" has been updated
via de35ff4fc373c9ff094d829edb5f49560a041a13 (commit)
via d39c4bcb29e915d8280db0ad8f79fa4a10776fd6 (commit)
via 4777c584cd5fe65bce274934d30800b020bfb91f (commit)
via f486e4d7f6d69ebb2f9e10761784a4b67100968d (commit)
via bc6730c67619939c6f18e256d20e7af6e4cc2912 (commit)
via 157e8b7fa03a4a6a4157ccf7363152bfd1a9ff61 (commit)
via 7f20d7dc771b827c218cced058407ba356f16008 (commit)
via 824604f5626f26fb023845ebba935317be589796 (commit)
from 4c34a2ae84137864c55e7e62d72b83ed8faf9acc (commit)
Summary of changes:
ThreadPool/include/AsteriskSCF/ThreadPool.h | 48 +-------
ThreadPool/include/AsteriskSCF/WorkerThread.h | 16 ++--
ThreadPool/src/ThreadPool.cpp | 159 ++++++++++++++-----------
ThreadPool/src/WorkerThread.cpp | 12 ++-
ThreadPool/test/TestThreadPool.cpp | 30 +++--
WorkQueue/src/DefaultQueueListener.cpp | 10 +-
WorkQueue/src/SuspendableWorkQueue.cpp | 2 +-
7 files changed, 139 insertions(+), 138 deletions(-)
- Log -----------------------------------------------------------------
commit de35ff4fc373c9ff094d829edb5f49560a041a13
Author: Mark Michelson <mmichelson at digium.com>
Date: Fri Apr 22 18:00:10 2011 -0500
Remove an unnecessary call since boost::thread has an == overload.
diff --git a/ThreadPool/src/WorkerThread.cpp b/ThreadPool/src/WorkerThread.cpp
index 98e3aa7..821b3df 100644
--- a/ThreadPool/src/WorkerThread.cpp
+++ b/ThreadPool/src/WorkerThread.cpp
@@ -112,7 +112,7 @@ void WorkerThread::setState(ThreadState newState)
bool WorkerThread::operator==(const WorkerThread& rhs)
{
- return mPriv->mThread.get_id() == rhs.mPriv->mThread.get_id();
+ return mPriv->mThread == rhs.mPriv->mThread;
}
}; //end namespace ThreadPool
commit d39c4bcb29e915d8280db0ad8f79fa4a10776fd6
Author: Mark Michelson <mmichelson at digium.com>
Date: Fri Apr 22 17:59:26 2011 -0500
Privatise the ThreadQueueListener more.
diff --git a/ThreadPool/include/AsteriskSCF/ThreadPool.h b/ThreadPool/include/AsteriskSCF/ThreadPool.h
index 5831534..d86fa92 100644
--- a/ThreadPool/include/AsteriskSCF/ThreadPool.h
+++ b/ThreadPool/include/AsteriskSCF/ThreadPool.h
@@ -29,37 +29,14 @@ namespace ThreadPool
// AsteriskSCF/System/ThreadPool/ThreadPoolIf.ice
class ThreadPool;
-/**
- * An implementation of a QueueListener used by a ThreadPool.
- *
- * The main job of this listener is to redirect the events to the
- * Pool's PoolListener.
- *
- * For mor information on the QueueListener's methods, see
- * AsteriskSCF/System/WorkQueue/WorkQueueIf.ice
- */
-class ThreadQueueListener : public AsteriskSCF::System::WorkQueue::V1::QueueListener
+class ThreadQueueListener;
+typedef IceUtil::Handle<ThreadQueueListener> ThreadQueueListenerPtr;
+
+class ThreadQueueListenerFactory
{
public:
- ThreadQueueListener(ThreadPool *pool);
-
- /**
- * Results in PoolListener::queueWorkAdded being called
- */
- void workAdded(int numNewWork, bool wasEmpty);
-
- /**
- * Should never be called since a ThreadPool does not
- * use a SuspendableQueue
- */
- void workResumable();
-
- /**
- * Results in PoolListener::queueEmptied being called
- */
- void emptied();
-private:
- ThreadPool *mThreadPool;
+ ThreadQueueListenerFactory();
+ ThreadQueueListenerPtr createThreadQueueListener(ThreadPool *pool);
};
// For more information on these methods, see
diff --git a/ThreadPool/src/ThreadPool.cpp b/ThreadPool/src/ThreadPool.cpp
index 5aee763..3b984c7 100644
--- a/ThreadPool/src/ThreadPool.cpp
+++ b/ThreadPool/src/ThreadPool.cpp
@@ -42,7 +42,10 @@ public:
: mControlQueue(new AsteriskSCF::WorkQueue::WorkQueue), mListener(listener), mQueue(queue), mShuttingDown(false)
{
mControlQueue->setListener(new AsteriskSCF::WorkQueue::DefaultQueueListener(mControlQueue));
- mQueue->setListener(new ThreadQueueListener(this));
+ ThreadQueueListenerPtr tqListener;
+ ThreadQueueListenerFactory factory;
+ tqListener = factory.createThreadQueueListener(this);
+ mQueue->setListener(tqListener);
}
~ThreadPool()
@@ -313,37 +316,66 @@ public:
boost::mutex mQueueLock;
};
-ThreadQueueListener::ThreadQueueListener(ThreadPool *priv)
- : mThreadPool(priv)
-{
-}
-
-void ThreadQueueListener::workAdded(int numNewWork, bool wasEmpty)
+/**
+ * An implementation of a QueueListener used by a ThreadPool.
+ *
+ * The main job of this listener is to redirect the events to the
+ * Pool's PoolListener.
+ *
+ * For mor information on the QueueListener's methods, see
+ * AsteriskSCF/System/WorkQueue/WorkQueueIf.ice
+ */
+class ThreadQueueListener : public AsteriskSCF::System::WorkQueue::V1::QueueListener
{
- mThreadPool->mListener->queueWorkAdded(mThreadPool, numNewWork, wasEmpty);
- // XXX This could potentially stand to be more sophisticated, but poking all the
- // idle threads will get the job done too.
- //
- // A potential alternative would be to poke a number of idle threads equal to the
- // new work count.
- for (ThreadIterator i = mThreadPool->mIdleThreads.begin();
- i != mThreadPool->mIdleThreads.end(); ++i)
+public:
+ ThreadQueueListener(ThreadPool *pool)
+ : mThreadPool(pool) { }
+ /**
+ * Results in PoolListener::queueWorkAdded being called
+ */
+ void workAdded(int numNewWork, bool wasEmpty)
{
- (*i)->setState(Active);
+ mThreadPool->mListener->queueWorkAdded(mThreadPool, numNewWork, wasEmpty);
+ // XXX This could potentially stand to be more sophisticated, but poking all the
+ // idle threads will get the job done too.
+ //
+ // A potential alternative would be to poke a number of idle threads equal to the
+ // new work count.
+ for (ThreadIterator i = mThreadPool->mIdleThreads.begin();
+ i != mThreadPool->mIdleThreads.end(); ++i)
+ {
+ (*i)->setState(Active);
+ }
}
-}
-void ThreadQueueListener::workResumable()
-{
- /* This should never be called since the ThreadPool does not
+ /**
+ * Should never be called since a ThreadPool does not
* use a SuspendableQueue
*/
- assert(false);
-}
+ void workResumable()
+ {
+ /* This should never be called since the ThreadPool does not
+ * use a SuspendableQueue
+ */
+ assert(false);
+ }
+
+ /**
+ * Results in PoolListener::queueEmptied being called
+ */
+ void emptied()
+ {
+ mThreadPool->mListener->queueEmptied(mThreadPool);
+ }
+private:
+ ThreadPool *mThreadPool;
+};
+
+ThreadQueueListenerFactory::ThreadQueueListenerFactory() { }
-void ThreadQueueListener::emptied()
+ThreadQueueListenerPtr ThreadQueueListenerFactory::createThreadQueueListener(ThreadPool* pool)
{
- mThreadPool->mListener->queueEmptied(mThreadPool);
+ return new ThreadQueueListener(pool);
}
ThreadPoolFactory::ThreadPoolFactory() { }
commit 4777c584cd5fe65bce274934d30800b020bfb91f
Author: Mark Michelson <mmichelson at digium.com>
Date: Fri Apr 22 16:08:22 2011 -0500
Don't confuse matters by returning false when an Ice handle is the return type.
diff --git a/WorkQueue/src/SuspendableWorkQueue.cpp b/WorkQueue/src/SuspendableWorkQueue.cpp
index 9893212..d7bc7cf 100644
--- a/WorkQueue/src/SuspendableWorkQueue.cpp
+++ b/WorkQueue/src/SuspendableWorkQueue.cpp
@@ -62,7 +62,7 @@ public:
assert(currentWork == 0);
if (mQueue.empty())
{
- return false;
+ return 0;
}
work = mQueue.front();
mQueue.pop_front();
commit f486e4d7f6d69ebb2f9e10761784a4b67100968d
Author: Mark Michelson <mmichelson at digium.com>
Date: Fri Apr 22 15:26:19 2011 -0500
Do some re-arranging to privatize things that could be made more private.
This results in the ThreadPool class being declared entirely in ThreadPool.cpp,
thus alleviating the need for a ThreadPoolPriv class.
diff --git a/ThreadPool/include/AsteriskSCF/ThreadPool.h b/ThreadPool/include/AsteriskSCF/ThreadPool.h
index 0d38c76..5831534 100644
--- a/ThreadPool/include/AsteriskSCF/ThreadPool.h
+++ b/ThreadPool/include/AsteriskSCF/ThreadPool.h
@@ -25,19 +25,9 @@ namespace AsteriskSCF
namespace ThreadPool
{
-class ThreadPoolPriv;
-
// For more information on these methods, see
// AsteriskSCF/System/ThreadPool/ThreadPoolIf.ice
-class ThreadPool : public AsteriskSCF::System::ThreadPool::V1::Pool
-{
-public:
- ThreadPool(const AsteriskSCF::System::ThreadPool::V1::PoolListenerPtr& listener, const AsteriskSCF::System::WorkQueue::V1::QueuePtr& queue);
- void setSize(::Ice::Int);
- AsteriskSCF::System::WorkQueue::V1::QueuePtr getQueue();
-private:
- boost::shared_ptr<ThreadPoolPriv> mThreadPoolPriv;
-};
+class ThreadPool;
/**
* An implementation of a QueueListener used by a ThreadPool.
@@ -51,8 +41,8 @@ private:
class ThreadQueueListener : public AsteriskSCF::System::WorkQueue::V1::QueueListener
{
public:
- ThreadQueueListener(ThreadPoolPriv *Priv,
- AsteriskSCF::System::ThreadPool::V1::Pool *pool);
+ ThreadQueueListener(ThreadPool *pool);
+
/**
* Results in PoolListener::queueWorkAdded being called
*/
@@ -69,8 +59,7 @@ public:
*/
void emptied();
private:
- ThreadPoolPriv *mThreadPoolPriv;
- AsteriskSCF::System::ThreadPool::V1::Pool *mPool;
+ ThreadPool *mThreadPool;
};
// For more information on these methods, see
diff --git a/ThreadPool/src/ThreadPool.cpp b/ThreadPool/src/ThreadPool.cpp
index 2b72598..5aee763 100644
--- a/ThreadPool/src/ThreadPool.cpp
+++ b/ThreadPool/src/ThreadPool.cpp
@@ -34,18 +34,18 @@ class ThreadQueueListener;
typedef std::vector<WorkerThreadPtr> ThreadContainer;
typedef ThreadContainer::iterator ThreadIterator;
-class ThreadPoolPriv : public WorkerThreadListener
+class ThreadPool : public Pool, public WorkerThreadListener
{
public:
- ThreadPoolPriv(const PoolListenerPtr& listener, const QueuePtr& queue, Pool *pool)
- : mControlQueue(new AsteriskSCF::WorkQueue::WorkQueue), mListener(listener), mQueue(queue), mPool(pool), mShuttingDown(false)
+ ThreadPool(const PoolListenerPtr& listener, const QueuePtr& queue)
+ : mControlQueue(new AsteriskSCF::WorkQueue::WorkQueue), mListener(listener), mQueue(queue), mShuttingDown(false)
{
mControlQueue->setListener(new AsteriskSCF::WorkQueue::DefaultQueueListener(mControlQueue));
- mQueue->setListener(new ThreadQueueListener(this, mPool));
+ mQueue->setListener(new ThreadQueueListener(this));
}
- ~ThreadPoolPriv()
+ ~ThreadPool()
{
//The tricky thing when destroying the thread pool is that
//we have to be sure that no queued operations from the worker
@@ -67,12 +67,31 @@ public:
killThreads(mZombieThreads, mZombieThreads.begin(), mZombieThreads.end());
}
+ /**
+ * Override of AsteriskSCF::System::ThreadPool::V1::Pool::setSize
+ */
+ void setSize(int size)
+ {
+ if (size >= 0)
+ {
+ resize(size);
+ }
+ }
+
+ /**
+ * Override of AsteriskSCF::System::ThreadPool::V1::Pool::getQueue
+ */
+ QueuePtr getQueue()
+ {
+ return mQueue;
+ }
+
void sendStateChanged()
{
int activeSize = mActiveThreads.size();
int idleSize = mIdleThreads.size();
int zombieSize = mZombieThreads.size();
- mListener->stateChanged(mPool, activeSize, idleSize, zombieSize);
+ mListener->stateChanged(this, activeSize, idleSize, zombieSize);
}
void killThreads(ThreadContainer& container,
@@ -101,7 +120,7 @@ public:
class ActiveThreadIdle : public Work
{
public:
- ActiveThreadIdle(WorkerThreadPtr thread, ThreadPoolPriv *priv)
+ ActiveThreadIdle(WorkerThreadPtr thread, ThreadPool *priv)
: mPriv(priv), mWorkerThread(thread) { }
void execute()
@@ -118,7 +137,7 @@ public:
mPriv->sendStateChanged();
}
private:
- ThreadPoolPriv *mPriv;
+ ThreadPool *mPriv;
WorkerThreadPtr mWorkerThread;
};
@@ -144,7 +163,7 @@ public:
class ZombieThreadDead : public Work
{
public:
- ZombieThreadDead(WorkerThreadPtr thread, ThreadPoolPriv *priv)
+ ZombieThreadDead(WorkerThreadPtr thread, ThreadPool *priv)
: mPriv(priv), mWorkerThread(thread) { }
void execute()
@@ -158,7 +177,7 @@ public:
mPriv->sendStateChanged();
}
private:
- ThreadPoolPriv *mPriv;
+ ThreadPool *mPriv;
WorkerThreadPtr mWorkerThread;
};
@@ -184,7 +203,7 @@ public:
class Resize : public Work
{
public:
- Resize(int numThreads, ThreadPoolPriv *priv)
+ Resize(int numThreads, ThreadPool *priv)
: mNumThreads(numThreads), mPriv(priv) { }
void execute()
{
@@ -250,7 +269,7 @@ public:
}
const size_t mNumThreads;
- ThreadPoolPriv *mPriv;
+ ThreadPool *mPriv;
};
typedef IceUtil::Handle<Resize> ResizePtr;
@@ -279,11 +298,6 @@ public:
PoolListenerPtr mListener;
QueuePtr mQueue;
- //The ThreadPoolPriv has to keep a pointer to
- //the public portion of the Pool since all listener
- //methods take a pointer to the Pool as a parameter.
- Pool *mPool;
-
ThreadContainer mActiveThreads;
ThreadContainer mIdleThreads;
ThreadContainer mZombieThreads;
@@ -299,21 +313,21 @@ public:
boost::mutex mQueueLock;
};
-ThreadQueueListener::ThreadQueueListener(ThreadPoolPriv *priv, Pool *pool)
- : mThreadPoolPriv(priv), mPool(pool)
+ThreadQueueListener::ThreadQueueListener(ThreadPool *priv)
+ : mThreadPool(priv)
{
}
void ThreadQueueListener::workAdded(int numNewWork, bool wasEmpty)
{
- mThreadPoolPriv->mListener->queueWorkAdded(mPool, numNewWork, wasEmpty);
+ mThreadPool->mListener->queueWorkAdded(mThreadPool, numNewWork, wasEmpty);
// XXX This could potentially stand to be more sophisticated, but poking all the
// idle threads will get the job done too.
//
// A potential alternative would be to poke a number of idle threads equal to the
// new work count.
- for (ThreadIterator i = mThreadPoolPriv->mIdleThreads.begin();
- i != mThreadPoolPriv->mIdleThreads.end(); ++i)
+ for (ThreadIterator i = mThreadPool->mIdleThreads.begin();
+ i != mThreadPool->mIdleThreads.end(); ++i)
{
(*i)->setState(Active);
}
@@ -329,25 +343,7 @@ void ThreadQueueListener::workResumable()
void ThreadQueueListener::emptied()
{
- mThreadPoolPriv->mListener->queueEmptied(mPool);
-}
-
-ThreadPool::ThreadPool(const PoolListenerPtr& listener, const QueuePtr& queue)
- : mThreadPoolPriv(new ThreadPoolPriv(listener, queue, this))
-{
-}
-
-void ThreadPool::setSize(int size)
-{
- if (size >= 0)
- {
- mThreadPoolPriv->resize(size);
- }
-}
-
-QueuePtr ThreadPool::getQueue()
-{
- return mThreadPoolPriv->mQueue;
+ mThreadPool->mListener->queueEmptied(mThreadPool);
}
ThreadPoolFactory::ThreadPoolFactory() { }
diff --git a/ThreadPool/test/TestThreadPool.cpp b/ThreadPool/test/TestThreadPool.cpp
index 99e7e95..cef9102 100644
--- a/ThreadPool/test/TestThreadPool.cpp
+++ b/ThreadPool/test/TestThreadPool.cpp
@@ -172,7 +172,8 @@ BOOST_AUTO_TEST_CASE(threadCreation)
TestListenerPtr listener(new TestListener);
QueuePtr queue(new WorkQueue());
- PoolPtr pool(new ThreadPool(listener, queue));
+ ThreadPoolFactoryPtr factory(new ThreadPoolFactory);
+ PoolPtr pool = factory->createPool(listener, queue);
pool->setSize(1);
@@ -192,7 +193,8 @@ BOOST_AUTO_TEST_CASE(threadDestruction)
TestListenerPtr listener(new TestListener);
QueuePtr queue(new WorkQueue());
- PoolPtr pool(new ThreadPool(listener, queue));
+ ThreadPoolFactoryPtr factory(new ThreadPoolFactory);
+ PoolPtr pool = factory->createPool(listener, queue);
pool->setSize(3);
@@ -218,7 +220,8 @@ BOOST_AUTO_TEST_CASE(oneTaskOneThread)
TestListenerPtr listener(new TestListener);
QueuePtr queue(new WorkQueue());
SimpleTaskPtr work(new SimpleTask());
- PoolPtr pool(new ThreadPool(listener, queue));
+ ThreadPoolFactoryPtr factory(new ThreadPoolFactory);
+ PoolPtr pool = factory->createPool(listener, queue);
queue->enqueueWork(work);
pool->setSize(1);
@@ -245,7 +248,8 @@ BOOST_AUTO_TEST_CASE(oneThreadOneTask)
TestListenerPtr listener(new TestListener);
QueuePtr queue(new WorkQueue());
- PoolPtr pool(new ThreadPool(listener, queue));
+ ThreadPoolFactoryPtr factory(new ThreadPoolFactory);
+ PoolPtr pool = factory->createPool(listener, queue);
SimpleTaskPtr work(new SimpleTask());
pool->setSize(1);
@@ -275,7 +279,8 @@ BOOST_AUTO_TEST_CASE(oneThreadMultipleTasks)
TestListenerPtr listener(new TestListener);
QueuePtr queue(new WorkQueue());
- PoolPtr pool(new ThreadPool(listener, queue));
+ ThreadPoolFactoryPtr factory(new ThreadPoolFactory);
+ PoolPtr pool = factory->createPool(listener, queue);
SimpleTaskPtr work1(new SimpleTask());
SimpleTaskPtr work2(new SimpleTask());
SimpleTaskPtr work3(new SimpleTask());
@@ -312,7 +317,8 @@ BOOST_AUTO_TEST_CASE(taskDistribution)
TestListenerPtr listener(new TestListener);
QueuePtr queue(new WorkQueue());
- PoolPtr pool(new ThreadPool(listener, queue));
+ ThreadPoolFactoryPtr factory(new ThreadPoolFactory);
+ PoolPtr pool = factory->createPool(listener, queue);
ComplexTaskPtr work1(new ComplexTask());
ComplexTaskPtr work2(new ComplexTask());
WorkSeq works;
@@ -362,7 +368,8 @@ BOOST_AUTO_TEST_CASE(zombies)
TestListenerPtr listener(new TestListener);
QueuePtr queue(new WorkQueue());
- PoolPtr pool(new ThreadPool(listener, queue));
+ ThreadPoolFactoryPtr factory(new ThreadPoolFactory);
+ PoolPtr pool = factory->createPool(listener, queue);
ComplexTaskPtr work1(new ComplexTask());
ComplexTaskPtr work2(new ComplexTask());
WorkSeq works;
@@ -418,7 +425,8 @@ BOOST_AUTO_TEST_CASE(moreThreadDestruction)
TestListenerPtr listener(new TestListener);
QueuePtr queue(new WorkQueue());
- PoolPtr pool(new ThreadPool(listener, queue));
+ ThreadPoolFactoryPtr factory(new ThreadPoolFactory);
+ PoolPtr pool = factory->createPool(listener, queue);
ComplexTaskPtr work1(new ComplexTask());
ComplexTaskPtr work2(new ComplexTask());
WorkSeq works;
commit bc6730c67619939c6f18e256d20e7af6e4cc2912
Author: Mark Michelson <mmichelson at digium.com>
Date: Fri Apr 22 11:48:36 2011 -0500
Change "poke" to "setState"
diff --git a/ThreadPool/include/AsteriskSCF/WorkerThread.h b/ThreadPool/include/AsteriskSCF/WorkerThread.h
index 2ee1b1b..36f0124 100644
--- a/ThreadPool/include/AsteriskSCF/WorkerThread.h
+++ b/ThreadPool/include/AsteriskSCF/WorkerThread.h
@@ -63,7 +63,7 @@ public:
*
* @param newState The state to change to
*/
- void poke(ThreadState newState);
+ void setState(ThreadState newState);
bool operator==(const WorkerThread& rhs);
private:
diff --git a/ThreadPool/src/ThreadPool.cpp b/ThreadPool/src/ThreadPool.cpp
index dab2a0c..2b72598 100644
--- a/ThreadPool/src/ThreadPool.cpp
+++ b/ThreadPool/src/ThreadPool.cpp
@@ -86,7 +86,7 @@ public:
for (ThreadIterator iter = first; iter != last; ++iter)
{
mZombieThreads.push_back(*iter);
- (*iter)->poke(Zombie);
+ (*iter)->setState(Zombie);
}
mActiveThreads.erase(first, last);
}
@@ -315,7 +315,7 @@ void ThreadQueueListener::workAdded(int numNewWork, bool wasEmpty)
for (ThreadIterator i = mThreadPoolPriv->mIdleThreads.begin();
i != mThreadPoolPriv->mIdleThreads.end(); ++i)
{
- (*i)->poke(Active);
+ (*i)->setState(Active);
}
}
diff --git a/ThreadPool/src/WorkerThread.cpp b/ThreadPool/src/WorkerThread.cpp
index fa724f6..98e3aa7 100644
--- a/ThreadPool/src/WorkerThread.cpp
+++ b/ThreadPool/src/WorkerThread.cpp
@@ -99,11 +99,11 @@ WorkerThread::WorkerThread(const QueuePtr& workQueue, WorkerThreadListener *list
WorkerThread::~WorkerThread()
{
- poke(Dead);
+ setState(Dead);
mPriv->mThread.join();
}
-void WorkerThread::poke(ThreadState newState)
+void WorkerThread::setState(ThreadState newState)
{
boost::unique_lock<boost::mutex> lock(mPriv->mLock);
mPriv->mState = newState;
diff --git a/WorkQueue/src/DefaultQueueListener.cpp b/WorkQueue/src/DefaultQueueListener.cpp
index 563380c..77043cb 100644
--- a/WorkQueue/src/DefaultQueueListener.cpp
+++ b/WorkQueue/src/DefaultQueueListener.cpp
@@ -64,7 +64,7 @@ public:
}
}
- void poke(ListenerState newState)
+ void setState(ListenerState newState)
{
boost::unique_lock<boost::mutex> lock(mLock);
mState = newState;
@@ -83,18 +83,18 @@ DefaultQueueListener::DefaultQueueListener(const QueuePtr& queue)
DefaultQueueListener::~DefaultQueueListener()
{
- mPriv->poke(Dead);
+ mPriv->setState(Dead);
mPriv->mThread.join();
}
void DefaultQueueListener::workAdded(int, bool)
{
- mPriv->poke(Active);
+ mPriv->setState(Active);
}
void DefaultQueueListener::workResumable()
{
- mPriv->poke(Active);
+ mPriv->setState(Active);
}
void DefaultQueueListener::emptied()
commit 157e8b7fa03a4a6a4157ccf7363152bfd1a9ff61
Author: Mark Michelson <mmichelson at digium.com>
Date: Fri Apr 22 11:45:53 2011 -0500
Get rid of some unused parameters.
diff --git a/WorkQueue/src/DefaultQueueListener.cpp b/WorkQueue/src/DefaultQueueListener.cpp
index 651ff94..563380c 100644
--- a/WorkQueue/src/DefaultQueueListener.cpp
+++ b/WorkQueue/src/DefaultQueueListener.cpp
@@ -87,7 +87,7 @@ DefaultQueueListener::~DefaultQueueListener()
mPriv->mThread.join();
}
-void DefaultQueueListener::workAdded(int numNewWork, bool wasEmpty)
+void DefaultQueueListener::workAdded(int, bool)
{
mPriv->poke(Active);
}
commit 7f20d7dc771b827c218cced058407ba356f16008
Author: Mark Michelson <mmichelson at digium.com>
Date: Fri Apr 22 11:44:53 2011 -0500
Convert worker threads to be smart pointers instead of bare pointers.
diff --git a/ThreadPool/include/AsteriskSCF/WorkerThread.h b/ThreadPool/include/AsteriskSCF/WorkerThread.h
index 741be3a..2ee1b1b 100644
--- a/ThreadPool/include/AsteriskSCF/WorkerThread.h
+++ b/ThreadPool/include/AsteriskSCF/WorkerThread.h
@@ -53,25 +53,25 @@ class WorkerThreadListener;
class WorkerThreadPriv;
-class WorkerThread
+class WorkerThread : public IceUtil::Shared
{
public:
WorkerThread(const AsteriskSCF::System::WorkQueue::V1::QueuePtr& workQueue, WorkerThreadListener *listener);
+ ~WorkerThread();
/**
* Tells a worker thread to change states
*
* @param newState The state to change to
*/
void poke(ThreadState newState);
- /**
- * Wait for the worker thread to complete
- */
- void join();
+ bool operator==(const WorkerThread& rhs);
private:
boost::shared_ptr<WorkerThreadPriv> mPriv;
};
+typedef IceUtil::Handle<WorkerThread> WorkerThreadPtr;
+
/**
* An listener for WorkerThread state changes
*/
@@ -81,11 +81,11 @@ public:
/**
* An active thread has become idle
*/
- virtual void activeThreadIdle(WorkerThread *thread) = 0;
+ virtual void activeThreadIdle(WorkerThreadPtr thread) = 0;
/**
* A zombie thread has died
*/
- virtual void zombieThreadDead(WorkerThread *thread) = 0;
+ virtual void zombieThreadDead(WorkerThreadPtr thread) = 0;
};
}; // end namespace ThreadPool
diff --git a/ThreadPool/src/ThreadPool.cpp b/ThreadPool/src/ThreadPool.cpp
index f8b55fb..dab2a0c 100644
--- a/ThreadPool/src/ThreadPool.cpp
+++ b/ThreadPool/src/ThreadPool.cpp
@@ -31,7 +31,7 @@ using namespace AsteriskSCF::System::WorkQueue::V1;
class ThreadQueueListener;
-typedef std::vector<WorkerThread*> ThreadContainer;
+typedef std::vector<WorkerThreadPtr> ThreadContainer;
typedef ThreadContainer::iterator ThreadIterator;
class ThreadPoolPriv : public WorkerThreadListener
@@ -78,13 +78,6 @@ public:
void killThreads(ThreadContainer& container,
ThreadIterator first, ThreadIterator last)
{
- for (ThreadIterator iter = first; iter != last; ++iter)
- {
- WorkerThread *doomed = *iter;
- doomed->poke(Dead);
- doomed->join();
- delete doomed;
- }
container.erase(first, last);
}
@@ -108,7 +101,7 @@ public:
class ActiveThreadIdle : public Work
{
public:
- ActiveThreadIdle(WorkerThread *thread, ThreadPoolPriv *priv)
+ ActiveThreadIdle(WorkerThreadPtr thread, ThreadPoolPriv *priv)
: mPriv(priv), mWorkerThread(thread) { }
void execute()
@@ -126,12 +119,12 @@ public:
}
private:
ThreadPoolPriv *mPriv;
- WorkerThread *mWorkerThread;
+ WorkerThreadPtr mWorkerThread;
};
typedef IceUtil::Handle<ActiveThreadIdle> ActiveThreadIdlePtr;
- void activeThreadIdle(WorkerThread *thread)
+ void activeThreadIdle(WorkerThreadPtr thread)
{
boost::lock_guard<boost::mutex> lock(mQueueLock);
if (!mShuttingDown)
@@ -151,7 +144,7 @@ public:
class ZombieThreadDead : public Work
{
public:
- ZombieThreadDead(WorkerThread *thread, ThreadPoolPriv *priv)
+ ZombieThreadDead(WorkerThreadPtr thread, ThreadPoolPriv *priv)
: mPriv(priv), mWorkerThread(thread) { }
void execute()
@@ -166,12 +159,12 @@ public:
}
private:
ThreadPoolPriv *mPriv;
- WorkerThread *mWorkerThread;
+ WorkerThreadPtr mWorkerThread;
};
typedef IceUtil::Handle<ZombieThreadDead> ZombieThreadDeadPtr;
- void zombieThreadDead(WorkerThread *thread)
+ void zombieThreadDead(WorkerThreadPtr thread)
{
boost::lock_guard<boost::mutex> lock(mQueueLock);
if (!mShuttingDown)
@@ -225,7 +218,7 @@ public:
{
for (size_t i = 0; i < numNewThreads; ++i)
{
- WorkerThread *newThread(new WorkerThread(mPriv->mQueue, mPriv));
+ WorkerThreadPtr newThread(new WorkerThread(mPriv->mQueue, mPriv));
mPriv->mActiveThreads.push_back(newThread);
}
}
diff --git a/ThreadPool/src/WorkerThread.cpp b/ThreadPool/src/WorkerThread.cpp
index b65a355..fa724f6 100644
--- a/ThreadPool/src/WorkerThread.cpp
+++ b/ThreadPool/src/WorkerThread.cpp
@@ -97,6 +97,12 @@ public:
WorkerThread::WorkerThread(const QueuePtr& workQueue, WorkerThreadListener *listener)
: mPriv(new WorkerThreadPriv(workQueue, listener, this)) { }
+WorkerThread::~WorkerThread()
+{
+ poke(Dead);
+ mPriv->mThread.join();
+}
+
void WorkerThread::poke(ThreadState newState)
{
boost::unique_lock<boost::mutex> lock(mPriv->mLock);
@@ -104,9 +110,9 @@ void WorkerThread::poke(ThreadState newState)
mPriv->mCond.notify_one();
}
-void WorkerThread::join()
+bool WorkerThread::operator==(const WorkerThread& rhs)
{
- mPriv->mThread.join();
+ return mPriv->mThread.get_id() == rhs.mPriv->mThread.get_id();
}
}; //end namespace ThreadPool
commit 824604f5626f26fb023845ebba935317be589796
Author: Mark Michelson <mmichelson at digium.com>
Date: Fri Apr 22 11:16:05 2011 -0500
Get rid of unused parameter warnings in TestThreadPool.
diff --git a/ThreadPool/test/TestThreadPool.cpp b/ThreadPool/test/TestThreadPool.cpp
index 7417063..99e7e95 100644
--- a/ThreadPool/test/TestThreadPool.cpp
+++ b/ThreadPool/test/TestThreadPool.cpp
@@ -33,7 +33,7 @@ public:
TestListener() : mActive(0), mIdle(0), mZombie(0), mTasks(0),
mWorkAddedNotice(false), mWasEmpty(false), mEmptyNotice(false) { }
- void stateChanged(const PoolPtr& pool, int active, int idle, int zombie)
+ void stateChanged(const PoolPtr&, int active, int idle, int zombie)
{
boost::unique_lock<boost::mutex> lock(mLock);
mActive = active;
@@ -42,7 +42,7 @@ public:
mDone.notify_one();
}
- void queueWorkAdded(const PoolPtr& pool, int count, bool wasEmpty)
+ void queueWorkAdded(const PoolPtr&, int count, bool wasEmpty)
{
boost::lock_guard<boost::mutex> lock(mLock);
mTasks = count;
@@ -50,7 +50,7 @@ public:
mWorkAddedNotice = true;
}
- void queueEmptied(const PoolPtr& pool)
+ void queueEmptied(const PoolPtr&)
{
boost::lock_guard<boost::mutex> lock(mLock);
mEmptyNotice = true;
-----------------------------------------------------------------------
--
asterisk-scf/integration/ice-util-cpp.git
More information about the asterisk-scf-commits
mailing list