[asterisk-scf-commits] asterisk-scf/release/util-cpp.git branch "master" created.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Fri Apr 22 13:38:14 CDT 2011
branch "master" has been created
at 02001bfcdaafd6eae7d02d3a7d92fe087b3c7d83 (commit)
- Log -----------------------------------------------------------------
commit 02001bfcdaafd6eae7d02d3a7d92fe087b3c7d83
Author: Ken Hunt <ken.hunt at digium.com>
Date: Fri Apr 22 13:34:35 2011 -0500
Added unit tests for release branch creation.
diff --git a/CMakeLists.txt b/CMakeLists.txt
index bb1db1d..eebab3a 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1,3 +1,10 @@
+#
+# Asterisk Scalable Communications Framework
+#
+# Copyright (C) 2010, 2011 -- Digium, Inc.
+#
+# All rights reserved.
+#
if (integrated_build STREQUAL "true")
set(util_cpp_dir ${CMAKE_CURRENT_SOURCE_DIR} PARENT_SCOPE)
set(util_cpp_bindir ${CMAKE_CURRENT_BINARY_DIR} PARENT_SCOPE)
diff --git a/StateMachine/CMakeLists.txt b/StateMachine/CMakeLists.txt
index 8c4b205..0042652 100644
--- a/StateMachine/CMakeLists.txt
+++ b/StateMachine/CMakeLists.txt
@@ -6,25 +6,5 @@
# All rights reserved.
#
-asterisk_scf_component_init(StateMachine CXX)
-
-include_directories(include)
-
-asterisk_scf_component_add_slice(StateMachine LoggerIf)
-asterisk_scf_component_add_file(StateMachine include/AsteriskSCF/StateMachine/SimpleStateMachine.h)
-asterisk_scf_component_add_file(StateMachine src/SimpleStateMachine.cpp)
-asterisk_scf_component_add_boost_libraries(StateMachine core)
-
-if(NOT logger_dir)
- message(FATAL_ERROR "The logger directory could not be found ${logger_dir}")
-endif()
-
-include_directories(${logger_dir}/common)
-include_directories(${logger_dir}/client/src)
-
-asterisk_scf_component_build_library(StateMachine)
-
-target_link_libraries(StateMachine logging-client)
-
-asterisk_scf_component_install(StateMachine LIBRARY lib "State Machine" StateMachine ARCHIVE DESTINATION lib)
-
+add_subdirectory(src)
+add_subdirectory(test)
\ No newline at end of file
diff --git a/StateMachine/include/AsteriskSCF/StateMachine/SimpleStateMachine.h b/StateMachine/include/AsteriskSCF/StateMachine/SimpleStateMachine.h
index 673c93b..58e9a0a 100644
--- a/StateMachine/include/AsteriskSCF/StateMachine/SimpleStateMachine.h
+++ b/StateMachine/include/AsteriskSCF/StateMachine/SimpleStateMachine.h
@@ -20,6 +20,7 @@
#include <boost/function.hpp>
#include <map>
#include <vector>
+#include <algorithm>
namespace AsteriskSCF
{
@@ -59,7 +60,7 @@ public:
class ExecutionGuard
{
public:
- ExecutionGuard(bool &val) : mGuard(val) {mGuard = true;}
+ ExecutionGuard(bool &refToGuardVar) : mGuard(refToGuardVar) {mGuard = true;}
~ExecutionGuard() {mGuard = false;}
private:
@@ -94,7 +95,12 @@ public:
void removeListener(const boost::shared_ptr<StateMachineListener >& listener)
{
- mListeners.remove(listener);
+ mListeners.erase(std::remove(mListeners.begin(), mListeners.end(), listener), mListeners.end());
+ }
+
+ int getNumListeners()
+ {
+ return mListeners.size();
}
/// State management.
@@ -107,6 +113,11 @@ public:
mStates[state] = handler;
}
+ int getNumStates()
+ {
+ return mStates.size();
+ }
+
/**
* Sets the next state. State transitions occur after the current state's execution is complete.
*/
diff --git a/StateMachine/src/CMakeLists.txt b/StateMachine/src/CMakeLists.txt
new file mode 100644
index 0000000..98bd14f
--- /dev/null
+++ b/StateMachine/src/CMakeLists.txt
@@ -0,0 +1,13 @@
+
+asterisk_scf_component_init(StateMachine CXX)
+
+include_directories(../include)
+
+asterisk_scf_component_add_slice(StateMachine LoggerIf)
+asterisk_scf_component_add_file(StateMachine ../include/AsteriskSCF/StateMachine/SimpleStateMachine.h)
+asterisk_scf_component_add_file(StateMachine SimpleStateMachine.cpp)
+asterisk_scf_component_add_boost_libraries(StateMachine core)
+
+asterisk_scf_component_build_library(StateMachine)
+
+asterisk_scf_component_install(StateMachine LIBRARY lib "State Machine" StateMachine ARCHIVE DESTINATION lib)
diff --git a/StateMachine/test/CMakeLists.txt b/StateMachine/test/CMakeLists.txt
new file mode 100644
index 0000000..3013313
--- /dev/null
+++ b/StateMachine/test/CMakeLists.txt
@@ -0,0 +1,14 @@
+# Create State Machine test project.
+asterisk_scf_component_init(StateMachineTest CXX)
+
+include_directories(../include)
+
+asterisk_scf_component_add_file(StateMachineTest StateMachineTest.cpp)
+
+asterisk_scf_component_add_boost_libraries(StateMachineTest unit_test_framework date_time)
+
+include_directories(${util_cpp_dir}/StateMachine/include)
+
+asterisk_scf_component_build_standalone(StateMachineTest)
+
+boost_add_test(StateMachineTest)
diff --git a/StateMachine/test/StateMachineTest.cpp b/StateMachine/test/StateMachineTest.cpp
new file mode 100644
index 0000000..12fa477
--- /dev/null
+++ b/StateMachine/test/StateMachineTest.cpp
@@ -0,0 +1,241 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#define BOOST_TEST_MODULE StateMachine
+
+#include <boost/test/unit_test.hpp>
+#include <boost/test/debug.hpp>
+
+// #include "../include/AsteriskSCF/StateMachine/SimpleStateMachine.h"
+#include <AsteriskSCF/StateMachine/SimpleStateMachine.h>
+
+using namespace std;
+using namespace AsteriskSCF::StateMachine;
+
+namespace Mock
+{
+
+enum MockMachineStates
+{
+ CYAN,
+ MAGENTA,
+ YELLOW,
+ INVALID // Just for testing. Something unused to init values to.
+};
+
+const MockMachineStates DEFAULT_STATE(CYAN);
+
+class MockStateMachineClient
+{
+public:
+ MockStateMachineClient() : mYellowCount(0), mCyanCount(0), mMagentaCount(0), mStateMachine(DEFAULT_STATE)
+ {
+ mStateMachine.addState(CYAN, boost::bind(&MockStateMachineClient::cyanState, this));
+ mStateMachine.addState(MAGENTA, boost::bind(&MockStateMachineClient::magentaState, this));
+ mStateMachine.addState(YELLOW, boost::bind(&MockStateMachineClient::yellowState, this));
+ }
+
+ void cyanState()
+ {
+ mCyanCount++;
+ mStateMachine.setNextState(Mock::MAGENTA);
+ }
+
+ void magentaState()
+ {
+ mMagentaCount++;
+ mStateMachine.setNextState(Mock::YELLOW);
+ }
+
+ void yellowState()
+ {
+ mYellowCount++;
+ mStateMachine.shutdown();
+ }
+
+ void reset()
+ {
+ mYellowCount = 0;
+ mCyanCount = 0;
+ mMagentaCount = 0;
+ }
+
+ int mYellowCount;
+ int mCyanCount;
+ int mMagentaCount;
+
+public:
+ AsteriskSCF::StateMachine::SimpleStateMachine<MockMachineStates> mStateMachine;
+};
+
+class MockListener : public AsteriskSCF::StateMachine::SimpleStateMachine<MockMachineStates>::StateMachineListener
+{
+public:
+ MockListener()
+ {
+ reset();
+ }
+
+ ~MockListener()
+ {
+ }
+
+ void stateExecutionStart(MockMachineStates state)
+ {
+ mStartNotice = state;
+ }
+
+ void stateExecutionComplete(MockMachineStates state)
+ {
+ mCompleteNotice = state;
+ }
+
+ void shutdown()
+ {
+ mShutdownNotice = true;
+ }
+
+ void stateTransition(MockMachineStates oldState, MockMachineStates newState)
+ {
+ mTransitionOld = oldState;
+ mTransitionNew = newState;
+ }
+
+ void reset()
+ {
+ mStartNotice = INVALID;
+ mCompleteNotice = INVALID;
+ mTransitionOld = INVALID;
+ mTransitionNew = INVALID;
+ mShutdownNotice = false;
+ }
+
+ MockMachineStates mStartNotice;
+ MockMachineStates mCompleteNotice;
+ MockMachineStates mTransitionOld;
+ MockMachineStates mTransitionNew;
+ bool mShutdownNotice;
+};
+typedef boost::shared_ptr<MockListener> MockListenerPtr;
+
+} // end Mock namespace
+
+BOOST_AUTO_TEST_SUITE(StateMachineTest)
+
+BOOST_AUTO_TEST_CASE(addRemoveListener)
+{
+ Mock::MockListenerPtr listener(new Mock::MockListener());
+ Mock::MockStateMachineClient c;
+
+ BOOST_CHECK(c.mStateMachine.getNumListeners() == 0);
+
+ c.mStateMachine.addListener(listener);
+
+ BOOST_CHECK(c.mStateMachine.getNumListeners() == 1);
+
+ c.mStateMachine.removeListener(listener);
+
+ BOOST_CHECK(c.mStateMachine.getNumListeners() == 0);
+}
+
+BOOST_AUTO_TEST_CASE(executeState)
+{
+ Mock::MockListenerPtr listener(new Mock::MockListener());
+ Mock::MockStateMachineClient c;
+ c.mStateMachine.addListener(listener);
+
+ BOOST_CHECK(c.mStateMachine.getNumStates() == 3);
+
+ BOOST_CHECK(listener->mStartNotice == Mock::INVALID);
+ BOOST_CHECK(listener->mCompleteNotice == Mock::INVALID);
+ BOOST_CHECK(listener->mTransitionOld == Mock::INVALID);
+ BOOST_CHECK(listener->mTransitionNew == Mock::INVALID);
+
+ // Execute the default state.
+ c.mStateMachine.execute();
+
+ BOOST_CHECK(listener->mStartNotice == Mock::CYAN);
+ BOOST_CHECK(listener->mCompleteNotice == Mock::CYAN);
+ BOOST_CHECK(listener->mTransitionOld == Mock::CYAN);
+ BOOST_CHECK(listener->mTransitionNew == Mock::MAGENTA);
+
+ BOOST_CHECK(c.mCyanCount == 1);
+ BOOST_CHECK(c.mYellowCount == 0);
+ BOOST_CHECK(c.mMagentaCount == 0);
+
+}
+
+BOOST_AUTO_TEST_CASE(transitionStates)
+{
+ Mock::MockListenerPtr listener(new Mock::MockListener());
+ Mock::MockStateMachineClient c;
+ c.mStateMachine.addListener(listener);
+
+ BOOST_CHECK(listener->mStartNotice == Mock::INVALID);
+ BOOST_CHECK(listener->mCompleteNotice == Mock::INVALID);
+ BOOST_CHECK(listener->mTransitionOld == Mock::INVALID);
+ BOOST_CHECK(listener->mTransitionNew == Mock::INVALID);
+
+ // Execute the deafult state.
+ c.mStateMachine.execute();
+
+ BOOST_CHECK(listener->mStartNotice == Mock::CYAN);
+ BOOST_CHECK(listener->mCompleteNotice == Mock::CYAN);
+ BOOST_CHECK(listener->mTransitionOld == Mock::CYAN);
+ BOOST_CHECK(listener->mTransitionNew == Mock::MAGENTA);
+
+ BOOST_CHECK(listener->mShutdownNotice == false);
+
+ BOOST_CHECK(c.mCyanCount == 1);
+ BOOST_CHECK(c.mYellowCount == 0);
+ BOOST_CHECK(c.mMagentaCount == 0);
+
+ listener->reset();
+
+ // Execute the magenta state.
+ c.mStateMachine.execute();
+
+ BOOST_CHECK(listener->mStartNotice == Mock::MAGENTA);
+ BOOST_CHECK(listener->mCompleteNotice == Mock::MAGENTA);
+ BOOST_CHECK(listener->mTransitionOld == Mock::MAGENTA);
+ BOOST_CHECK(listener->mTransitionNew == Mock::YELLOW);
+
+ BOOST_CHECK(listener->mShutdownNotice == false);
+
+ BOOST_CHECK(c.mCyanCount == 1);
+ BOOST_CHECK(c.mYellowCount == 0);
+ BOOST_CHECK(c.mMagentaCount == 1);
+
+
+ listener->reset();
+
+ // Execute the final state.
+ c.mStateMachine.execute();
+
+ BOOST_CHECK(listener->mStartNotice == Mock::YELLOW);
+ BOOST_CHECK(listener->mCompleteNotice == Mock::YELLOW);
+ BOOST_CHECK(listener->mTransitionOld == Mock::INVALID);
+ BOOST_CHECK(listener->mTransitionNew == Mock::INVALID);
+
+ BOOST_CHECK(listener->mShutdownNotice == true);
+
+ BOOST_CHECK(c.mCyanCount == 1);
+ BOOST_CHECK(c.mYellowCount == 1);
+ BOOST_CHECK(c.mMagentaCount == 1);
+
+}
+
+BOOST_AUTO_TEST_SUITE_END()
diff --git a/Threading/CMakeLists.txt b/Threading/CMakeLists.txt
index 7e60fb4..f00c977 100644
--- a/Threading/CMakeLists.txt
+++ b/Threading/CMakeLists.txt
@@ -33,3 +33,4 @@ target_link_libraries(Threading logging-client)
asterisk_scf_component_install(Threading LIBRARY lib "Threading" Threading ARCHIVE DESTINATION lib)
+add_subdirectory(test)
\ No newline at end of file
diff --git a/Threading/include/AsteriskSCF/Threading/PausibleWorkQueue.h b/Threading/include/AsteriskSCF/Threading/PausibleWorkQueue.h
index 36187e8..153042a 100644
--- a/Threading/include/AsteriskSCF/Threading/PausibleWorkQueue.h
+++ b/Threading/include/AsteriskSCF/Threading/PausibleWorkQueue.h
@@ -25,7 +25,7 @@ namespace Threading
* is not derived, so implementations of this interface should derive from
* both WorkQueue and PausibleWorkQueue.
*/
-class PausibleWorkQueue
+class ASTERISK_SCF_ICEBOX_EXPORT PausibleWorkQueue
{
public:
virtual bool isRunning() = 0;
diff --git a/Threading/include/AsteriskSCF/Threading/SimpleWorkQueue.h b/Threading/include/AsteriskSCF/Threading/SimpleWorkQueue.h
index 3ecc00c..a03be58 100644
--- a/Threading/include/AsteriskSCF/Threading/SimpleWorkQueue.h
+++ b/Threading/include/AsteriskSCF/Threading/SimpleWorkQueue.h
@@ -20,8 +20,6 @@
#include <boost/thread/condition.hpp>
#include <boost/shared_ptr.hpp>
-#include <AsteriskSCF/logger.h>
-
#include "WorkQueue.h"
#include "PausibleWorkQueue.h"
@@ -36,11 +34,11 @@ namespace Threading
*/
class SimpleWorkQueuePriv;
-class ASTERISK_SCF_ICEBOX_EXPORT SimpleWorkQueue : public WorkQueue, public PausibleWorkQueue, boost::noncopyable
+class ASTERISK_SCF_ICEBOX_EXPORT SimpleWorkQueue : public WorkQueue, public PausibleWorkQueue
{
public:
- SimpleWorkQueue(const std::string& id, const AsteriskSCF::System::Logging::Logger& logger);
+ SimpleWorkQueue(const std::string& id);
~SimpleWorkQueue();
// Overrides for the WorkQueue interface.
@@ -56,7 +54,8 @@ public:
virtual void pause();
virtual void resume();
-private: boost::shared_ptr<SimpleWorkQueuePriv> mImpl;
+private:
+ boost::shared_ptr<SimpleWorkQueuePriv> mImpl;
};
} // end namespace Threading
diff --git a/Threading/include/AsteriskSCF/Threading/WorkQueue.h b/Threading/include/AsteriskSCF/Threading/WorkQueue.h
index 65c7f88..262b437 100644
--- a/Threading/include/AsteriskSCF/Threading/WorkQueue.h
+++ b/Threading/include/AsteriskSCF/Threading/WorkQueue.h
@@ -26,7 +26,7 @@ namespace Threading
* This class defines an interface to a work queue. A work queue manages one or
* more processing threads, and allows work to be enqueued.
*/
-class WorkQueue
+class ASTERISK_SCF_ICEBOX_EXPORT WorkQueue
{
public:
diff --git a/Threading/src/SimpleWorkQueue.cpp b/Threading/src/SimpleWorkQueue.cpp
index 77fe099..e059035 100644
--- a/Threading/src/SimpleWorkQueue.cpp
+++ b/Threading/src/SimpleWorkQueue.cpp
@@ -25,14 +25,11 @@
#include <boost/thread.hpp>
#include <list>
-#include <AsteriskSCF/logger.h>
-
#include "AsteriskSCF/Threading/SimpleWorkQueue.h"
using namespace boost;
using namespace AsteriskSCF;
using namespace AsteriskSCF::Threading;
-using namespace AsteriskSCF::System::Logging;
namespace
{
@@ -56,20 +53,17 @@ namespace Threading
class SimpleWorkQueuePriv
{
public:
- SimpleWorkQueuePriv(const std::string& id, const Logger& logger)
- : mLogger(logger),
- mQid(id),
+ SimpleWorkQueuePriv(const std::string& id)
+ : mQid(id),
mFinished(false),
mPaused(false), // runs by default.
- mNoOpPoolIdPtr (new WorkQueue::PoolId),
- mThread(boost::bind(&SimpleWorkQueuePriv::execute, this))
+ mNoOpPoolIdPtr (new WorkQueue::PoolId)
{
- mLogger(Debug) << BOOST_CURRENT_FUNCTION << ": called. Queue ID:" << mQid;
+ mThread.reset(new boost::thread(boost::bind(&SimpleWorkQueuePriv::execute, this)));
}
~SimpleWorkQueuePriv()
{
- mLogger(Debug) << BOOST_CURRENT_FUNCTION << ": called. Queue ID:" << mQid;
}
/**
@@ -77,8 +71,6 @@ public:
*/
void terminate()
{
- mLogger(Debug) << BOOST_CURRENT_FUNCTION << ": called for queue " << mQid ;
-
mFinished = true;
mPaused = false;
@@ -101,7 +93,7 @@ public:
*/
void join()
{
- mThread.join();
+ mThread->join();
}
WorkPtr dequeue();
@@ -109,7 +101,6 @@ public:
void execute();
bool isPaused();
- const Logger& mLogger;
std::string mQid;
bool mFinished;
bool mPaused;
@@ -120,18 +111,16 @@ public:
boost::condition mPauseCondition;
boost::shared_ptr<WorkQueue::PoolId> mNoOpPoolIdPtr;
- boost::thread mThread; // This variable is last so that the class is fully initialized before the thread executes.
+ boost::shared_ptr<boost::thread> mThread; // This variable is last so that the class is fully initialized before the thread executes.
};
-SimpleWorkQueue::SimpleWorkQueue(const std::string& qid, const Logger& logger) : mImpl(new SimpleWorkQueuePriv(qid, logger))
+SimpleWorkQueue::SimpleWorkQueue(const std::string& qid) : mImpl(new SimpleWorkQueuePriv(qid))
{
- mImpl->mLogger(Debug) << BOOST_CURRENT_FUNCTION << ": called. Queue ID:" << mImpl->mQid;
}
SimpleWorkQueue::~SimpleWorkQueue()
{
- mImpl->mLogger(Debug) << BOOST_CURRENT_FUNCTION << ": called. Queue ID:" << mImpl->mQid;
mImpl->terminate();
// Wait for worker thread to shut down.
@@ -148,8 +137,6 @@ bool SimpleWorkQueue::isRunning()
*/
void SimpleWorkQueue::pause()
{
- mImpl->mLogger(Debug) << BOOST_CURRENT_FUNCTION << ": called for queue " << mImpl->mQid;
-
boost::lock_guard<boost::mutex> lock(mImpl->mPauseMutex);
mImpl->mPaused = true;
}
@@ -159,8 +146,6 @@ void SimpleWorkQueue::pause()
*/
void SimpleWorkQueue::resume()
{
- mImpl->mLogger(Debug) << BOOST_CURRENT_FUNCTION << ": called for queue " << mImpl->mQid;
-
boost::lock_guard<boost::mutex> lock(mImpl->mPauseMutex);
mImpl->mPaused = false;
mImpl->mPauseCondition.notify_all();
@@ -205,19 +190,14 @@ WorkPtr SimpleWorkQueuePriv::waitAndDequeue()
while (mQueue.empty())
{
- mLogger(Debug) << BOOST_CURRENT_FUNCTION << ": Waiting on empty queue. Queue ID:" << mQid;
-
if (mFinished)
{
- mLogger(Debug) << BOOST_CURRENT_FUNCTION << ": Returning the NO_WORK token. Queue ID:" << mQid;
return HiddenNoWorkPtr;
}
mEmptyQueueCondition.wait(lock);
}
- mLogger(Debug) << BOOST_CURRENT_FUNCTION << ": Dequeuing some work. Queue ID:" << mQid;
-
shared_ptr<WorkQueue::Work> work = mQueue.front();
mQueue.pop_front();
@@ -241,8 +221,6 @@ void SimpleWorkQueuePriv::execute()
boost::unique_lock<boost::mutex> lock(mPauseMutex);
while(mPaused)
{
- mLogger(Debug) << BOOST_CURRENT_FUNCTION << ": Waiting while paused. Queue ID:" << mQid;
-
mPauseCondition.wait(lock);
}
@@ -251,31 +229,19 @@ void SimpleWorkQueuePriv::execute()
break;
}
} // end lock scope
-
- mLogger(Debug) << BOOST_CURRENT_FUNCTION << ": Pinging the work queue. Queue ID:" << mQid;
-
WorkPtr work = waitAndDequeue();
- mLogger(Debug) << BOOST_CURRENT_FUNCTION << ": Doing the work. Queue ID:" << mQid;
-
try
{
work->doWork();
}
- catch(const std::exception& e)
- {
- // Workers should be catching/managing their own exceptions!
- mLogger(Warning) << BOOST_CURRENT_FUNCTION << ": Work item threw exception for queue " << mQid;
- mLogger(Warning) << " Details: " << e.what();
- }
catch(...)
{
- mLogger(Warning) << BOOST_CURRENT_FUNCTION << ": Work item threw non-standard exception. ";
+ // Workers should be catching/managing their own exceptions!
}
} // while !mFinished
- mLogger(Debug) << BOOST_CURRENT_FUNCTION << ": Exiting the thread for good. Queue ID:" << mQid;
}
} // end namespace Threading
diff --git a/Threading/test/CMakeLists.txt b/Threading/test/CMakeLists.txt
new file mode 100644
index 0000000..a20d74d
--- /dev/null
+++ b/Threading/test/CMakeLists.txt
@@ -0,0 +1,22 @@
+# Create State Machine test project.
+asterisk_scf_component_init(SimpleWorkQueueTest CXX)
+
+include_directories(../include)
+
+asterisk_scf_component_add_file(SimpleWorkQueueTest SimpleWorkQueueTest.cpp)
+
+asterisk_scf_component_add_boost_libraries(SimpleWorkQueueTest unit_test_framework)
+
+include_directories(${util_cpp_dir}/Threading/include)
+
+if(NOT logger_dir)
+ message(FATAL_ERROR "The logger directory could not be found ${logger_dir}")
+endif()
+include_directories(${logger_dir}/include)
+
+asterisk_scf_component_build_standalone(SimpleWorkQueueTest)
+target_link_libraries(SimpleWorkQueueTest logging-client)
+
+target_link_libraries(SimpleWorkQueueTest Threading)
+
+boost_add_test(SimpleWorkQueueTest)
diff --git a/Threading/test/SimpleWorkQueueTest.cpp b/Threading/test/SimpleWorkQueueTest.cpp
new file mode 100644
index 0000000..b9b374c
--- /dev/null
+++ b/Threading/test/SimpleWorkQueueTest.cpp
@@ -0,0 +1,73 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#define BOOST_TEST_MODULE SimpleWorkQueue
+
+#include <boost/test/unit_test.hpp>
+#include <boost/test/debug.hpp>
+
+#include <AsteriskSCF/Threading/SimpleWorkQueue.h>
+
+using namespace std;
+using namespace AsteriskSCF::Threading;
+
+class MockWork : public AsteriskSCF::Threading::WorkQueue::Work
+{
+public:
+ MockWork() : mWorkDone(false) {}
+
+ /**
+ * An implementation of the WorkQueue::Work interface.
+ */
+ virtual void doWork()
+ {
+ mWorkDone = true;
+ }
+
+bool mWorkDone;
+};
+typedef boost::shared_ptr<MockWork> MockWorkPtr;
+
+BOOST_AUTO_TEST_SUITE(SimpleWorkQueueTest)
+
+BOOST_AUTO_TEST_CASE(enqueueWork)
+{
+
+ MockWorkPtr w1(new MockWork());
+ MockWorkPtr w2(new MockWork());
+ MockWorkPtr w3(new MockWork());
+
+ SimpleWorkQueue wq("TestWorkQueue");
+
+ BOOST_CHECK(wq.workPending() == false);
+
+ BOOST_CHECK(wq.isRunning() == true);
+
+ wq.enqueue(w1);
+ wq.enqueue(w2);
+ wq.enqueue(w3);
+
+ boost::this_thread::sleep( boost::get_system_time() +
+ boost::posix_time::milliseconds( std::max<long>(100,0) ));
+
+ BOOST_CHECK(w1->mWorkDone == true);
+ BOOST_CHECK(w2->mWorkDone == true);
+ BOOST_CHECK(w3->mWorkDone == true);
+
+}
+
+
+BOOST_AUTO_TEST_SUITE_END()
commit d2ea2f1f04fd9ad34552d5c79f806d2b9bd952af
Author: Ken Hunt <ken.hunt at digium.com>
Date: Sat Apr 16 21:12:38 2011 -0500
Added thread safety for execute() operation.
diff --git a/StateMachine/include/AsteriskSCF/StateMachine/SimpleStateMachine.h b/StateMachine/include/AsteriskSCF/StateMachine/SimpleStateMachine.h
index 538c941..673c93b 100644
--- a/StateMachine/include/AsteriskSCF/StateMachine/SimpleStateMachine.h
+++ b/StateMachine/include/AsteriskSCF/StateMachine/SimpleStateMachine.h
@@ -15,6 +15,7 @@
*/
#pragma once
+#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/function.hpp>
#include <map>
@@ -26,13 +27,12 @@ namespace StateMachine
{
/**
- * A simple state machine. User's provide a definition of the state type (probably an enum type) and
- * a type that can be executed using operator() to handle execution for a given state.
+ * A simple state machine. User's provide a definition of the enum type that defines the
+ * valid states.
* A listener interface is provided to allow monitoring state changes and state execution.
- * - typename S State type
- * - typename F An executable operation for state handler
+ * - typename S Enum type that identifies all valid states.
*/
-template<typename S, typename F>
+template<typename S>
class SimpleStateMachine
{
public:
@@ -40,7 +40,6 @@ public:
/**
* Listener interface for monitoring the state machine.
*/
-
class StateMachineListener
{
public:
@@ -48,21 +47,37 @@ public:
virtual void stateExecutionStart(S state) = 0;
virtual void stateExecutionComplete(S state) = 0;
virtual void stateTransition(S oldState, S newState) = 0;
+ virtual void shutdown() = 0;
protected:
StateMachineListener() {}
};
- //typedef StateMachineListener<S> StateMachineListenerType;
+ /**
+ * RAII setter for a boolean.
+ */
+ class ExecutionGuard
+ {
+ public:
+ ExecutionGuard(bool &val) : mGuard(val) {mGuard = true;}
+ ~ExecutionGuard() {mGuard = false;}
+
+ private:
+ bool &mGuard;
+ };
public:
/**
* Constructor.
* @param defaultState The default state for the state machine.
*/
- SimpleStateMachine(S defaultState)
+ SimpleStateMachine(S defaultState)
+ : mCurrentState(defaultState),
+ mNextState(defaultState),
+ mShutdown(false),
+ mInExecution(false)
+
{
- mCurrentState = mNextState = defaultState;
}
~SimpleStateMachine()
@@ -87,7 +102,7 @@ public:
/**
* Sets the execution handler for a given state.
*/
- void addState(S state, const F& handler)
+ void addState(S state, const boost::function<void ()>& handler)
{
mStates[state] = handler;
}
@@ -105,10 +120,14 @@ public:
*/
void execute()
{
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+
+ ExecutionGuard guard(mInExecution);
+
// Notify all listeners that execution is starting.
sendExecutionStartNotice(mCurrentState);
- std::map<S, F>::iterator it = mStates.find(mCurrentState);
+ std::map<S, boost::function<void ()> >::iterator it = mStates.find(mCurrentState);
if (it != mStates.end())
{
// Execute
@@ -125,8 +144,31 @@ public:
sendTransitionNotice(mCurrentState, mNextState);
mCurrentState = mNextState;
}
+
+ if (mShutdown)
+ {
+ // Send shutdown notice to the listeners.
+ sendShutdownNotice();
+ }
}
+ /**
+ * Shutdown the state machine.
+ */
+ void shutdown()
+ {
+ // If we being called as a result of state execution (as expected),
+ // mark the state machine for shutdown, and let execution() handle it.
+ if (mInExecution)
+ {
+ mShutdown = true;
+ }
+ else
+ {
+ // Send shutdown notice to the listeners.
+ sendShutdownNotice();
+ }
+ }
private:
@@ -146,9 +188,6 @@ private:
{
x->stateExecutionStart(mState);
}
- catch(const Ice::Exception&)
- {
- }
catch(...)
{
}
@@ -181,9 +220,6 @@ private:
{
x->stateExecutionComplete(mState);
}
- catch(const Ice::Exception&)
- {
- }
catch(...)
{
}
@@ -215,9 +251,6 @@ private:
{
x->stateTransition(mOldState, mNewState);
}
- catch(const Ice::Exception&)
- {
- }
catch(...)
{
}
@@ -234,11 +267,44 @@ private:
std::for_each(mListeners.begin(), mListeners.end(), TransitionNotice<S> (oldState, newState));
}
+ /**
+ * Functor for forwarding shutdown() notices.
+ */
+ template<typename S>
+ class ShutdownNotice
+ {
+ // Types: S - State
+ public:
+ ShutdownNotice() {}
+ ~ShutdownNotice() {}
+ void operator() (boost::shared_ptr<StateMachineListener >& x)
+ {
+ try
+ {
+ x->shutdown();
+ }
+ catch(...)
+ {
+ }
+ }
+ };
+
+ /**
+ * Forward the state transition notice to all registered listeners.
+ */
+ void sendShutdownNotice()
+ {
+ std::for_each(mListeners.begin(), mListeners.end(), ShutdownNotice<S> ());
+ }
+
/// Class state
- std::map<S, F> mStates;
+ std::map<S, boost::function<void ()> > mStates;
S mCurrentState;
S mNextState;
+ bool mShutdown;
+ bool mInExecution;
+ boost::shared_mutex mLock;
std::vector<boost::shared_ptr<StateMachineListener > > mListeners;
};
commit 841b6f6da15190289390c5b4ba900793a497dc7f
Author: Ken Hunt <ken.hunt at digium.com>
Date: Mon Mar 7 01:25:11 2011 -0600
Changes for routing service replication.
diff --git a/StateMachine/include/AsteriskSCF/StateMachine/SimpleStateMachine.h b/StateMachine/include/AsteriskSCF/StateMachine/SimpleStateMachine.h
index acabef9..538c941 100644
--- a/StateMachine/include/AsteriskSCF/StateMachine/SimpleStateMachine.h
+++ b/StateMachine/include/AsteriskSCF/StateMachine/SimpleStateMachine.h
@@ -40,16 +40,21 @@ public:
/**
* Listener interface for monitoring the state machine.
*/
- template<typename S>
+
class StateMachineListener
{
public:
- ~StateMachineListener() {}
+ virtual ~StateMachineListener() {}
virtual void stateExecutionStart(S state) = 0;
virtual void stateExecutionComplete(S state) = 0;
virtual void stateTransition(S oldState, S newState) = 0;
+
+ protected:
+ StateMachineListener() {}
};
+ //typedef StateMachineListener<S> StateMachineListenerType;
+
public:
/**
* Constructor.
@@ -60,14 +65,19 @@ public:
mCurrentState = mNextState = defaultState;
}
+ ~SimpleStateMachine()
+ {
+ mListeners.clear();
+ }
+
//// Listener management.
- void addListener(const boost::shared_ptr<StateMachineListener<S> > & listener)
+ void addListener(const boost::shared_ptr<StateMachineListener >& listener)
{
- mListeners.add(listener);
+ mListeners.push_back(listener);
}
- void removeListener(const boost::shared_ptr<StateMachineListener<S> > & listener)
+ void removeListener(const boost::shared_ptr<StateMachineListener >& listener)
{
mListeners.remove(listener);
}
@@ -130,7 +140,7 @@ private:
public:
ExecutionStartNotice(S state) : mState(state) {}
~ExecutionStartNotice() {}
- void operator() (const boost::shared_ptr<StateMachineListener<S> >& x)
+ void operator() (boost::shared_ptr<StateMachineListener >& x)
{
try
{
@@ -165,7 +175,7 @@ private:
public:
ExecutionCompleteNotice(S state) : mState(state) {}
~ExecutionCompleteNotice() {}
- void operator() (const boost::shared_ptr<StateMachineListener<S> >& x)
+ void operator() (boost::shared_ptr<StateMachineListener >& x)
{
try
{
@@ -199,7 +209,7 @@ private:
public:
TransitionNotice(S oldState, S newState) : mOldState(oldState), mNewState(newState) {}
~TransitionNotice() {}
- void operator() (const boost::shared_ptr<StateMachineListener<S> >& x)
+ void operator() (boost::shared_ptr<StateMachineListener >& x)
{
try
{
@@ -229,7 +239,7 @@ private:
std::map<S, F> mStates;
S mCurrentState;
S mNextState;
- std::vector<const boost::shared_ptr<StateMachineListener<S> > > mListeners;
+ std::vector<boost::shared_ptr<StateMachineListener > > mListeners;
};
} // end namespace StateMachine
diff --git a/Threading/CMakeLists.txt b/Threading/CMakeLists.txt
index 6275408..7e60fb4 100644
--- a/Threading/CMakeLists.txt
+++ b/Threading/CMakeLists.txt
@@ -10,6 +10,8 @@ asterisk_scf_component_init(Threading CXX)
include_directories(include)
+asterisk_scf_slice_include_directories(${API_SLICE_DIR})
+
asterisk_scf_component_add_slice(Threading LoggerIf)
asterisk_scf_component_add_file(Threading include/AsteriskSCF/Threading/WorkQueue.h)
asterisk_scf_component_add_file(Threading include/AsteriskSCF/Threading/SimpleWorkQueue.h)
@@ -21,8 +23,9 @@ if(NOT logger_dir)
message(FATAL_ERROR "The logger directory could not be found ${logger_dir}")
endif()
-include_directories(${logger_dir}/common)
+include_directories(${logger_dir}/include)
include_directories(${logger_dir}/client/src)
+include_directories(${API_INCLUDE_DIR})
asterisk_scf_component_build_library(Threading)
diff --git a/Threading/include/AsteriskSCF/Threading/SimpleWorkQueue.h b/Threading/include/AsteriskSCF/Threading/SimpleWorkQueue.h
index 61ace49..3ecc00c 100644
--- a/Threading/include/AsteriskSCF/Threading/SimpleWorkQueue.h
+++ b/Threading/include/AsteriskSCF/Threading/SimpleWorkQueue.h
@@ -20,9 +20,10 @@
#include <boost/thread/condition.hpp>
#include <boost/shared_ptr.hpp>
+#include <AsteriskSCF/logger.h>
+
#include "WorkQueue.h"
#include "PausibleWorkQueue.h"
-#include "logger.h"
namespace AsteriskSCF
{
diff --git a/Threading/src/SimpleWorkQueue.cpp b/Threading/src/SimpleWorkQueue.cpp
index 0eb74cb..77fe099 100644
--- a/Threading/src/SimpleWorkQueue.cpp
+++ b/Threading/src/SimpleWorkQueue.cpp
@@ -25,7 +25,7 @@
#include <boost/thread.hpp>
#include <list>
-#include "logger.h"
+#include <AsteriskSCF/logger.h>
#include "AsteriskSCF/Threading/SimpleWorkQueue.h"
commit f6464a5c703bd8365d015795f78c3bf0c7ae4c52
Author: Ken Hunt <ken.hunt at digium.com>
Date: Thu Mar 3 11:07:05 2011 -0600
Added state machine implementation.
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 4bfc63e..bb1db1d 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -4,3 +4,4 @@ if (integrated_build STREQUAL "true")
endif()
add_subdirectory(Threading)
+add_subdirectory(StateMachine)
diff --git a/StateMachine/CMakeLists.txt b/StateMachine/CMakeLists.txt
new file mode 100644
index 0000000..8c4b205
--- /dev/null
+++ b/StateMachine/CMakeLists.txt
@@ -0,0 +1,30 @@
+#
+# Asterisk Scalable Communications Framework
+#
+# Copyright (C) 2010, 2011 -- Digium, Inc.
+#
+# All rights reserved.
+#
+
+asterisk_scf_component_init(StateMachine CXX)
+
+include_directories(include)
+
+asterisk_scf_component_add_slice(StateMachine LoggerIf)
+asterisk_scf_component_add_file(StateMachine include/AsteriskSCF/StateMachine/SimpleStateMachine.h)
+asterisk_scf_component_add_file(StateMachine src/SimpleStateMachine.cpp)
+asterisk_scf_component_add_boost_libraries(StateMachine core)
+
+if(NOT logger_dir)
+ message(FATAL_ERROR "The logger directory could not be found ${logger_dir}")
+endif()
+
+include_directories(${logger_dir}/common)
+include_directories(${logger_dir}/client/src)
+
+asterisk_scf_component_build_library(StateMachine)
+
+target_link_libraries(StateMachine logging-client)
+
+asterisk_scf_component_install(StateMachine LIBRARY lib "State Machine" StateMachine ARCHIVE DESTINATION lib)
+
diff --git a/StateMachine/include/AsteriskSCF/StateMachine/SimpleStateMachine.h b/StateMachine/include/AsteriskSCF/StateMachine/SimpleStateMachine.h
new file mode 100644
index 0000000..acabef9
--- /dev/null
+++ b/StateMachine/include/AsteriskSCF/StateMachine/SimpleStateMachine.h
@@ -0,0 +1,236 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#pragma once
+
+#include <boost/shared_ptr.hpp>
+#include <boost/function.hpp>
+#include <map>
+#include <vector>
+
+namespace AsteriskSCF
+{
+namespace StateMachine
+{
+
+/**
+ * A simple state machine. User's provide a definition of the state type (probably an enum type) and
+ * a type that can be executed using operator() to handle execution for a given state.
+ * A listener interface is provided to allow monitoring state changes and state execution.
+ * - typename S State type
+ * - typename F An executable operation for state handler
+ */
+template<typename S, typename F>
+class SimpleStateMachine
+{
+public:
+
+ /**
+ * Listener interface for monitoring the state machine.
+ */
+ template<typename S>
+ class StateMachineListener
+ {
+ public:
+ ~StateMachineListener() {}
+ virtual void stateExecutionStart(S state) = 0;
+ virtual void stateExecutionComplete(S state) = 0;
+ virtual void stateTransition(S oldState, S newState) = 0;
+ };
+
+public:
+ /**
+ * Constructor.
+ * @param defaultState The default state for the state machine.
+ */
+ SimpleStateMachine(S defaultState)
+ {
+ mCurrentState = mNextState = defaultState;
+ }
+
+ //// Listener management.
+
+ void addListener(const boost::shared_ptr<StateMachineListener<S> > & listener)
+ {
+ mListeners.add(listener);
+ }
+
+ void removeListener(const boost::shared_ptr<StateMachineListener<S> > & listener)
+ {
+ mListeners.remove(listener);
+ }
+
+ /// State management.
+
+ /**
+ * Sets the execution handler for a given state.
+ */
+ void addState(S state, const F& handler)
+ {
+ mStates[state] = handler;
+ }
+
+ /**
+ * Sets the next state. State transitions occur after the current state's execution is complete.
+ */
+ void setNextState(S state)
+ {
+ mNextState = state;
+ }
+
+ /**
+ * This executes the state machine's current state.
+ */
+ void execute()
+ {
+ // Notify all listeners that execution is starting.
+ sendExecutionStartNotice(mCurrentState);
+
+ std::map<S, F>::iterator it = mStates.find(mCurrentState);
+ if (it != mStates.end())
+ {
+ // Execute
+ it->second();
+ }
+
+ // Notify all listeners that execution is complete.
+ sendExecutionCompleteNotice(mCurrentState);
+
+ // Did state transition occur?
+ if (mCurrentState != mNextState)
+ {
+ // Notify listeners if machine transitioned to new state.
+ sendTransitionNotice(mCurrentState, mNextState);
+ mCurrentState = mNextState;
+ }
+ }
+
+
+private:
+
+ /**
+ * Functor for forwarding stateExecutionStart() notices.
+ */
+ template<typename S>
+ class ExecutionStartNotice
+ {
+ // Types: S - State
+ public:
+ ExecutionStartNotice(S state) : mState(state) {}
+ ~ExecutionStartNotice() {}
+ void operator() (const boost::shared_ptr<StateMachineListener<S> >& x)
+ {
+ try
+ {
+ x->stateExecutionStart(mState);
+ }
+ catch(const Ice::Exception&)
+ {
+ }
+ catch(...)
+ {
+ }
+ }
+ S mState;
+ };
+
+ /**
+ * Forward the Exection Start notice to all registered listeners.
+ */
+ void sendExecutionStartNotice(S state)
+ {
+ std::for_each(mListeners.begin(), mListeners.end(), ExecutionStartNotice<S>(state));
+ }
+
+
+ /**
+ * Functor for forwarding stateExecutionComplete() notices.
+ */
+ template<typename S>
+ class ExecutionCompleteNotice
+ {
+ // Types: S - State
+ public:
+ ExecutionCompleteNotice(S state) : mState(state) {}
+ ~ExecutionCompleteNotice() {}
+ void operator() (const boost::shared_ptr<StateMachineListener<S> >& x)
+ {
+ try
+ {
+ x->stateExecutionComplete(mState);
+ }
+ catch(const Ice::Exception&)
+ {
+ }
+ catch(...)
+ {
+ }
+ }
+ S mState;
+ };
+
+ /**
+ * Forward the Exection Complete notice to all registered listeners.
+ */
+ void sendExecutionCompleteNotice(S state)
+ {
+ std::for_each(mListeners.begin(), mListeners.end(), ExecutionCompleteNotice<S> (state));
+ }
+
+ /**
+ * Functor for forwarding stateTransition() notices.
+ */
+ template<typename S>
+ class TransitionNotice
+ {
+ // Types: S - State
+ public:
+ TransitionNotice(S oldState, S newState) : mOldState(oldState), mNewState(newState) {}
+ ~TransitionNotice() {}
+ void operator() (const boost::shared_ptr<StateMachineListener<S> >& x)
+ {
+ try
+ {
+ x->stateTransition(mOldState, mNewState);
+ }
+ catch(const Ice::Exception&)
+ {
+ }
+ catch(...)
+ {
+ }
+ }
+ S mOldState;
+ S mNewState;
+ };
+
+ /**
+ * Forward the state transition notice to all registered listeners.
+ */
+ void sendTransitionNotice(S oldState, S newState)
+ {
+ std::for_each(mListeners.begin(), mListeners.end(), TransitionNotice<S> (oldState, newState));
+ }
+
+ /// Class state
+
+ std::map<S, F> mStates;
+ S mCurrentState;
+ S mNextState;
+ std::vector<const boost::shared_ptr<StateMachineListener<S> > > mListeners;
+};
+
+} // end namespace StateMachine
+} // end namespace AsteriskSCF
diff --git a/StateMachine/src/SimpleStateMachine.cpp b/StateMachine/src/SimpleStateMachine.cpp
new file mode 100644
index 0000000..7d73a7f
--- /dev/null
+++ b/StateMachine/src/SimpleStateMachine.cpp
@@ -0,0 +1,26 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#include "AsteriskSCF/StateMachine/SimpleStateMachine.h"
+
+namespace AsteriskSCF
+{
+namespace StateMachine
+{
+
+
+} // end namespace StateMachine
+} // end namespace AsteriskSCF
diff --git a/Threading/CMakeLists.txt~ b/Threading/CMakeLists.txt~
deleted file mode 100644
index 61c91ab..0000000
--- a/Threading/CMakeLists.txt~
+++ /dev/null
@@ -1,32 +0,0 @@
-#
-# Asterisk Scalable Communications Framework
-#
-# Copyright (C) 2010, 2011 -- Digium, Inc.
-#
-# All rights reserved.
-#
-
-asterisk_scf_component_init(Threading CXX)
-
-include_directories(include)
-
-asterisk_scf_component_add_slice(Threading LoggerIf)
-asterisk_scf_component_add_file(Threading include/AsteriskSCF/Threading/Threading.h)
-asterisk_scf_component_add_file(Threading include/AsteriskSCF/Threading/SimpleThreading.h)
-asterisk_scf_component_add_file(Threading include/AsteriskSCF/Threading/PausibleThreading.h)
-asterisk_scf_component_add_file(Threading src/SimpleThreading.cpp)
-asterisk_scf_component_add_boost_libraries(Threading core thread)
-
-if(NOT logger_dir)
- message(FATAL_ERROR "The logger directory could not be found ${logger_dir}")
-endif()
-
-include_directories(${logger_dir}/common)
-include_directories(${logger_dir}/client/src)
-
-asterisk_scf_component_build_library(Threading)
-
-target_link_libraries(Threading logging-client)
-
-asterisk_scf_component_install(Threading LIBRARY lib "Threading" Threading ARCHIVE DESTINATION lib)
-
diff --git a/Threading/src/SimpleWorkQueue.cpp b/Threading/src/SimpleWorkQueue.cpp
index 85766bf..0eb74cb 100644
--- a/Threading/src/SimpleWorkQueue.cpp
+++ b/Threading/src/SimpleWorkQueue.cpp
@@ -80,7 +80,12 @@ public:
mLogger(Debug) << BOOST_CURRENT_FUNCTION << ": called for queue " << mQid ;
mFinished = true;
- resume();
+ mPaused = false;
+
+ { // scope for the lock.
+ boost::lock_guard<boost::mutex> lock(mPauseMutex);
+ mPauseCondition.notify_all();
+ }
{ // scope for the lock.
boost::lock_guard<boost::mutex> lock(mQueueMutex);
diff --git a/Threading/src/SimpleWorkQueue.cpp~ b/Threading/src/SimpleWorkQueue.cpp~
deleted file mode 100644
index 943fcf2..0000000
--- a/Threading/src/SimpleWorkQueue.cpp~
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Asterisk SCF -- An open-source communications framework.
- *
- * Copyright (C) 2010, Digium, Inc.
- *
- * See http://www.asterisk.org for more information about
- * the Asterisk SCF project. Please do not directly contact
- * any of the maintainers of this project for assistance;
- * the project provides a web site, mailing lists and IRC
- * channels for your use.
- *
- * This program is free software, distributed under the terms of
- * the GNU General Public License Version 2. See the LICENSE.txt file
- * at the top of the source tree.
- */
-
-/**
- * A simple Work Queue implementation. On construction, starts an internal thread.
- * Work can be enqueued via the thread-safe enqueue() method. All work must implement
- * the Work interface.
- */
-#include <iostream>
-#include <boost/shared_ptr.hpp>
-#include <boost/bind.hpp>
-#include <boost/thread.hpp>
-#include <list>
-
-#include "logger.h"
-
-#include "AsteriskSCF/WorkQueue/SimpleWorkQueue.h"
-
-using namespace AsteriskSCF;
-using namespace AsteriskSCF::System::Logging;
-using namespace boost;
-
-namespace
-{
- /**
- * This is a private no-op implementation of a work item. Returned from WaitAndDequeue
- * if the program is Terminated while waiting on the EmptyQueueCondition.
- */
- class HiddenNoWorkClass : public WorkQueue::Work
- {
- public:
- HiddenNoWorkClass() {};
- void doWork() {} // Do nothing
- };
-}
-
-namespace AsteriskSCF
-{
-class SimpleWorkQueuePriv
-{
-public:
- SimpleWorkQueuePriv(const std::string& id, const Logger& logger)
- : mLogger(logger),
- mQid(id),
- mInitialized(false),
- mFinished(false),
- mPaused(false), // runs by default.
- mThread(boost::bind(&SimpleWorkQueuePriv::execute, this))
-
- {
- mLogger(Debug) << BOOST_CURRENT_FUNCTION << ": called. Queue ID:" << mQid;
- }
-
- ~SimpleWorkQueuePriv()
- {
- mLogger(Debug) << BOOST_CURRENT_FUNCTION << ": called. Queue ID:" << mQid;
- }
-
- WorkPtr dequeue();
- WorkPtr waitAndDequeue();
- void execute();
- bool isPaused();
-
- const Logger& mLogger;
- std::string mQid;
- bool mInitialized;
- bool mFinished;
- bool mPaused;
- std::list<WorkPtr> mQueue;
- boost::mutex mQueueMutex;
- boost::condition mEmptyQueueCondition;
- boost::mutex mPauseMutex;
- boost::condition mPauseCondition;
- boost::thread mThread; // This variable is last so that the class is fully initialized before the thread executes.
-};
-}
-
-SimpleWorkQueue::SimpleWorkQueue(const std::string& qid, const Logger& logger) : mImpl(new SimpleWorkQueuePriv(qid, logger))
-{
- mImpl->mLogger(Debug) << BOOST_CURRENT_FUNCTION << ": called. Queue ID:" << mImpl->mQid;
- mImpl->mInitialized = true;
-}
-
-SimpleWorkQueue::~SimpleWorkQueue()
-{
- mImpl->mLogger(Debug) << BOOST_CURRENT_FUNCTION << ": called. Queue ID:" << mImpl->mQid;
- terminate();
-
- // Wait for worker thread to shut down.
- join();
-}
-
-bool SimpleWorkQueue::isRunning()
-{
- return (mImpl->mInitialized && !mImpl->mPaused && !mImpl->mFinished);
-}
-
-/**
- * Pause the SimpleWorkQueue's thread.
- */
-void SimpleWorkQueue::pause()
-{
- mImpl->mLogger(Info) << BOOST_CURRENT_FUNCTION << ": called for queue " << mImpl->mQid;
-
- boost::lock_guard<boost::mutex> lock(mImpl->mPauseMutex);
- mImpl->mPaused = true;
-}
-
-/**
- * Resume from a Paused state.
- */
-void SimpleWorkQueue::resume()
-{
- mImpl->mLogger(Info) << BOOST_CURRENT_FUNCTION << ": called for queue " << mImpl->mQid;
-
- boost::lock_guard<boost::mutex> lock(mImpl->mPauseMutex);
- mImpl->mPaused = false;
- mImpl->mPauseCondition.notify_all();
-}
-
-/**
- * Stops this thread from executing.
- */
-void SimpleWorkQueue::terminate()
-{
- mImpl->mLogger(Info) << BOOST_CURRENT_FUNCTION << ": called for queue " << mImpl->mQid ;
-
- mImpl->mFinished = true;
- mImpl->mPaused = false;
-
- { // scope for the lock.
- boost::lock_guard<boost::mutex> lock(mImpl->mPauseMutex);
- mImpl->mPauseCondition.notify_all(); // In case the thread was waiting on the PauseCondition.
- }
-
- { // scope for the lock.
- boost::lock_guard<boost::mutex> lock(mImpl->mQueueMutex);
- mImpl->mEmptyQueueCondition.notify_all(); // In case the thread was waiting on an EmptyQueueCondition
- }
-}
-
-/**
- * A convenience method to determine if there is any pending work on the queue.
- */
-bool SimpleWorkQueue::workPending()
-{
- return !mImpl->mQueue.empty();
-}
-
-/**
- * Allows other thread to join to this thread. The caller needs to
- * call this object's Terminate method, or the join will block
- * indefinitely.
- */
-void SimpleWorkQueue::join()
-{
- mImpl->mThread.join();
-}
-
-static WorkQueue::PoolId noOpPoolId;
-
-/**
- * Enqueue an item of work for processing on this queue's thread.
- */
-WorkQueue::PoolId SimpleWorkQueue::enqueue(const WorkPtr& w)
-{
- bool wasEmpty(false);
-
- { // scope for the mutex.
- boost::lock_guard<boost::mutex> lock(mImpl->mQueueMutex);
- wasEmpty = mImpl->mQueue.empty();
- mImpl->mQueue.push_back(w);
-
- if (wasEmpty)
- {
- mImpl->mEmptyQueueCondition.notify_all();
- }
- }
-
- return noOpPoolId;
-}
-
-static shared_ptr<WorkQueue::Work> HiddenNoWorkPtr(new HiddenNoWorkClass());
-
-/**
- * This method returns the next work from the queue. If no work available,
- * this method waits on the EmptyQueueCondition.
- */
-WorkPtr SimpleWorkQueuePriv::waitAndDequeue()
-{
- boost::unique_lock<boost::mutex> lock(mQueueMutex);
-
- while (mQueue.empty())
- {
- mLogger(Debug) << BOOST_CURRENT_FUNCTION << ": Waiting on empty queue. Queue ID:" << mQid;
-
- if (mFinished)
- {
- mLogger(Info) << BOOST_CURRENT_FUNCTION << ": Returning the NO_WORK token. Queue ID:" << mQid;
- return HiddenNoWorkPtr;
- }
-
- mEmptyQueueCondition.wait(lock);
- }
-
- mLogger(Debug) << BOOST_CURRENT_FUNCTION << ": Dequeuing some work. Queue ID:" << mQid;
-
- shared_ptr<WorkQueue::Work> work = mQueue.front();
- mQueue.pop_front();
-
- return work;
-}
-
-bool SimpleWorkQueuePriv::isPaused()
-{
- boost::lock_guard<boost::mutex> lock(mPauseMutex);
- return mPaused;
-}
-
-/**
- * This is the thread's event loop. The thread terminates when this method returns.
- */
-void SimpleWorkQueuePriv::execute()
-{
- while (!mFinished)
- {
- { // scope the lock
- boost::unique_lock<boost::mutex> lock(mPauseMutex);
- while(mPaused)
- {
- mLogger(Debug) << BOOST_CURRENT_FUNCTION << ": Waiting while paused. Queue ID:" << mQid;
-
- mPauseCondition.wait(lock);
- }
-
- if (mFinished) // In case Terminate was called while in PauseCondition
- {
- break;
- }
- } // end lock scope
-
- mLogger(Debug) << BOOST_CURRENT_FUNCTION << ": Pinging the work queue. Queue ID:" << mQid;
-
- WorkPtr work = waitAndDequeue();
-
- mLogger(Debug) << BOOST_CURRENT_FUNCTION << ": Doing the work. Queue ID:" << mQid;
-
- try
- {
- work->doWork();
- }
- catch(const std::exception& e)
- {
- // Workers should be catching/managing their own exceptions!
- mLogger(Warning) << BOOST_CURRENT_FUNCTION << ": Work item threw exception for queue " << mQid;
- mLogger(Warning) << " Details: " << e.what();
- }
-
- } // while !mFinished
-
- mLogger(Debug) << BOOST_CURRENT_FUNCTION << ": Exiting the thread for good. Queue ID:" << mQid;
-}
-
commit 127ae4a06949d2bba30c7948ad499f9296272ef6
Author: Ken Hunt <ken.hunt at digium.com>
Date: Fri Jan 21 17:00:48 2011 -0600
Incorporating more review feedback.
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 2a28ed3..4bfc63e 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -3,4 +3,4 @@ if (integrated_build STREQUAL "true")
set(util_cpp_bindir ${CMAKE_CURRENT_BINARY_DIR} PARENT_SCOPE)
endif()
-add_subdirectory(WorkQueue)
+add_subdirectory(Threading)
diff --git a/Threading/CMakeLists.txt b/Threading/CMakeLists.txt
index d4f5412..6275408 100644
--- a/Threading/CMakeLists.txt
+++ b/Threading/CMakeLists.txt
@@ -6,15 +6,16 @@
# All rights reserved.
#
-asterisk_scf_component_init(WorkQueue CXX)
+asterisk_scf_component_init(Threading CXX)
include_directories(include)
-asterisk_scf_component_add_slice(WorkQueue LoggerIf)
-asterisk_scf_component_add_file(WorkQueue include/AsteriskSCF/WorkQueue/WorkQueue.h)
-asterisk_scf_component_add_file(WorkQueue include/AsteriskSCF/WorkQueue/SimpleWorkQueue.h)
-asterisk_scf_component_add_file(WorkQueue src/SimpleWorkQueue.cpp)
-asterisk_scf_component_add_boost_libraries(WorkQueue core thread)
+asterisk_scf_component_add_slice(Threading LoggerIf)
+asterisk_scf_component_add_file(Threading include/AsteriskSCF/Threading/WorkQueue.h)
+asterisk_scf_component_add_file(Threading include/AsteriskSCF/Threading/SimpleWorkQueue.h)
+asterisk_scf_component_add_file(Threading include/AsteriskSCF/Threading/PausibleWorkQueue.h)
+asterisk_scf_component_add_file(Threading src/SimpleWorkQueue.cpp)
+asterisk_scf_component_add_boost_libraries(Threading core thread)
if(NOT logger_dir)
message(FATAL_ERROR "The logger directory could not be found ${logger_dir}")
@@ -23,9 +24,9 @@ endif()
include_directories(${logger_dir}/common)
include_directories(${logger_dir}/client/src)
-asterisk_scf_component_build_library(WorkQueue)
+asterisk_scf_component_build_library(Threading)
-target_link_libraries(WorkQueue logging-client)
+target_link_libraries(Threading logging-client)
-asterisk_scf_component_install(WorkQueue LIBRARY lib "Work Queue" WorkQueue ARCHIVE DESTINATION lib)
+asterisk_scf_component_install(Threading LIBRARY lib "Threading" Threading ARCHIVE DESTINATION lib)
diff --git a/Threading/CMakeLists.txt~ b/Threading/CMakeLists.txt~
new file mode 100644
index 0000000..61c91ab
--- /dev/null
+++ b/Threading/CMakeLists.txt~
@@ -0,0 +1,32 @@
+#
+# Asterisk Scalable Communications Framework
+#
+# Copyright (C) 2010, 2011 -- Digium, Inc.
+#
+# All rights reserved.
+#
+
+asterisk_scf_component_init(Threading CXX)
+
+include_directories(include)
+
+asterisk_scf_component_add_slice(Threading LoggerIf)
+asterisk_scf_component_add_file(Threading include/AsteriskSCF/Threading/Threading.h)
+asterisk_scf_component_add_file(Threading include/AsteriskSCF/Threading/SimpleThreading.h)
+asterisk_scf_component_add_file(Threading include/AsteriskSCF/Threading/PausibleThreading.h)
+asterisk_scf_component_add_file(Threading src/SimpleThreading.cpp)
+asterisk_scf_component_add_boost_libraries(Threading core thread)
+
+if(NOT logger_dir)
+ message(FATAL_ERROR "The logger directory could not be found ${logger_dir}")
+endif()
+
+include_directories(${logger_dir}/common)
+include_directories(${logger_dir}/client/src)
+
+asterisk_scf_component_build_library(Threading)
+
+target_link_libraries(Threading logging-client)
+
+asterisk_scf_component_install(Threading LIBRARY lib "Threading" Threading ARCHIVE DESTINATION lib)
+
diff --git a/Threading/include/AsteriskSCF/Threading/SimpleWorkQueue.h b/Threading/include/AsteriskSCF/Threading/SimpleWorkQueue.h
index d597271..61ace49 100644
--- a/Threading/include/AsteriskSCF/Threading/SimpleWorkQueue.h
+++ b/Threading/include/AsteriskSCF/Threading/SimpleWorkQueue.h
@@ -1,7 +1,7 @@
/*
* Asterisk SCF -- An open-source communications framework.
*
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2011, Digium, Inc.
*
* See http://www.asterisk.org for more information about
* the Asterisk SCF project. Please do not directly contact
@@ -15,42 +15,48 @@
*/
#pragma once
-#include <list>
#include <string>
#include <boost/thread.hpp>
#include <boost/thread/condition.hpp>
#include <boost/shared_ptr.hpp>
#include "WorkQueue.h"
+#include "PausibleWorkQueue.h"
#include "logger.h"
namespace AsteriskSCF
{
+namespace Threading
+{
+
/**
* A simple Work Queue implementation.
* See WorkQueue for more information.
*/
class SimpleWorkQueuePriv;
-class ASTERISK_SCF_ICEBOX_EXPORT SimpleWorkQueue : public WorkQueue, boost::noncopyable
+class ASTERISK_SCF_ICEBOX_EXPORT SimpleWorkQueue : public WorkQueue, public PausibleWorkQueue, boost::noncopyable
{
public:
SimpleWorkQueue(const std::string& id, const AsteriskSCF::System::Logging::Logger& logger);
~SimpleWorkQueue();
- virtual WorkQueue::PoolId enqueue(const WorkPtr& w);
- virtual void terminate();
- virtual void join();
+ // Overrides for the WorkQueue interface.
+ virtual boost::shared_ptr<WorkQueue::PoolId> enqueue(const WorkPtr& w);
+ virtual void enqueue(const WorkPtr& w, PoolId)
+ {
+ enqueue(w);
+ }
- // This implementation adds the concept of Pausing to the generic WorkQueue.
- bool isRunning();
- bool workPending();
- void pause();
- void resume();
+ // Overrides for the PausibleWorkQueue mixin interface.
+ virtual bool isRunning();
+ virtual bool workPending();
+ virtual void pause();
+ virtual void resume();
-private:
- boost::shared_ptr<SimpleWorkQueuePriv> mImpl;
+private: boost::shared_ptr<SimpleWorkQueuePriv> mImpl;
};
-};
+} // end namespace Threading
+} // end namespace AsteriskSCF
diff --git a/Threading/include/AsteriskSCF/Threading/WorkQueue.h b/Threading/include/AsteriskSCF/Threading/WorkQueue.h
index 5b306cc..65c7f88 100644
--- a/Threading/include/AsteriskSCF/Threading/WorkQueue.h
+++ b/Threading/include/AsteriskSCF/Threading/WorkQueue.h
@@ -1,7 +1,7 @@
/*
* Asterisk SCF -- An open-source communications framework.
*
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2011, Digium, Inc.
*
* See http://www.asterisk.org for more information about
* the Asterisk SCF project. Please do not directly contact
@@ -19,6 +19,9 @@
namespace AsteriskSCF
{
+namespace Threading
+{
+
/**
* This class defines an interface to a work queue. A work queue manages one or
* more processing threads, and allows work to be enqueued.
@@ -52,35 +55,22 @@ public:
/**
* Enqueue work to be peformed.
*/
- virtual PoolId enqueue(const WorkPtr& w) = 0;
+ virtual boost::shared_ptr<PoolId> enqueue(const WorkPtr& w) = 0;
/**
* Enqueue work to be performed, and specify a particular thread
- * of a Thread Pool to do the work.
- * Note: Implementations of Thread Pools are expected to override this
- * default implementation.
- */
- virtual void enqueue(const WorkPtr& w, PoolId)
- {
- enqueue(w);
- }
-
- /**
- * Terminate the work queue.
+ * of a Thread Pool to do the work. Note: For single threaded work queues,
+ * the PoolId can be ignored.
*/
- virtual void terminate() = 0;
-
- /**
- * Block until processing on all worker threads completes.
- */
- virtual void join() = 0;
+ virtual void enqueue(const WorkPtr& w, PoolId) = 0;
virtual ~WorkQueue() {};
protected:
- WorkQueue() {}; // Hide the constructor since this is an interface.
+ WorkQueue() {}; // Hide the constructor since this is an interface.
... 1443 lines suppressed ...
--
asterisk-scf/release/util-cpp.git
More information about the asterisk-scf-commits
mailing list