[asterisk-scf-commits] asterisk-scf/integration/ice-util-cpp.git branch "workqueue" created.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Wed Mar 16 16:35:15 CDT 2011


branch "workqueue" has been created
        at  777ec8309a11a35b8637e1ca464d9038af7134fe (commit)

- Log -----------------------------------------------------------------
commit 777ec8309a11a35b8637e1ca464d9038af7134fe
Author: Mark Michelson <mmichelson at digium.com>
Date:   Wed Mar 16 16:26:51 2011 -0500

    Add tests that exercise the WorkQueue implementation.
    
    I had to do some tweaking to some CMakeLists.txt files to get
    things a bit more organized. I also made some tweaks to
    WorkQueue.cpp to make tests pass as planned.
    
    Some discussion in \#asterisk-scf-dev has prompted some
    new behavior to be added, so that's what I'll do next.

diff --git a/WorkQueue/CMakeLists.txt b/WorkQueue/CMakeLists.txt
index 1f20da6..b1a0931 100644
--- a/WorkQueue/CMakeLists.txt
+++ b/WorkQueue/CMakeLists.txt
@@ -6,18 +6,5 @@
 # All rights reserved.
 #
 
-asterisk_scf_slice_include_directories(${API_SLICE_DIR})
-
-asterisk_scf_component_init(WorkQueue CXX)
-
-include_directories(include)
-include_directories(${API_INCLUDE_DIR})
-
-asterisk_scf_component_add_file(WorkQueue
-    include/AsteriskSCF/WorkQueue.h)
-asterisk_scf_component_add_file(WorkQueue src/WorkQueue.cpp)
-asterisk_scf_component_add_boost_libraries(WorkQueue thread)
-
-asterisk_scf_component_build_library(WorkQueue)
-
-asterisk_scf_headers_install(include/)
+add_subdirectory(src)
+add_subdirectory(test)
diff --git a/WorkQueue/CMakeLists.txt b/WorkQueue/src/CMakeLists.txt
similarity index 70%
copy from WorkQueue/CMakeLists.txt
copy to WorkQueue/src/CMakeLists.txt
index 1f20da6..48a35e7 100644
--- a/WorkQueue/CMakeLists.txt
+++ b/WorkQueue/src/CMakeLists.txt
@@ -10,14 +10,14 @@ asterisk_scf_slice_include_directories(${API_SLICE_DIR})
 
 asterisk_scf_component_init(WorkQueue CXX)
 
-include_directories(include)
+include_directories(../include)
 include_directories(${API_INCLUDE_DIR})
 
 asterisk_scf_component_add_file(WorkQueue
-    include/AsteriskSCF/WorkQueue.h)
-asterisk_scf_component_add_file(WorkQueue src/WorkQueue.cpp)
+    ../include/AsteriskSCF/WorkQueue.h)
+asterisk_scf_component_add_file(WorkQueue WorkQueue.cpp)
 asterisk_scf_component_add_boost_libraries(WorkQueue thread)
 
 asterisk_scf_component_build_library(WorkQueue)
 
