[asterisk-scf-commits] asterisk-scf/integration/bridging.git branch "async-bridging" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Mon Mar 14 10:01:26 CDT 2011


branch "async-bridging" has been updated
       via  4ba15a34d409f5d9cac4de90a6da902f74dda3ee (commit)
      from  f86a76ed0e55bd3c1ddacb80d87b8f8384d22ed8 (commit)

Summary of changes:
 src/Tasks.h |  311 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 311 insertions(+), 0 deletions(-)
 create mode 100644 src/Tasks.h


- Log -----------------------------------------------------------------
commit 4ba15a34d409f5d9cac4de90a6da902f74dda3ee
Author: Brent Eagles <beagles at digium.com>
Date:   Mon Mar 14 12:31:15 2011 -0230

    Added missing file.

diff --git a/src/Tasks.h b/src/Tasks.h
new file mode 100644
index 0000000..85a61f6
--- /dev/null
+++ b/src/Tasks.h
@@ -0,0 +1,311 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#pragma once
+
+#include <Ice/Ice.h>
+#include <boost/thread/locks.hpp>
+#include <list>
+
+namespace AsteriskSCF
+{
+namespace BridgeService
+{
+
+class TaskListener : virtual public IceUtil::Shared
+{
+public:
+    virtual ~TaskListener() {}
+
+    virtual void succeeded() = 0;
+    virtual void failed() = 0;
+};
+typedef IceUtil::Handle<TaskListener> TaskListenerPtr;
+
+class QueuedTask : virtual public IceUtil::Shared
+{
+    //
+    // Not implemented.
+    //
+    QueuedTask(const QueuedTask&);
+    void operator=(const QueuedTask&);
+    
+public:
+    virtual ~QueuedTask() {}
+    
+    //
+    // All three of these methods should not throw exceptions! Return true if should continue
+    //
+    bool execute()
+    {
+        try
+        {
+            return executeImpl();
+        }
+        catch(...)
+        {
+            mListener->failed();
+        }
+        return true;
+    }
+    
+    void fail()
+    {
+        try
+        {
+            failImpl();
+        }
+        catch(...)
+        {
+        }
+    }
+
+    void destroy()
+    {
+        try
+        {
+            destroyImpl();
+        }
+        catch(...)
+        {
+        }
+    }
+
+    void setListener(const TaskListenerPtr& listener)
+    {
+        assert(!mListener);
+        mListener = listener;
+    }
+
+protected:
+    TaskListenerPtr mListener;
+    
+    QueuedTask() {} // for those derived classes that don't require arguments.
+
+    virtual bool executeImpl() = 0;
+    
+    //
+    // Default implementations are provided for the following two methods as it is entirely possible that the task will
+    // not have anything to do.
+    //
+    virtual void failImpl() {}
+    virtual void destroyImpl() {}
+};
+typedef IceUtil::Handle<QueuedTask> QueuedTaskPtr;
+typedef std::list<QueuedTaskPtr> QueuedTasks;
+
+//
+// TODO: It would be nice to make the locking part of a policy that templated in/out.
+// TODO: Rollback?
+//
+class Executor : virtual public TaskListener
+{
+public:
+    Executor() :
+        mStopped(true)
+    {
+    }
+    
+    Executor(const QueuedTasks& tasks) :
+        mTasks(tasks),
+        mStopped(true)
+    {
+        for (QueuedTasks::iterator i = mTasks.begin(); i != mTasks.end(); ++i)
+        {
+            (*i)->setListener(this);
+        }
+    }
+
+    void start()
+    {
+        QueuedTaskPtr t;
+        {
+            boost::unique_lock<boost::shared_mutex> lock(mLock);
+            if (mStopped)
+            {
+                t = mTasks.front();
+                mStopped = false;
+            }
+        }
+        if (t->execute())
+        {
+            succeeded();
+        }
+    }
+
+    void stop()
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        mStopped = true;
+    }
+
+    //
+    // A note on reference counts. In order to notify the Executor that an operation has succeeded or failed, the
+    // operation must maintain a reference to it. This may seem like a circular reference, but it works out. Once things
+    // have been set in motion, the only references held to the executor are held by the tasks themselves. The very last
+    // reference to the executor will be held by the current executing task.
+    //
+    void succeeded()
+    {
+        while (mTasks.size() > 0)
+        {
+            QueuedTaskPtr t;
+            {
+                boost::unique_lock<boost::shared_mutex> lock(mLock);
+                if (!mStopped)
+                {
+                    mTasks.pop_front();
+                    t = mTasks.front();
+                }
+            }
+            if (t)
+            {
+                //
+                // TODO: this is too easy to get wrong, should revisit.
+                //
+                // If execute returns false, the queue should suspend until it gets started again
+                // by some outside agent. If a task returns true, it should not call succeeded.
+                //
+                if (!t->execute())
+                {
+                    break;
+                }
+            }
+        }
+    }
+    
+    void failed()
+    {
+        while (mTasks.size() > 0)
+        {
+            QueuedTaskPtr i;
+            {
+                boost::unique_lock<boost::shared_mutex> lock(mLock);
+                i = mTasks.front();
+                mTasks.pop_front();
+            }
+            i->fail();
+        }
+    }
+
+    void destroy()
+    {
+        //
+        // Note that a request that is in progress will not be destroyed.
+        //
+        while (mTasks.size() > 0)
+        {
+            
+            QueuedTaskPtr i;
+            {
+                boost::unique_lock<boost::shared_mutex> lock(mLock);
+                i = mTasks.front();
+                mTasks.pop_front();
+            }
+            i->destroy();
+        }
+    }
+
+    bool done()
+    {
+        boost::shared_lock<boost::shared_mutex> lock(mLock);
+        return mTasks.size() == 0;
+    }
+
+    void append(const QueuedTaskPtr& newTask)
+    {
+        bool restart = false;
+        {
+            newTask->setListener(this);
+            boost::unique_lock<boost::shared_mutex> lock(mLock);
+            restart = (mTasks.size() == 0);
+            mTasks.push_back(newTask);
+        }
+        if (restart)
+        {
+            start();
+        }
+    }
+
+
+protected:
+    boost::shared_mutex mLock;
+    QueuedTasks mTasks;
+    bool mStopped;
+};
+typedef IceUtil::Handle<Executor> ExecutorPtr;
+
+class QueueableExecutor : virtual public Executor, virtual public QueuedTask
+{
+public:
+    //
+    // Useful for derived classes that can only do initialization in the constructor body.
+    //
+    QueueableExecutor()
+    {
+    }
+
+    QueueableExecutor(const QueuedTasks& tasks) :
+        Executor(tasks)
+    {
+    }
+
+    void succeeded()
+    {
+        Executor::succeeded();
+        if (done())
+        {
+            mListener->succeeded();
+        }
+    }
+
+    void failed()
+    {
+        mListener->failed();
+    }
+
+protected:
+
+    void setTasks(const QueuedTasks& tasks)
+    {
+        if (tasks.size() > 0)
+        {
+            mTasks.assign(tasks.begin(), tasks.end());
+        }
+        for (QueuedTasks::iterator i = mTasks.begin() ; i != mTasks.end(); ++i)
+        {
+            (*i)->setListener(this);
+        }
+    }
+    
+    bool executeImpl()
+    {
+        Executor::start();
+        return false;
+    }
+
+    void failImpl()
+    {
+        Executor::failed();
+    }
+
+    void destroyImpl()
+    {
+        Executor::destroy();
+    }
+};
+
+} /* End of namespace BridgeService */
+} /* End of namespace AsteriskSCF */

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/bridging.git



More information about the asterisk-scf-commits mailing list