[asterisk-scf-commits] asterisk-scf/integration/ice-util-cpp.git branch "workqueue" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Thu Mar 17 17:14:00 CDT 2011


branch "workqueue" has been updated
       via  fc35d57a2ee51e0ef9b679c4d4f7baeae2a031af (commit)
       via  cae3cdb44f21eeec4c6c004001afa1a09425a1f8 (commit)
       via  b63d7360b8cf70353de0dd80bb71506ea5883062 (commit)
       via  804fed670f876a3d52b8684a97da05bec3a64a29 (commit)
      from  f91844c5783119bb2e40bbe0eb2e7c4639915e5d (commit)

Summary of changes:
 .../{WorkQueue.h => SuspendableWorkQueue.h}        |   17 +-
 WorkQueue/src/CMakeLists.txt                       |    3 +
 WorkQueue/src/SuspendableWorkQueue.cpp             |  335 ++++++++++++++++++++
 WorkQueue/src/WorkQueue.cpp                        |    4 -
 WorkQueue/test/CMakeLists.txt                      |    9 +
 ...tWorkQueue.cpp => TestSuspendableWorkQueue.cpp} |  183 ++++++++---
 WorkQueue/test/{test.cpp => test2.cpp}             |    2 +-
 7 files changed, 494 insertions(+), 59 deletions(-)
 copy WorkQueue/include/AsteriskSCF/{WorkQueue.h => SuspendableWorkQueue.h} (65%)
 create mode 100644 WorkQueue/src/SuspendableWorkQueue.cpp
 copy WorkQueue/test/{TestWorkQueue.cpp => TestSuspendableWorkQueue.cpp} (61%)
 copy WorkQueue/test/{test.cpp => test2.cpp} (92%)


- Log -----------------------------------------------------------------
commit fc35d57a2ee51e0ef9b679c4d4f7baeae2a031af
Author: Mark Michelson <mmichelson at digium.com>
Date:   Thu Mar 17 17:12:50 2011 -0500

    Add some test code.
    
    This has all the same tests from WorkQueue and defines some
    further tasks that can be used to test task suspension. Currently
    test code compiles but tests do not pass. I will have to investigate
    this later.

diff --git a/WorkQueue/include/AsteriskSCF/SuspendableWorkQueue.h b/WorkQueue/include/AsteriskSCF/SuspendableWorkQueue.h
index 36f7afa..dc65f34 100644
--- a/WorkQueue/include/AsteriskSCF/SuspendableWorkQueue.h
+++ b/WorkQueue/include/AsteriskSCF/SuspendableWorkQueue.h
@@ -27,7 +27,7 @@ namespace WorkQueue
 
 class SuspendableWorkQueueImpl;
 