-asterisk_scf_headers_install(include/)
+asterisk_scf_headers_install(../include/)
diff --git a/WorkQueue/src/WorkQueue.cpp b/WorkQueue/src/WorkQueue.cpp
index 2821667..bf906e5 100644
--- a/WorkQueue/src/WorkQueue.cpp
+++ b/WorkQueue/src/WorkQueue.cpp
@@ -66,6 +66,11 @@ void WorkQueue::enqueueWorkSeq(const WorkSeq &works)
 void WorkQueue::cancelWork(const WorkPtr &work)
 {
     boost::unique_lock<boost::mutex> lock(mImpl->mLock);
+    if (mImpl->q.empty())
+    {
+        return;
+    }
+
     for (std::deque<WorkPtr>::iterator i = mImpl->q.begin();
             i != mImpl->q.end(); ++i)
     {
diff --git a/WorkQueue/test/CMakeLists.txt b/WorkQueue/test/CMakeLists.txt
new file mode 100644
index 0000000..5a9c74d
--- /dev/null
+++ b/WorkQueue/test/CMakeLists.txt
@@ -0,0 +1,23 @@
+#
+# Asterisk Scalable Communications Framework
+#
+# Copyright (C) 2011 -- Digium, Inc.
+#
+# All rights reserved.
+#
+
+asterisk_scf_component_init(WorkQueueTest CXX)
+
+include_directories(${API_INCLUDE_DIR})
+include_directories(../src)
+include_directories(../include)
+
+asterisk_scf_component_add_file(WorkQueueTest TestWorkQueue.cpp)
+asterisk_scf_component_add_file(WorkQueueTest test.cpp)
+asterisk_scf_component_add_boost_libraries(WorkQueueTest unit_test_framework)
+
+asterisk_scf_component_build_standalone(WorkQueueTest)
+target_link_libraries(WorkQueueTest asterisk-scf-api)
+target_link_libraries(WorkQueueTest WorkQueue)
+
+boost_add_test(WorkQueueTest)
diff --git a/WorkQueue/test/TestWorkQueue.cpp b/WorkQueue/test/TestWorkQueue.cpp
new file mode 100644
index 0000000..8da6074
--- /dev/null
+++ b/WorkQueue/test/TestWorkQueue.cpp
@@ -0,0 +1,226 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#include <boost/test/unit_test.hpp>
+
+#include <AsteriskSCF/WorkQueue.h>
+
+using namespace AsteriskSCF::System::WorkQueue::V1;
+using namespace AsteriskSCF::WorkQueue;
+
+class TestListener : public QueueListener
+{
+public:
+    TestListener()
+        : addedNotice(false),
+        addedEmptyNotice(false),
+        emptyNotice(false) { }
+
+    void workAdded(bool wasEmpty)
+    {
+        addedNotice = true;
+        addedEmptyNotice = wasEmpty;
+    }
+
+    void emptied()
+    {
+        emptyNotice = true;
+    }
+
+    void workResumable()
+    {
+        BOOST_FAIL("workResumable called from a Queue?!");
+    }
+
+    bool addedNotice;
+    bool addedEmptyNotice;
+    bool emptyNotice;
+};
+
+typedef IceUtil::Handle<TestListener> TestListenerPtr;
+
+class Task : public Work
+{
+public:
+    Task() : taskExecuted(false) { }
+    void execute()
+    {
+        taskExecuted = true;
+    }
+    bool taskExecuted;
+};
+
+typedef IceUtil::Handle<Task> TaskPtr;
+
+BOOST_AUTO_TEST_SUITE(WorkQueueTest)
+
+BOOST_AUTO_TEST_CASE(addWork)
+{
+    TestListenerPtr listener(new TestListener);
+    QueuePtr queue(new WorkQueue(listener));
+    TaskPtr work(new Task);
+
+    queue->enqueueWork(work);
+    BOOST_CHECK(listener->addedNotice == true);
+    BOOST_CHECK(listener->addedEmptyNotice == true);
+    BOOST_CHECK(queue->workCount() == 1);
+}
+
+BOOST_AUTO_TEST_CASE(addWorkSeq)
+{
+    TestListenerPtr listener(new TestListener);
+    QueuePtr queue(new WorkQueue(listener));
+    TaskPtr work1(new Task);
+    TaskPtr work2(new Task);
+    WorkSeq works;
+    works.push_back(work1);
+    works.push_back(work2);
+
+    queue->enqueueWorkSeq(works);
+    BOOST_CHECK(listener->addedNotice == true);
+    BOOST_CHECK(listener->addedEmptyNotice == true);
+    BOOST_CHECK(listener->emptyNotice == false);
+    BOOST_CHECK(queue->workCount() == 2);
+}
+
+BOOST_AUTO_TEST_CASE(addExistingWork)
+{
+    TestListenerPtr listener(new TestListener);
+    QueuePtr queue(new WorkQueue(listener));
+    TaskPtr work1(new Task);
+    TaskPtr work2(new Task);
+
+    queue->enqueueWork(work1);
+    listener->addedNotice = false;
+
+    queue->enqueueWork(work2);
+    BOOST_CHECK(listener->addedNotice == true);
+    BOOST_CHECK(listener->addedEmptyNotice == false);
+    BOOST_CHECK(listener->emptyNotice == false);
+    BOOST_CHECK(queue->workCount() == 2);
+}
+
+BOOST_AUTO_TEST_CASE(cancelWork)
+{
+    TestListenerPtr listener(new TestListener);
+    QueuePtr queue(new WorkQueue(listener));
+    TaskPtr work1(new Task);
+
+    queue->enqueueWork(work1);
+    queue->cancelWork(work1);
+
+    BOOST_CHECK(listener->emptyNotice == true);
+    BOOST_CHECK(queue->workCount() == 0);
+}
+
+BOOST_AUTO_TEST_CASE(cancelNonExistent)
+{
+    TestListenerPtr listener(new TestListener);
+    QueuePtr queue(new WorkQueue(listener));
+    TaskPtr work(new Task);
+
+    queue->cancelWork(work);
+    BOOST_CHECK(listener->addedNotice == false);
+    BOOST_CHECK(listener->addedEmptyNotice == false);
+    BOOST_CHECK(listener->emptyNotice == false);
+    BOOST_CHECK(queue->workCount() == 0);
+}
+
+BOOST_AUTO_TEST_CASE(workExecution)
+{
+    TestListenerPtr listener(new TestListener);
+    QueuePtr queue(new WorkQueue(listener));
+    TaskPtr work(new Task);
+
+    queue->enqueueWork(work);
+    queue->executeWork();
+
+    BOOST_CHECK(work->taskExecuted == true);
+    BOOST_CHECK(listener->emptyNotice == true);
+    BOOST_CHECK(queue->workCount() == 0);
+}
+
+BOOST_AUTO_TEST_CASE(executeNonExistent)
+{
+    TestListenerPtr listener(new TestListener);
+    QueuePtr queue(new WorkQueue(listener));
+
+    queue->executeWork();
+    BOOST_CHECK(queue->workCount() == 0);
+    BOOST_CHECK(listener->emptyNotice == false);
+}
+
+BOOST_AUTO_TEST_CASE(executionOrder1)
+{
+    TestListenerPtr listener(new TestListener);
+    QueuePtr queue(new WorkQueue(listener));
+    TaskPtr work1(new Task);
+    TaskPtr work2(new Task);
+
+    queue->enqueueWork(work1);
+    queue->enqueueWork(work2);
+    
+    queue->executeWork();
+    BOOST_CHECK(work1->taskExecuted == true);
+    BOOST_CHECK(work2->taskExecuted == false);
+    
+    queue->executeWork();
+    BOOST_CHECK(work2->taskExecuted == true);
+    BOOST_CHECK(listener->emptyNotice == true);
+    BOOST_CHECK(queue->workCount() == 0);
+}
+
+BOOST_AUTO_TEST_CASE(executionOrder2)
+{
+    TestListenerPtr listener(new TestListener);
+    QueuePtr queue(new WorkQueue(listener));
+    TaskPtr work1(new Task);
+    TaskPtr work2(new Task);
+    TaskPtr work3(new Task);
+    TaskPtr work4(new Task);
+    WorkSeq works1;
+    WorkSeq works2;
+
+    works1.push_back(work1);
+    works1.push_back(work2);
+    works2.push_back(work3);
+    works2.push_back(work4);
+
+    queue->enqueueWorkSeq(works1);
+    queue->enqueueWorkSeq(works2);
+
+    queue->executeWork();
+    BOOST_CHECK(work1->taskExecuted == true);
+    BOOST_CHECK(work2->taskExecuted == false);
+    BOOST_CHECK(work3->taskExecuted == false);
+    BOOST_CHECK(work4->taskExecuted == false);
+
+    queue->executeWork();
+    BOOST_CHECK(work2->taskExecuted == true);
+    BOOST_CHECK(work3->taskExecuted == false);
+    BOOST_CHECK(work4->taskExecuted == false);
+
+    queue->executeWork();
+    BOOST_CHECK(work3->taskExecuted == true);
+    BOOST_CHECK(work4->taskExecuted == false);
+
+    queue->executeWork();
+    BOOST_CHECK(work4->taskExecuted == true);
+    BOOST_CHECK(listener->emptyNotice == true);
+    BOOST_CHECK(queue->workCount() == 0);
+}
+
+BOOST_AUTO_TEST_SUITE_END()
diff --git a/WorkQueue/test/test.cpp b/WorkQueue/test/test.cpp
new file mode 100644
index 0000000..b19fc86
--- /dev/null
+++ b/WorkQueue/test/test.cpp
@@ -0,0 +1,18 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#define BOOST_TEST_MODULE WorkQueue
+#include <boost/test/unit_test.hpp>

commit e7d230d100542d6cbc0e50cecc1471c72f14af1a
Author: Mark Michelson <mmichelson at digium.com>
Date:   Wed Mar 16 14:41:33 2011 -0500

    Add another potential spot to notify of the queue's emptiness.
    
    I also removed the "XXX make asynchronous" comments because it's not
    really necessary. Since all objects are local, there's not a huge
    penalty incurred for calling out to the listener synchronously. Also,
    since we don't have a proxy to the listener but a handle, it would
    not be possible to use AMI AFAIK.

diff --git a/WorkQueue/src/WorkQueue.cpp b/WorkQueue/src/WorkQueue.cpp
index 48f8fba..2821667 100644
--- a/WorkQueue/src/WorkQueue.cpp
+++ b/WorkQueue/src/WorkQueue.cpp
@@ -51,7 +51,6 @@ void WorkQueue::enqueueWork(const WorkPtr &work)
     bool wasEmpty = mImpl->q.empty();
     mImpl->q.push_back(work);
     lock.unlock();
-    //XXX Make asynchronous
     mImpl->mListener->workAdded(wasEmpty);
 }
 
@@ -61,13 +60,12 @@ void WorkQueue::enqueueWorkSeq(const WorkSeq &works)
     bool wasEmpty = mImpl->q.empty();
     mImpl->q.insert(mImpl->q.end(), works.begin(), works.end());
     lock.unlock();
-    //XXX Make asynchronous
     mImpl->mListener->workAdded(wasEmpty);
 }
 
 void WorkQueue::cancelWork(const WorkPtr &work)
 {
-    boost::lock_guard<boost::mutex> lock(mImpl->mLock);
+    boost::unique_lock<boost::mutex> lock(mImpl->mLock);
     for (std::deque<WorkPtr>::iterator i = mImpl->q.begin();
             i != mImpl->q.end(); ++i)
     {
@@ -77,6 +75,11 @@ void WorkQueue::cancelWork(const WorkPtr &work)
             break;
         }
     }
+    if (mImpl->q.empty())
+    {
+        lock.unlock();
+        mImpl->mListener->emptied();
+    }
 }
 
 bool WorkQueue::executeWork()
@@ -95,7 +98,6 @@ bool WorkQueue::executeWork()
     if (mImpl->q.empty())
     {
         lock.unlock();
-        //XXX Make asynchronous
         mImpl->mListener->emptied();
         return false;
     }

commit f30fa702d4b1f4057d643dd36accfd640b7a0c1e
Author: Mark Michelson <mmichelson at digium.com>
Date:   Wed Mar 16 14:22:58 2011 -0500

    Add locking to protect queue accesses.

diff --git a/WorkQueue/src/WorkQueue.cpp b/WorkQueue/src/WorkQueue.cpp
index 3bcd8eb..48f8fba 100644
--- a/WorkQueue/src/WorkQueue.cpp
+++ b/WorkQueue/src/WorkQueue.cpp
@@ -35,6 +35,7 @@ public:
         : mListener(listener) { }
 
     QueueListenerPtr mListener;
+    boost::mutex mLock;
     std::deque<WorkPtr> q;
 };
 
