[asterisk-scf-commits] asterisk-scf/integration/bridging.git branch "async-bridging" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Fri Mar 18 13:07:45 CDT 2011
branch "async-bridging" has been updated
via 9b6ca85f6ca32fb9125b985d4ae1d62d145cf100 (commit)
from a911c2e9da43c5b8ff6e54da7d91e452765a0c3a (commit)
Summary of changes:
config/test_bridging.conf.in | 1 +
src/BridgeImpl.cpp | 637 +++++++++++++++++++++++++++---------------
src/BridgeImpl.h | 10 +-
src/BridgeManagerImpl.cpp | 102 ++++---
src/MediaSplicer.cpp | 55 +++-
src/SessionWrapper.cpp | 19 +-
src/Tasks.h | 118 ++++----
7 files changed, 596 insertions(+), 346 deletions(-)
- Log -----------------------------------------------------------------
commit 9b6ca85f6ca32fb9125b985d4ae1d62d145cf100
Author: Brent Eagles <beagles at digium.com>
Date: Fri Mar 18 15:36:49 2011 -0230
Cleanup and fix the executors and add some logging.
Hook up AMD callbacks to task queues.
Finish AMD'ifying BridgeManager and Bridge Methods.
diff --git a/config/test_bridging.conf.in b/config/test_bridging.conf.in
index 461f245..ec7ced3 100644
--- a/config/test_bridging.conf.in
+++ b/config/test_bridging.conf.in
@@ -5,6 +5,7 @@ AsteriskSCFIceStorm.Publish.Endpoints=default -p 55556
AsteriskSCFIceStorm.Publish.ThreadPool.Size=4
AsteriskSCFIceStorm.Transient=1
TopicManager.Proxy=AsteriskSCFIceStorm/TopicManager:default -p 55555
+Ice.ThreadPool.Client.Size=4
LoggerAdapter.Endpoints=default
AsteriskSCF.LoggingService.Endpoints=default
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index d29fb29..9a4c051 100644
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -77,7 +77,6 @@ public:
//
// AsteriskSCF::SessionCommunications::Bridging::Bridge Interface
//
- void addSessions(const SessionSeq& sessions);
void addSessions_async(const AMD_Bridge_addSessionsPtr& callback, const SessionSeq& sessions, const Ice::Current&);
void removeSessions(const SessionSeq& sessions, const Ice::Current& current);
void removeSessions_async(const AMD_Bridge_removeSessionsPtr& callback, const SessionSeq& sessions,
@@ -90,7 +89,6 @@ public:
void addListener(const BridgeListenerPrx& listener, const Ice::Current& current);
void removeListener(const BridgeListenerPrx& listener, const Ice::Current& current);
- void replaceSession(const SessionPrx&, const SessionSeq& newSessions, const Ice::Current& current);
void replaceSession_async(const AMD_Bridge_replaceSessionPtr& callbac, const SessionPrx& sessionToReplace,
const SessionSeq& newSessions, const Ice::Current& current);
@@ -110,7 +108,9 @@ public:
string id();
SessionCollectionPtr sessions();
- void spawnShutdown();
+ void forceUpdate();
+
+ void getAddSessionsTasks(QueuedTasks& tasks, const SessionSeq& sessions);
private:
@@ -153,11 +153,6 @@ private:
//
Logger mLogger;
- //
- // TODO: check to see if this is really needed. If so, it should really be done by a thread pool.
- //
- IceUtil::Handle<IceUtil::Thread> mShutdownThread;
-
void statePreCheck();
BridgeStateItemPtr createUpdate();
void pushUpdate(const BridgeStateItemPtr& update);
@@ -196,6 +191,314 @@ static void checkSessions(const SessionSeq& sessions)
//
static const string TopicPrefix("AsteriskSCF.Bridge.");
+class ShutdownThread : public IceUtil::Thread
+{
+public:
+ ShutdownThread(const BridgePrx& bridge) :
+ mBridge(bridge)
+ {
+ }
+
+ void run()
+ {
+ try
+ {
+ mBridge->shutdown();
+ }
+ catch (...)
+ {
+ }
+ }
+private:
+ BridgePrx mBridge;
+};
+
+class SessionsTracker : public IceUtil::Shared
+{
+public:
+ SessionsTracker() :
+ mResponses(0)
+ {
+ }
+
+ void add(const SessionPrx& s)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mSessions.push_back(s);
+ ++mResponses;
+ }
+
+ SessionSeq getSessions()
+ {
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ return mSessions;
+ }
+
+ void addException(const SessionPrx& session, const Ice::Exception& ex)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ SessionError error;
+ error.failedSession = session;
+ error.message = ex.what();
+ mExceptions.push_back(error);
+ ++mResponses;
+ }
+
+ void addExceptionMessage(const SessionPrx& session, const string& msg)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ SessionError error;
+ error.failedSession = session;
+ error.message = msg;
+ mExceptions.push_back(error);
+ ++mResponses;
+ }
+
+ SessionErrorSeq getExceptions()
+ {
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ return mExceptions;
+ }
+
+ size_t responseCount()
+ {
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ return mResponses;
+ }
+
+private:
+ boost::shared_mutex mLock;
+ SessionSeq mSessions;
+ SessionErrorSeq mExceptions;
+ size_t mResponses;
+};
+typedef IceUtil::Handle<SessionsTracker> SessionsTrackerPtr;
+
+class RemoveSessionsNotify : public QueuedTask
+{
+public:
+ RemoveSessionsNotify(const BridgeListenerMgrPtr& bridgeListeners,
+ const SessionsTrackerPtr& tracker) :
+ QueuedTask("RemoveSessionsNotify"),
+ mBridgeListeners(bridgeListeners),
+ mTracker(tracker)
+ {
+ }
+
+protected:
+ bool executeImpl()
+ {
+ SessionSeq sessions = mTracker->getSessions();
+ if (!sessions.empty())
+ {
+ mBridgeListeners->sessionsRemoved(sessions);
+ }
+ return true;
+ }
+
+private:
+ BridgeListenerMgrPtr mBridgeListeners;
+ SessionsTrackerPtr mTracker;
+};
+
+class SetBridgeTask : public QueuedTask
+{
+public:
+ SetBridgeTask(const SessionCollectionPtr& sessionCollection, const BridgePrx& bridge,
+ const SessionListenerPrx& listener, const SessionSeq& sessions, const SessionsTrackerPtr& tracker):
+ QueuedTask("SetBridgeTask"),
+ mSessionManager(sessionCollection),
+ mBridge(bridge),
+ mSessionListener(listener),
+ mSessions(sessions),
+ mTracker(tracker)
+ {
+ }
+
+protected:
+ bool executeImpl()
+ {
+ bool tasksDispatched = false;
+ for (SessionSeq::iterator i = mSessions.begin(); i != mSessions.end(); ++i)
+ {
+ SessionWrapperPtr session = mSessionManager->addSession(*i);
+ if (session == 0)
+ {
+ //
+ // This shouldn't happen!
+ //
+ mTracker->addExceptionMessage(*i, "session already added");
+ continue;
+ }
+ tasksDispatched = true;
+ (*i)->begin_setBridge(mBridge, mSessionListener,
+ newCallback_Session_setBridge(this, &SetBridgeTask::set,
+ &SetBridgeTask::failed), session);
+ }
+ return !tasksDispatched;
+ }
+
+ void set(const SessionInfoPtr& info, const SessionWrapperPtr& session)
+ {
+ mTracker->add(session->getSession());
+ if (info->currentState != "ready")
+ {
+ //
+ // setupMedia is an AMI backed implementation, so should not block here.
+ //
+ session->setupMedia();
+ }
+ if (mTracker->responseCount() == mSessions.size())
+ {
+ mListener->succeeded();
+ }
+ }
+
+ void failed(const Ice::Exception& ex, const SessionWrapperPtr& session)
+ {
+ //
+ // TODO:
+ // * Log exception.
+ // * Rollback
+ // * Interpret exception and decide whether or not to fail the entire operations.
+ // Currently the semantics allow some operations to fail.
+ //
+ mTracker->addException(session->getSession(), ex);
+ try
+ {
+ //
+ // We want to make sure that session is laying around. The session collection will
+ // take care of cleaning it up as long as it is marked as destroyed.
+ //
+ session->destroy();
+ }
+ catch(...)
+ {
+ }
+ if (mTracker->responseCount() == mSessions.size())
+ {
+ SessionErrorSeq exceptions = mTracker->getExceptions();
+ if (exceptions.size() == mSessions.size())
+ {
+ mListener->failed();
+ }
+ else
+ {
+ mListener->succeeded();
+ }
+ }
+ }
+
+private:
+ SessionCollectionPtr mSessionManager;
+ BridgePrx mBridge;
+ SessionListenerPrx mSessionListener;
+ SessionSeq mSessions;
+ SessionsTrackerPtr mTracker;
+};
+
+class AddToListeners : public QueuedTask
+{
+public:
+ AddToListeners(const BridgeListenerMgrPtr& listeners, const SessionsTrackerPtr& tracker) :
+ QueuedTask("AddToListeners"),
+ mListeners(listeners),
+ mTracker(tracker)
+ {
+ }
+
+protected:
+ bool executeImpl()
+ {
+ mListeners->sessionsAdded(mTracker->getSessions());
+ return true;
+ }
+
+private:
+ BridgeListenerMgrPtr mListeners;
+ SessionsTrackerPtr mTracker;
+};
+
+class CheckShutdown : public QueuedTask
+{
+public:
+ CheckShutdown(const BridgeImplPtr& bridge, const BridgePrx& proxy) :
+ QueuedTask("CheckShutdown"),
+ mBridge(bridge),
+ mPrx(proxy)
+ {
+ }
+
+protected:
+ bool executeImpl()
+ {
+ if (mBridge->sessions()->size() < 2 && mPrx)
+ {
+ IceUtil::Handle<IceUtil::Thread> t = new ShutdownThread(mPrx);
+ t->start();
+ }
+ return true;
+ }
+private:
+ BridgeImplPtr mBridge;
+ BridgePrx mPrx;
+};
+
+template <class T>
+class GenericAMDCallback : public QueuedTask
+{
+public:
+ GenericAMDCallback(const T& cb, const SessionsTrackerPtr& tracker) :
+ QueuedTask("GenericAMDCallback"),
+ mCallback(cb),
+ mTracker(tracker)
+ {
+ }
+protected:
+
+ bool executeImpl()
+ {
+ mCallback->ice_response();
+ return true;
+ }
+
+ void failImpl()
+ {
+ SessionErrorSeq errors(mTracker->getExceptions());
+ if (!errors.empty())
+ {
+ mCallback->ice_exception(BridgeSessionOperationFailed(errors));
+ }
+ else
+ {
+ mCallback->ice_exception();
+ }
+ }
+
+private:
+ T mCallback;
+ SessionsTrackerPtr mTracker;
+};
+
+class UpdateTask : public QueuedTask
+{
+public:
+ UpdateTask(const BridgeImplPtr& bridge) :
+ QueuedTask("UpdateTask"),
+ mBridge(bridge)
+ {
+ }
+
+protected:
+ bool executeImpl()
+ {
+ mBridge->forceUpdate();
+ return true;
+ }
+private:
+ BridgeImplPtr mBridge;
+};
+
} // End of anonymous namespace
BridgeImpl::BridgeImpl(const string& name, const Ice::ObjectAdapterPtr& adapter,
@@ -231,91 +534,33 @@ BridgeImpl::~BridgeImpl()
//
}
-void BridgeImpl::addSessions(const SessionSeq& sessions)
-{
- if (sessions.size() == 0)
- {
- return;
- }
- checkSessions(sessions);
- statePreCheck();
- mLogger(Debug) << FUNLOG << ": adding " << sessions.size() << " sessions";
-
- SessionSeq addedSessions;
- for (SessionSeq::const_iterator i = sessions.begin(); i != sessions.end(); ++i)
- {
- try
- {
- SessionWrapperPtr session = mSessions->addSession(*i);
- if (session)
- {
- SessionInfoPtr info;
- try
- {
- RetryPolicy policy(5, 500);
- //
- // canRetry should never return false since we throw ourselves out of this loop. But
- // we'll do it here in case we decide to do something else.
- //
- while (policy.canRetry())
- {
- try
- {
- //
- // TODO : AMI!
- //
- info = (*i)->setBridge(mPrx, mSessionListenerPrx);
- addedSessions.push_back(*i);
- break;
- }
- catch (const Ice::ConnectionLostException&)
- {
- if (!policy.retry())
- {
- throw;
- }
- }
- }
- }
- catch (const Ice::Exception& ex)
- {
- mLogger(Debug) << FUNLOG << ": " << (*i)->ice_toString() << " threw " << ex.what()
- << " continuing";
- }
- if (info->currentState != "ready")
- {
- //
- // We setup media.
- // TODO: AMI should come into play here.
- //
- session->setupMedia();
- }
- }
- }
- catch (const Ice::Exception&)
- {
- }
- }
-
- BridgeStateItemPtr update;
- if (addedSessions.size())
- {
- mListeners->sessionsAdded(addedSessions);
- {
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- update = createUpdate();
- }
- }
- pushUpdate(update);
-}
-
void BridgeImpl::addSessions_async(const AMD_Bridge_addSessionsPtr& callback, const SessionSeq& sessions,
const Ice::Current& current)
{
try
{
- addSessions(sessions);
- callback->ice_response();
+ if (sessions.size() == 0 && callback)
+ {
+ callback->ice_response();
+ return;
+ }
+ checkSessions(sessions);
+ statePreCheck();
+ mLogger(Debug) << FUNLOG << ": adding " << sessions.size() << " sessions";
+
+ SessionsTrackerPtr tracker(new SessionsTracker);
+ QueuedTasks tasks;
+ tasks.push_back(new SetBridgeTask(mSessions, mPrx, mSessionListenerPrx, sessions, tracker));
+ tasks.push_back(new AddToListeners(mListeners, tracker));
+ tasks.push_back(new GenericAMDCallback<AMD_Bridge_addSessionsPtr>(callback, tracker));
+ tasks.push_back(new UpdateTask(this));
+ ExecutorPtr executor(new Executor(tasks, mLogger));
+ executor->start();
+ //
+ // When the operations have all completed, that last task will take care of handling
+ // the callback. It's all left withing the try/catch in the event something happens during
+ // task startup.
+ //
}
catch (const std::exception& ex)
{
@@ -327,45 +572,42 @@ void BridgeImpl::addSessions_async(const AMD_Bridge_addSessionsPtr& callback, co
}
}
-void BridgeImpl::removeSessions(const SessionSeq& sessions, const Ice::Current& current)
-{
- if (sessions.size() == 0)
- {
- return;
- }
- checkSessions(sessions);
- SessionSeq removedSessions;
- statePreCheck();
- for (SessionSeq::const_iterator i = sessions.begin(); i != sessions.end(); ++i)
- {
- SessionWrapperPtr session = mSessions->getSession(*i);
- if (session)
- {
- session->shutdown(mSessionListenerPrx, new ResponseCode);
- removedSessions.push_back(*i);
- }
- }
-
- if (removedSessions.size() != 0)
- {
- mListeners->sessionsRemoved(removedSessions);
- }
-
- SessionSeq sessionsLeft(mSessions->getSessionSeq());
- if (sessionsLeft.size() < 2)
- {
- spawnShutdown();
- }
- mSessions->reap();
-}
-
void BridgeImpl::removeSessions_async(const AMD_Bridge_removeSessionsPtr& callback, const SessionSeq& sessions,
const Ice::Current& current)
{
try
{
- removeSessions(sessions, current);
- callback->ice_response();
+ if (sessions.size() == 0)
+ {
+ callback->ice_response();
+ return;
+ }
+ checkSessions(sessions);
+ statePreCheck();
+
+ //
+ // The shutdown of individual sessions are implemented as series of AMI requests. Once initiated,
+ // we allow them to proceed asynchronously and do not concern ourselves with the result.
+ // The logic of shutdown should remove them either because the operations succeeded or
+ // *couldn't* be accomplished because of some terminal condition. At any rate, waiting around for them
+ // is pointless.
+ //
+ SessionsTrackerPtr removed(new SessionsTracker);
+ for (SessionSeq::const_iterator i = sessions.begin(); i != sessions.end(); ++i)
+ {
+ SessionWrapperPtr session = mSessions->getSession(*i);
+ if (session)
+ {
+ session->shutdown(mSessionListenerPrx, new ResponseCode);
+ removed->add(session->getSession());
+ }
+ }
+ QueuedTasks tasks;
+ tasks.push_back(new RemoveSessionsNotify(mListeners, removed));
+ tasks.push_back(new GenericAMDCallback<AMD_Bridge_removeSessionsPtr>(callback, removed));
+ tasks.push_back(new CheckShutdown(this, mPrx));
+ ExecutorPtr runner(new Executor(tasks, mLogger));
+ runner->start();
}
catch (const std::exception& ex)
{
@@ -519,90 +761,40 @@ void BridgeImpl::removeListener(const BridgeListenerPrx& listener, const Ice::Cu
}
}
-void BridgeImpl::replaceSession(const SessionPrx& oldSession, const SessionSeq& newSessions, const Ice::Current& current)
-{
- mLogger(Debug) << FUNLOG << ":" << objectIdFromCurrent(current);
-
- checkSessions(newSessions);
- statePreCheck();
-
- SessionWrapperPtr session = mSessions->getSession(oldSession);
- //
- // If the session did not exist on this bridge, then this operation should not proceed.
- //
- if (!session)
- {
- throw SessionNotFound(oldSession);
- }
- session->shutdown(mSessionListenerPrx, new ResponseCode);
-
- SessionSeq removed;
- removed.push_back(oldSession);
- mListeners->sessionsRemoved(removed);
-
- SessionSeq added;
- for (SessionSeq::const_iterator i = newSessions.begin(); i != newSessions.end(); ++i)
- {
- try
- {
- RetryPolicy policy(5, 500);
- //
- // canRetry should never return false since we throw ourselves out of this loop. But
- // we'll do it here in case we decide to do something else.
- //
- while (policy.canRetry())
- {
- try
- {
- SessionInfoPtr info = (*i)->setBridge(mPrx, mSessionListenerPrx);
- SessionWrapperPtr session = mSessions->addSession(*i);
- if (info->currentState != "ready")
- {
- //
- // We setup media.
- // TODO: AMI should come into play here.
- //
- session->setupMedia();
- }
- added.push_back(*i);
-
- break;
- }
- catch (const Ice::ConnectionLostException&)
- {
- if (!policy.retry())
- {
- throw;
- }
- }
- }
- }
- catch (const Ice::Exception& ex)
- {
- //
- // We need to continue if setBridge fails for some reason. Rolling
- // back the other sessions would be difficult. The bridge does not
- // know enough to try for system wide consistency. An OTS would
- // really be required if things like replaceSessions() were to be
- // atomic.
- //
- mLogger(Info) << FUNLOG << ": setting the bridge on " << *i << " threw " << ex.what();
- }
- }
-
- //
- // Now update the listeners.
- //
- mListeners->sessionsAdded(added);
-}
-
void BridgeImpl::replaceSession_async(const AMD_Bridge_replaceSessionPtr& callback, const SessionPrx& sessionToReplace,
const SessionSeq& newSessions, const Ice::Current& current)
{
try
{
- replaceSession(sessionToReplace, newSessions, current);
- callback->ice_response();
+ mLogger(Debug) << FUNLOG << ":" << objectIdFromCurrent(current);
+
+ checkSessions(newSessions);
+ statePreCheck();
+
+ SessionWrapperPtr session = mSessions->getSession(sessionToReplace);
+ //
+ // If the session did not exist on this bridge, then this operation should not proceed.
+ //
+ if (!session)
+ {
+ throw SessionNotFound(sessionToReplace);
+ }
+ //
+ // Shutdown is inheritently asynchronous (see SessionWrapper::shutdown())
+ //
+ SessionsTrackerPtr removeTracker(new SessionsTracker);
+ removeTracker->add(session->getSession());
+ session->shutdown(mSessionListenerPrx, new ResponseCode);
+
+ SessionsTrackerPtr tracker(new SessionsTracker);
+ QueuedTasks tasks;
+ tasks.push_back(new RemoveSessionsNotify(mListeners, removeTracker));
+ tasks.push_back(new SetBridgeTask(mSessions, mPrx, mSessionListenerPrx, newSessions, tracker));
+ tasks.push_back(new AddToListeners(mListeners, tracker));
+ tasks.push_back(new GenericAMDCallback<AMD_Bridge_replaceSessionPtr>(callback, tracker));
+ tasks.push_back(new UpdateTask(this));
+ ExecutorPtr executor(new Executor(tasks, mLogger));
+ executor->start();
}
catch (const std::exception& ex)
{
@@ -725,39 +917,23 @@ SessionCollectionPtr BridgeImpl::sessions()
return mSessions;
}
-namespace
+void BridgeImpl::forceUpdate()
{
-class ShutdownThread : public IceUtil::Thread
-{
-public:
- ShutdownThread(const BridgePrx& bridge) :
- mBridge(bridge)
- {
- }
-
- void run()
+ BridgeStateItemPtr update;
{
- try
- {
- mBridge->shutdown();
- }
- catch (...)
- {
- }
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ update = createUpdate();
}
-private:
- BridgePrx mBridge;
-};
+ pushUpdate(update);
}
-void BridgeImpl::spawnShutdown()
+void BridgeImpl::getAddSessionsTasks(QueuedTasks& tasks,
+ const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions)
{
- boost::shared_lock<boost::shared_mutex> lock(mLock);
- if (!mShutdownThread)
- {
- mShutdownThread = new ShutdownThread(mPrx);
- mShutdownThread->start();
- }
+ SessionsTrackerPtr tracker(new SessionsTracker);
+ tasks.push_back(new SetBridgeTask(mSessions, mPrx, mSessionListenerPrx, sessions, tracker));
+ tasks.push_back(new AddToListeners(mListeners, tracker));
+ tasks.push_back(new UpdateTask(this));
}
void BridgeImpl::statePreCheck()
@@ -805,7 +981,7 @@ void BridgeImpl::pushUpdate(const BridgeStateItemPtr& update)
void BridgeImpl::pushUpdates(const ReplicatedStateItemSeq& update)
{
- if (update.size() > 0 && replicate())
+ if (!update.empty() && replicate())
{
mReplicator->setState(update);
}
@@ -848,7 +1024,8 @@ AsteriskSCF::BridgeService::BridgeServant::create(const Ice::ObjectAdapterPtr& o
const AsteriskSCF::Bridge::V1::BridgeStateItemPtr& state)
{
logger(Debug) << FUNLOG << ": creating replica for " << state->bridgeId;
- IceUtil::Handle<AsteriskSCF::BridgeService::BridgeServant> bridge(new BridgeImpl(state->bridgeId, objectAdapter, vector<BridgeListenerPrx>(),
+ IceUtil::Handle<AsteriskSCF::BridgeService::BridgeServant> bridge(
+ new BridgeImpl(state->bridgeId, objectAdapter, vector<BridgeListenerPrx>(),
listenerMgr, replicator, state, logger));
return bridge;
diff --git a/src/BridgeImpl.h b/src/BridgeImpl.h
index 1276482..fd5dbfa 100644
--- a/src/BridgeImpl.h
+++ b/src/BridgeImpl.h
@@ -96,8 +96,6 @@ public:
**/
virtual void activate(const AsteriskSCF::SessionCommunications::V1::BridgePrx& proxy) = 0;
- virtual void addSessions(const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions) = 0;
-
/**
*
* Replication helper methods
@@ -113,6 +111,14 @@ public:
virtual SessionCollectionPtr sessions() = 0;
+ //
+ // Allows an external agent to force a state update replication.
+ //
+ virtual void forceUpdate() = 0;
+
+ virtual void getAddSessionsTasks(QueuedTasks& tasks,
+ const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions) = 0;
+
/**
*
* Internal implementation detail interface.
diff --git a/src/BridgeManagerImpl.cpp b/src/BridgeManagerImpl.cpp
index 64b13ad..7d90ed5 100644
--- a/src/BridgeManagerImpl.cpp
+++ b/src/BridgeManagerImpl.cpp
@@ -60,8 +60,6 @@ public:
//
// AsteriskSCF::SessionCommunications::V1::BridgeManager Interface
//
- BridgePrx createBridge(const SessionSeq& endpoints, const BridgeListenerPrx& listener, const Ice::Current& current);
-
void createBridge_async(const AMD_BridgeManager_createBridgePtr& request, const SessionSeq& endpoints,
const BridgeListenerPrx& listener, const Ice::Current&);
@@ -172,59 +170,81 @@ BridgeManagerImpl::~BridgeManagerImpl()
mListeners = 0;
}
-BridgePrx BridgeManagerImpl::createBridge(const SessionSeq& sessions,
- const BridgeListenerPrx& listener, const Ice::Current& current)
+class FinishUp : public QueuedTask
{
- mLogger(Debug) << FUNLOG << ":" << objectIdFromCurrent(current);
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- statePreCheck(BOOST_CURRENT_FUNCTION);
- reap();
-
- string stringId = string("bridge.") + IceUtil::generateUUID();
- Ice::Identity id(mAdapter->getCommunicator()->stringToIdentity(stringId));
- BridgePrx prx(BridgePrx::uncheckedCast(mAdapter->createProxy(id)));
- BridgeListenerMgrPtr mgr(new BridgeListenerMgr(mAdapter->getCommunicator(), stringId, prx));
- vector<BridgeListenerPrx> listeners(mState->defaultBridgeListeners);
- if (listener)
+public:
+ FinishUp(const AMD_BridgeManager_createBridgePtr& callback,
+ const BridgeManagerListenerMgrPtr& listenerMgr,
+ const BridgePrx& bridgeProxy) :
+ QueuedTask("FinishUp"),
+ mCallback(callback),
+ mListenerMgr(listenerMgr),
+ mBridgeProxy(bridgeProxy)
{
- listeners.push_back(listener);
}
+
+protected:
- BridgeServantPtr bridge = BridgeServant::create(stringId, mAdapter, listeners, mgr, mReplicator, mLogger);
- Ice::ObjectPrx obj = mAdapter->add(bridge, id);
-
- mLogger(Info) << objectIdFromCurrent(current) << ": creating new bridge " << obj->ice_toString() << "." ;
- BridgeInfo info;
- info.servant = bridge;
- info.proxy = BridgePrx::uncheckedCast(obj);
-
- bridge->activate(info.proxy);
-
- if (mListeners)
+ bool executeImpl()
{
- mListeners->bridgeCreated(info.proxy);
+ if (mListenerMgr)
+ {
+ mListenerMgr->bridgeCreated(mBridgeProxy);
+ }
+ mCallback->ice_response(mBridgeProxy);
+ return true;
}
- else
+
+ void failImpl()
{
- mLogger(Debug) << ": bridgeCreated event not published as there are no listeners configured.";
+ mCallback->ice_exception();
}
- mBridges.push_back(info);
-
- //
- // Don't forget to add the initial sessions.
- //
- bridge->addSessions(sessions);
- return info.proxy;
-}
+private:
+ AMD_BridgeManager_createBridgePtr mCallback;
+ BridgeManagerListenerMgrPtr mListenerMgr;
+ BridgePrx mBridgeProxy;
+};
void BridgeManagerImpl::createBridge_async(const AMD_BridgeManager_createBridgePtr& callback,
- const SessionSeq& endpoints, const BridgeListenerPrx& listener, const Ice::Current& current)
+ const SessionSeq& sessions, const BridgeListenerPrx& listener, const Ice::Current& current)
{
try
{
- BridgePrx result = createBridge(endpoints, listener, current);
- callback->ice_response(result);
+ mLogger(Debug) << FUNLOG << ":" << objectIdFromCurrent(current);
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ statePreCheck(BOOST_CURRENT_FUNCTION);
+ reap();
+
+ string stringId = string("bridge.") + IceUtil::generateUUID();
+ Ice::Identity id(mAdapter->getCommunicator()->stringToIdentity(stringId));
+ BridgePrx prx(BridgePrx::uncheckedCast(mAdapter->createProxy(id)));
+ BridgeListenerMgrPtr mgr(new BridgeListenerMgr(mAdapter->getCommunicator(), stringId, prx));
+ vector<BridgeListenerPrx> listeners(mState->defaultBridgeListeners);
+ if (listener)
+ {
+ listeners.push_back(listener);
+ }
+
+ BridgeServantPtr bridge = BridgeServant::create(stringId, mAdapter, listeners, mgr, mReplicator, mLogger);
+ Ice::ObjectPrx obj = mAdapter->add(bridge, id);
+
+ mLogger(Info) << objectIdFromCurrent(current) << ": creating new bridge " << obj->ice_toString() << "." ;
+ BridgeInfo info;
+ info.servant = bridge;
+ info.proxy = BridgePrx::uncheckedCast(obj);
+ bridge->activate(info.proxy);
+ mBridges.push_back(info);
+
+ QueuedTasks tasks;
+ if (!sessions.empty())
+ {
+ bridge->getAddSessionsTasks(tasks, sessions);
+ }
+
+ tasks.push_back(new FinishUp(callback, mListeners, info.proxy));
+ ExecutorPtr runner(new Executor(tasks, mLogger));
+ runner->start();
}
catch (const std::exception& ex)
{
diff --git a/src/MediaSplicer.cpp b/src/MediaSplicer.cpp
index 7fab2b1..619d5c7 100644
--- a/src/MediaSplicer.cpp
+++ b/src/MediaSplicer.cpp
@@ -23,6 +23,13 @@
#include "ServiceUtil.h"
#include "SessionWrapper.h"
+//
+// TODO:
+// * Reasonable retry semantics.
+// * Rollback on connection failures.
+// * Cleanup of MediaConnectorI.
+// * Conference bridging.
+//
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::Media::V1;
using namespace AsteriskSCF::Bridge::V1;
@@ -36,21 +43,25 @@ namespace BridgeService
// Forward declarations.
//
class MediaSplicerI;
-class MedixMixerPtr;
+class MediaConnectorI;
+typedef IceUtil::Handle<MediaConnectorI> MediaConnectorIPtr;
+typedef IceUtil::Handle<MediaSplicerI> MediaSplicerIPtr;
+//
+// Some types that are used throughout.
+//
typedef pair<AsteriskSCF::Media::V1::StreamSinkPrx, AsteriskSCF::Media::V1::StreamSourcePrx> OutgoingPairing;
typedef pair<AsteriskSCF::Media::V1::StreamSourcePrx, AsteriskSCF::Media::V1::StreamSinkPrx> IncomingPairing;
typedef vector<OutgoingPairing> OutgoingPairings;
typedef vector<IncomingPairing> IncomingPairings;
-class MediaConnectorI;
-typedef IceUtil::Handle<MediaConnectorI> MediaConnectorIPtr;
-class MediaSplicerI;
-typedef IceUtil::Handle<MediaSplicerI> MediaSplicerIPtr;
-
+//
+// This structure acts as a sort of token for list of media connection operations. These operations are performed
+// asynchronously and helper classes to move through each operation, effectively serializing the entire affair while
+// letting independent subtasks to be performed independently.
+//
struct MediaConnectorBuilder : public IceUtil::Shared
{
-public:
AsteriskSCF::Media::V1::SessionPrx mediaSession;
AsteriskSCF::Media::V1::StreamSourceSeq sources;
AsteriskSCF::Media::V1::StreamSinkSeq sinks;
@@ -61,10 +72,10 @@ public:
typedef IceUtil::Handle<MediaConnectorBuilder> MediaConnectorBuilderPtr;
//
-// TODO: These proxies could use some retry properties added to them...
-// particularily for the media stream connect/disconnect operations.
+// An implementation of the MediaConnector interface. The details of "hookup up" media is all hidden behind this
+// interface. It's undergone several revisions in the past, some pretty hasty so it probably warrants a refactoring and
+// a close look on its behavior.
//
-
class MediaConnectorI : public MediaConnector
{
public:
@@ -413,7 +424,8 @@ private:
Logger mLogger;
};
-QueuedTasks createMediaConnectTasks(const SessionWrapperPtr& session, const MediaConnectorIPtr& peer, const MediaSplicerIPtr& splicer);
+QueuedTasks createMediaConnectTasks(const SessionWrapperPtr& session,
+ const MediaConnectorIPtr& peer, const MediaSplicerIPtr& splicer);
//
// TODO: This needs to register the streams with an active threaded mixing element.
//
@@ -428,6 +440,10 @@ public:
{
}
+ //
+ // This operation is called internally when the operations for initializing a MediaConnectorBuilder instance have
+ // completed.
+ //
MediaConnectorPtr createConnector(const SessionWrapperPtr& session, const MediaConnectorBuilderPtr& data)
{
MediaConnectorIPtr connector(new MediaConnectorI(data, mBridgeId, session->id(), mReplicator, mLogger));
@@ -440,6 +456,10 @@ public:
return connector;
}
+ //
+ // Called through the external MediaSplicer interface, this initiates a series of operations to establish the media
+ // interconnections so a session can communicate on the bridge.
+ //
void connect(const SessionWrapperPtr& session)
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
@@ -469,7 +489,12 @@ public:
existing = mSessions.back().connector;
existing->clearConnections();
}
- ExecutorPtr taskExecutor(new Executor(createMediaConnectTasks(session, existing, this)));
+
+ //
+ // We do not bother tracking the executor for now. A tidy shutdown would probably want to clean this up.
+ // An alternative is to pass back the queued tasks to the caller and let them start and stop the process.
+ //
+ ExecutorPtr taskExecutor(new Executor(createMediaConnectTasks(session, existing, this), mLogger));
taskExecutor->start();
}
@@ -667,6 +692,7 @@ class GetMediaSession : public QueuedTask
{
public:
GetMediaSession(const SessionWrapperPtr& session, const MediaConnectorBuilderPtr& materials) :
+ QueuedTask("GetMediaSession"),
mSession(session),
mMaterials(materials)
{
@@ -700,6 +726,7 @@ class GetSinks : public QueuedTask
{
public:
GetSinks(const MediaConnectorBuilderPtr& materials) :
+ QueuedTask("GetSinks"),
mMaterials(materials)
{
}
@@ -730,6 +757,7 @@ class GetSources : public QueuedTask
{
public:
GetSources(const MediaConnectorBuilderPtr& materials) :
+ QueuedTask("GetSources"),
mMaterials(materials)
{
}
@@ -761,6 +789,7 @@ class GetCompatiblePairings : public QueuedTask
{
public:
GetCompatiblePairings(const MediaSplicerIPtr& splicer, const MediaConnectorBuilderPtr& materials) :
+ QueuedTask("GetCompatiblePairings"),
mSplicer(splicer),
mMaterials(materials)
{
@@ -788,6 +817,7 @@ class MakeConnections : public QueuedTask
{
public:
MakeConnections(const MediaConnectorBuilderPtr& materials) :
+ QueuedTask("MakeConnections"),
mMaterials(materials),
mIncomingPairings(mMaterials->incomingPairings.size(), 0),
mOutgoingPairings(mMaterials->outgoingPairings.size(), 0),
@@ -966,6 +996,7 @@ class CreateAndRegisterConnector : public QueuedTask
public:
CreateAndRegisterConnector(const SessionWrapperPtr& session, const MediaSplicerIPtr& splicer,
const MediaConnectorBuilderPtr& materials) :
+ QueuedTask("CreateAndRegisterConnector"),
mSession(session),
mSplicer(splicer),
mMaterials(materials)
diff --git a/src/SessionWrapper.cpp b/src/SessionWrapper.cpp
index d7b4271..8688c4e 100644
--- a/src/SessionWrapper.cpp
+++ b/src/SessionWrapper.cpp
@@ -102,6 +102,7 @@ class RemoveBridgeTask : public QueuedTask
{
public:
RemoveBridgeTask(const SessionWrapperPtr& session, const SessionListenerPrx& listener) :
+ QueuedTask("RemoveBridgeTask"),
mSession(session),
mSessionListener(listener)
{
@@ -123,6 +124,9 @@ protected:
void failed(const Ice::Exception& ex)
{
+ //
+ // TODO: Log exception.
+ //
mListener->failed();
}
@@ -150,6 +154,7 @@ class SessionStopTask : public QueuedTask
public:
SessionStopTask(const SessionWrapperPtr& session,
const ResponseCodePtr& code) :
+ QueuedTask("SessionStopTask"),
mSession(session),
mCode(code)
{
@@ -172,6 +177,9 @@ protected:
void failed(const Ice::Exception&)
{
+ //
+ // TODO: Log exception.
+ //
mListener->failed();
}
@@ -199,6 +207,7 @@ class SetStateTask : public QueuedTask
public:
SetStateTask(const SessionWrapperPtr& session,
const AsteriskSCF::Bridge::V1::BridgedSessionState newState) :
+ QueuedTask("SetStateTask"),
mSession(session),
mState(newState)
{
@@ -239,6 +248,9 @@ protected:
void failed(const Ice::Exception&)
{
+ //
+ // TODO: Log exception.
+ //
mListener->failed();
}
@@ -250,6 +262,7 @@ class ConnectMediaTask : public QueuedTask
{
public:
ConnectMediaTask(const SessionWrapperPtr& session) :
+ QueuedTask("ConnectMediaTask"),
mSession(session)
{
}
@@ -309,7 +322,7 @@ SessionWrapper::SessionWrapper(const BridgedSessionPtr& session,
mLogger(logger),
mId(mSession->key),
mSplicer(splicer),
- mActivities(new Executor)
+ mActivities(new Executor(mLogger))
{
}
@@ -482,13 +495,13 @@ string SessionWrapper::id()
void SessionWrapper::setup()
{
mLogger(Debug) << FUNLOG << ": starting setup of connected session " << mId;
- mActivities->append(new QueueableExecutor(createSetupTasks(this)));
+ mActivities->append(new QueueableExecutor(createSetupTasks(this), mLogger));
}
void SessionWrapper::shutdown(const SessionListenerPrx& listener, const ResponseCodePtr& code)
{
mLogger(Debug) << FUNLOG << ": beginning shutdown of " << mId;
- QueuedTaskPtr shutdownRunner(new QueueableExecutor(createShutdownTasks(this, listener, code)));
+ QueuedTaskPtr shutdownRunner(new QueueableExecutor(createShutdownTasks(this, listener, code), mLogger));
//
// TODO: determine if the pending activites should be shutdown first.
//
diff --git a/src/Tasks.h b/src/Tasks.h
index f046cbc..06cc867 100644
--- a/src/Tasks.h
+++ b/src/Tasks.h
@@ -18,6 +18,7 @@
#include <Ice/Ice.h>
#include <boost/thread/locks.hpp>
#include <list>
+#include <AsteriskSCF/logger.h>
namespace AsteriskSCF
{
@@ -43,6 +44,7 @@ class QueuedTask : virtual public IceUtil::Shared
void operator=(const QueuedTask&);
public:
+
virtual ~QueuedTask() {}
//
@@ -89,11 +91,25 @@ public:
mListener = listener;
}
+ std::string name()
+ {
+ return mName;
+ }
+
protected:
TaskListenerPtr mListener;
-
- QueuedTask() {} // for those derived classes that don't require arguments.
+ std::string mName;
+
+ QueuedTask() :
+ mName("<unnamed>")
+ {
+ }
+ QueuedTask(const std::string& name) :
+ mName(name)
+ {
+ }
+
virtual bool executeImpl() = 0;
//
@@ -113,13 +129,15 @@ typedef std::list<QueuedTaskPtr> QueuedTasks;
class Executor : virtual public TaskListener
{
public:
- Executor() :
+ Executor(const AsteriskSCF::System::Logging::Logger& logger) :
+ mLogger(logger),
mStopped(true)
{
}
- Executor(const QueuedTasks& tasks) :
+ Executor(const QueuedTasks& tasks, const AsteriskSCF::System::Logging::Logger& logger) :
mTasks(tasks),
+ mLogger(logger),
mStopped(true)
{
for (QueuedTasks::iterator i = mTasks.begin(); i != mTasks.end(); ++i)
@@ -130,19 +148,15 @@ public:
void start()
{
- QueuedTaskPtr t;
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
- if (mStopped)
+ if (!mStopped)
{
- t = mTasks.front();
- mStopped = false;
+ return;
}
+ mStopped = false;
}
- if (t && t->execute())
- {
- succeeded();
- }
+ succeeded();
}
void stop()
@@ -159,49 +173,39 @@ public:
//
void succeeded()
{
- while (mTasks.size() > 0)
+ QueuedTaskPtr current = popNextTask();
+ while (current)
{
- QueuedTaskPtr t;
- {
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- if (!mStopped)
- {
- mTasks.pop_front();
- if (mTasks.size() == 0)
- {
- mStopped = true;
- break;
- }
- t = mTasks.front();
- }
- }
- if (t)
+ mLogger(AsteriskSCF::System::Logging::Debug) << ": executing " << current->name();
+
+ if (!current->execute())
{
- //
- // TODO: this is too easy to get wrong, should revisit.
- //
- // If execute returns false, the queue should suspend until it gets started again
- // by some outside agent. If a task returns true, it should not call succeeded.
- //
- if (!t->execute())
- {
- break;
- }
+ break;
}
+ current = popNextTask();
}
}
+
+ QueuedTaskPtr popNextTask()
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ if (mTasks.empty() || mStopped)
+ {
+ return 0;
+ }
+ QueuedTaskPtr t = mTasks.front();
+ mTasks.pop_front();
+ return t;
+ }
void failed()
{
- while (mTasks.size() > 0)
+ QueuedTaskPtr current = popNextTask();
+ while (current)
{
- QueuedTaskPtr i;
- {
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- i = mTasks.front();
- mTasks.pop_front();
- }
- i->fail();
+ mLogger(AsteriskSCF::System::Logging::Debug) << ": failing " << current->name();
+ current->fail();
+ current = popNextTask();
}
}
@@ -210,16 +214,12 @@ public:
//
// Note that a request that is in progress will not be destroyed.
//
- while (mTasks.size() > 0)
+ QueuedTaskPtr current = popNextTask();
+ while (current)
{
-
- QueuedTaskPtr i;
- {
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- i = mTasks.front();
- mTasks.pop_front();
- }
- i->destroy();
+ mLogger(AsteriskSCF::System::Logging::Debug) << ": destroying " << current->name();
+ current->destroy();
+ current = popNextTask();
}
}
@@ -249,6 +249,7 @@ public:
protected:
boost::shared_mutex mLock;
QueuedTasks mTasks;
+ AsteriskSCF::System::Logging::Logger mLogger;
bool mStopped;
};
typedef IceUtil::Handle<Executor> ExecutorPtr;
@@ -259,12 +260,13 @@ public:
//
// Useful for derived classes that can only do initialization in the constructor body.
//
- QueueableExecutor()
+ QueueableExecutor(const AsteriskSCF::System::Logging::Logger& logger) :
+ Executor(logger)
{
}
- QueueableExecutor(const QueuedTasks& tasks) :
- Executor(tasks)
+ QueueableExecutor(const QueuedTasks& tasks, const AsteriskSCF::System::Logging::Logger& logger) :
+ Executor(tasks, logger)
{
}
-----------------------------------------------------------------------
--
asterisk-scf/integration/bridging.git
More information about the asterisk-scf-commits
mailing list