[asterisk-scf-commits] asterisk-scf/integration/bridging.git branch "async-bridging" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Tue Apr 5 13:35:50 CDT 2011
branch "async-bridging" has been updated
via e82d5aaaecfa0571d3af66ae59ba4cbc0e485ee6 (commit)
via b484e71afcf0662502434ec025a3d5856f6b2bc5 (commit)
via c6c40843202ec7a607979e4f29fb41190a7d8e44 (commit)
via 5a4d3ce25852972ef47157c4e190ec01aba58065 (commit)
via 504ababbd67d5a477d145b5f363654ca10b086a1 (commit)
from 67870b0b7764e1ddb9447a84d93ee6d954db2cce (commit)
Summary of changes:
README.txt | 2 +-
config/test_bridging.conf.in | 142 +++++++++++++++++++---
src/BridgeImpl.cpp | 7 +-
src/BridgeImpl.h | 2 +-
src/BridgeListenerMgr.cpp | 2 +-
src/BridgeListenerMgr.h | 2 +-
src/BridgeManagerImpl.cpp | 11 +--
src/BridgeManagerImpl.h | 2 +-
src/BridgeManagerListenerMgr.cpp | 2 +-
src/BridgeManagerListenerMgr.h | 2 +-
src/BridgeReplicatorService.cpp | 2 +-
src/BridgeReplicatorStateListenerI.cpp | 2 +-
src/BridgeReplicatorStateListenerI.h | 2 +-
src/BridgeServiceConfig.h | 2 +-
src/CMakeLists.txt | 6 +
src/DebugUtil.h | 2 +-
src/MediaSplicer.cpp | 2 +-
src/MediaSplicer.h | 2 +-
src/Service.cpp | 21 ++--
src/ServiceUtil.h | 41 +++++--
src/SessionCollection.cpp | 2 +-
src/SessionCollection.h | 2 +-
src/SessionListener.cpp | 2 +-
src/SessionListener.h | 2 +-
src/SessionOperations.cpp | 2 +-
src/SessionOperations.h | 2 +-
src/SessionWrapper.cpp | 2 +-
src/SessionWrapper.h | 2 +-
src/Tasks.h | 212 +++++++++++++++++++++++++++-----
test/BridgeListenerI.cpp | 2 +-
test/BridgeListenerI.h | 2 +-
test/BridgeManagerListenerI.cpp | 2 +-
test/BridgeManagerListenerI.h | 2 +-
test/SessionListenerI.cpp | 2 +-
test/SessionListenerI.h | 2 +-
test/TestBridging.cpp | 2 +-
test/TestCommandDriver.cpp | 2 +-
test/TestCommandDriver.h | 2 +-
test/UnitTests.cpp | 2 +-
39 files changed, 392 insertions(+), 112 deletions(-)
- Log -----------------------------------------------------------------
commit e82d5aaaecfa0571d3af66ae59ba4cbc0e485ee6
Author: Brent Eagles <beagles at digium.com>
Date: Tue Apr 5 16:04:15 2011 -0230
* Add some missing files to CMakeLists.txt
* Add some documentation to Tasks.h
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index 469364b..f386ffd 100644
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -804,7 +804,7 @@ void BridgeImpl::replaceSession_async(const AMD_Bridge_replaceSessionPtr& callba
bool BridgeImpl::destroyed()
{
boost::shared_lock<boost::shared_mutex> lock(mLock);
- mLogger(Debug) << FUNLOG << ": " << (mState->runningState == Destroyed ? "yes, I am destroyed." : "no, I am not destroyed") ;=
+ mLogger(Debug) << FUNLOG << ": " << (mState->runningState == Destroyed ? "yes, I am destroyed." : "no, I am not destroyed") ;
return (mState->runningState == Destroyed);
}
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 1da4fa2..1c0397a 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -8,6 +8,7 @@ asterisk_scf_component_add_file(bridgeservice BridgeListenerMgr.h)
asterisk_scf_component_add_file(bridgeservice BridgeListenerMgr.cpp)
asterisk_scf_component_add_file(bridgeservice BridgeManagerListenerMgr.h)
asterisk_scf_component_add_file(bridgeservice BridgeManagerListenerMgr.cpp)
+asterisk_scf_component_add_file(bridgeservice BridgeReplicatorStateListenerI.h)
asterisk_scf_component_add_file(bridgeservice BridgeReplicatorStateListenerI.cpp)
asterisk_scf_component_add_file(bridgeservice BridgeManagerImpl.h)
asterisk_scf_component_add_file(bridgeservice BridgeManagerImpl.cpp)
@@ -23,6 +24,9 @@ asterisk_scf_component_add_file(bridgeservice ServiceUtil.h)
asterisk_scf_component_add_file(bridgeservice DebugUtil.h)
asterisk_scf_component_add_file(bridgeservice MediaSplicer.h)
asterisk_scf_component_add_file(bridgeservice MediaSplicer.cpp)
+asterisk_scf_component_add_file(bridgeservice Tasks.h)
+asterisk_scf_component_add_file(bridgeservice InternalExceptions.h)
+asterisk_scf_component_add_file(bridgeservice ListenerManager.h)
asterisk_scf_component_add_slice(bridgeservice ./BridgeReplicatorIf.ice)
asterisk_scf_component_add_ice_libraries(bridgeservice IceStorm)
diff --git a/src/Tasks.h b/src/Tasks.h
index 06cc867..bfaa6d5 100644
--- a/src/Tasks.h
+++ b/src/Tasks.h
@@ -25,6 +25,23 @@ namespace AsteriskSCF
namespace BridgeService
{
+/**
+ *
+ * These classes will most likely be replaced with the Asterisk SCF utility library's WorkQueue class. They are a much
+ * simplified version of the SuspendableWorkWorkQueue. The names of these classes were deliberately kept distinct from
+ * those in the WorkQueue class to avoid confusion if/when migration occurs (subtle name differences are difficult to
+ * "see")
+ *
+ **/
+
+/**
+ *
+ * TaskListener is an abstract base class for classes wishing to receive notification
+ * if a task has succeeded or failed. It's very basic and has no support for
+ * information to be passed on notification. Parameter passing, etc. is expected
+ * to be implemented as an independent mechanism.
+ *
+ **/
class TaskListener : virtual public IceUtil::Shared
{
public:
@@ -35,6 +52,12 @@ public:
};
typedef IceUtil::Handle<TaskListener> TaskListenerPtr;
+/**
+ *
+ * A queued task is a basically a functor with some additional interfaces
+ * to support the executor protocol.
+ *
+ **/
class QueuedTask : virtual public IceUtil::Shared
{
//
@@ -47,9 +70,22 @@ public:
virtual ~QueuedTask() {}
- //
- // All three of these methods should not throw exceptions! Return true if should continue
- //
+ /**
+ *
+ * All three of these methods should not throw exceptions! Return true if should continue. Allowing an exception to
+ * pass would bypass any error recovery mechanisms that might be implemented by Queued tasks that might be part of
+ * the task list. DO NOT OVERRIDE THESE METHODS! SEE THE xImpl() METHODS IN THE PROTECTED SECTION!
+ *
+ **/
+
+ /**
+ *
+ * Performs the operation encapsulated by this task.
+ *
+ * @returns true if the next task should be executed, false if not (false is typically returned if the operation
+ * encapsulates an asynchronous operation and things should not continue until that async operation completes).
+ *
+ **/
bool execute()
{
try
@@ -62,7 +98,13 @@ public:
}
return true;
}
-
+
+ /**
+ *
+ * Notification when the operation has not been executed by an overall exception situation has occurred. The task
+ * should release any resources that it has acquired. No further methods will be called on it by the managing queue.
+ *
+ **/
void fail()
{
try
@@ -74,6 +116,12 @@ public:
}
}
+ /**
+ *
+ * Notification that the task should release any resources, but does not indicate an error condition. No further
+ * methods will be called on it by the managing queue.
+ *
+ **/
void destroy()
{
try
@@ -85,12 +133,22 @@ public:
}
}
+ /**
+ *
+ * We only permit one listener. Using "listener" here is somewhat of a misnomer, but it suits for the time
+ * being. setListener() is implicitly called when you added the QueuedTask to the Executor.
+ *
+ **/
void setListener(const TaskListenerPtr& listener)
{
assert(!mListener);
mListener = listener;
}
+ //
+ // Tasks may be named, allowing for information tracing/logging
+ // in whatever is managing the tasks.
+ //
std::string name()
{
return mName;
@@ -109,13 +167,29 @@ protected:
mName(name)
{
}
+
+ /**
+ *
+ * The downcalls that QueuedTask derived objects must implement.
+ *
+ **/
+ /**
+ *
+ * Performs the operation encapsulated by this task.
+ *
+ * @returns true if the next task should be executed, false if not (false is typically returned if the operation
+ * encapsulates an asynchronous operation and things should not continue until that async operation completes).
+ *
+ **/
virtual bool executeImpl() = 0;
- //
- // Default implementations are provided for the following two methods as it is entirely possible that the task will
- // not have anything to do.
- //
+ /**
+ *
+ * Default implementations are provided for the following two methods as it is entirely possible that the task will
+ * not have anything to do.
+ *
+ **/
virtual void failImpl() {}
virtual void destroyImpl() {}
};
@@ -140,12 +214,21 @@ public:
mLogger(logger),
mStopped(true)
{
+ //
+ // As indicated above, the tasks are associated with this "executor" instance
+ // as they are added.
+ //
for (QueuedTasks::iterator i = mTasks.begin(); i != mTasks.end(); ++i)
{
(*i)->setListener(this);
}
}
+ /**
+ *
+ * Start running the tasks!
+ *
+ **/
void start()
{
{
@@ -156,28 +239,58 @@ public:
}
mStopped = false;
}
+ //
+ // We delegate to the logic in the succeeded() callback.
+ //
succeeded();
}
+ /**
+ *
+ * Prevent execution of remaining queued tasks. The result of calling stop() will not be immediate. Tasks currently
+ * in the process of being executed will be unaffected. Execution can be resumed by calling start().
+ *
+ **/
void stop()
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
mStopped = true;
}
- //
- // A note on reference counts. In order to notify the Executor that an operation has succeeded or failed, the
- // operation must maintain a reference to it. This may seem like a circular reference, but it works out. Once things
- // have been set in motion, the only references held to the executor are held by the tasks themselves. The very last
- // reference to the executor will be held by the current executing task.
- //
+ /**
+ * TaskListener interface implementation
+ */
+
+ /**
+ * A note on reference counts. In order to notify the Executor that an operation has succeeded or failed, the
+ * operation must maintain a reference to it. This may seem like a circular reference, but it works out. Once things
+ * have been set in motion, the only references held to the executor are held by the tasks themselves. The very last
+ * reference to the executor will be held by the current executing task.
+ **/
+
+ /**
+ *
+ * Notification that a queued task has completed successfully and the next task can be executed. Implementation of
+ * TaskListener::succeeded(). Queued tasks invoke this callback if their execute() method returns false once their
+ * work has completed (e.g. AMI callback).
+ *
+ **/
void succeeded()
{
QueuedTaskPtr current = popNextTask();
while (current)
{
mLogger(AsteriskSCF::System::Logging::Debug) << ": executing " << current->name();
-
+
+ //
+ // Properly implemented tasks will return true if the next task should
+ // be executed immediately. If it returns false, the loop will break and the
+ // will resume the next time the executor receives a succeeded callback.
+ //
+ // NOTE: It *might* appear that there is a race condition as the task being
+ // executed might call "succeeded" before this loop completes. It *should* be
+ // ok as this loop will still terminate as needed.
+ //
if (!current->execute())
{
break;
@@ -185,19 +298,13 @@ public:
current = popNextTask();
}
}
-
- QueuedTaskPtr popNextTask()
- {
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- if (mTasks.empty() || mStopped)
- {
- return 0;
- }
- QueuedTaskPtr t = mTasks.front();
- mTasks.pop_front();
- return t;
- }
+ /**
+ *
+ * Called by an executing task indicating that an operation has failed. The implementation notifies all
+ * remaining queued tasks of the failure. Implements TaskListener::failed()
+ *
+ **/
void failed()
{
QueuedTaskPtr current = popNextTask();
@@ -209,10 +316,19 @@ public:
}
}
+ //
+ // End of TaskListener interface implementation
+ //
+
+ /**
+ *
+ * Notifies queued tasks that the queue is being destroyed.
+ *
+ **/
void destroy()
{
//
- // Note that a request that is in progress will not be destroyed.
+ // NOTE: a request that is in progress will not be destroyed.
//
QueuedTaskPtr current = popNextTask();
while (current)
@@ -223,12 +339,24 @@ public:
}
}
- bool done()
+ /**
+ *
+ * Simple query to determine if all of the queued items have completed.
+ *
+ * @returns true if there are no queued tasks.
+ *
+ **/
+ bool isDone()
{
boost::shared_lock<boost::shared_mutex> lock(mLock);
return mTasks.size() == 0;
}
+ /**
+ *
+ * Adds a task to the queue, implicitly calling setListener() on it.
+ *
+ **/
void append(const QueuedTaskPtr& newTask)
{
bool restart = false;
@@ -245,6 +373,24 @@ public:
}
}
+ /**
+ *
+ * Retrieves and removes the next queued task.
+ *
+ * @returns the next task or 0 if there are no more tasks or if the queue has been stopped.
+ *
+ **/
+ QueuedTaskPtr popNextTask()
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ if (mTasks.empty() || mStopped)
+ {
+ return 0;
+ }
+ QueuedTaskPtr t = mTasks.front();
+ mTasks.pop_front();
+ return t;
+ }
protected:
boost::shared_mutex mLock;
@@ -254,6 +400,12 @@ protected:
};
typedef IceUtil::Handle<Executor> ExecutorPtr;
+/**
+ *
+ * The QueueableExcutor is a "mash-up" of a queued tasks and a queue of tasks, allowing
+ * a QueuedTask instance to effectively be made up of other QueueTask instances.
+ *
+ **/
class QueueableExecutor : virtual public Executor, virtual public QueuedTask
{
public:
@@ -273,7 +425,7 @@ public:
void succeeded()
{
Executor::succeeded();
- if (done())
+ if (isDone())
{
mListener->succeeded();
}
commit b484e71afcf0662502434ec025a3d5856f6b2bc5
Merge: 5a4d3ce c6c4084
Author: Brent Eagles <beagles at digium.com>
Date: Tue Apr 5 15:16:00 2011 -0230
Merge updates from 'bridge-replication' branch.
commit 5a4d3ce25852972ef47157c4e190ec01aba58065
Merge: 67870b0 504abab
Author: Brent Eagles <beagles at digium.com>
Date: Tue Apr 5 12:02:04 2011 -0230
Merge branch updates to 'bridge-replication'
diff --cc src/BridgeImpl.cpp
index 9a4c051,acc4410..469364b
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@@ -186,319 -186,6 +186,314 @@@ static void checkSessions(const Session
}
}
- //
- // Compiled in constants.
- //
- static const string TopicPrefix("AsteriskSCF.Bridge.");
-
+class ShutdownThread : public IceUtil::Thread
+{
+public:
+ ShutdownThread(const BridgePrx& bridge) :
+ mBridge(bridge)
+ {
+ }
+
+ void run()
+ {
+ try
+ {
+ mBridge->shutdown();
+ }
+ catch (...)
+ {
+ }
+ }
+private:
+ BridgePrx mBridge;
+};
+
+class SessionsTracker : public IceUtil::Shared
+{
+public:
+ SessionsTracker() :
+ mResponses(0)
+ {
+ }
+
+ void add(const SessionPrx& s)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mSessions.push_back(s);
+ ++mResponses;
+ }
+
+ SessionSeq getSessions()
+ {
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ return mSessions;
+ }
+
+ void addException(const SessionPrx& session, const Ice::Exception& ex)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ SessionError error;
+ error.failedSession = session;
+ error.message = ex.what();
+ mExceptions.push_back(error);
+ ++mResponses;
+ }
+
+ void addExceptionMessage(const SessionPrx& session, const string& msg)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ SessionError error;
+ error.failedSession = session;
+ error.message = msg;
+ mExceptions.push_back(error);
+ ++mResponses;
+ }
+
+ SessionErrorSeq getExceptions()
+ {
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ return mExceptions;
+ }
+
+ size_t responseCount()
+ {
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ return mResponses;
+ }
+
+private:
+ boost::shared_mutex mLock;
+ SessionSeq mSessions;
+ SessionErrorSeq mExceptions;
+ size_t mResponses;
+};
+typedef IceUtil::Handle<SessionsTracker> SessionsTrackerPtr;
+
+class RemoveSessionsNotify : public QueuedTask
+{
+public:
+ RemoveSessionsNotify(const BridgeListenerMgrPtr& bridgeListeners,
+ const SessionsTrackerPtr& tracker) :
+ QueuedTask("RemoveSessionsNotify"),
+ mBridgeListeners(bridgeListeners),
+ mTracker(tracker)
+ {
+ }
+
+protected:
+ bool executeImpl()
+ {
+ SessionSeq sessions = mTracker->getSessions();
+ if (!sessions.empty())
+ {
+ mBridgeListeners->sessionsRemoved(sessions);
+ }
+ return true;
+ }
+
+private:
+ BridgeListenerMgrPtr mBridgeListeners;
+ SessionsTrackerPtr mTracker;
+};
+
+class SetBridgeTask : public QueuedTask
+{
+public:
+ SetBridgeTask(const SessionCollectionPtr& sessionCollection, const BridgePrx& bridge,
+ const SessionListenerPrx& listener, const SessionSeq& sessions, const SessionsTrackerPtr& tracker):
+ QueuedTask("SetBridgeTask"),
+ mSessionManager(sessionCollection),
+ mBridge(bridge),
+ mSessionListener(listener),
+ mSessions(sessions),
+ mTracker(tracker)
+ {
+ }
+
+protected:
+ bool executeImpl()
+ {
+ bool tasksDispatched = false;
+ for (SessionSeq::iterator i = mSessions.begin(); i != mSessions.end(); ++i)
+ {
+ SessionWrapperPtr session = mSessionManager->addSession(*i);
+ if (session == 0)
+ {
+ //
+ // This shouldn't happen!
+ //
+ mTracker->addExceptionMessage(*i, "session already added");
+ continue;
+ }
+ tasksDispatched = true;
+ (*i)->begin_setBridge(mBridge, mSessionListener,
+ newCallback_Session_setBridge(this, &SetBridgeTask::set,
+ &SetBridgeTask::failed), session);
+ }
+ return !tasksDispatched;
+ }
+
+ void set(const SessionInfoPtr& info, const SessionWrapperPtr& session)
+ {
+ mTracker->add(session->getSession());
+ if (info->currentState != "ready")
+ {
+ //
+ // setupMedia is an AMI backed implementation, so should not block here.
+ //
+ session->setupMedia();
+ }
+ if (mTracker->responseCount() == mSessions.size())
+ {
+ mListener->succeeded();
+ }
+ }
+
+ void failed(const Ice::Exception& ex, const SessionWrapperPtr& session)
+ {
+ //
+ // TODO:
+ // * Log exception.
+ // * Rollback
+ // * Interpret exception and decide whether or not to fail the entire operations.
+ // Currently the semantics allow some operations to fail.
+ //
+ mTracker->addException(session->getSession(), ex);
+ try
+ {
+ //
+ // We want to make sure that session is laying around. The session collection will
+ // take care of cleaning it up as long as it is marked as destroyed.
+ //
+ session->destroy();
+ }
+ catch(...)
+ {
+ }
+ if (mTracker->responseCount() == mSessions.size())
+ {
+ SessionErrorSeq exceptions = mTracker->getExceptions();
+ if (exceptions.size() == mSessions.size())
+ {
+ mListener->failed();
+ }
+ else
+ {
+ mListener->succeeded();
+ }
+ }
+ }
+
+private:
+ SessionCollectionPtr mSessionManager;
+ BridgePrx mBridge;
+ SessionListenerPrx mSessionListener;
+ SessionSeq mSessions;
+ SessionsTrackerPtr mTracker;
+};
+
+class AddToListeners : public QueuedTask
+{
+public:
+ AddToListeners(const BridgeListenerMgrPtr& listeners, const SessionsTrackerPtr& tracker) :
+ QueuedTask("AddToListeners"),
+ mListeners(listeners),
+ mTracker(tracker)
+ {
+ }
+
+protected:
+ bool executeImpl()
+ {
+ mListeners->sessionsAdded(mTracker->getSessions());
+ return true;
+ }
+
+private:
+ BridgeListenerMgrPtr mListeners;
+ SessionsTrackerPtr mTracker;
+};
+
+class CheckShutdown : public QueuedTask
+{
+public:
+ CheckShutdown(const BridgeImplPtr& bridge, const BridgePrx& proxy) :
+ QueuedTask("CheckShutdown"),
+ mBridge(bridge),
+ mPrx(proxy)
+ {
+ }
+
+protected:
+ bool executeImpl()
+ {
+ if (mBridge->sessions()->size() < 2 && mPrx)
+ {
+ IceUtil::Handle<IceUtil::Thread> t = new ShutdownThread(mPrx);
+ t->start();
+ }
+ return true;
+ }
+private:
+ BridgeImplPtr mBridge;
+ BridgePrx mPrx;
+};
+
+template <class T>
+class GenericAMDCallback : public QueuedTask
+{
+public:
+ GenericAMDCallback(const T& cb, const SessionsTrackerPtr& tracker) :
+ QueuedTask("GenericAMDCallback"),
+ mCallback(cb),
+ mTracker(tracker)
+ {
+ }
+protected:
+
+ bool executeImpl()
+ {
+ mCallback->ice_response();
+ return true;
+ }
+
+ void failImpl()
+ {
+ SessionErrorSeq errors(mTracker->getExceptions());
+ if (!errors.empty())
+ {
+ mCallback->ice_exception(BridgeSessionOperationFailed(errors));
+ }
+ else
+ {
+ mCallback->ice_exception();
+ }
+ }
+
+private:
+ T mCallback;
+ SessionsTrackerPtr mTracker;
+};
+
+class UpdateTask : public QueuedTask
+{
+public:
+ UpdateTask(const BridgeImplPtr& bridge) :
+ QueuedTask("UpdateTask"),
+ mBridge(bridge)
+ {
+ }
+
+protected:
+ bool executeImpl()
+ {
+ mBridge->forceUpdate();
+ return true;
+ }
+private:
+ BridgeImplPtr mBridge;
+};
+
} // End of anonymous namespace
BridgeImpl::BridgeImpl(const string& name, const Ice::ObjectAdapterPtr& adapter,
@@@ -809,8 -578,8 +804,8 @@@ void BridgeImpl::replaceSession_async(c
bool BridgeImpl::destroyed()
{
boost::shared_lock<boost::shared_mutex> lock(mLock);
-- mLogger(Debug) << FUNLOG << ": " << (mState->runningState == Destroyed ? "yes, I am destroyed." : "no, I am not destroyed") ;
- return mState->runningState == Destroyed;
++ mLogger(Debug) << FUNLOG << ": " << (mState->runningState == Destroyed ? "yes, I am destroyed." : "no, I am not destroyed") ;=
+ return (mState->runningState == Destroyed);
}
void BridgeImpl::destroyImpl()
-----------------------------------------------------------------------
--
asterisk-scf/integration/bridging.git
More information about the asterisk-scf-commits
mailing list