@@ -46,22 +47,27 @@ WorkQueue::WorkQueue(const QueueListenerPtr &listener)
 
 void WorkQueue::enqueueWork(const WorkPtr &work)
 {
+    boost::unique_lock<boost::mutex> lock(mImpl->mLock);
     bool wasEmpty = mImpl->q.empty();
     mImpl->q.push_back(work);
+    lock.unlock();
     //XXX Make asynchronous
     mImpl->mListener->workAdded(wasEmpty);
 }
 
 void WorkQueue::enqueueWorkSeq(const WorkSeq &works)
 {
+    boost::unique_lock<boost::mutex> lock(mImpl->mLock);
     bool wasEmpty = mImpl->q.empty();
     mImpl->q.insert(mImpl->q.end(), works.begin(), works.end());
+    lock.unlock();
     //XXX Make asynchronous
     mImpl->mListener->workAdded(wasEmpty);
 }
 
 void WorkQueue::cancelWork(const WorkPtr &work)
 {
+    boost::lock_guard<boost::mutex> lock(mImpl->mLock);
     for (std::deque<WorkPtr>::iterator i = mImpl->q.begin();
             i != mImpl->q.end(); ++i)
     {
@@ -75,6 +81,7 @@ void WorkQueue::cancelWork(const WorkPtr &work)
 
 bool WorkQueue::executeWork()
 {
+    boost::unique_lock<boost::mutex> lock(mImpl->mLock);
     if (mImpl->q.empty())
     {
         return false;
@@ -82,9 +89,12 @@ bool WorkQueue::executeWork()
 
     WorkPtr work = mImpl->q.front();
     mImpl->q.pop_front();
+    lock.unlock();
     work->execute();
+    lock.lock();
     if (mImpl->q.empty())
     {
+        lock.unlock();
         //XXX Make asynchronous
         mImpl->mListener->emptied();
         return false;
@@ -94,6 +104,7 @@ bool WorkQueue::executeWork()
 
 int WorkQueue::workCount()
 {
+    boost::unique_lock<boost::mutex> lock(mImpl->mLock);
     return mImpl->q.size();
 }
 

commit 1e83578d1b1eeef00c244701d24eb6553a140343
Author: Mark Michelson <mmichelson at digium.com>
Date:   Wed Mar 16 13:49:57 2011 -0500

    Add listener operations.
    
    Fix up a couple of potential hazards as well, and write the
    work cancelation method.

diff --git a/WorkQueue/src/WorkQueue.cpp b/WorkQueue/src/WorkQueue.cpp
index 6fd8b2c..3bcd8eb 100644
--- a/WorkQueue/src/WorkQueue.cpp
+++ b/WorkQueue/src/WorkQueue.cpp
@@ -46,25 +46,47 @@ WorkQueue::WorkQueue(const QueueListenerPtr &listener)
 
 void WorkQueue::enqueueWork(const WorkPtr &work)
 {
+    bool wasEmpty = mImpl->q.empty();
     mImpl->q.push_back(work);
+    //XXX Make asynchronous
+    mImpl->mListener->workAdded(wasEmpty);
 }
 
 void WorkQueue::enqueueWorkSeq(const WorkSeq &works)
 {
+    bool wasEmpty = mImpl->q.empty();
     mImpl->q.insert(mImpl->q.end(), works.begin(), works.end());
+    //XXX Make asynchronous
+    mImpl->mListener->workAdded(wasEmpty);
 }
 
 void WorkQueue::cancelWork(const WorkPtr &work)
 {
+    for (std::deque<WorkPtr>::iterator i = mImpl->q.begin();
+            i != mImpl->q.end(); ++i)
+    {
+        if (*i == work)
+        {
+            mImpl->q.erase(i);
+            break;
+        }
+    }
 }
 
 bool WorkQueue::executeWork()
 {
+    if (mImpl->q.empty())
+    {
+        return false;
+    }
+
     WorkPtr work = mImpl->q.front();
     mImpl->q.pop_front();
     work->execute();
     if (mImpl->q.empty())
     {
+        //XXX Make asynchronous
+        mImpl->mListener->emptied();
         return false;
     }
     return true;

commit 1504d831a08b6e5b65d73412e00fb57fadc9d83c
Author: Mark Michelson <mmichelson at digium.com>
Date:   Wed Mar 16 13:25:03 2011 -0500

    Add initial WorkQueue implementation.
    
    This is missing two key aspects:
    
    1) Locks
    2) Callouts to the listener.
    
    These shall be added in short order.

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 67c5652..21a57d0 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -6,3 +6,4 @@ endif()
 add_subdirectory(SmartProxy)
 add_subdirectory(StateReplicator)
 add_subdirectory(AmiCollector)
+add_subdirectory(WorkQueue)
diff --git a/WorkQueue/CMakeLists.txt b/WorkQueue/CMakeLists.txt
new file mode 100644
index 0000000..1f20da6
--- /dev/null
+++ b/WorkQueue/CMakeLists.txt
@@ -0,0 +1,23 @@
+#
+# Asterisk Scalable Communications Framework
+#
+# Copyright (C) 2011 -- Digium, Inc.
+#
+# All rights reserved.
+#
+
+asterisk_scf_slice_include_directories(${API_SLICE_DIR})
+
+asterisk_scf_component_init(WorkQueue CXX)
+
+include_directories(include)
+include_directories(${API_INCLUDE_DIR})
+
+asterisk_scf_component_add_file(WorkQueue
+    include/AsteriskSCF/WorkQueue.h)
+asterisk_scf_component_add_file(WorkQueue src/WorkQueue.cpp)
+asterisk_scf_component_add_boost_libraries(WorkQueue thread)
+
+asterisk_scf_component_build_library(WorkQueue)
+
+asterisk_scf_headers_install(include/)
diff --git a/WorkQueue/include/AsteriskSCF/WorkQueue.h b/WorkQueue/include/AsteriskSCF/WorkQueue.h
new file mode 100644
index 0000000..2c47179
--- /dev/null
+++ b/WorkQueue/include/AsteriskSCF/WorkQueue.h
@@ -0,0 +1,45 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#pragma once
+
+#include <boost/shared_ptr.hpp>
+
+#include <AsteriskSCF/System/WorkQueue/WorkQueueIf.h>
+
+namespace AsteriskSCF
+{
+namespace WorkQueue
+{
+
+class WorkQueueImpl;
+
+class WorkQueue : public AsteriskSCF::System::WorkQueue::V1::Queue
+{
+public:
+    WorkQueue();
+    WorkQueue(const AsteriskSCF::System::WorkQueue::V1::QueueListenerPtr &listener);
+    void enqueueWork(const AsteriskSCF::System::WorkQueue::V1::WorkPtr &work);
+    void enqueueWorkSeq(const AsteriskSCF::System::WorkQueue::V1::WorkSeq &works);
+    void cancelWork(const AsteriskSCF::System::WorkQueue::V1::WorkPtr &work);
+    bool executeWork();
+    int workCount();
+    void setListener(const AsteriskSCF::System::WorkQueue::V1::QueueListenerPtr &listener);
+private:
+    boost::shared_ptr<WorkQueueImpl> mImpl;
+};
+
+}; //end namespace WorkQueue
+}; //end namespace AsteriskSCF
diff --git a/WorkQueue/src/WorkQueue.cpp b/WorkQueue/src/WorkQueue.cpp
new file mode 100644
index 0000000..6fd8b2c
--- /dev/null
+++ b/WorkQueue/src/WorkQueue.cpp
@@ -0,0 +1,84 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#include <deque>
+#include <boost/thread/shared_mutex.hpp>
+
+#include <AsteriskSCF/WorkQueue.h>
+
+namespace AsteriskSCF
+{
+namespace WorkQueue
+{
+
+using namespace AsteriskSCF::System::WorkQueue::V1;
+
+class WorkQueueImpl
+{
+public:
+    WorkQueueImpl()
+        : mListener(0) { }
+    WorkQueueImpl(const QueueListenerPtr &listener)
+        : mListener(listener) { }
+
+    QueueListenerPtr mListener;
+    std::deque<WorkPtr> q;
+};
+
+WorkQueue::WorkQueue()
+    : mImpl(new WorkQueueImpl) { }
+
+WorkQueue::WorkQueue(const QueueListenerPtr &listener)
+    : mImpl(new WorkQueueImpl(listener)) { }
+
+void WorkQueue::enqueueWork(const WorkPtr &work)
+{
+    mImpl->q.push_back(work);
+}
+
+void WorkQueue::enqueueWorkSeq(const WorkSeq &works)
+{
+    mImpl->q.insert(mImpl->q.end(), works.begin(), works.end());
+}
+
+void WorkQueue::cancelWork(const WorkPtr &work)
+{
+}
+
+bool WorkQueue::executeWork()
+{
+    WorkPtr work = mImpl->q.front();
+    mImpl->q.pop_front();
+    work->execute();
+    if (mImpl->q.empty())
+    {
+        return false;
+    }
+    return true;
+}
+
+int WorkQueue::workCount()
+{
+    return mImpl->q.size();
+}
+
+void WorkQueue::setListener(const QueueListenerPtr &listener)
+{
+    mImpl->mListener = listener;
+}
+
+}; // end namespace WorkQueue
+}; // end namespace AsteriskSCF

commit 853b36ba80ed5692b9f4b2acbfaa22f5f65cff2d
Author: David M. Lee <dlee at digium.com>
Date:   Thu Jan 20 12:38:37 2011 -0600

    Refactored out several install functions for installation.
    
    * asterisk_scf_component_install - Install a component
    * asterisk_scf_headers_install - Install .h files
    * asterisk_scf_slice_headers_install - Install generated .h files
    * asterisk_scf_slice_install - Install .ice files

diff --git a/AmiCollector/CMakeLists.txt b/AmiCollector/CMakeLists.txt
index 46a8e75..8233f7f 100644
--- a/AmiCollector/CMakeLists.txt
+++ b/AmiCollector/CMakeLists.txt
@@ -10,4 +10,8 @@ asterisk_scf_component_add_boost_libraries(ami-collector thread)
 
 asterisk_scf_component_build_library(ami-collector)
 
+# don't install the component.  it's just there to make Visual Studio happy
+# _do_ install the header files
+asterisk_scf_headers_install(include/)
+
 add_subdirectory(test)
diff --git a/SmartProxy/CMakeLists.txt b/SmartProxy/CMakeLists.txt
index 51e7331..bdbe333 100644
--- a/SmartProxy/CMakeLists.txt
+++ b/SmartProxy/CMakeLists.txt
@@ -29,4 +29,4 @@ target_link_libraries(SmartProxy logging-client)
 
 # don't install the component.  it's just there to make Visual Studio happy
 # _do_ install the header files
-install(DIRECTORY include/ DESTINATION ${ASTERISK_SCF_INSTALL_INCLUDE_DIR})
+asterisk_scf_headers_install(include/)
diff --git a/StateReplicator/CMakeLists.txt b/StateReplicator/CMakeLists.txt
index a17bd75..472c21c 100644
--- a/StateReplicator/CMakeLists.txt
+++ b/StateReplicator/CMakeLists.txt
@@ -23,4 +23,4 @@ add_subdirectory(test)
 
 # don't install the component.  it's just there to make Visual Studio happy
 # _do_ install the header files
-install(DIRECTORY include/ DESTINATION ${ASTERISK_SCF_INSTALL_INCLUDE_DIR})
+asterisk_scf_headers_install(include/)

commit 315704575dbed670c809a8c637b13eab1b14c64e
Author: David M. Lee <dlee at digium.com>
Date:   Thu Jan 13 15:21:21 2011 -0600

    Installation config

diff --git a/SmartProxy/CMakeLists.txt b/SmartProxy/CMakeLists.txt
index 3b38e29..51e7331 100644
--- a/SmartProxy/CMakeLists.txt
+++ b/SmartProxy/CMakeLists.txt
@@ -27,4 +27,6 @@ asterisk_scf_component_build_library(SmartProxy)
 include_directories(${API_INCLUDE_DIR})
 target_link_libraries(SmartProxy logging-client)
 
+# don't install the component.  it's just there to make Visual Studio happy
+# _do_ install the header files
 install(DIRECTORY include/ DESTINATION ${ASTERISK_SCF_INSTALL_INCLUDE_DIR})
diff --git a/StateReplicator/CMakeLists.txt b/StateReplicator/CMakeLists.txt
index ca5f819..a17bd75 100644
--- a/StateReplicator/CMakeLists.txt
+++ b/StateReplicator/CMakeLists.txt
@@ -12,13 +12,15 @@ asterisk_scf_component_init(StateReplicator CXX)
 
 include_directories(include)
 
-asterisk_scf_component_add_file(StateReplicator include/AsteriskSCF/StateReplicator.h)
+asterisk_scf_component_add_file(StateReplicator
+    include/AsteriskSCF/StateReplicator.h)
 asterisk_scf_component_add_file(StateReplicator src/StateReplicator.cpp)
 asterisk_scf_component_add_boost_libraries(StateReplicator thread)
- 
-asterisk_scf_component_build_library(StateReplicator)
 
-asterisk_scf_component_install(StateReplicator LIBRARY lib "State Replicator" statereplicator ARCHIVE DESTINATION lib)
+asterisk_scf_component_build_library(StateReplicator)
 
 add_subdirectory(test)
 
+# don't install the component.  it's just there to make Visual Studio happy
+# _do_ install the header files
+install(DIRECTORY include/ DESTINATION ${ASTERISK_SCF_INSTALL_INCLUDE_DIR})
diff --git a/StateReplicator/test/CMakeLists.txt b/StateReplicator/test/CMakeLists.txt
index 5ff08fd..075f348 100644
--- a/StateReplicator/test/CMakeLists.txt
+++ b/StateReplicator/test/CMakeLists.txt
@@ -9,6 +9,5 @@ asterisk_scf_component_add_slice(StateReplicatorTest ../testslice/StateReplicato
 asterisk_scf_component_add_boost_libraries(StateReplicatorTest unit_test_framework thread date_time)
 
 asterisk_scf_component_build_standalone(StateReplicatorTest)
-asterisk_scf_component_install(StateReplicatorTest RUNTIME bin "StateReplicatorTest Component Test Driver." Test)
 
 boost_add_test(StateReplicatorTest)

commit debc4ae991ace2e0149384996a7fbb6c6117019e
Author: David M. Lee <dlee at digium.com>
Date:   Wed Jan 19 23:29:24 2011 -0600

    Fixed dependencies for Windows build.

diff --git a/AmiCollector/CMakeLists.txt b/AmiCollector/CMakeLists.txt
index b5e6420..46a8e75 100644
--- a/AmiCollector/CMakeLists.txt
+++ b/AmiCollector/CMakeLists.txt
@@ -6,6 +6,8 @@ asterisk_scf_component_add_file(ami-collector include/AsteriskSCF/AmiCollector.h
 asterisk_scf_component_add_file(ami-collector include/AsteriskSCF/ResponseCollector.h)
 asterisk_scf_component_add_file(ami-collector src/AmiCollector.cpp)
 
+asterisk_scf_component_add_boost_libraries(ami-collector thread)
+
 asterisk_scf_component_build_library(ami-collector)
 
 add_subdirectory(test)
diff --git a/AmiCollector/test/CMakeLists.txt b/AmiCollector/test/CMakeLists.txt
index 5dbb40b..14cf003 100644
--- a/AmiCollector/test/CMakeLists.txt
+++ b/AmiCollector/test/CMakeLists.txt
@@ -6,7 +6,7 @@ asterisk_scf_component_add_file(ami-collector-test IceIntegration-test.cpp)
 asterisk_scf_component_add_file(ami-collector-test TestAmiCollector.h)
 asterisk_scf_component_add_file(ami-collector-test test.cpp)
 
-asterisk_scf_component_add_boost_libraries(ami-collector-test unit_test_framework thread)
+asterisk_scf_component_add_boost_libraries(ami-collector-test unit_test_framework thread date_time)
 
 asterisk_scf_component_build_standalone(ami-collector-test)
 
diff --git a/AmiCollector/test/IceIntegration-test.cpp b/AmiCollector/test/IceIntegration-test.cpp
index 042cabb..3a2626a 100644
--- a/AmiCollector/test/IceIntegration-test.cpp
+++ b/AmiCollector/test/IceIntegration-test.cpp
@@ -14,8 +14,6 @@
  * at the top of the source tree.
  */
 
-#include <unistd.h>
-
 #include <Ice/Ice.h>
 #include <Ice/Proxy.h>
 #include <boost/test/unit_test.hpp>

commit 2db6f77d765176d76bc4f62a28dd55efce60901b
Author: David M. Lee <dlee at digium.com>
Date:   Wed Jan 19 12:07:41 2011 -0600

    Adjusted the anonymous namespace for a gcc 4.1 compiler bug.

diff --git a/AmiCollector/test/IceIntegration-test.cpp b/AmiCollector/test/IceIntegration-test.cpp
index 06ba168..042cabb 100644
--- a/AmiCollector/test/IceIntegration-test.cpp
+++ b/AmiCollector/test/IceIntegration-test.cpp
@@ -52,7 +52,6 @@ struct Fixture
     Ice::ObjectPtr object;
     Ice::ObjectPrx proxy;
 };
-} // anonymous namespace
 
 BOOST_FIXTURE_TEST_SUITE(AmiCollectorIceIntegration, Fixture)
 
@@ -119,3 +118,4 @@ BOOST_AUTO_TEST_CASE(test_some)
 }
 
 BOOST_AUTO_TEST_SUITE_END()
+} // anonymous namespace

commit 19f9d661714335ba777a5c5898503c41a27ff446
Author: David M. Lee <dlee at digium.com>
Date:   Wed Jan 19 09:56:21 2011 -0600

    Made ctor protected; explained the seperate init().
    
    See CR-ASTSCF-40.

diff --git a/AmiCollector/include/AsteriskSCF/ResponseCollector.h b/AmiCollector/include/AsteriskSCF/ResponseCollector.h
index 44e7b6c..de2e011 100644
--- a/AmiCollector/include/AsteriskSCF/ResponseCollector.h
+++ b/AmiCollector/include/AsteriskSCF/ResponseCollector.h
@@ -45,14 +45,6 @@ public:
     typedef T InvocationParamType;
     typedef ResponseCollector<T> ResponseCollectorT;
 
-    /**
-     * Constructor.  The collector is no good until it's been initialized.
-     */
-    ResponseCollector() :
-        mNumResponses(0)
-    {
-    }
-
     virtual ~ResponseCollector() {}
 
     /**
@@ -87,6 +79,17 @@ public:
     }
 
 protected:
+    /**
+     * Constructor.  The collector is no good until it's been initialized.
+     * We cannot initialize the collector in the constructor, because it may
+     * call the processCompletion() virtual function, which is a no-no from a
+     * ctor.
+     */
+    ResponseCollector() :
+        mNumResponses(0)
+    {
+    }
+
     /** Called to retrieve the results for an invocation */
     virtual void processInvocation(InvocationParamType param) = 0;
     /** Called after all the responses or exceptions have been accounted for. */

commit 13814ce199236dbbe5345f948bc5e2b151358070
Author: David M. Lee <dlee at digium.com>
Date:   Wed Jan 12 17:35:46 2011 -0600

    Fail test if exception is not passed through.
    
    See CR-ASTSCF-40.

diff --git a/AmiCollector/test/ResponseCollector-test.cpp b/AmiCollector/test/ResponseCollector-test.cpp
index 63f7403..af94552 100644
--- a/AmiCollector/test/ResponseCollector-test.cpp
+++ b/AmiCollector/test/ResponseCollector-test.cpp
@@ -76,7 +76,11 @@ BOOST_AUTO_TEST_CASE(test_one)
     BOOST_CHECK_EQUAL(0, uut.results);
     BOOST_CHECK_EQUAL(false, uut.complete);
 
-    try { uut.invoke(true); } catch (...) { /* ignore */ }
+    try
+    {
+        uut.invoke(true);
+        BOOST_FAIL("Expected exception to pass through");
+    } catch (const TestException&) { /* ignore */ }
 
     BOOST_CHECK_EQUAL(1, uut.results);
     BOOST_CHECK_EQUAL(true, uut.complete);

commit fc7cecaba535aa94217df488cfbb5c9af9c6139a
Author: David M. Lee <dlee at digium.com>
Date:   Fri Jan 7 15:45:35 2011 -0600

    Fixed AmiCollector for include file changes.

diff --git a/AmiCollector/CMakeLists.txt b/AmiCollector/CMakeLists.txt
index 47f2510..b5e6420 100644
--- a/AmiCollector/CMakeLists.txt
+++ b/AmiCollector/CMakeLists.txt
@@ -1,2 +1,11 @@
-add_subdirectory(src)
+asterisk_scf_component_init(ami-collector CXX)
+
+include_directories(include)
+
+asterisk_scf_component_add_file(ami-collector include/AsteriskSCF/AmiCollector.h)
+asterisk_scf_component_add_file(ami-collector include/AsteriskSCF/ResponseCollector.h)
+asterisk_scf_component_add_file(ami-collector src/AmiCollector.cpp)
+
+asterisk_scf_component_build_library(ami-collector)
+
 add_subdirectory(test)
diff --git a/AmiCollector/src/AmiCollector.h b/AmiCollector/include/AsteriskSCF/AmiCollector.h
similarity index 98%
rename from AmiCollector/src/AmiCollector.h
rename to AmiCollector/include/AsteriskSCF/AmiCollector.h
index 892a136..cd703c3 100644
--- a/AmiCollector/src/AmiCollector.h
+++ b/AmiCollector/include/AsteriskSCF/AmiCollector.h
@@ -22,7 +22,7 @@
 #include <IceUtil/Handle.h>
 #include <IceUtil/Shared.h>
 
-#include "ResponseCollector.h"
+#include <AsteriskSCF/ResponseCollector.h>
 
 namespace AsteriskSCF
 {
diff --git a/AmiCollector/src/ResponseCollector.h b/AmiCollector/include/AsteriskSCF/ResponseCollector.h
similarity index 100%
rename from AmiCollector/src/ResponseCollector.h
rename to AmiCollector/include/AsteriskSCF/ResponseCollector.h
diff --git a/AmiCollector/src/AmiCollector.cpp b/AmiCollector/src/AmiCollector.cpp
index a5e4797..c0244fe 100644
--- a/AmiCollector/src/AmiCollector.cpp
+++ b/AmiCollector/src/AmiCollector.cpp
@@ -14,4 +14,4 @@
  * at the top of the source tree.
  */
 
-#include "AmiCollector.h"
+#include <AsteriskSCF/AmiCollector.h>
diff --git a/AmiCollector/src/CMakeLists.txt b/AmiCollector/src/CMakeLists.txt
deleted file mode 100644
index 319aea0..0000000
--- a/AmiCollector/src/CMakeLists.txt
+++ /dev/null
@@ -1,6 +0,0 @@
-asterisk_scf_component_init(ami-collector CXX)
-
-asterisk_scf_component_add_file(ami-collector AmiCollector.h)
-asterisk_scf_component_add_file(ami-collector AmiCollector.cpp)
-
-asterisk_scf_component_build_library(ami-collector)
diff --git a/AmiCollector/test/IceIntegration-test.cpp b/AmiCollector/test/IceIntegration-test.cpp
index 343ef1e..06ba168 100644
--- a/AmiCollector/test/IceIntegration-test.cpp
+++ b/AmiCollector/test/IceIntegration-test.cpp
@@ -20,7 +20,8 @@
 #include <Ice/Proxy.h>
 #include <boost/test/unit_test.hpp>
 
-#include "AmiCollector.h"
+#include <AsteriskSCF/AmiCollector.h>
+
 #include "TestAmiCollector.h"
 
 using namespace AsteriskSCF::IceUtil;
diff --git a/AmiCollector/test/ResponseCollector-test.cpp b/AmiCollector/test/ResponseCollector-test.cpp
index 34bb0f3..63f7403 100644
--- a/AmiCollector/test/ResponseCollector-test.cpp
+++ b/AmiCollector/test/ResponseCollector-test.cpp
@@ -16,7 +16,7 @@
 
 #include <boost/test/unit_test.hpp>
 
-#include "ResponseCollector.h"
+#include <AsteriskSCF/ResponseCollector.h>
 
 using namespace AsteriskSCF;
 
diff --git a/AmiCollector/test/TestAmiCollector.h b/AmiCollector/test/TestAmiCollector.h
index 11ae789..b714f13 100644
--- a/AmiCollector/test/TestAmiCollector.h
+++ b/AmiCollector/test/TestAmiCollector.h
@@ -17,7 +17,7 @@
 #include <boost/thread/condition_variable.hpp>
 #include <IceUtil/Handle.h>
 
-#include "AmiCollector.h"
+#include <AsteriskSCF/AmiCollector.h>
 
 namespace AsteriskSCF
 {

commit 7ad9eb283116047b1aaafaced3116e67e01180cb
Author: David M. Lee <dlee at digium.com>
Date:   Thu Jan 6 08:45:57 2011 -0600

    Extracted a base ResponseCollector from AmiCollector.

diff --git a/AmiCollector/src/AmiCollector.h b/AmiCollector/src/AmiCollector.h
index a5ddbb9..892a136 100644
--- a/AmiCollector/src/AmiCollector.h
+++ b/AmiCollector/src/AmiCollector.h
@@ -16,13 +16,14 @@
 
 #pragma once
 
-#include <boost/thread/mutex.hpp>
 #include <Ice/Object.h>
 #include <Ice/Proxy.h>
 #include <Ice/OutgoingAsync.h>
 #include <IceUtil/Handle.h>
 #include <IceUtil/Shared.h>
 
+#include "ResponseCollector.h"
+
 namespace AsteriskSCF
 {
 namespace IceUtil
@@ -46,7 +47,9 @@ namespace IceUtil
 template<typename T,
          typename P,
          T (P::element_type::*EndFunction)(const Ice::AsyncResultPtr&)>
-class AmiCollector : public ::IceUtil::Shared
+class AmiCollector :
+        public ::IceUtil::Shared,
+        public ResponseCollector<const Ice::AsyncResultPtr&>
 {
 public:
     /** Proxy handle type (i.e. ObjectPrx) */
@@ -56,41 +59,29 @@ public:
     /** Return type for the end function */
     typedef T AsyncResultType;
     /** The type for AmiCollector itself */
-    typedef AmiCollector<T, P, EndFunction> Self;
+    typedef AmiCollector<T, P, EndFunction> AmiCollectorT;
 
     /**
-     * Constructor.
-     * @param numResponses The number of responses expected.
-     * @param end Pointer to the end function to call for responses.
+     * Creates an Ice CallbackPtr wrapper around this object, for use with AMI.
      */
-    AmiCollector(size_t numResponses) :
-        mNumResponses(numResponses)
+    Ice::CallbackPtr newIceCallback()
     {
+        // this happens prior to any callbacks, to no need to lock
+        return Ice::newCallback(this, &AmiCollectorT::invoke);
     }
 
     /**
-     * Creates an Ice CallbackPtr wrapper around this object, for use with AMI.
+     * While this looks useless, it's necessary so that the Ice callback can
+     * call the invoke function on the pointer we pass to it.
      */
-    Ice::CallbackPtr newIceCallback()
+    void invoke(const Ice::AsyncResultPtr& r)
     {
-        // this happens prior to any callbacks, to no need to lock
-
-        // handle the case where we don't expect any responses
-        // can't handle this in the ctor, since you can't call virtual
-        // functions in the constructor
-        if (mNumResponses == 0)
-        {
-            processCompletion();
-        }
-        return Ice::newCallback(this, &Self::finished);
+        ResponseCollectorT::invoke(r);
     }
 
-    /** The number of outstanding responses remaining. */
-    size_t getRemainingResponses() const { return mNumResponses; }
-
 protected:
     /** Protected dtor prevents creating instances on the stack */
-    ~AmiCollector() {}
+    virtual ~AmiCollector() {}
     /** Called for each response received. */
     virtual void processResult(AsyncResultType result) = 0;
     /** Called for each exception received. */
@@ -98,32 +89,18 @@ protected:
     /** Called after all the responses or exceptions have been accounted for. */
     virtual void processCompletion() {}
 
-    /**
-     * Thread safety for response counting.  Derived classes may use this for
-     * their own thread safety needs as well.
-     */
-    boost::mutex mMutex;
-
+private:
     /** Callback from Ice */
-    void finished(const Ice::AsyncResultPtr& r)
+    void processInvocation(const Ice::AsyncResultPtr& r)
     {
         try
         {
             try
             {
-                // unit test may not have an AsyncResultPtr
-                if (r)
-                {
-                    IceProxy p = IceProxy::uncheckedCast(r->getProxy());
-                    // invoke the nd function on our proxy, passing in r
-                    // the results are passed to processResult
-                    processResult(((*p).*EndFunction)(r));
-                }
-                else
-                {
-                    // we didn't have an AsyncResultPtr, so make something up
-                    processResult(AsyncResultType());
-                }
+                IceProxy p = IceProxy::uncheckedCast(r->getProxy());
+                // invoke the end function on our proxy, passing in r
+                // the results are passed to processResult
+                processResult(((*p).*EndFunction)(r));
             }
             catch(const Ice::Exception& e)
             {
@@ -142,33 +119,6 @@ protected:
             std::clog << "Unexpected exception\n";
             assert(false);
         }
-        countResponse();
-    }
-
-private:
-    size_t mNumResponses;
-
-    /**
-     * Called from each AMI callback.  When this function sees that the expected
-     * number of responses have been received, it invokes
-     * <code>processCompletion</code>
-     */
-    void countResponse()
-    {
-        bool done = false;
-        {
-            boost::lock_guard<boost::mutex> lock(mMutex);
-            assert(mNumResponses > 0); // we got more responses than expected
-            if (--mNumResponses == 0)
-            {
-                done = true;
-            }
-        }
-
-        if (done)
-        {
-            processCompletion();
-        }
     }
 };
 
diff --git a/AmiCollector/src/ResponseCollector.h b/AmiCollector/src/ResponseCollector.h
new file mode 100644
index 0000000..44e7b6c
--- /dev/null
+++ b/AmiCollector/src/ResponseCollector.h
@@ -0,0 +1,132 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#pragma once
+
+#include <boost/thread/mutex.hpp>
+#include <iostream>
+
+namespace AsteriskSCF
+{
+
+class TooManyInvocations : public std::exception
+{
+public:
+    const char* what() const throw ()
+    {
+        return "ResponseCollector invoked too many times";
+    }
+};
+
+/**
+ * Collects the results for multiple asynchronous responses.  While the response
+ * counting logic is properly locked and thread safe, it is incumbent upon the
+ * implementor of the <code>processInvocation</code> functions to write those
+ * functions to be thread safe.  Derived classes may use mMutex for their own
+ * locking.
+ */
+template<typename T>
+class ResponseCollector
+{
+public:
+    typedef T InvocationParamType;
+    typedef ResponseCollector<T> ResponseCollectorT;
+
+    /**
+     * Constructor.  The collector is no good until it's been initialized.
+     */
+    ResponseCollector() :
+        mNumResponses(0)
+    {
+    }
+
+    virtual ~ResponseCollector() {}
+
+    /**
+     * Initialize the collector.
+     * @param numResponses The number of responses expected.
+     */
+    void init(size_t numResponses)
+    {
+        mNumResponses = numResponses;
+        // handle the case where we expect no responses
+        if (numResponses == 0)
+        {
+            processCompletion();
+        }
+    }
+
+    /**
+     * Invoke this function for every response.
+     */
+    void invoke(InvocationParamType param)
+    {
+        try
+        {
+            processInvocation(param);
+        }
+        catch(...)
+        {
+            countResponse();
+            throw;
+        }
+        countResponse();
+    }
+
+protected:
+    /** Called to retrieve the results for an invocation */
+    virtual void processInvocation(InvocationParamType param) = 0;
+    /** Called after all the responses or exceptions have been accounted for. */
+    virtual void processCompletion() {}
+
+    /**
+     * Thread safety for response counting.  Derived classes may use this for
+     * their own thread safety needs as well.
+     */
+    boost::mutex mMutex;
+
+private:
+    size_t mNumResponses;
+
+    /**
+     * Called from each AMI callback.  When this function sees that the expected
+     * number of responses have been received, it invokes
+     * <code>processCompletion</code>
+     */
+    void countResponse()
+    {
+        bool done = false;
+        {
+            boost::lock_guard<boost::mutex> lock(mMutex);
+            if (mNumResponses <= 0)
+            {
+                throw TooManyInvocations();
+            }
+
+            if (--mNumResponses == 0)
+            {
+                done = true;
+            }
+        }
+
+        if (done)
+        {
+            processCompletion();
+        }
+    }
+};
+
+} // AsteriskSCF
diff --git a/AmiCollector/test/AmiCollector-test.cpp b/AmiCollector/test/AmiCollector-test.cpp
deleted file mode 100644
index 05f2973..0000000
--- a/AmiCollector/test/AmiCollector-test.cpp
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Asterisk SCF -- An open-source communications framework.
- *
- * Copyright (C) 2010, Digium, Inc.
- *
- * See http://www.asterisk.org for more information about
- * the Asterisk SCF project. Please do not directly contact
- * any of the maintainers of this project for assistance;
- * the project provides a web site, mailing lists and IRC
- * channels for your use.
- *
- * This program is free software, distributed under the terms of
- * the GNU General Public License Version 2. See the LICENSE.txt file
- * at the top of the source tree.
- */
-
-#include <boost/test/unit_test.hpp>
-
-#include "AmiCollector.h"
-#include "TestAmiCollector.h"
-
-using namespace AsteriskSCF::IceUtil;
-
-BOOST_AUTO_TEST_SUITE(AmiCollectorTest)
-
-BOOST_AUTO_TEST_CASE(test_zero)
-{
-    TestAmiCollectorPtr uut = new TestAmiCollector(0);
-
-    uut->newIceCallback();
-
-    BOOST_CHECK_EQUAL(0, uut->results);
-    BOOST_CHECK_EQUAL(0, uut->exceptions);
-    BOOST_CHECK_EQUAL(true, uut->complete);
-}
-
-BOOST_AUTO_TEST_CASE(test_one)
-{
-    TestAmiCollectorPtr uut = new TestAmiCollector(1);
-
-    uut->newIceCallback();
-
-    BOOST_CHECK_EQUAL(0, uut->results);
-    BOOST_CHECK_EQUAL(0, uut->exceptions);
-    BOOST_CHECK_EQUAL(false, uut->complete);
-
-    uut->finished(0);
-
-    BOOST_CHECK_EQUAL(1, uut->results);
-    BOOST_CHECK_EQUAL(0, uut->exceptions);
-    BOOST_CHECK_EQUAL(true, uut->complete);
-}
-
-BOOST_AUTO_TEST_SUITE_END()
diff --git a/AmiCollector/test/CMakeLists.txt b/AmiCollector/test/CMakeLists.txt
index 952caf0..5dbb40b 100644
--- a/AmiCollector/test/CMakeLists.txt
+++ b/AmiCollector/test/CMakeLists.txt
@@ -1,7 +1,7 @@
 asterisk_scf_component_init(ami-collector-test CXX)
 include_directories("../src")
 
-asterisk_scf_component_add_file(ami-collector-test AmiCollector-test.cpp)
+asterisk_scf_component_add_file(ami-collector-test ResponseCollector-test.cpp)
 asterisk_scf_component_add_file(ami-collector-test IceIntegration-test.cpp)
 asterisk_scf_component_add_file(ami-collector-test TestAmiCollector.h)
 asterisk_scf_component_add_file(ami-collector-test test.cpp)
diff --git a/AmiCollector/test/ResponseCollector-test.cpp b/AmiCollector/test/ResponseCollector-test.cpp
new file mode 100644
index 0000000..34bb0f3
--- /dev/null
+++ b/AmiCollector/test/ResponseCollector-test.cpp
@@ -0,0 +1,130 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#include <boost/test/unit_test.hpp>
+
+#include "ResponseCollector.h"
+
+using namespace AsteriskSCF;
+
+namespace
+{
+class TestException : std::exception
+{
+public:
+    const char* what() const throw() { return "TestException"; }
+};
+
+class TestCollector : public ResponseCollector<bool>
+{
+public:
+    /** Num results received */
+    int results;
+    /** Is complete? */
+    bool complete;
+
+    TestCollector() : results(0), complete(false) {}
+
+    void processInvocation(bool shouldThrow)
+    {
+        ++results;
+        if (shouldThrow)
+        {
+            throw TestException();
+        }
+    }
+    void processCompletion()
+    {
+        BOOST_CHECK_EQUAL(false, complete);
+        complete = true;
+    }
+};
+
+} // namespace
+
+BOOST_AUTO_TEST_SUITE(AmiCollectorTest)
+
+BOOST_AUTO_TEST_CASE(test_zero)
+{
+    TestCollector uut;
+
+    uut.init(0);
+
+    BOOST_CHECK_EQUAL(0, uut.results);
+    BOOST_CHECK_EQUAL(true, uut.complete);
+}
+
+BOOST_AUTO_TEST_CASE(test_one)
+{
+    TestCollector uut;
+
+    uut.init(1);
+
+    BOOST_CHECK_EQUAL(0, uut.results);
+    BOOST_CHECK_EQUAL(false, uut.complete);
+
+    try { uut.invoke(true); } catch (...) { /* ignore */ }
+
+    BOOST_CHECK_EQUAL(1, uut.results);
+    BOOST_CHECK_EQUAL(true, uut.complete);
+}
+
+BOOST_AUTO_TEST_CASE(test_one_exception)
+{
+    TestCollector uut;
+
+    uut.init(1);
+
+    uut.invoke(false);
+
+    BOOST_CHECK_EQUAL(1, uut.results);
+    BOOST_CHECK_EQUAL(true, uut.complete);
+}
+
+BOOST_AUTO_TEST_CASE(test_many)
+{
+    TestCollector uut;
+
+    uut.init(5);
+
+    try { uut.invoke(true); } catch (...) { /* ignore */ }
+    try { uut.invoke(true); } catch (...) { /* ignore */ }
+    try { uut.invoke(true); } catch (...) { /* ignore */ }
+    try { uut.invoke(true); } catch (...) { /* ignore */ }
+    BOOST_CHECK_EQUAL(false, uut.complete);
+    try { uut.invoke(true); } catch (...) { /* ignore */ }
+
+    BOOST_CHECK_EQUAL(5, uut.results);
+    BOOST_CHECK_EQUAL(true, uut.complete);
+}
+
+BOOST_AUTO_TEST_CASE(test_mixed)
+{
+    TestCollector uut;
+
+    uut.init(5);
+
+    try { uut.invoke(true); } catch (...) { /* ignore */ }
+    uut.invoke(false);
+    try { uut.invoke(true); } catch (...) { /* ignore */ }
+    uut.invoke(false);
+    try { uut.invoke(true); } catch (...) { /* ignore */ }
+
+    BOOST_CHECK_EQUAL(5, uut.results);
+    BOOST_CHECK_EQUAL(true, uut.complete);
+}
+
+BOOST_AUTO_TEST_SUITE_END()
diff --git a/AmiCollector/test/TestAmiCollector.h b/AmiCollector/test/TestAmiCollector.h
index 6291702..11ae789 100644
--- a/AmiCollector/test/TestAmiCollector.h
+++ b/AmiCollector/test/TestAmiCollector.h
@@ -34,11 +34,12 @@ class TestAmiCollector : public AsteriskSCF::IceUtil::AmiCollector<
 {
 public:
     explicit TestAmiCollector(size_t n) :
-        Self(n),
         results(0),
         exceptions(0),
         complete(false)
-    {}
+    {
+        init(n);
+    }
     /** Num results received */
     int results;
     /** Num exceptions received */
@@ -46,12 +47,6 @@ public:
     /** Is complete? */
     bool complete;
 
-    /** Exposes the 'finished' callback function for unit testing. */
-    void finished(const Ice::AsyncResultPtr& r)
-    {
-        Self::finished(r);
-    }
-
     /** Blocks until processCompletion is called. */
     void waitForCompletion()
     {

commit 4920b90d3935aa8b395111d4faa425769ac82611
Author: David M. Lee <dlee at digium.com>
Date:   Wed Jan 5 16:02:10 2011 -0600

    cleanup

diff --git a/AmiCollector/src/AmiCollector.h b/AmiCollector/src/AmiCollector.h
index 5e7e605..a5ddbb9 100644
--- a/AmiCollector/src/AmiCollector.h
+++ b/AmiCollector/src/AmiCollector.h
@@ -33,8 +33,19 @@ namespace IceUtil
  * counting logic is properly locked and thread safe, it is incumbent upon the
  * implementor of the <code>process*</code> functions to write those functions
  * to be thread safe.  Derived classes may use mMutex for their own locking.
+ *
+ * An example of the template parameters would be:
+ * <code>
+ *   <bool, Ice::ObjectPrx, &Ice::ObjectPrx::element_type::end_ice_isA>
+ * </code>
+ *
+ * @param T The asynchronous result type (return value from end function)
+ * @param P The type of the proxy class
+ * @param EndFunction The pointer to member function for the end_ function
  */
-template<typename T, typename P, T (P::element_type::*EndFunction)(const Ice::AsyncResultPtr&)>
+template<typename T,
+         typename P,
+         T (P::element_type::*EndFunction)(const Ice::AsyncResultPtr&)>
 class AmiCollector : public ::IceUtil::Shared
 {
 public:
diff --git a/AmiCollector/test/TestAmiCollector.h b/AmiCollector/test/TestAmiCollector.h
index b3cb484..6291702 100644
--- a/AmiCollector/test/TestAmiCollector.h
+++ b/AmiCollector/test/TestAmiCollector.h
@@ -27,7 +27,10 @@ namespace IceUtil
 /**
  * Simple AMI collector for testing
  */
-class TestAmiCollector : public AsteriskSCF::IceUtil::AmiCollector<bool, Ice::ObjectPrx, &Ice::ObjectPrx::element_type::end_ice_isA>
+class TestAmiCollector : public AsteriskSCF::IceUtil::AmiCollector<
+    bool,
+    Ice::ObjectPrx,
+    &Ice::ObjectPrx::element_type::end_ice_isA>
 {
 public:
     explicit TestAmiCollector(size_t n) :

commit e8ba04f58ce8cf49545cbea0a7d180ff0513ef3e
Author: David M. Lee <dlee at digium.com>
Date:   Wed Jan 5 15:56:14 2011 -0600

    PTMF is now template param.

diff --git a/AmiCollector/src/AmiCollector.h b/AmiCollector/src/AmiCollector.h
index dcae2b9..5e7e605 100644
--- a/AmiCollector/src/AmiCollector.h
+++ b/AmiCollector/src/AmiCollector.h
@@ -34,29 +34,26 @@ namespace IceUtil
  * implementor of the <code>process*</code> functions to write those functions
  * to be thread safe.  Derived classes may use mMutex for their own locking.
  */
-template<typename T, typename P>
+template<typename T, typename P, T (P::element_type::*EndFunction)(const Ice::AsyncResultPtr&)>
 class AmiCollector : public ::IceUtil::Shared
 {
 public:
     /** Proxy handle type (i.e. ObjectPrx) */
     typedef P IceProxy;
     /** Proxy element type (i.e. Proxy::Ice::Object) */
-    typedef typename IceProxy::element_type ElementType;
+    typedef typename P::element_type ElementType;
     /** Return type for the end function */
     typedef T AsyncResultType;
     /** The type for AmiCollector itself */
-    typedef AmiCollector<T, P> Self;
-    /** Pointer to member function for the end function */
-    typedef AsyncResultType (ElementType::*EndFunction)(const Ice::AsyncResultPtr&);
+    typedef AmiCollector<T, P, EndFunction> Self;
 
     /**
      * Constructor.
      * @param numResponses The number of responses expected.
      * @param end Pointer to the end function to call for responses.
      */
-    AmiCollector(size_t numResponses, EndFunction end) :
-        mNumResponses(numResponses),
-        mEnd(end)
+    AmiCollector(size_t numResponses) :
+        mNumResponses(numResponses)
     {
     }
 
@@ -107,9 +104,9 @@ protected:
                 if (r)
                 {
                     IceProxy p = IceProxy::uncheckedCast(r->getProxy());
-                    // invoke the mEnd function on our proxy, passing in r
+                    // invoke the nd function on our proxy, passing in r
                     // the results are passed to processResult
-                    processResult(((*p).*mEnd)(r));
+                    processResult(((*p).*EndFunction)(r));
                 }
                 else
                 {
@@ -139,7 +136,6 @@ protected:
 
 private:
     size_t mNumResponses;
-    EndFunction mEnd;
 
     /**
      * Called from each AMI callback.  When this function sees that the expected
diff --git a/AmiCollector/test/TestAmiCollector.h b/AmiCollector/test/TestAmiCollector.h
index 6471897..b3cb484 100644
--- a/AmiCollector/test/TestAmiCollector.h
+++ b/AmiCollector/test/TestAmiCollector.h
@@ -27,11 +27,11 @@ namespace IceUtil
 /**
  * Simple AMI collector for testing
  */
-class TestAmiCollector : public AsteriskSCF::IceUtil::AmiCollector<bool, Ice::ObjectPrx>
+class TestAmiCollector : public AsteriskSCF::IceUtil::AmiCollector<bool, Ice::ObjectPrx, &Ice::ObjectPrx::element_type::end_ice_isA>
 {
 public:
     explicit TestAmiCollector(size_t n) :
-        Self(n, &IceProxy::element_type::end_ice_isA),
+        Self(n),
         results(0),
         exceptions(0),
         complete(false)

commit a0161edf7b9144b720285b8828ee8a361773afd7
Author: David M. Lee <dlee at digium.com>
Date:   Wed Jan 5 15:27:33 2011 -0600

    Now I feel good about the AmiCollector :-)

diff --git a/AmiCollector/src/AmiCollector.h b/AmiCollector/src/AmiCollector.h
index 98d0998..dcae2b9 100644
--- a/AmiCollector/src/AmiCollector.h
+++ b/AmiCollector/src/AmiCollector.h
@@ -29,90 +29,117 @@ namespace IceUtil
 {
 
 /**
- * Collects the results for multiple asynchronous responses.  While the vote
+ * Collects the results for multiple asynchronous responses.  While the response
  * counting logic is properly locked and thread safe, it is incumbent upon the
  * implementor of the <code>process*</code> functions to write those functions
- * to be thread safe.  Feel free to use mMutex (it's protected) if needed.
+ * to be thread safe.  Derived classes may use mMutex for their own locking.
  */
-template<typename T>
+template<typename T, typename P>
 class AmiCollector : public ::IceUtil::Shared
 {
 public:
+    /** Proxy handle type (i.e. ObjectPrx) */
+    typedef P IceProxy;
+    /** Proxy element type (i.e. Proxy::Ice::Object) */
+    typedef typename IceProxy::element_type ElementType;
+    /** Return type for the end function */
     typedef T AsyncResultType;
-    typedef AmiCollector<T> Self;
+    /** The type for AmiCollector itself */
+    typedef AmiCollector<T, P> Self;
+    /** Pointer to member function for the end function */
+    typedef AsyncResultType (ElementType::*EndFunction)(const Ice::AsyncResultPtr&);
 
-    AmiCollector(size_t numVotes) : mNumVotes(numVotes)
+    /**
+     * Constructor.
+     * @param numResponses The number of responses expected.
+     * @param end Pointer to the end function to call for responses.
+     */
+    AmiCollector(size_t numResponses, EndFunction end) :
+        mNumResponses(numResponses),
+        mEnd(end)
     {
     }
 
     /**
      * Creates an Ice CallbackPtr wrapper around this object, for use with AMI.
      */
-    Ice::CallbackPtr toIceCallback()
+    Ice::CallbackPtr newIceCallback()
     {
         // this happens prior to any callbacks, to no need to lock
 
         // handle the case where we don't expect any responses
         // can't handle this in the ctor, since you can't call virtual
         // functions in the constructor
-        if (mNumVotes == 0)
+        if (mNumResponses == 0)
         {
             processCompletion();
         }
         return Ice::newCallback(this, &Self::finished);
     }
 
-    size_t getRemainingVotes() const { return mNumVotes; }
+    /** The number of outstanding responses remaining. */
+    size_t getRemainingResponses() const { return mNumResponses; }
 
 protected:
     /** Protected dtor prevents creating instances on the stack */
     ~AmiCollector() {}
-    /**
-     * Wrapper to correctly end the remote function call.
-     * Should almost always look like:
-     * <code>
-     *  DerivedPrx d = DerivedPrx::uncheckedCast(r->getProxy());
-     *  return d->end_remoteFunction(r);
-     * </code>
-     */
-    virtual AsyncResultType end(const Ice::AsyncResultPtr& proxy) = 0;
-    /**
-     * Called for each response received.
-     */
+    /** Called for each response received. */
     virtual void processResult(AsyncResultType result) = 0;
-    /**
-     * Called for each exception received.
-     */
+    /** Called for each exception received. */
     virtual void processException(const Ice::Exception& e) = 0;
-    /**
-     * Called after all the responses or exceptions have been accounted for.
-     */
+    /** Called after all the responses or exceptions have been accounted for. */
     virtual void processCompletion() {}
 
+    /**
+     * Thread safety for response counting.  Derived classes may use this for
+     * their own thread safety needs as well.
+     */
     boost::mutex mMutex;
 
+    /** Callback from Ice */
     void finished(const Ice::AsyncResultPtr& r)
     {
         try
         {
             try
             {
-                processResult(end(r));
+                // unit test may not have an AsyncResultPtr
+                if (r)
+                {
+                    IceProxy p = IceProxy::uncheckedCast(r->getProxy());
+                    // invoke the mEnd function on our proxy, passing in r
+                    // the results are passed to processResult
+                    processResult(((*p).*mEnd)(r));
+                }
+                else
+                {
+                    // we didn't have an AsyncResultPtr, so make something up
+                    processResult(AsyncResultType());
+                }
             }
             catch(const Ice::Exception& e)
             {
                 processException(e);
             }
         }
+        catch(const std::exception& e)
+        {
+            // any exceptions thrown during processing are programming errors
+            std::clog << "Unexpected exception: " << e.what() << '\n';
+            assert(false);
+        }
         catch(...)
         {
-            // ignore any processing exceptions
+            // and _please_ don't just throw char pointers :-(
+            std::clog << "Unexpected exception\n";
+            assert(false);
         }
         countResponse();
     }
 
 private:
-    size_t mNumVotes;
+    size_t mNumResponses;
+    EndFunction mEnd;
 
     /**
      * Called from each AMI callback.  When this function sees that the expected
@@ -124,8 +151,8 @@ private:
         bool done = false;
         {
             boost::lock_guard<boost::mutex> lock(mMutex);
-            assert(mNumVotes > 0); // we got more responses than expected
-            if (--mNumVotes == 0)
+            assert(mNumResponses > 0); // we got more responses than expected
+            if (--mNumResponses == 0)
             {
                 done = true;
             }
diff --git a/AmiCollector/test/AmiCollector-test.cpp b/AmiCollector/test/AmiCollector-test.cpp
index 6f659e1..05f2973 100644
--- a/AmiCollector/test/AmiCollector-test.cpp
+++ b/AmiCollector/test/AmiCollector-test.cpp
@@ -14,74 +14,20 @@
  * at the top of the source tree.
  */
 
-#include <Ice/Proxy.h>
 #include <boost/test/unit_test.hpp>
 
 #include "AmiCollector.h"
+#include "TestAmiCollector.h"
 
 using namespace AsteriskSCF::IceUtil;
 
-namespace
-{
-class TestAmiCollector : public AsteriskSCF::IceUtil::AmiCollector<bool>
-{
-public:
-    explicit TestAmiCollector(size_t n) :
-        AmiCollector<bool>(n),
-        results(0),
-        exceptions(0),
-        complete(false)
-    {}
-    int results;
-    int exceptions;
-    bool complete;
-
-    void finished(const Ice::AsyncResultPtr& r)
-    {
-        Self::finished(r);
-    }
-
-protected:
-    ~TestAmiCollector() {}
-    virtual AsyncResultType end(const Ice::AsyncResultPtr& proxy)
-    {
-        // fake - do nothing
-        return false;
-    }
-    void processResult(AsyncResultType result)
-    {
-        ++results;
-    }
-    void processException(const Ice::Exception& e)
-    {
-        ++exceptions;
-    }
-    void processCompletion()
... 6721 lines suppressed ...


-- 
asterisk-scf/integration/ice-util-cpp.git



More information about the asterisk-scf-commits mailing list