-class SuspendableWorkQueue : public AsteriskSCF::System::WorkQueue::V1::Queue
+class SuspendableWorkQueue : public AsteriskSCF::System::WorkQueue::V1::SuspendableQueue
 {
 public:
     SuspendableWorkQueue();
diff --git a/WorkQueue/test/CMakeLists.txt b/WorkQueue/test/CMakeLists.txt
index 5a9c74d..d4adb38 100644
--- a/WorkQueue/test/CMakeLists.txt
+++ b/WorkQueue/test/CMakeLists.txt
@@ -20,4 +20,13 @@ asterisk_scf_component_build_standalone(WorkQueueTest)
 target_link_libraries(WorkQueueTest asterisk-scf-api)
 target_link_libraries(WorkQueueTest WorkQueue)
 
+asterisk_scf_component_add_file(SuspendableWorkQueueTest TestSuspendableWorkQueue.cpp)
+asterisk_scf_component_add_file(SuspendableWorkQueueTest test2.cpp)
+asterisk_scf_component_add_boost_libraries(SuspendableWorkQueueTest unit_test_framework thread)
+
+asterisk_scf_component_build_standalone(SuspendableWorkQueueTest)
+target_link_libraries(SuspendableWorkQueueTest asterisk-scf-api)
+target_link_libraries(SuspendableWorkQueueTest WorkQueue)
+
 boost_add_test(WorkQueueTest)
+boost_add_test(SuspendableWorkQueueTest)
diff --git a/WorkQueue/test/TestSuspendableWorkQueue.cpp b/WorkQueue/test/TestSuspendableWorkQueue.cpp
new file mode 100644
index 0000000..4afe13f
--- /dev/null
+++ b/WorkQueue/test/TestSuspendableWorkQueue.cpp
@@ -0,0 +1,460 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#include <boost/test/unit_test.hpp>
+#include <boost/thread.hpp>
+
+#include <AsteriskSCF/SuspendableWorkQueue.h>
+
+using namespace AsteriskSCF::System::WorkQueue::V1;
+using namespace AsteriskSCF::WorkQueue;
+
+class TestListener : public QueueListener
+{
+public:
+    TestListener()
+        : addedNotice(false),
+        addedEmptyNotice(false),
+        emptyNotice(false),
+        resumableNotice(false) { }
+
+    void workAdded(bool wasEmpty)
+    {
+        addedNotice = true;
+        addedEmptyNotice = wasEmpty;
+    }
+
+    void emptied()
+    {
+        emptyNotice = true;
+    }
+
+    void workResumable()
+    {
+        resumableNotice = true;
+    }
+
+    bool addedNotice;
+    bool addedEmptyNotice;
+    bool emptyNotice;
+    bool resumableNotice;
+};
+
+typedef IceUtil::Handle<TestListener> TestListenerPtr;
+
+class SimpleTask : public SuspendableWork
+{
+public:
+    SimpleTask() : taskExecuted(false) { }
+    SuspendableWorkResult execute(const SuspendableWorkListenerPtr &listener)
+    {
+        taskExecuted = true;
+        return Complete;
+    }
+    bool taskExecuted;
+};
+
+typedef IceUtil::Handle<SimpleTask> SimpleTaskPtr;
+
+static boost::condition_variable globalCond;
+static boost::mutex globalLock;
+static bool globalGoOn;
+
+class ComplexTask : public SuspendableWork
+{
+public:
+    ComplexTask() : currentState(Initial), mThread(boost::bind(&ComplexTask::thread, this)) { }
+
+    ~ComplexTask()
+    {
+        boost::unique_lock<boost::mutex> lock(globalLock);
+        globalGoOn = true;
+        globalCond.notify_one();
+        lock.unlock();
+        mThread.join();
+    }
+
+    SuspendableWorkResult execute(const SuspendableWorkListenerPtr &listener)
+    {
+        mListener = listener;
+        switch (currentState)
+        {
+        case Initial:
+            currentState = Task1Complete;
+            return Suspended;
+        case Task1Complete:
+            currentState = Task2Complete;
+            return Complete;
+        case Task2Complete:
+        default:
+            BOOST_FAIL("ComplexTask executed in bad state");
+        }
+    }
+
+    void thread()
+    {
+        boost::unique_lock<boost::mutex> lock(globalLock);
+        while (!globalGoOn)
+        {
+            globalCond.wait(lock);
+        }
+        lock.unlock();
+        mListener->workResumable();
+    }
+    enum State
+    {
+        Initial,
+        Task1Complete,
+        Task2Complete
+    } currentState;
+
+    boost::thread mThread;
+    SuspendableWorkListenerPtr mListener;
+};
+
+class RacyTask : public SuspendableWork
+{
+public:
+    RacyTask() : currentState(Initial) { }
+    SuspendableWorkResult execute(const SuspendableWorkListenerPtr &listener)
+    {
+        switch (currentState)
+        {
+        case Initial:
+            currentState = Task1Complete;
+            // We purposely tell the listener that work can be
+            // resumed before returning Suspended to simulate
+            // a race condition.
+            listener->workResumable();
+            return Suspended;
+        case Task1Complete:
+            currentState = Task2Complete;
+            return Complete;
+        case Task2Complete:
+        default:
+            BOOST_FAIL("RacyTask executed in bad state");
+        }
+    }
+    enum State
+    {
+        Initial,
+        Task1Complete,
+        Task2Complete
+    } currentState;
+};
+
+BOOST_AUTO_TEST_SUITE(SuspendableWorkQueueTest)
+
+BOOST_AUTO_TEST_CASE(addWork)
+{
+    TestListenerPtr listener(new TestListener);
+    SuspendableQueuePtr queue(new SuspendableWorkQueue(listener));
+    SimpleTaskPtr work(new SimpleTask);
+
+    bool excepted = false;
+    try
+    {
+        queue->enqueueWork(work);
+    }
+    catch (const WorkExists &)
+    {
+        excepted = true;
+    }
+
+    BOOST_CHECK(excepted == false);
+    BOOST_CHECK(listener->addedNotice == true);
+    BOOST_CHECK(listener->addedEmptyNotice == true);
+    BOOST_CHECK(queue->workCount() == 1);
+}
+
+BOOST_AUTO_TEST_CASE(addWorkSeq)
+{
+    TestListenerPtr listener(new TestListener);
+    SuspendableQueuePtr queue(new SuspendableWorkQueue(listener));
+    SimpleTaskPtr work1(new SimpleTask);
+    SimpleTaskPtr work2(new SimpleTask);
+    SuspendableWorkSeq works;
+    works.push_back(work1);
+    works.push_back(work2);
+
+    bool excepted = false;
+    try
+    {
+        queue->enqueueWorkSeq(works);
+    }
+    catch (const WorkExists &)
+    {
+        excepted = true;
+    }
+
+    BOOST_CHECK(excepted == false);
+    BOOST_CHECK(listener->addedNotice == true);
+    BOOST_CHECK(listener->addedEmptyNotice == true);
+    BOOST_CHECK(listener->emptyNotice == false);
+    BOOST_CHECK(queue->workCount() == 2);
+}
+
+BOOST_AUTO_TEST_CASE(appendWork)
+{
+    TestListenerPtr listener(new TestListener);
+    SuspendableQueuePtr queue(new SuspendableWorkQueue(listener));
+    SimpleTaskPtr work1(new SimpleTask);
+    SimpleTaskPtr work2(new SimpleTask);
+
+    queue->enqueueWork(work1);
+    listener->addedNotice = false;
+
+    bool excepted = false;
+    try
+    {
+        queue->enqueueWork(work2);
+    }
+    catch (const WorkExists &)
+    {
+        excepted = true;
+    }
+
+    BOOST_CHECK(excepted == false);
+    BOOST_CHECK(listener->addedNotice == true);
+    BOOST_CHECK(listener->addedEmptyNotice == false);
+    BOOST_CHECK(listener->emptyNotice == false);
+    BOOST_CHECK(queue->workCount() == 2);
+}
+
+BOOST_AUTO_TEST_CASE(addExistentWork)
+{
+    TestListenerPtr listener(new TestListener);
+    SuspendableQueuePtr queue(new SuspendableWorkQueue(listener));
+    SimpleTaskPtr work(new SimpleTask);
+
+    queue->enqueueWork(work);
+
+    bool excepted = false;
+    try
+    {
+        queue->enqueueWork(work);
+    }
+    catch(const WorkExists&)
+    {
+        excepted = true;
+    }
+
+    BOOST_CHECK(excepted == true);
+    BOOST_CHECK(queue->workCount() == 1);
+}
+
+BOOST_AUTO_TEST_CASE(addExistentWorkSeq)
+{
+    TestListenerPtr listener(new TestListener);
+    SuspendableQueuePtr queue(new SuspendableWorkQueue(listener));
+    SimpleTaskPtr work1(new SimpleTask);
+    SimpleTaskPtr work2(new SimpleTask);
+    SimpleTaskPtr work3(new SimpleTask);
+    SuspendableWorkSeq works1;
+    SuspendableWorkSeq works2;
+    works1.push_back(work1);
+    works1.push_back(work2);
+    works2.push_back(work3);
+    works2.push_back(work2);
+    
+    queue->enqueueWorkSeq(works1);
+
+    bool excepted = false;
+    try
+    {
+        queue->enqueueWorkSeq(works2);
+    }
+    catch (const WorkExists &)
+    {
+        excepted = true;
+    }
+
+    BOOST_CHECK(excepted == true);
+    BOOST_CHECK(listener->addedNotice == true);
+    BOOST_CHECK(listener->addedEmptyNotice == true);
+    BOOST_CHECK(listener->emptyNotice == false);
+    BOOST_CHECK(queue->workCount() == 2);
+}
+
+BOOST_AUTO_TEST_CASE(cancelWork)
+{
+    TestListenerPtr listener(new TestListener);
+    SuspendableQueuePtr queue(new SuspendableWorkQueue(listener));
+    SimpleTaskPtr work1(new SimpleTask);
+
+    queue->enqueueWork(work1);
+
+    bool excepted = false;
+    try
+    {
+        queue->cancelWork(work1);
+    }
+    catch (const WorkNotFound &)
+    {
+        excepted = true;
+    }
+
+    BOOST_CHECK(excepted == false);
+    BOOST_CHECK(listener->emptyNotice == true);
+    BOOST_CHECK(queue->workCount() == 0);
+}
+
+BOOST_AUTO_TEST_CASE(cancelNonExistent1)
+{
+    TestListenerPtr listener(new TestListener);
+    SuspendableQueuePtr queue(new SuspendableWorkQueue(listener));
+    SimpleTaskPtr work(new SimpleTask);
+
+    bool excepted = false;
+    try
+    {
+        queue->cancelWork(work);
+    }
+    catch (const WorkNotFound &)
+    {
+        excepted = true;
+    }
+
+    BOOST_CHECK(excepted == true);
+    BOOST_CHECK(listener->addedNotice == false);
+    BOOST_CHECK(listener->addedEmptyNotice == false);
+    BOOST_CHECK(listener->emptyNotice == false);
+    BOOST_CHECK(queue->workCount() == 0);
+}
+
+BOOST_AUTO_TEST_CASE(cancelNonExistent2)
+{
+    TestListenerPtr listener(new TestListener);
+    SuspendableQueuePtr queue(new SuspendableWorkQueue(listener));
+    SimpleTaskPtr work1(new SimpleTask);
+    SimpleTaskPtr work2(new SimpleTask);
+
+    queue->enqueueWork(work1);
+
+    bool excepted = false;
+    try
+    {
+        queue->cancelWork(work2);
+    }
+    catch (const WorkNotFound &wnf)
+    {
+        excepted = true;
+    }
+
+    BOOST_CHECK(excepted == true);
+    BOOST_CHECK(queue->workCount() == 1);
+}
+
+BOOST_AUTO_TEST_CASE(simpleWorkExecution)
+{
+    TestListenerPtr listener(new TestListener);
+    SuspendableQueuePtr queue(new SuspendableWorkQueue(listener));
+    SimpleTaskPtr work(new SimpleTask);
+
+    queue->enqueueWork(work);
+    bool moreWork = queue->executeWork();
+
+    BOOST_CHECK(moreWork == false);
+    BOOST_CHECK(work->taskExecuted == true);
+    BOOST_CHECK(listener->emptyNotice == true);
+    BOOST_CHECK(queue->workCount() == 0);
+}
+
+BOOST_AUTO_TEST_CASE(executeNonExistent)
+{
+    TestListenerPtr listener(new TestListener);
+    SuspendableQueuePtr queue(new SuspendableWorkQueue(listener));
+
+    bool moreWork = queue->executeWork();
+
+    BOOST_CHECK(moreWork == false);
+    BOOST_CHECK(queue->workCount() == 0);
+    BOOST_CHECK(listener->emptyNotice == false);
+}
+
+BOOST_AUTO_TEST_CASE(executionOrder1)
+{
+    TestListenerPtr listener(new TestListener);
+    SuspendableQueuePtr queue(new SuspendableWorkQueue(listener));
+    SimpleTaskPtr work1(new SimpleTask);
+    SimpleTaskPtr work2(new SimpleTask);
+
+    queue->enqueueWork(work1);
+    queue->enqueueWork(work2);
+    
+    bool moreWork = queue->executeWork();
+
+    BOOST_CHECK(moreWork == true);
+    BOOST_CHECK(work1->taskExecuted == true);
+    BOOST_CHECK(work2->taskExecuted == false);
+    
+    moreWork = queue->executeWork();
+
+    BOOST_CHECK(moreWork == false);
+    BOOST_CHECK(work2->taskExecuted == true);
+    BOOST_CHECK(listener->emptyNotice == true);
+    BOOST_CHECK(queue->workCount() == 0);
+}
+
+BOOST_AUTO_TEST_CASE(executionOrder2)
+{
+    TestListenerPtr listener(new TestListener);
+    SuspendableQueuePtr queue(new SuspendableWorkQueue(listener));
+    SimpleTaskPtr work1(new SimpleTask);
+    SimpleTaskPtr work2(new SimpleTask);
+    SimpleTaskPtr work3(new SimpleTask);
+    SimpleTaskPtr work4(new SimpleTask);
+    SuspendableWorkSeq works1;
+    SuspendableWorkSeq works2;
+
+    works1.push_back(work1);
+    works1.push_back(work2);
+    works2.push_back(work3);
+    works2.push_back(work4);
+
+    queue->enqueueWorkSeq(works1);
+    queue->enqueueWorkSeq(works2);
+
+    bool moreWork = queue->executeWork();
+
+    BOOST_CHECK(moreWork == true);
+    BOOST_CHECK(work1->taskExecuted == true);
+    BOOST_CHECK(work2->taskExecuted == false);
+    BOOST_CHECK(work3->taskExecuted == false);
+    BOOST_CHECK(work4->taskExecuted == false);
+
+    moreWork = queue->executeWork();
+
+    BOOST_CHECK(moreWork == true);
+    BOOST_CHECK(work2->taskExecuted == true);
+    BOOST_CHECK(work3->taskExecuted == false);
+    BOOST_CHECK(work4->taskExecuted == false);
+
+    moreWork = queue->executeWork();
+
+    BOOST_CHECK(moreWork == true);
+    BOOST_CHECK(work3->taskExecuted == true);
+    BOOST_CHECK(work4->taskExecuted == false);
+
+    moreWork = queue->executeWork();
+
+    BOOST_CHECK(moreWork == false);
+    BOOST_CHECK(work4->taskExecuted == true);
+    BOOST_CHECK(listener->emptyNotice == true);
+    BOOST_CHECK(queue->workCount() == 0);
+}
+
+BOOST_AUTO_TEST_SUITE_END()
diff --git a/WorkQueue/test/test2.cpp b/WorkQueue/test/test2.cpp
new file mode 100644
index 0000000..81310b6
--- /dev/null
+++ b/WorkQueue/test/test2.cpp
@@ -0,0 +1,18 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#define BOOST_TEST_MODULE SuspendableWorkQueue
+#include <boost/test/unit_test.hpp>

commit cae3cdb44f21eeec4c6c004001afa1a09425a1f8
Author: Mark Michelson <mmichelson at digium.com>
Date:   Thu Mar 17 15:33:01 2011 -0500

    Clarify some language and fix some bugs.

diff --git a/WorkQueue/src/SuspendableWorkQueue.cpp b/WorkQueue/src/SuspendableWorkQueue.cpp
index 619bd1e..39a1ab0 100644
--- a/WorkQueue/src/SuspendableWorkQueue.cpp
+++ b/WorkQueue/src/SuspendableWorkQueue.cpp
@@ -159,9 +159,11 @@ public:
         // that suspended work completed and this listener was notified
         // before the queue could move past the step of executing
         // work and on to the step of marking the work as Suspended.
+        //
         // In this case, we don't need to notify the queue's listener
-        // that work is resumable, because the queue already knows
-        // this and will return appropriately.
+        // that work is resumable, because no entity outside the
+        // queue itself knows the work ever tried to enter a
+        // suspended state.
         if (previousState == SuspendableWorkQueueImpl::Suspended)
         {
             mImpl->mListener->workResumable();
@@ -288,26 +290,26 @@ bool SuspendableWorkQueue::executeWork()
 
     if (result == AsteriskSCF::System::WorkQueue::V1::Suspended)
     {
-        /**
-         * This should be incredibly rare. It may be that
-         * before we execute this section of code, the suspended
-         * work could have completed and set the state to 
-         * Resumable. If this is the case, then there is no
-         * reason to set the state to Suspended because work
-         * may continue as normal.
-         */
-        if (mImpl->state == SuspendableWorkQueueImpl::Executing)
+        // It's possible that a piece of work called out to an
+        // asynchronous method and needed to be suspended. Before
+        // the work's execute() method could return a "Suspended"
+        // result, the asynchronous method completed and notified
+        // the work listener that the work could be resumed.
+        //
+        // If the state has transitioned to Resumable, then we
+        // can forego setting the state to Suspended and instead
+        // treat the situation as if the work's execution had
+        // actually returned "Complete"
+        if (mImpl->state != SuspendableWorkQueueImpl::Resumable)
         {
             mImpl->state = SuspendableWorkQueueImpl::Suspended;
             mImpl->currentWork = work;
             return false;
         }
     }
-    else
-    {
-        mImpl->state = SuspendableWorkQueueImpl::Ready;
-        mImpl->currentWork = 0;
-    }
+    
+    mImpl->state = SuspendableWorkQueueImpl::Ready;
+    mImpl->currentWork = 0;
     
     if (mImpl->q.empty())
     {
@@ -321,7 +323,7 @@ bool SuspendableWorkQueue::executeWork()
 int SuspendableWorkQueue::workCount()
 {
     boost::unique_lock<boost::mutex> lock(mImpl->mLock);
-    return mImpl->q.size();
+    return mImpl->q.size() + mImpl->currentWork ? 1 : 0;
 }
 
 void SuspendableWorkQueue::setListener(const QueueListenerPtr &listener)

commit b63d7360b8cf70353de0dd80bb71506ea5883062
Author: Mark Michelson <mmichelson at digium.com>
Date:   Thu Mar 17 14:59:58 2011 -0500

    Add initial implementation of SuspendableWorkQueue.

diff --git a/WorkQueue/include/AsteriskSCF/SuspendableWorkQueue.h b/WorkQueue/include/AsteriskSCF/SuspendableWorkQueue.h
new file mode 100644
index 0000000..36f7afa
--- /dev/null
+++ b/WorkQueue/include/AsteriskSCF/SuspendableWorkQueue.h
@@ -0,0 +1,46 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#pragma once
+
+#include <boost/shared_ptr.hpp>
+
+#include <AsteriskSCF/System/WorkQueue/WorkQueueIf.h>
+
+namespace AsteriskSCF
+{
+namespace WorkQueue
+{
+
+class SuspendableWorkQueueImpl;
+
+class SuspendableWorkQueue : public AsteriskSCF::System::WorkQueue::V1::Queue
+{
+public:
+    SuspendableWorkQueue();
+    SuspendableWorkQueue(const AsteriskSCF::System::WorkQueue::V1::QueueListenerPtr &listener);
+    void enqueueWork(const AsteriskSCF::System::WorkQueue::V1::SuspendableWorkPtr &work);
+    void enqueueWorkSeq(const AsteriskSCF::System::WorkQueue::V1::SuspendableWorkSeq &works);
+    void cancelWork(const AsteriskSCF::System::WorkQueue::V1::SuspendableWorkPtr &work);
+    bool executeWork();
+    int workCount();
+    void setListener(const AsteriskSCF::System::WorkQueue::V1::QueueListenerPtr &listener);
+private:
+    boost::shared_ptr<SuspendableWorkQueueImpl> mImpl;
+};
+
+}; //end namespace WorkQueue
+}; //end namespace AsteriskSCF
diff --git a/WorkQueue/src/CMakeLists.txt b/WorkQueue/src/CMakeLists.txt
index 48a35e7..c570188 100644
--- a/WorkQueue/src/CMakeLists.txt
+++ b/WorkQueue/src/CMakeLists.txt
@@ -15,7 +15,10 @@ include_directories(${API_INCLUDE_DIR})
 
 asterisk_scf_component_add_file(WorkQueue
     ../include/AsteriskSCF/WorkQueue.h)
+asterisk_scf_component_add_file(WorkQueue
+    ../include/AsteriskSCF/SuspendableWorkQueue.h)
 asterisk_scf_component_add_file(WorkQueue WorkQueue.cpp)
+asterisk_scf_component_add_file(WorkQueue SuspendableWorkQueue.cpp)
 asterisk_scf_component_add_boost_libraries(WorkQueue thread)
 
 asterisk_scf_component_build_library(WorkQueue)
diff --git a/WorkQueue/src/SuspendableWorkQueue.cpp b/WorkQueue/src/SuspendableWorkQueue.cpp
new file mode 100644
index 0000000..619bd1e
--- /dev/null
+++ b/WorkQueue/src/SuspendableWorkQueue.cpp
@@ -0,0 +1,333 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#include <deque>
+#include <boost/thread/shared_mutex.hpp>
+
+#include <AsteriskSCF/SuspendableWorkQueue.h>
+
+namespace AsteriskSCF
+{
+namespace WorkQueue
+{
+
+using namespace AsteriskSCF::System::WorkQueue::V1;
+
+class SuspendableWorkQueueImpl
+{
+public:
+    SuspendableWorkQueueImpl()
+        : mListener(0) { }
+    SuspendableWorkQueueImpl(const QueueListenerPtr &listener)
+        : mListener(listener) { }
+
+    QueueListenerPtr mListener;
+    boost::mutex mLock;
+    std::deque<SuspendableWorkPtr> q;
+    enum WorkState
+    {
+        /**
+         * There currently is no work being executed and
+         * no work is suspended.
+         *
+         * If asked to execute work while in this state,
+         * the queue will pop the front item from the
+         * queue and execute it.
+         *
+         * Transitions to this state:
+         * * Executing: When a work item's execute method
+         *   returns Complete
+         * * Resumable: When a work item's execute method
+         *   returns Suspended.
+         *
+         * Transitions from this state:
+         * * Executing: When a work item is executed
+         *
+         * This state implies that currentWork == 0
+         */
+        Ready,
+
+        /**
+         * A piece of work is currently being executed.
+         *
+         * If asked to execute work while in this state,
+         * the queue will refuse to do so.
+         *
+         * Transitions to this state:
+         * * Ready: When a work item is executed
+         * * Resumable: When a previously suspended
+         *   work item is executed.
+         * 
+         * Transitions from this state:
+         * * Ready: When a work item's execute method
+         *   returns Complete
+         * * Suspended: When a work item's execute method
+         *   returns Suspended
+         * * Resumable: When an executing work item's
+         *   work listener is notified that suspended
+         *   work may be resumed before the execution
+         *   method has had a chance to be notified the
+         *   work was ever suspended. This should be
+         *   very rare.
+         *
+         * This state implies that currentWork != 0
+         */
+        Executing,
+
+        /**
+         * An executed item of work has been suspended.
+         *
+         * If asked to execute work while in this state,
+         * the queue will refuse to do so.
+         *
+         * Transitions to this state:
+         * * Executing: When a work item's execute method
+         *   returns Suspended
+         *
+         * Transitions from this state:
+         * * Resumable: When a suspended work item notifies
+         *   its listener that work may be resumed.
+         *
+         * This state implies that currentWork != 0
+         */
+        Suspended,
+
+        /**
+         * A suspended item of work has notified a listener
+         * that work may be resumed.
+         *
+         * If asked to execute work while in this state,
+         * the queue will call currentWork's execute
+         * method.
+         *
+         * Transitions to this state:
+         * * Executing: When an executing work item's
+         *   work listener is notified that suspended
+         *   work may be resumed before the execution
+         *   method has had a chance to be notified
+         *   the work was ever suspended. This should
+         *   be very rare.
+         * * Suspended: When a suspended work item notifies
+         *   its listener that work may be resumed.
+         *
+         * Transitions from this state:
+         * * Executing: When a previously suspended work
+         *   item is executed.
+         *
+         * This state implies that currentWork != 0
+         */
+        Resumable
+    } state;
+
+    /**
+     * When an item needs to be executed, it is popped
+     * off the queue and executed. If the item is
+     * suspended, then we keep a reference to the suspended
+     * work item here so we can attempt execution again
+     * when asked.
+     */
+    SuspendableWorkPtr currentWork;
+};
+
+class WorkListener : public SuspendableWorkListener
+{
+public:
+    WorkListener(boost::shared_ptr<SuspendableWorkQueueImpl> impl)
+        : mImpl(impl) { }
+    void workResumable()
+    {
+        boost::unique_lock<boost::mutex> lock(mImpl->mLock);
+        SuspendableWorkQueueImpl::WorkState previousState = mImpl->state;
+        mImpl->state = SuspendableWorkQueueImpl::Resumable;
+        lock.unlock();
+
+        // If the workResumable notification is received and the
+        // state of things was not suspended, then that must mean
+        // that suspended work completed and this listener was notified
+        // before the queue could move past the step of executing
+        // work and on to the step of marking the work as Suspended.
+        // In this case, we don't need to notify the queue's listener
+        // that work is resumable, because the queue already knows
+        // this and will return appropriately.
+        if (previousState == SuspendableWorkQueueImpl::Suspended)
+        {
+            mImpl->mListener->workResumable();
+        }
+    }
+
+    boost::shared_ptr<SuspendableWorkQueueImpl> mImpl;
+};
+
+typedef IceUtil::Handle<WorkListener> WorkListenerPtr;
+
+SuspendableWorkQueue::SuspendableWorkQueue()
+    : mImpl(new SuspendableWorkQueueImpl) { }
+
+SuspendableWorkQueue::SuspendableWorkQueue(const QueueListenerPtr &listener)
+    : mImpl(new SuspendableWorkQueueImpl(listener)) { }
+
+void SuspendableWorkQueue::enqueueWork(const SuspendableWorkPtr &work)
+{
+    boost::unique_lock<boost::mutex> lock(mImpl->mLock);
+
+    if (work == mImpl->currentWork)
+    {
+        throw WorkExists();
+    }
+    
+    if (std::find(mImpl->q.begin(), mImpl->q.end(), work) != mImpl->q.end())
+    {
+        //XXX When slice2cpp bug is cleared, we'll want
+        //to actually include the duplicated work item in
+        //the exception
+        throw WorkExists();
+    }
+
+    bool wasEmpty = mImpl->q.empty();
+    mImpl->q.push_back(work);
+    lock.unlock();
+    mImpl->mListener->workAdded(wasEmpty);
+}
+
+void SuspendableWorkQueue::enqueueWorkSeq(const SuspendableWorkSeq &works)
+{
+    boost::unique_lock<boost::mutex> lock(mImpl->mLock);
+    
+    for (SuspendableWorkSeq::const_iterator iter = works.begin(); iter != works.end(); ++iter)
+    {
+        if (*iter == mImpl->currentWork)
+        {
+            throw WorkExists();
+        }
+        if (std::find(mImpl->q.begin(), mImpl->q.end(), *iter) != mImpl->q.end())
+        {
+            //XXX When slice2cpp bug is cleared, we'll need
+            //not to throw the exception immediately. Instead,
+            //we'll keep a running list of duplicated items so
+            //we can throw an exception with all duplicated items.
+            throw WorkExists();
+        }
+    }
+
+    bool wasEmpty = mImpl->q.empty();
+    mImpl->q.insert(mImpl->q.end(), works.begin(), works.end());
+    lock.unlock();
+    mImpl->mListener->workAdded(wasEmpty);
+}
+
+void SuspendableWorkQueue::cancelWork(const SuspendableWorkPtr &work)
+{
+    boost::unique_lock<boost::mutex> lock(mImpl->mLock);
+
+    if (work == mImpl->currentWork)
+    {
+        throw WorkNotFound();
+    }
+
+    std::deque<SuspendableWorkPtr>::iterator i = std::find(mImpl->q.begin(), mImpl->q.end(), work);
+    if (i == mImpl->q.end())
+    {
+        throw WorkNotFound();
+    }
+
+    mImpl->q.erase(i);
+
+    if (mImpl->q.empty())
+    {
+        lock.unlock();
+        mImpl->mListener->emptied();
+    }
+}
+
+bool SuspendableWorkQueue::executeWork()
+{
+    boost::unique_lock<boost::mutex> lock(mImpl->mLock);
+
+    if (mImpl->state != SuspendableWorkQueueImpl::Ready
+            && mImpl->state != SuspendableWorkQueueImpl::Resumable)
+    {
+        return false;
+    }
+
+    SuspendableWorkPtr work;
+    if (mImpl->state == SuspendableWorkQueueImpl::Ready)
+    {
+        if (mImpl->q.empty())
+        {
+            return false;
+        }
+        work = mImpl->q.front();
+        mImpl->q.pop_front();
+    }
+    else //state is Resumable
+    {
+        assert(mImpl->currentWork != 0);
+        work = mImpl->currentWork;
+    }
+
+    mImpl->state = SuspendableWorkQueueImpl::Executing;
+    lock.unlock();
+
+    WorkListenerPtr workListener(new WorkListener(mImpl));
+    SuspendableWorkResult result = work->execute(workListener);
+
+    lock.lock();
+
+    if (result == AsteriskSCF::System::WorkQueue::V1::Suspended)
+    {
+        /**
+         * This should be incredibly rare. It may be that
+         * before we execute this section of code, the suspended
+         * work could have completed and set the state to 
+         * Resumable. If this is the case, then there is no
+         * reason to set the state to Suspended because work
+         * may continue as normal.
+         */
+        if (mImpl->state == SuspendableWorkQueueImpl::Executing)
+        {
+            mImpl->state = SuspendableWorkQueueImpl::Suspended;
+            mImpl->currentWork = work;
+            return false;
+        }
+    }
+    else
+    {
+        mImpl->state = SuspendableWorkQueueImpl::Ready;
+        mImpl->currentWork = 0;
+    }
+    
+    if (mImpl->q.empty())
+    {
+        lock.unlock();
+        mImpl->mListener->emptied();
+        return false;
+    }
+    return true;
+}
+
+int SuspendableWorkQueue::workCount()
+{
+    boost::unique_lock<boost::mutex> lock(mImpl->mLock);
+    return mImpl->q.size();
+}
+
+void SuspendableWorkQueue::setListener(const QueueListenerPtr &listener)
+{
+    mImpl->mListener = listener;
+}
+
+}; //end namespace WorkQueue
+}; //end namespace AsteriskSCF

commit 804fed670f876a3d52b8684a97da05bec3a64a29
Author: Mark Michelson <mmichelson at digium.com>
Date:   Thu Mar 17 10:54:06 2011 -0500

    Remove redundant check for emptiness when canceling work.
    
    The std::find operation is really all that's needed. It doesn't
    require a special case for an empty queue.

diff --git a/WorkQueue/src/WorkQueue.cpp b/WorkQueue/src/WorkQueue.cpp
index df045e7..0f1a1ea 100644
--- a/WorkQueue/src/WorkQueue.cpp
+++ b/WorkQueue/src/WorkQueue.cpp
@@ -88,10 +88,6 @@ void WorkQueue::enqueueWorkSeq(const WorkSeq &works)
 void WorkQueue::cancelWork(const WorkPtr &work)
 {
     boost::unique_lock<boost::mutex> lock(mImpl->mLock);
-    if (mImpl->q.empty())
-    {
-        throw WorkNotFound();
-    }
 
     std::deque<WorkPtr>::iterator i = std::find(mImpl->q.begin(), mImpl->q.end(), work);
     if (i == mImpl->q.end())

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/ice-util-cpp.git



More information about the asterisk-scf-commits mailing list