[asterisk-scf-commits] asterisk-scf/integration/bridging.git branch "async-bridging" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Mon Mar 14 10:01:26 CDT 2011
branch "async-bridging" has been updated
via 4ba15a34d409f5d9cac4de90a6da902f74dda3ee (commit)
from f86a76ed0e55bd3c1ddacb80d87b8f8384d22ed8 (commit)
Summary of changes:
src/Tasks.h | 311 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 311 insertions(+), 0 deletions(-)
create mode 100644 src/Tasks.h
- Log -----------------------------------------------------------------
commit 4ba15a34d409f5d9cac4de90a6da902f74dda3ee
Author: Brent Eagles <beagles at digium.com>
Date: Mon Mar 14 12:31:15 2011 -0230
Added missing file.
diff --git a/src/Tasks.h b/src/Tasks.h
new file mode 100644
index 0000000..85a61f6
--- /dev/null
+++ b/src/Tasks.h
@@ -0,0 +1,311 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#pragma once
+
+#include <Ice/Ice.h>
+#include <boost/thread/locks.hpp>
+#include <list>
+
+namespace AsteriskSCF
+{
+namespace BridgeService
+{
+
+class TaskListener : virtual public IceUtil::Shared
+{
+public:
+ virtual ~TaskListener() {}
+
+ virtual void succeeded() = 0;
+ virtual void failed() = 0;
+};
+typedef IceUtil::Handle<TaskListener> TaskListenerPtr;
+
+class QueuedTask : virtual public IceUtil::Shared
+{
+ //
+ // Not implemented.
+ //
+ QueuedTask(const QueuedTask&);
+ void operator=(const QueuedTask&);
+
+public:
+ virtual ~QueuedTask() {}
+
+ //
+ // All three of these methods should not throw exceptions! Return true if should continue
+ //
+ bool execute()
+ {
+ try
+ {
+ return executeImpl();
+ }
+ catch(...)
+ {
+ mListener->failed();
+ }
+ return true;
+ }
+
+ void fail()
+ {
+ try
+ {
+ failImpl();
+ }
+ catch(...)
+ {
+ }
+ }
+
+ void destroy()
+ {
+ try
+ {
+ destroyImpl();
+ }
+ catch(...)
+ {
+ }
+ }
+
+ void setListener(const TaskListenerPtr& listener)
+ {
+ assert(!mListener);
+ mListener = listener;
+ }
+
+protected:
+ TaskListenerPtr mListener;
+
+ QueuedTask() {} // for those derived classes that don't require arguments.
+
+ virtual bool executeImpl() = 0;
+
+ //
+ // Default implementations are provided for the following two methods as it is entirely possible that the task will
+ // not have anything to do.
+ //
+ virtual void failImpl() {}
+ virtual void destroyImpl() {}
+};
+typedef IceUtil::Handle<QueuedTask> QueuedTaskPtr;
+typedef std::list<QueuedTaskPtr> QueuedTasks;
+
+//
+// TODO: It would be nice to make the locking part of a policy that templated in/out.
+// TODO: Rollback?
+//
+class Executor : virtual public TaskListener
+{
+public:
+ Executor() :
+ mStopped(true)
+ {
+ }
+
+ Executor(const QueuedTasks& tasks) :
+ mTasks(tasks),
+ mStopped(true)
+ {
+ for (QueuedTasks::iterator i = mTasks.begin(); i != mTasks.end(); ++i)
+ {
+ (*i)->setListener(this);
+ }
+ }
+
+ void start()
+ {
+ QueuedTaskPtr t;
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ if (mStopped)
+ {
+ t = mTasks.front();
+ mStopped = false;
+ }
+ }
+ if (t->execute())
+ {
+ succeeded();
+ }
+ }
+
+ void stop()
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mStopped = true;
+ }
+
+ //
+ // A note on reference counts. In order to notify the Executor that an operation has succeeded or failed, the
+ // operation must maintain a reference to it. This may seem like a circular reference, but it works out. Once things
+ // have been set in motion, the only references held to the executor are held by the tasks themselves. The very last
+ // reference to the executor will be held by the current executing task.
+ //
+ void succeeded()
+ {
+ while (mTasks.size() > 0)
+ {
+ QueuedTaskPtr t;
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ if (!mStopped)
+ {
+ mTasks.pop_front();
+ t = mTasks.front();
+ }
+ }
+ if (t)
+ {
+ //
+ // TODO: this is too easy to get wrong, should revisit.
+ //
+ // If execute returns false, the queue should suspend until it gets started again
+ // by some outside agent. If a task returns true, it should not call succeeded.
+ //
+ if (!t->execute())
+ {
+ break;
+ }
+ }
+ }
+ }
+
+ void failed()
+ {
+ while (mTasks.size() > 0)
+ {
+ QueuedTaskPtr i;
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ i = mTasks.front();
+ mTasks.pop_front();
+ }
+ i->fail();
+ }
+ }
+
+ void destroy()
+ {
+ //
+ // Note that a request that is in progress will not be destroyed.
+ //
+ while (mTasks.size() > 0)
+ {
+
+ QueuedTaskPtr i;
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ i = mTasks.front();
+ mTasks.pop_front();
+ }
+ i->destroy();
+ }
+ }
+
+ bool done()
+ {
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ return mTasks.size() == 0;
+ }
+
+ void append(const QueuedTaskPtr& newTask)
+ {
+ bool restart = false;
+ {
+ newTask->setListener(this);
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ restart = (mTasks.size() == 0);
+ mTasks.push_back(newTask);
+ }
+ if (restart)
+ {
+ start();
+ }
+ }
+
+
+protected:
+ boost::shared_mutex mLock;
+ QueuedTasks mTasks;
+ bool mStopped;
+};
+typedef IceUtil::Handle<Executor> ExecutorPtr;
+
+class QueueableExecutor : virtual public Executor, virtual public QueuedTask
+{
+public:
+ //
+ // Useful for derived classes that can only do initialization in the constructor body.
+ //
+ QueueableExecutor()
+ {
+ }
+
+ QueueableExecutor(const QueuedTasks& tasks) :
+ Executor(tasks)
+ {
+ }
+
+ void succeeded()
+ {
+ Executor::succeeded();
+ if (done())
+ {
+ mListener->succeeded();
+ }
+ }
+
+ void failed()
+ {
+ mListener->failed();
+ }
+
+protected:
+
+ void setTasks(const QueuedTasks& tasks)
+ {
+ if (tasks.size() > 0)
+ {
+ mTasks.assign(tasks.begin(), tasks.end());
+ }
+ for (QueuedTasks::iterator i = mTasks.begin() ; i != mTasks.end(); ++i)
+ {
+ (*i)->setListener(this);
+ }
+ }
+
+ bool executeImpl()
+ {
+ Executor::start();
+ return false;
+ }
+
+ void failImpl()
+ {
+ Executor::failed();
+ }
+
+ void destroyImpl()
+ {
+ Executor::destroy();
+ }
+};
+
+} /* End of namespace BridgeService */
+} /* End of namespace AsteriskSCF */
-----------------------------------------------------------------------
--
asterisk-scf/integration/bridging.git
More information about the asterisk-scf-commits
mailing list