[asterisk-scf-commits] asterisk-scf/integration/ice-util-cpp.git branch "workqueue" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Fri Mar 25 09:35:47 CDT 2011
branch "workqueue" has been updated
via 894bfb514ff55ea84a7ac6294a037a579b45292e (commit)
from 77522e48b1b06b4b03c38c64b900f1459ead5c07 (commit)
Summary of changes:
{WorkQueue => ThreadPool}/CMakeLists.txt | 0
ThreadPool/include/AsteriskSCF/ThreadPool.h | 60 +++++++
ThreadPool/src/CMakeLists.txt | 23 +++
ThreadPool/src/ThreadPool.cpp | 239 +++++++++++++++++++++++++++
4 files changed, 322 insertions(+), 0 deletions(-)
copy {WorkQueue => ThreadPool}/CMakeLists.txt (100%)
create mode 100644 ThreadPool/include/AsteriskSCF/ThreadPool.h
create mode 100644 ThreadPool/src/CMakeLists.txt
create mode 100644 ThreadPool/src/ThreadPool.cpp
- Log -----------------------------------------------------------------
commit 894bfb514ff55ea84a7ac6294a037a579b45292e
Author: Mark Michelson <mmichelson at digium.com>
Date: Fri Mar 25 09:34:49 2011 -0500
Adding some initial code for ThreadPool implementation.
It likely doesn't even compile. I'm just pushing it so that I can
get to it when I work on it later over the weekend.
diff --git a/ThreadPool/CMakeLists.txt b/ThreadPool/CMakeLists.txt
new file mode 100644
index 0000000..b1a0931
--- /dev/null
+++ b/ThreadPool/CMakeLists.txt
@@ -0,0 +1,10 @@
+#
+# Asterisk Scalable Communications Framework
+#
+# Copyright (C) 2011 -- Digium, Inc.
+#
+# All rights reserved.
+#
+
+add_subdirectory(src)
+add_subdirectory(test)
diff --git a/ThreadPool/include/AsteriskSCF/ThreadPool.h b/ThreadPool/include/AsteriskSCF/ThreadPool.h
new file mode 100644
index 0000000..ebe3135
--- /dev/null
+++ b/ThreadPool/include/AsteriskSCF/ThreadPool.h
@@ -0,0 +1,60 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#pragma once
+
+#include <boost/shared_ptr.hpp>
+
+#include <AsteriskSCF/System/ThreadPool/ThreadPoolIf.h>
+
+namespace AsteriskSCF
+{
+namespace ThreadPool
+{
+
+class WorkerThread;
+
+class ThreadPool : public AsteriskSCF::System::ThreadPool::V1::Pool, public AsteriskSCF::System::WorkQueue::V1::QueueListener
+{
+public:
+ ThreadPool();
+ ThreadPool(int size);
+ void setSize(::Ice::Int);
+ AsteriskSCF::System::WorkQueue::V1::QueuePtr getQueue();
+private:
+ void grow(int numNewThreads);
+ void shrink(int threadsToKill);
+ void activeThreadIdle(const boost::shared_ptr<WorkerThread> &thread);
+ void ThreadPool::zombieThreadDead(const boost::shared_ptr<WorkerThread> &thread);
+
+ AsteriskSCF::System::ThreadPool::V1::PoolListenerPtr mListener;
+ AsteriskSCF::System::WorkQueue::V1::QueuePtr mQueue;
+
+ std::vector<boost::shared_ptr<WorkerThread> > mActiveThreads;
+ std::vector<boost::shared_ptr<WorkerThread> > mIdleThreads;
+ std::vector<boost::shared_ptr<WorkerThread> > mZombieThreads;
+};
+
+class ThreadPoolFactory : public AsteriskSCF::System::ThreadPool::V1::PoolFactory
+{
+public:
+ ThreadPoolFactory();
+ AsteriskSCF::System::ThreadPool::V1::PoolPtr createPool(const AsteriskSCF::System::ThreadPool::V1::PoolListenerPtr& listener,
+ const AsteriskSCF::System::WorkQueue::V1::QueuePtr& queue);
+private:
+};
+
+}; // end namespace ThreadPool
+}; // end namespace AsteriskSCF
diff --git a/ThreadPool/src/CMakeLists.txt b/ThreadPool/src/CMakeLists.txt
new file mode 100644
index 0000000..d26144c
--- /dev/null
+++ b/ThreadPool/src/CMakeLists.txt
@@ -0,0 +1,23 @@
+#
+# Asterisk Scalable Communications Framework
+#
+# Copyright (C) 2011 -- Digium, Inc.
+#
+# All rights reserved.
+#
+
+asterisk_scf_slice_include_directories(${API_SLICE_DIR})
+
+asterisk_scf_component_init(ThreadPool CXX)
+
+include_directories(../include)
+include_directories(${API_INCLUDE_DIR})
+
+asterisk_scf_component_add_file(ThreadPool
+ ../include/AsteriskSCF/ThreadPool.h)
+asterisk_scf_component_add_file(ThreadPool ThreadPool.cpp)
+asterisk_scf_component_add_boost_libraries(ThreadPool thread)
+
+asterisk_scf_component_build_library(ThreadPool)
+
+asterisk_scf_headers_install(../include/)
diff --git a/ThreadPool/src/ThreadPool.cpp b/ThreadPool/src/ThreadPool.cpp
new file mode 100644
index 0000000..b64d705
--- /dev/null
+++ b/ThreadPool/src/ThreadPool.cpp
@@ -0,0 +1,239 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#include <AsteriskSCF/ThreadPool.h>
+#include <AsteriskSCF/WorkQueue.h>
+
+namespace AsteriskSCF
+{
+namespace ThreadPool
+{
+
+using namespace AsteriskSCF::System::ThreadPool::V1;
+using namespace AsteriskSCF::System::WorkQueue::V1;
+
+class WorkerThread
+{
+public:
+ WorkerThread(const QueuePtr& workQueue)
+ : mQueue(workQueue), mState(Active), mThread(boost::bind(&WorkerThread::active, this)) { }
+
+ void active()
+ {
+ while (mState == Active)
+ {
+ if (!mQueue->executeWork())
+ {
+ idle();
+ }
+ }
+
+ // Reaching this portion means the thread is
+ // on death's door. It may have been killed while
+ // it was idle, in which case it can just die
+ // peacefully. If it's a zombie, though, then
+ // it needs to let the ThreadPoolImpl know so
+ // that the thread can be removed from the
+ // vector of zombie threads.
+ if (mState == Zombie)
+ {
+ mPool->zombieThreadDead(this);
+ }
+ }
+
+ void idle()
+ {
+ boost::unique_lock<boost::mutex> lock(mLock);
+ // If we've been turned into a zombie while we
+ // were active, then just go ahead and return.
+ if (mState == Zombie)
+ {
+ return;
+ }
+
+ // Otherwise, we'll set ourselves idle and wait
+ // for a poke
+ mState = Idle;
+ mPool->activeThreadIdle(this);
+ mCond.wait(lock);
+ }
+
+ void poke(ThreadState newState)
+ {
+ boost::unique_lock<boost::mutex> lock(mLock);
+ mState = newState;
+ mCond.notify_one();
+ }
+
+ enum ThreadState
+ {
+ /**
+ * Actively doing work
+ */
+ Active,
+ /**
+ * Nothing to do; waiting to be poked
+ */
+ Idle,
+ /**
+ * Marked for deletion. May still be executing
+ * code but next time it checks its state, it
+ * will die.
+ *
+ * Basically what happens when we try to kill
+ * a thread when it is Active
+ */
+ Zombie,
+ /**
+ * The ThreadPoolImpl considers this thread to
+ * be gone. The thread just needs to get out of
+ * the way ASAP.
+ *
+ * Basically what happens when we try to kill
+ * a thread when it is Idle
+ */
+ Dead
+ } mState;
+
+ QueuePtr mQueue;
+ boost::thread mThread;
+ boost::condition_variable mCond;
+ boost::mutex mLock;
+};
+
+void ThreadPool::grow(int numNewThreads)
+{
+ for (int i = 0; i < numNewThreads; ++i)
+ {
+ boost::shared_ptr<WorkerThread> newThread(new WorkerThread(mQueue));
+ mActiveThreads.push_back(newThread);
+ }
+}
+
+void ThreadPool::shrink(int threadsToKill)
+{
+ for (std::vector<boost::shared_ptr<WorkerThread> >::iterator i = mIdleThreads.begin();
+ i != mIdleThreads.end(); ++i)
+ {
+ mIdleThreads.erase(i);
+ (*i)->poke(WorkerThread::Dead);
+
+ if (--threadsToKill == 0)
+ {
+ return;
+ }
+ }
+
+ // If we've made it here, then it means that there weren't enough idle
+ // threads to kill. We'll need to zombify some active threads then.
+ for (std::vector<boost::shared_ptr<WorkerThread> >::iterator i = mActiveThreads.begin();
+ i != mThreads.end(); ++i)
+ {
+ //Active threads, on the other hand, need to at least temporarily be
+ //pushed into the zombie container.
+ mZombieThreads.push_back(*i);
+ mActiveThreads.erase(i);
+ (*i)->poke(WorkerThread::Zombie);
+
+ if (--threadsToKill == 0)
+ {
+ return;
+ }
+ }
+}
+
+void ThreadPool::activeThreadIdle(const boost::shared_ptr<WorkerThread> &thread)
+{
+ std::vector<boost::shared_ptr<WorkerThread> >::iterator iter =
+ std::find(mActiveThreads.begin(), mActiveThreads.end(), thread);
+
+ if (iter != mActiveThreads.end())
+ {
+ mIdleThreads.push_back(*iter);
+ mActiveThreads.erase(iter);
+ }
+ mListener->stateChanged(mActiveThreads.size(), mIdleThreads.size(), mZombieThreads.size());
+}
+
+void ThreadPool::zombieThreadDead(const boost::shared_ptr<WorkerThread> &thread)
+{
+ mZombieThreads.erase(std::find(mActiveThreads.begin(), mActiveThreads.end(), thread));
+ mListener->stateChanged(mActiveThreads.size(), mIdleThreads.size(), mZombieThreads.size());
+}
+
+ThreadPool::ThreadPool(const ThreadPoolListenerPtr &listener, const QueuePtr &queue)
+ : mQueue(queue), mListener(listener), mQueueListener(new ThreadPoolQueueListener(this))
+{
+ queue->setListener(mQueue);
+}
+
+void ThreadPool::setSize(int size)
+{
+ int currentSize = mActiveThreads.size() + mIdleThreads.size() + mZombieThreads.size();
+ if (size < currentSize)
+ {
+ shrink(currentSize - size);
+ }
+ else
+ {
+ grow(size - currentSize);
+ }
+ mListener->stateChanged(this, mActiveThreads.size(), mIdleThreads.size(), mZombieThreads.size());
+}
+
+QueuePtr ThreadPool::getQueue()
+{
+ return mQueue;
+}
+
+//QueueListener overloads
+
+void ThreadPool::workAdded(bool wasEmpty)
+{
+ // XXX The interface for PoolListener says that the second parameter of this
+ // method should be the amount of new items added. Well, that's just a pain to
+ // try to keep up with and report accurately. For now, I'm just going to report
+ // the total work count.
+ //
+ // If we want to report the amount of new work, then I would strongly suggest
+ // changing the QueueListener's workAdded method to include the amount of new
+ // work added.
+ mListener->queueWorkAdded(this, mQueue->workCount(), wasEmpty);
+ // XXX This could potentially stand to be more sophisticated, but poking all the
+ // idle threads will get the job done too.
+ //
+ // A potential alternative would be to poke a number of idle threads equal to the
+ // new work count.
+ for (std::vector<boost::shared_ptr<WorkerThread> >::iterator i = mIdleThreads.begin();
+ i != mIdleThreads.end(); ++i)
+ {
+ (*i)->poke(WorkerThread::Active);
+ }
+}
+void ThreadPool::workResumable()
+{
+ /* This should never be called since we use a standard
+ * work queue in the thread pool.
+ */
+ assert(false);
+}
+void ThreadPool::emptied()
+{
+ mListener->queueEmptied(this)
+}
+
+}; // end namespace ThreadPool
+}; // end namespace Asterisk SCF
-----------------------------------------------------------------------
--
asterisk-scf/integration/ice-util-cpp.git
More information about the asterisk-scf-commits
mailing list