[asterisk-scf-commits] asterisk-scf/integration/bridging.git branch "bridge-replication" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Thu Feb 24 14:47:22 CST 2011
branch "bridge-replication" has been updated
via 3a967cb0590297d35cf9fa290512d32bf2423799 (commit)
from 840b5d0a42573922616e25bc3324fb23e08066c7 (commit)
Summary of changes:
src/BridgeImpl.cpp | 783 +++++---------------
src/BridgeImpl.h | 13 +-
src/BridgeManagerImpl.cpp | 4 +-
src/BridgeReplicatorIf.ice | 86 ++-
src/BridgeReplicatorStateListenerI.cpp | 57 ++-
src/CMakeLists.txt | 6 +
src/DebugUtil.h | 7 +
src/SessionCollection.cpp | 158 ++++
src/SessionCollection.h | 120 +++
src/SessionListener.cpp | 140 ++++
...eplicatorStateListenerI.h => SessionListener.h} | 22 +-
src/SessionOperations.cpp | 122 +++
src/SessionOperations.h | 202 +++++
src/SessionWrapper.cpp | 156 ++++
src/SessionWrapper.h | 134 ++++
15 files changed, 1380 insertions(+), 630 deletions(-)
create mode 100644 src/SessionCollection.cpp
create mode 100644 src/SessionCollection.h
create mode 100644 src/SessionListener.cpp
copy src/{BridgeReplicatorStateListenerI.h => SessionListener.h} (65%)
create mode 100644 src/SessionOperations.cpp
create mode 100644 src/SessionOperations.h
create mode 100644 src/SessionWrapper.cpp
create mode 100644 src/SessionWrapper.h
- Log -----------------------------------------------------------------
commit 3a967cb0590297d35cf9fa290512d32bf2423799
Author: Brent Eagles <beagles at digium.com>
Date: Thu Feb 24 17:00:41 2011 -0330
Some fairly large refactorings to better support smaller granularity in the replication as
well as making it easier to bring asynchronous operations in.
This is an intermediate commit to track changes in progress and have not been tested and
aren't quite ready for review.
Amongst the refactorings:
- The collection of sessions is no longer directly managed by the bridge object itself,
but by a SessionCollection object.
- The session listener been separated from the bridge object source code file and
now has very little direct interaction with the bridge servant. Operations that
manipulate the sessions on the bridge are done by accessing the SessionCollection.
- The collective operations on the sessions have been moved into a separate file
and have been largely templatized.
- The SessionCollection has a template visitSession operation that, combined with
the session operations mentioned above, implement the large scale bridge related
operations.
- Sessions that have been added to the bridge result in the creation of a SessionWrapper
(it's really a proxy or adapter of sorts, I hate naming). Once created and connected,
most operations that affect its state occur directly to it as opposed to going through it.
Sadly, at the moment this means locks throughout, but perhaps with the recent
thread pool developments, this can be greatly improved.
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index 8e0badb..7267165 100644
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -22,10 +22,14 @@
#include <boost/thread/locks.hpp>
#include <memory>
#include <algorithm>
-#include "MediaSplicer.h"
+#include <set>
#include "ServiceUtil.h"
#include "DebugUtil.h"
+#include "SessionWrapper.h"
+#include "SessionOperations.h"
+#include "SessionListener.h"
+
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::SessionCommunications::V1;
using namespace AsteriskSCF::BridgeService;
@@ -35,97 +39,23 @@ using namespace std;
/**
*
- * NOTE: Code must be reviewed/refactored for exception safety/consistency.
- * Operations involving establishing media connections may or may not be
- * catastrophic. An example of a non-catastrophic situation might be a media
- * allocation operation that might immediately fail for transient reasons but
- * can be initialized in the background in a relatively timely fashion (of
- * course this would depend on the context).
+ * NOTE: Once asynchronous invocations began to be added to the mix, several shortcomings of both the initial
+ * implementation of the bridge and its replication became apparent. In the previous implementation, the bridge
+ * operations were very "serial" when it came to adding and removing sessions. When converting these operations to
+ * AMD/AMI, the serial approach completely falls apart. Especially when replication is thrown into the mix. For example,
+ * in a typical AMD/AMI scenario, the setBridge() and media allocation operations would occur as AMI requests. The
+ * apparent outcome will be that the participants will "appear" on the call in the order in which the operations
+ * complete (i.e. randomly). Indeed it is possible that some will ultimately fail and some sessions might actually be
+ * removed from the bridge before they are able to complete. Reconciling the completely asynchronous nature to an
+ * implementation is just too awful. As asynchronous support in bridging is the driving factor behind these changes, the
+ * refactoring for replication could have occurred in that development branch. However, replication is a separate task
+ * and it is desirable to have a *relevant* review of it independently of asynchronous support.
*
*/
namespace
{
-
-/**
- *
- * A helper class implemented as a wrapper for a BridgeStateItem.
- *
- **/
-class SessionWrapper
-{
-public:
-
- SessionWrapper(const BridgedSessionPtr& s) :
- mSession(s)
- {
- }
-
- bool isConnected()
- {
- return mSession->currentState == BridgedSessionState::Connected;
- }
-
- void connect()
- {
- if (mSession->currentState == BridgedSessionState::Connected)
- {
- return;
- }
-
- mSession->session->connect();
- mSession->currentState = BridgedSessionState::Connected;
- }
-
- void ring()
- {
- if (mSession->currentState == BridgedSessionState::Connected)
- {
- return;
- }
- mSession->session->ring();
- }
-
- void setConnected()
- {
- mSession->currentState = BridgedSessionState::Connected;
- }
-
- SessionPrx getSession() const
- {
- return mSession->session;
- }
-
- void setConnector(const MediaConnectorPtr& connector)
- {
- mConnector = connector;
- }
-
- void disconnect()
- {
- if (mConnector)
- {
- try
- {
- mConnector->unplug();
- }
- catch (const Ice::Exception&)
- {
- //
- // There are several valid reasons why this might occur, so we'll ignore it and move on.
- //
- }
- mConnector = 0;
- }
- mSession->currentState = BridgedSessionState::Disconnected;
- }
-
-private:
- BridgedSessionPtr mSession;
- MediaConnectorPtr mConnector;
-};
-
/**
*
* BridgeImpl is a reference implmentation of the AsteriskSCF::Bridging::V1::Bridge
@@ -135,7 +65,7 @@ private:
class BridgeImpl : virtual public BridgeServant
{
public:
- BridgeImpl(const Ice::ObjectAdapterPtr& objAdapter,
+ BridgeImpl(const string& name, const Ice::ObjectAdapterPtr& objAdapter,
const vector<BridgeListenerPrx>& listeners,
const BridgeListenerMgrPtr& listenerMgr,
const ReplicatorSmartPrx& replicator,
@@ -169,45 +99,74 @@ public:
BridgeStateItemPtr getState();
void updateState(const BridgeStateItemPtr& state);
+ void updateState(const BridgedSessionPtr& sessionState);
void activate();
string id();
+ SessionCollectionPtr sessions();
- void sessionConnected(const SessionPrx& session);
- size_t sessionStopped(const SessionPrx& session, const ResponseCodePtr& response);
-
-
-
- vector<BridgedSessionPtr> currentSessions();
void spawnShutdown();
private:
boost::shared_mutex mLock;
+ //
+ // True if not a standby object.
+ //
bool mActivated;
+
+ //
+ // The replicated state.
+ //
BridgeStateItemPtr mState;
- MediaSplicer mSplicer;
+ //
+ // Helper class for dealing with the sessions.
+ //
+ SessionCollectionPtr mSessions;
+ //
+ // TODO: Move this out.. this doesn't belong with the bridge implementation any longer.
+ //
+ MediaSplicer mSplicer;
+
const string mName;
Ice::ObjectAdapterPtr mObjAdapter;
BridgeListenerMgrPtr mListeners;
ReplicatorSmartPrx mReplicator;
+
+ //
+ // The bridge's callback implementation for the sessions that get added to it.
+ //
SessionListenerPtr mSessionListener;
SessionListenerPrx mSessionListenerPrx;
+
+ //
+ // A proxy to this bridge. Used when publishing events.
+ //
BridgePrx mPrx;
+
+ //
+ // The logger object.
+ //
Logger mLogger;
+ //
+ // TODO: check to see if this is really needed. If so, it should really be done by a thread pool.
+ //
IceUtil::Handle<IceUtil::Thread> mShutdownThread;
void statePreCheck();
void update();
+ void updateNoLock();
};
typedef IceUtil::Handle<BridgeImpl> BridgeImplPtr;
-
-void checkSessions(const SessionSeq& sessions)
+//
+// simply checks for nulls at this point.
+//
+static void checkSessions(const SessionSeq& sessions)
{
Ice::LongSeq invalidIndexes;
Ice::Long index = 0;
@@ -229,227 +188,9 @@ void checkSessions(const SessionSeq& sessions)
//
static const string TopicPrefix("AsteriskSCF.Bridge.");
-//
-// TODO:
-// Operations that are performed on all bridge sessions might be better done as AMI requests.
-//
-//
-// Functor to support using for_each on shutdown.
-//
-class ShutdownImpl : public unary_function<BridgedSessionPtr, void>
-{
-public:
- ShutdownImpl(const SessionListenerPrx& listener,
- const ResponseCodePtr& response,
- const Logger& logger) :
- mListener(listener),
- mResponse(response),
- mLogger(logger)
- {
- }
-
- void operator()(const BridgedSessionPtr& b)
- {
- try
- {
- SessionWrapper s(b);
- s.getSession()->removeBridge(mListener);
- s.disconnect();
- s.getSession()->stop(mResponse);
- }
- catch (const Ice::ObjectNotExistException& ex)
- {
- mLogger(Debug) << FUNLOG << ": " << ex.what();
- mNonExistent.push_back(b);
- }
- catch (const Ice::Exception& ex)
- {
- mLogger(Debug) << FUNLOG << ": " << ex.what();
- }
- }
-
- const vector<BridgedSessionPtr>& nonExistentObjects()
- {
- return mNonExistent;
- }
-private:
- SessionListenerPrx mListener;
- ResponseCodePtr mResponse;
- vector<BridgedSessionPtr> mNonExistent;
- Logger mLogger;
-};
-
-class RingImpl : public unary_function<BridgedSessionPtr, void>
-{
-public:
- RingImpl(const SessionPrx& exclude, const Logger& logger) :
- mExclude(exclude->ice_getIdentity()),
- mLogger(logger)
- {
- }
-
- void operator()(const BridgedSessionPtr& b)
- {
- SessionWrapper s(b);
- if (s.getSession()->ice_getIdentity() != mExclude)
- {
- try
- {
- s.ring();
- }
- catch (const Ice::ObjectNotExistException& ex)
- {
- mNonExistent.push_back(b);
- mLogger(Debug) << FUNLOG << ": " << ex.what();
- }
- catch (const Ice::Exception& ex)
- {
- mLogger(Debug) << FUNLOG << ": " << ex.what();
- }
- }
- }
-
- const vector<BridgedSessionPtr>& nonExistentObjects()
- {
- return mNonExistent;
- }
-
-private:
- Ice::Identity mExclude;
- vector<BridgedSessionPtr> mNonExistent;
- Logger mLogger;
-};
-
-class ConnectImpl : public unary_function<BridgedSessionPtr, void>
-{
-public:
- ConnectImpl(const SessionPrx& exclude, const Logger& logger) :
- mExclude(exclude->ice_getIdentity()),
- mLogger(logger)
- {
- }
-
- void operator()(BridgedSessionPtr& b)
- {
- SessionWrapper s(b);
- if (s.getSession()->ice_getIdentity() != mExclude)
- {
- try
- {
- s.connect();
- }
- catch (const Ice::ObjectNotExistException& ex)
- {
- mNonExistent.push_back(b);
- mLogger(Debug) << FUNLOG << ": " << ex.what();
- }
- catch (const Ice::Exception& ex)
- {
- mLogger(Debug) << FUNLOG << ": " << ex.what();
- }
- }
- }
-
- const vector<BridgedSessionPtr>& nonExistentObjects()
- {
- return mNonExistent;
- }
-
-private:
- Ice::Identity mExclude;
- vector<BridgedSessionPtr> mNonExistent;
- Logger mLogger;
-};
-
-class FindImpl : public unary_function<BridgedSessionPtr, bool>
-{
-public:
- FindImpl(const SessionPrx& prx) :
- mId(prx->ice_getIdentity())
- {
- }
-
- bool operator()(const BridgedSessionPtr& b)
- {
- SessionWrapper s(b);
- return s.getSession()->ice_getIdentity() == mId;
- }
-private:
- Ice::Identity mId;
-};
-
-//
-// For events that require modification to the bridge, we use helper methods on the bridge itself.
-// For events result in distribution to the bridge sessions, we copy the current sessions and
-// run the calls from the listener itself.
-//
-class SessionListenerImpl : public SessionListener
-{
-public:
- SessionListenerImpl(const BridgeImplPtr& b, const Logger& logger) :
- mBridge(b),
- mLogger(logger)
- {
- }
-
- void connected(const SessionPrx& source, const Ice::Current&)
- {
- try
- {
- mBridge->sessionConnected(source);
- }
- catch (const Ice::Exception& ex)
- {
- mLogger(Debug) << FUNLOG << ": " << ex.what();
- throw;
- }
- vector<BridgedSessionPtr> sessions(mBridge->currentSessions());
- for_each(sessions.begin(), sessions.end(), ConnectImpl(source, mLogger));
- }
-
- void flashed(const SessionPrx& source, const Ice::Current&)
- {
- }
-
- void held(const SessionPrx& source, const Ice::Current&)
- {
- }
-
- void progressing(const SessionPrx& source,
- const ResponseCodePtr& response, const Ice::Current&)
- {
- }
-
- void ringing(const SessionPrx& source, const Ice::Current&)
- {
- vector<BridgedSessionPtr> sessions(mBridge->currentSessions());
- if (sessions.size() > 0)
- {
- for_each(sessions.begin(), sessions.end(), RingImpl(source, mLogger));
- }
- }
-
- void stopped(const SessionPrx& source, const ResponseCodePtr& response, const Ice::Current& current)
- {
- size_t endpointCount = mBridge->sessionStopped(source, response);
- if (endpointCount < 2)
- {
- mBridge->spawnShutdown();
- }
- }
-
- void unheld(const SessionPrx& source, const Ice::Current&)
- {
- }
-
-private:
- BridgeImplPtr mBridge;
- Logger mLogger;
-};
-
} // End of anonymous namespace
-BridgeImpl::BridgeImpl(const Ice::ObjectAdapterPtr& adapter,
+BridgeImpl::BridgeImpl(const string& name, const Ice::ObjectAdapterPtr& adapter,
const vector<BridgeListenerPrx>& listeners,
const BridgeListenerMgrPtr& listenerMgr,
const ReplicatorSmartPrx& replicator,
@@ -457,10 +198,12 @@ BridgeImpl::BridgeImpl(const Ice::ObjectAdapterPtr& adapter,
const Logger& logger) :
mActivated(false),
mState(state),
+ mSessions(new SessionCollection(adapter->getCommunicator(), name, replicator, logger)),
+ mName(name),
mObjAdapter(adapter),
mListeners(listenerMgr),
mReplicator(replicator),
- mSessionListener(new SessionListenerImpl(this, logger)),
+ mSessionListener(createSessionListener(mSessions, logger)),
mLogger(logger)
{
for (vector<BridgeListenerPrx>::const_iterator i = listeners.begin();
@@ -486,88 +229,71 @@ void BridgeImpl::addSessions(const SessionSeq& sessions, const Ice::Current& cur
return;
}
checkSessions(sessions);
+ statePreCheck();
+ mLogger(Debug) << FUNLOG << ": adding " << sessions.size() << " sessions to "
+ << current.adapter->getCommunicator()->identityToString(current.id);
+
SessionSeq addedSessions;
+ for (SessionSeq::const_iterator i = sessions.begin(); i != sessions.end(); ++i)
{
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- statePreCheck();
- mLogger(Debug) << FUNLOG << ": adding " << sessions.size() << " sessions to "
- << current.adapter->getCommunicator()->identityToString(current.id);
- for (SessionSeq::const_iterator i = sessions.begin();
- i != sessions.end(); ++i)
+ try
{
- //
- // TODO: how do we want to handle sessions that have already been added to the bridge. Its pretty much
- // impossible to guard against race conditions where multiple call paths might want to add a session
- // more than once for some reason. We should probably just log it and move on.
- //
- vector<BridgedSessionPtr>::iterator j =
- find_if(mState->bridgedSessions.begin(), mState->bridgedSessions.end(), FindImpl(*i));
- if (j != mState->bridgedSessions.end())
- {
- mLogger(Debug) << FUNLOG << ": " << (*i)->ice_toString()
- << " is already registered with this bridge.";
- continue;
- }
-
- SessionInfoPtr info;
- try
+ SessionWrapperPtr session = mSessions->addSession(*i);
+ if (session)
{
- RetryPolicy policy(5, 500);
- //
- // canRetry should never return false since we throw ourselves out of this loop. But
- // we'll do it here in case we decide to do something else.
- //
- while (policy.canRetry())
+ SessionInfoPtr info;
+ try
{
- try
- {
- info = (*i)->setBridge(mPrx, mSessionListenerPrx);
- break;
- }
- catch (const Ice::ConnectionLostException&)
+ RetryPolicy policy(5, 500);
+ //
+ // canRetry should never return false since we throw ourselves out of this loop. But
+ // we'll do it here in case we decide to do something else.
+ //
+ while (policy.canRetry())
{
- if (!policy.retry())
+ try
{
- throw;
+ //
+ // TODO : AMI!
+ //
+ info = (*i)->setBridge(mPrx, mSessionListenerPrx);
+ addedSessions.push_back(*i);
+ break;
+ }
+ catch (const Ice::ConnectionLostException&)
+ {
+ if (!policy.retry())
+ {
+ throw;
+ }
}
}
}
+ catch (const Ice::Exception& ex)
+ {
+ mLogger(Debug) << FUNLOG << ": " << (*i)->ice_toString() << " threw " << ex.what()
+ << " continuing";
+ }
+ if (info->currentState != "ready")
+ {
+ //
+ // We setup media.
+ // TODO: AMI should come into play here.
+ //
+ session->setConnector(mSplicer.connect(*i, mLogger));
+ }
}
- catch (const Ice::Exception& ex)
- {
- mLogger(Debug) << FUNLOG << ": " << (*i)->ice_toString() << " threw " << ex.what()
- << " continuing";
- }
- //
- // We need to define these states! Especially the ones that define when start is called or not.
- //
- BridgedSessionPtr newSession(new BridgedSession);
- newSession->session = *i;
- if (info->currentState == "ready")
- {
- mLogger(Debug) << FUNLOG << ": " << (*i)->ice_toString()
- << " current state is ready (not yet connected), not establishing media connections.";
- newSession->currentState = BridgedSessionState::Added;
- }
- else
- {
- mLogger(Debug) << FUNLOG << ": " << (*i)->ice_toString()
- << " media is expected to be establishing plugging media into bridge.";
-
- SessionWrapper s(newSession);
- s.setConnector(mSplicer.connect(*i, mLogger));
- newSession->currentState = BridgedSessionState::Connected;
- }
- mState->bridgedSessions.push_back(newSession);
-
- addedSessions.push_back(*i);
}
- update();
+ catch (const Ice::Exception& ex)
+ {
+ }
}
+
if (addedSessions.size())
{
mListeners->sessionsAdded(addedSessions);
}
+ update();
}
void BridgeImpl::removeSessions(
@@ -579,69 +305,55 @@ void BridgeImpl::removeSessions(
}
checkSessions(sessions);
SessionSeq removedSessions;
+ statePreCheck();
+ for (SessionSeq::const_iterator i = sessions.begin();
+ i != sessions.end(); ++i)
{
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- statePreCheck();
- for (SessionSeq::const_iterator i = sessions.begin();
- i != sessions.end(); ++i)
+ SessionWrapperPtr session = mSessions->getSession(*i);
+ if (session)
{
- vector<BridgedSessionPtr>::iterator j = find_if(mState->bridgedSessions.begin(),
- mState->bridgedSessions.end(), FindImpl(*i));
- if (j != mState->bridgedSessions.end())
- {
- try
- {
- (*j)->session->removeBridge(mSessionListenerPrx);
- }
- catch (const Ice::Exception& ex)
- {
- mLogger(Info) << ": removingthe bridge from " << (*j)->session << " threw "
- << ex.what();
- }
- SessionWrapper(*j).disconnect();
- mState->bridgedSessions.erase(j);
- removedSessions.push_back(*i);
- }
- else
+ try
{
//
- // TODO: how do we want to handle sessions that aren't there.
- // Its pretty much impossible to guard against race conditions
- // where an expected session may no longer be present (device
- // hung up, etc). In other words, this should probably be
- // expected and maybe just logged.
+ // TODO: AMI.
//
+ (*i)->removeBridge(mSessionListenerPrx);
}
+ catch (const Ice::Exception& ex)
+ {
+ mLogger(Info) << ": removing the bridge from " << (*i) << " threw "
+ << ex.what();
+ }
+ session->disconnect();
+ removedSessions.push_back(*i);
}
- update();
}
if (removedSessions.size() != 0)
{
mListeners->sessionsRemoved(removedSessions);
}
- //
- // TODO: Should be policy driven.
- //
- if (mState->bridgedSessions.size() < 2)
+
+ CountIfOperation< Negate <IfStateCriteria> > counter(IfStateCriteria(BridgedSessionState::Done));
+ mSessions->visitSessions(counter);
+ if (counter.count() < 2)
{
spawnShutdown();
}
+ update();
}
SessionSeq BridgeImpl::listSessions(const Ice::Current& current)
{
- mLogger(Debug) << FUNLOG << ":" << objectIdFromCurrent(current);
- boost::shared_lock<boost::shared_mutex> lock(mLock);
+ mLogger(Debug) << FUNLOG << ":" << objectIdFromCurrent(current);
statePreCheck();
- mLogger(Debug) << FUNLOG << ": working with " << mState->bridgedSessions.size() << " sessions." ;
- SessionSeq result;
- for (vector<BridgedSessionPtr>::const_iterator i = mState->bridgedSessions.begin(); i != mState->bridgedSessions.end();
- ++i)
- {
- result.push_back((*i)->session);
- }
- return result;
+ set< BridgedSessionState > excludedStates;
+ excludedStates.insert(BridgedSessionState::Done);
+ excludedStates.insert(BridgedSessionState::Disconnected);
+ Negate< IfInCriteria< set< BridgedSessionState >, StateMemberSelector > > whereNot(excludedStates);
+ SelectOperation< Negate< IfInCriteria< set< BridgedSessionState >, StateMemberSelector > >, SessionPrxSelector, SessionSeq> select(whereNot);
+ mSessions->visitSessions(select);
+ return select.results();
}
void BridgeImpl::shutdown(const Ice::Current& current)
@@ -650,7 +362,7 @@ void BridgeImpl::shutdown(const Ice::Current& current)
// When shutting down, the bridge makes a copy of its current state and unlocks, proceeding with
// no other internal locks.
//
- mLogger(Debug) << FUNLOG << ":" << objectIdFromCurrent(current);
+ mLogger(Debug) << FUNLOG << ":" << objectIdFromCurrent(current);
boost::unique_lock<boost::shared_mutex> lock(mLock);
if (mState->runningState == ShuttingDown)
{
@@ -666,14 +378,8 @@ void BridgeImpl::shutdown(const Ice::Current& current)
mListeners->stopping();
- //
- // TODO: Response code for termination messages for bridges shutting down should come from configuration
- //
- if (mState->bridgedSessions.size() > 0)
- {
- for_each(mState->bridgedSessions.begin(), mState->bridgedSessions.end(),
- ShutdownImpl(mSessionListenerPrx, new ResponseCode, mLogger));
- }
+ ShutdownSessionOperation shutdownOp(mSessionListenerPrx, new ResponseCode, mLogger);
+ mSessions->visitSessions(shutdownOp);
mLogger(Info) << objectIdFromCurrent(current) << ": is shutdown." ;
mListeners->stopped();
@@ -684,7 +390,8 @@ void BridgeImpl::shutdown(const Ice::Current& current)
//
mObjAdapter->remove(mSessionListenerPrx->ice_getIdentity());
mSessionListener = 0;
- update();
+
+ updateNoLock();
}
void BridgeImpl::destroy(const Ice::Current& current)
@@ -710,58 +417,51 @@ void BridgeImpl::destroy(const Ice::Current& current)
// Remove references to the session listener implementation.
//
mObjAdapter->remove(mSessionListenerPrx->ice_getIdentity());
- update();
+ updateNoLock();
}
void BridgeImpl::addListener(const BridgeListenerPrx& listener, const Ice::Current&)
{
mListeners->addListener(listener);
- boost::shared_lock<boost::shared_mutex> lock(mLock);
update();
}
void BridgeImpl::removeListener(const BridgeListenerPrx& listener, const Ice::Current&)
{
mListeners->removeListener(listener);
- boost::shared_lock<boost::shared_mutex> lock(mLock);
update();
}
void BridgeImpl::replaceSession(const SessionPrx& oldSession, const SessionSeq& newSessions, const Ice::Current& current)
{
mLogger(Debug) << FUNLOG << ":" << objectIdFromCurrent(current);
- BridgedSessionPtr toRemove;
- vector<BridgedSessionPtr> newMembers;
checkSessions(newSessions);
+ statePreCheck();
+
+ SessionWrapperPtr session = mSessions->getSession(oldSession);
+ if (session)
{
- //
- // TODO:Need to find a way to make this as exception safe as possible!
- //
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- statePreCheck();
-
- vector<BridgedSessionPtr>::iterator i =
- find_if(mState->bridgedSessions.begin(), mState->bridgedSessions.end(), FindImpl(oldSession));
-
- //
- // We need to disconnect the media while the bridge is locked because the media splicer does not currently
- // have any mutex protection of its own.
- //
- if (i != mState->bridgedSessions.end())
+ try
{
- mLogger(Debug) << FUNLOG << ": found session to replace : " << oldSession->ice_toString();
- toRemove = *i;
- SessionWrapper s(toRemove);
- s.disconnect();
- mState->bridgedSessions.erase(i);
+ //
+ // TODO: AMI.
+ //
+ oldSession->removeBridge(mSessionListenerPrx);
}
- //
- // We don't do anything else with the new members just yet. We are
- // going to release the lock and continue with things.
- //
+ catch (const Ice::Exception& ex)
+ {
+ mLogger(Info) << ": removingthe bridge from " << oldSession << " threw "
+ << ex.what();
+ }
+ session->disconnect();
+ SessionSeq removed;
+ removed.push_back(oldSession);
+ mListeners->sessionsRemoved(removed);
+ update();
}
- vector<SessionInfoPtr> infoSeq;
+
+ SessionSeq added;
for (SessionSeq::const_iterator i = newSessions.begin(); i != newSessions.end(); ++i)
{
try
@@ -775,11 +475,18 @@ void BridgeImpl::replaceSession(const SessionPrx& oldSession, const SessionSeq&
{
try
{
- infoSeq.push_back((*i)->setBridge(mPrx, mSessionListenerPrx));
- BridgedSessionPtr s(new BridgedSession);
- s->session = *i;
- s->currentState = BridgedSessionState::Added;
- newMembers.push_back(s);
+ SessionInfoPtr info = (*i)->setBridge(mPrx, mSessionListenerPrx);
+ SessionWrapperPtr session = mSessions->addSession(*i);
+ if (info->currentState != "ready")
+ {
+ //
+ // We setup media.
+ // TODO: AMI should come into play here.
+ //
+ session->setConnector(mSplicer.connect(*i, mLogger));
+ }
+ added.push_back(*i);
+
break;
}
catch (const Ice::ConnectionLostException&)
@@ -803,70 +510,12 @@ void BridgeImpl::replaceSession(const SessionPrx& oldSession, const SessionSeq&
mLogger(Info) << FUNLOG << ": setting the bridge on " << *i << " threw " << ex.what();
}
}
- assert(infoSeq.size() == newMembers.size());
-
- {
- //
- // Now that is all over with, let's add them to the bridge's list.
- //
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- for (vector<BridgedSessionPtr>::iterator j = newMembers.begin(); j != newMembers.end() ; ++j)
- {
- mState->bridgedSessions.push_back(*j);
- }
- update();
- }
-
- //
- // The call on the session to disassociate it from the bridge is deferred
- // until after the lock on the bridge has been released.
- //
- if (toRemove)
- {
- RetryPolicy policy(5, 500);
- //
- // canRetry should never return false since we throw ourselves out of this loop. But
- // we'll do it here in case we decide to do something else.
- //
- while (policy.canRetry())
- {
- try
- {
- toRemove->session->removeBridge(mSessionListenerPrx);
- break;
- }
- catch (const NotBridged&)
- {
- mLogger(Info) << FUNLOG <<
- ": removeBridge on session being replaced threw a `NotBridged' exception";
- break;
- }
- catch (const Ice::ConnectionLostException&)
- {
- if (!policy.retry())
- {
- throw;
- }
- }
- catch (const Ice::Exception& ex)
- {
- mLogger(Info) << FUNLOG << ": removeBridge resulted in : " << ex.what();
- break;
- }
- }
- }
//
// Now update the listeners.
//
- SessionSeq sessions(newSessions);
- mListeners->sessionsAdded(sessions);
- if (toRemove)
- {
- sessions.clear();
- sessions.push_back(oldSession);
- mListeners->sessionsRemoved(sessions);
- }
+ mListeners->sessionsAdded(added);
+ update();
}
bool BridgeImpl::destroyed()
@@ -929,6 +578,11 @@ string BridgeImpl::id()
return mState->bridgeId;
}
+SessionCollectionPtr BridgeImpl::sessions()
+{
+ return mSessions;
+}
+
void BridgeImpl::activate()
{
mLogger(Debug) << FUNLOG;
@@ -936,63 +590,6 @@ void BridgeImpl::activate()
mActivated = true;
}
-void BridgeImpl::sessionConnected(const SessionPrx& session)
-{
- mLogger(Debug) << FUNLOG << ": session connected " << session->ice_toString() ;
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- vector<BridgedSessionPtr>::iterator i =
- find_if(mState->bridgedSessions.begin(), mState->bridgedSessions.end(), FindImpl(session));
-#ifndef _NDEBUG
- if (i == mState->bridgedSessions.end())
- {
- mLogger(Debug) << FUNLOG << ": did not find " << session->ice_toString();
- }
-#endif
- if (i != mState->bridgedSessions.end())
- {
- SessionWrapper s(*i);
- s.setConnected();
- s.setConnector(mSplicer.connect(session, mLogger));
- }
-}
-
-size_t BridgeImpl::sessionStopped(const SessionPrx& session, const ResponseCodePtr& response)
-{
- mLogger(Debug) << FUNLOG << ": session terminated from " << session->ice_toString() ;
- try
- {
- session->removeBridge(mSessionListenerPrx);
- }
- catch (Ice::ObjectNotExistException& ex)
- {
- mLogger(Debug) << FUNLOG << ":" << ex.what();
- }
- catch (Ice::Exception& ex)
- {
- mLogger(Debug) << FUNLOG << ":" << ex.what();
- throw;
- }
-
- BridgedSessionPtr b;
- size_t sizeAfter;
- {
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- vector<BridgedSessionPtr>::iterator i =
- find_if(mState->bridgedSessions.begin(), mState->bridgedSessions.end(), FindImpl(session));
- if (i != mState->bridgedSessions.end())
- {
- b = *i;
- mState->bridgedSessions.erase(i);
- }
- sizeAfter = mState->bridgedSessions.size();
- }
- if (b)
- {
- SessionWrapper(b).disconnect();
- }
-
- return sizeAfter;
-}
namespace
{
@@ -1029,16 +626,17 @@ void BridgeImpl::spawnShutdown()
}
}
-vector<BridgedSessionPtr> BridgeImpl::currentSessions()
+void BridgeImpl::update()
{
- boost::shared_lock<boost::shared_mutex> lock(mLock);
- return mState->bridgedSessions;
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ updateNoLock();
}
-void BridgeImpl::update()
+void BridgeImpl::updateNoLock()
{
if (mActivated)
{
+ ++mState->serial;
ReplicatedStateItemSeq seq;
seq.push_back(getState());
mReplicator->setState(seq);
@@ -1047,6 +645,8 @@ void BridgeImpl::update()
void BridgeImpl::statePreCheck()
{
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+
if (mState == ShuttingDown)
{
mLogger(Debug) << FUNLOG << ": called when shutting down." ;
@@ -1060,7 +660,7 @@ void BridgeImpl::statePreCheck()
}
IceUtil::Handle<AsteriskSCF::BridgeService::BridgeServant>
-AsteriskSCF::BridgeService::BridgeServant::create(const Ice::ObjectAdapterPtr& objectAdapter,
+AsteriskSCF::BridgeService::BridgeServant::create(const string& name, const Ice::ObjectAdapterPtr& objectAdapter,
const vector<BridgeListenerPrx>& listeners,
const AsteriskSCF::BridgeService::BridgeListenerMgrPtr& listenerMgr,
const ReplicatorSmartPrx& replicator,
@@ -1068,13 +668,14 @@ AsteriskSCF::BridgeService::BridgeServant::create(const Ice::ObjectAdapterPtr& o
{
BridgeStateItemPtr state(new AsteriskSCF::Bridge::V1::BridgeStateItem);
state->runningState = Running;
+ state->serial = SerialCounterStart;
//
// TODO: "replicate" is the only replication policy currently supported by the bridge service.
// In the future it may be possible for the bridge replica to reconstruct its media operations
// allowing localized resources to be used.
//
state->mediaReplicationPolicy = MediaOperationReplicationPolicy::Replicate;
- return new BridgeImpl(objectAdapter, listeners, listenerMgr, replicator, state, logger);
+ return new BridgeImpl(name, objectAdapter, listeners, listenerMgr, replicator, state, logger);
}
IceUtil::Handle<AsteriskSCF::BridgeService::BridgeServant>
@@ -1085,5 +686,5 @@ AsteriskSCF::BridgeService::BridgeServant::create(const Ice::ObjectAdapterPtr& o
const AsteriskSCF::Bridge::V1::BridgeStateItemPtr& state)
{
logger(Debug) << FUNLOG << ": creating replica for " << state->bridgeId;
- return new BridgeImpl(objectAdapter, state->listeners, listenerMgr, replicator, state, logger);
+ return new BridgeImpl(state->bridgeId, objectAdapter, state->listeners, listenerMgr, replicator, state, logger);
}
diff --git a/src/BridgeImpl.h b/src/BridgeImpl.h
index d20be7a..3e3c865 100644
--- a/src/BridgeImpl.h
+++ b/src/BridgeImpl.h
@@ -21,6 +21,7 @@
#include "BridgeReplicatorIf.h"
#include "BridgeListenerMgr.h"
#include "BridgeServiceConfig.h"
+#include "SessionCollection.h"
namespace AsteriskSCF
{
@@ -95,7 +96,6 @@ public:
**/
virtual void activate(const AsteriskSCF::SessionCommunications::V1::BridgePrx& proxy) = 0;
-
/**
*
* Replication helper methods
@@ -109,12 +109,21 @@ public:
virtual std::string id() = 0;
+ virtual SessionCollectionPtr sessions() = 0;
+
+ /**
+ *
+ * Internal implementation detail interface.
+ *
+ **/
+
/**
*
* Factory method for creating bridge servant instances.
*
**/
- static IceUtil::Handle<BridgeServant> create(const Ice::ObjectAdapterPtr& objectAdapter,
+ static IceUtil::Handle<BridgeServant> create(const std::string& name,
+ const Ice::ObjectAdapterPtr& objectAdapter,
const std::vector<AsteriskSCF::SessionCommunications::V1::BridgeListenerPrx>& listeners,
const AsteriskSCF::BridgeService::BridgeListenerMgrPtr& listenerMgr,
const ReplicatorSmartPrx& replicator,
diff --git a/src/BridgeManagerImpl.cpp b/src/BridgeManagerImpl.cpp
index 008c8df..3f6b4a7 100644
--- a/src/BridgeManagerImpl.cpp
+++ b/src/BridgeManagerImpl.cpp
@@ -156,6 +156,7 @@ BridgeManagerImpl::BridgeManagerImpl(const Ice::ObjectAdapterPtr& adapter, const
mListeners = new BridgeManagerListenerMgr(mAdapter->getCommunicator(), mName, mSourceProxy);
mState->runningState = ServiceState::Running;
mState->key = name;
+ mState->serial = SerialCounterStart;
}
BridgeManagerImpl::~BridgeManagerImpl()
@@ -185,7 +186,7 @@ BridgePrx BridgeManagerImpl::createBridge(const SessionSeq& sessions,
listeners.push_back(listener);
}
- BridgeServantPtr bridge = BridgeServant::create(mAdapter, listeners, mgr, mReplicator, mLogger);
+ BridgeServantPtr bridge = BridgeServant::create(stringId, mAdapter, listeners, mgr, mReplicator, mLogger);
Ice::ObjectPrx obj = mAdapter->add(bridge, id);
mLogger(Info) << objectIdFromCurrent(current) << ": creating new bridge " << obj->ice_toString() << "." ;
@@ -426,6 +427,7 @@ void BridgeManagerImpl::update()
{
if (mActivated)
{
+ ++mState->serial;
ReplicatedStateItemSeq seq;
seq.push_back(getState());
mReplicator->setState(seq);
diff --git a/src/BridgeReplicatorIf.ice b/src/BridgeReplicatorIf.ice
index 5412b52..0fa5dfe 100644
--- a/src/BridgeReplicatorIf.ice
+++ b/src/BridgeReplicatorIf.ice
@@ -33,7 +33,34 @@ const string StateReplicatorDiscoveryCategory = "BridgeReplicator";
/**
*
- * XXX
+ * It might seem a little odd to define this constant in Slice, but it needs to be
+ * specified and used everywhere serial numbers are considered. The initial serial
+ * number is offset from 0 by some arbitrary yet large enough number so that when
+ * the serial wraps, it can be detected without examining previous values and making
+ * some of decision based on a delta (which would also be arbitrary).
+ *
+ **/
+const int SerialCounterStart = 100;
+
+/**
+ *
+ * Base class for replicated state.
+ *
+ * Some notes on "serial":
+ * The "serial" member was added after it was realized that in a highly asynchronous environment with
+ * active replication, it is possible for operations to get lost or duplicated.
+ *
+ **/
+class ReplicatedStateItem
+{
+ string key; /* unique identifier for this state item */
+ long serial; /* a version identifier */
+};
+sequence<ReplicatedStateItem> ReplicatedStateItemSeq;
+
+/**
+ *
+ * TODO: Use!
*
*/
class BridgeStateReplicatorParams extends AsteriskSCF::Core::Discovery::V1::ServiceLocatorParams
@@ -62,12 +89,11 @@ enum MediaOperationReplicationPolicy
* to media sources and sinks in the bridge.
*
**/
-class MediaPairing
+class MediaPairing extends ReplicatedStateItem
{
AsteriskSCF::Media::V1::StreamSource* source;
AsteriskSCF::Media::V1::StreamSink* sink;
};
-
/**
*
* A session may have more than one source an sink connected for this session.
@@ -75,6 +101,15 @@ class MediaPairing
**/
sequence<MediaPairing> MediaPairingSeq;
+class SessionPairing extends ReplicatedStateItem
+{
+ string bridgeKey;
+ string sessionKeyA;
+ string sessionKeyB;
+
+ MediaPairingSeq mediaPairings;
+};
+
/**
*
* A session that has been added to the bridge changes "state" relative to the bridge.
@@ -85,8 +120,9 @@ enum BridgedSessionState
{
Added, /* Session has been added to the bridge, but is not yet connected to the call. */
Connected, /* Session is bridged and connected. */
- Disconnected /* Session has been disconnected and is in the process of being removed
+ Disconnected, /* Session has been disconnected and is in the process of being removed
from the bridge. */
+ Done /* Session object is no longer available and any resources related to it should be released */
};
/**
@@ -95,7 +131,7 @@ enum BridgedSessionState
* than just the SessionPrx.
*
**/
-class BridgedSession
+class BridgedSession extends ReplicatedStateItem
{
/**
* The session proxy itself. Must not be null!
@@ -108,20 +144,21 @@ class BridgedSession
BridgedSessionState currentState;
/**
- * The media pairings associated with this bridged session.
+ * Key to the child session pairing
**/
- MediaPairingSeq mediaPairings;
-};
-sequence<BridgedSession> BridgedSessionSeq;
+ string sessionPairingKey;
-/**
- * Base class for replicated state.
- **/
-class ReplicatedStateItem
-{
- string key; /* unique identifier for this state item */
+ /**
+ * The parent bridge.
+ **/
+ string bridgeId;
+
+ /**
+ * Tracks the order in which the items were added.
+ */
+ long orderAdded;
};
-sequence<ReplicatedStateItem> ReplicatedStateItemSeq;
+sequence<BridgedSession> BridgedSessionSeq;
/**
* Bridge service operational states.
@@ -151,9 +188,22 @@ class BridgeManagerStateItem extends ReplicatedStateItem
**/
class BridgeStateItem extends ReplicatedStateItem
{
- string bridgeId; /* the id for the bridge object is included here
- as all items are relative to a bridge */
+ /**
+ * The ID for the bridge object include here as all items are relative
+ * to a bridge.
+ **/
+ string bridgeId;
+
+ /**
+ * The current activation state of the bridge. NOT the same
+ * as the Component running state.
+ **/
ServiceState runningState;
+
+ /**
+ * These items are replicated separately, but mirrored here.
+ * TODO: see if they can be completely removed.
+ **/
BridgedSessionSeq bridgedSessions;
BridgeListenerSeq listeners;
MediaOperationReplicationPolicy mediaReplicationPolicy;
diff --git a/src/BridgeReplicatorStateListenerI.cpp b/src/BridgeReplicatorStateListenerI.cpp
index 7e9f609..7cef728 100644
--- a/src/BridgeReplicatorStateListenerI.cpp
+++ b/src/BridgeReplicatorStateListenerI.cpp
@@ -53,11 +53,28 @@ public:
{
for (ReplicatedStateItemSeq::const_iterator i = items.begin(); i != items.end(); ++i)
{
- //
- // I don't really have a use for these hash table items at the moment.
- //
- mItems[(*i)->key] = *i;
-
+ map<string, ReplicatedStateItemPtr>::iterator entry = mItems.find((*i)->key);
+ ReplicatedStateItemPtr existingItem;
+ if (entry != mItems.end())
+ {
+
+ //
+ // Look at serial numbers and indicate an out of sequence update. We should
+ // ignore out of sequence updates.
+ //
+ if ((entry->second->serial > (*i)->serial) && (*i)->serial > SerialCounterStart)
+ {
+ mLogger(Error) << "Update serial number for " << (*i)->key << " out of sequence! " <<
+ (*i)->serial << " updating " << entry->second->serial;
+ continue;
+ }
+ existingItem = entry->second;
+ }
+ else
+ {
+ mItems[(*i)->key] = *i;
+ }
+
BridgeManagerStateItemPtr managerItem = BridgeManagerStateItemPtr::dynamicCast((*i));
if (managerItem)
{
@@ -89,13 +106,43 @@ public:
// We could break here if we could be sure that there were no other updates.
//
}
+
if (!found)
{
+ if (existingItem)
+ {
+ mLogger(Error) << "Replica listener has a bridge object that the bridge manager does not know about. This likely indicates an error and should be investigated.";
+ }
mManager->createBridgeReplica(bridgeItem);
}
continue;
}
+ BridgedSessionPtr bridgedSessionItem = BridgedSessionPtr::dynamicCast((*i));
+ if (bridgedSessionItem)
+ {
+ vector<BridgeServantPtr> bridges = mManager->getBridges();
+ bool found = false;
+ for (vector<BridgeServantPtr>::iterator b = bridges.begin(); b != bridges.end(); ++b)
+ {
+ if ((*b) && (*b)->id() == bridgedSessionItem->bridgeId)
+ {
+ SessionCollectionPtr sessions = (*b)->sessions();
+ sessions->replicaUpdate(bridgedSessionItem);
+ found = true;
+ }
+ //
+ // We could break here if we could be sure that there were no other updates.
+ //
+ }
+ if (!found)
+ {
+ mLogger(Error) << "received an update for a session on a bridge that does not exist!";
+ }
+
+ continue;
+ }
+
mLogger(Info) << "Bridge replicator service received an unrecognized replication item.";
}
}
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 695cf10..4685e05 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -11,6 +11,12 @@ asterisk_scf_component_add_file(bridgeservice BridgeManagerListenerMgr.cpp)
asterisk_scf_component_add_file(bridgeservice BridgeReplicatorStateListenerI.cpp)
asterisk_scf_component_add_file(bridgeservice BridgeManagerImpl.h)
asterisk_scf_component_add_file(bridgeservice BridgeManagerImpl.cpp)
+asterisk_scf_component_add_file(bridgeservice SessionWrapper.cpp)
+asterisk_scf_component_add_file(bridgeservice SessionWrapper.h)
+asterisk_scf_component_add_file(bridgeservice SessionCollection.cpp)
+asterisk_scf_component_add_file(bridgeservice SessionCollection.h)
+asterisk_scf_component_add_file(bridgeservice SessionOperations.cpp)
+asterisk_scf_component_add_file(bridgeservice SessionOperations.h)
asterisk_scf_component_add_file(bridgeservice MediaSplicer.h)
asterisk_scf_component_add_file(bridgeservice MediaSplicer.cpp)
asterisk_scf_component_add_slice(bridgeservice ./BridgeReplicatorIf.ice)
diff --git a/src/DebugUtil.h b/src/DebugUtil.h
index 9afb360..7c9d565 100644
--- a/src/DebugUtil.h
+++ b/src/DebugUtil.h
@@ -38,6 +38,7 @@ std::ostream& dumpState(std::ostream& os, const std::string& prefix,
}
os << prefix << "Id: " << comm->identityToString(session->session->ice_getIdentity()) << '\n';
+ os << prefix << "Serial: " << session->serial << "\n";
os << prefix << "State: ";
switch (session->currentState)
{
@@ -53,6 +54,10 @@ std::ostream& dumpState(std::ostream& os, const std::string& prefix,
default:
os << "(invalid)\n";
}
+#if 0
+ //
+ // XXX: media pairings are probably going to occur elsewhere, update when that's decided.
+ //
int index = 0;
for (AsteriskSCF::Bridge::V1::MediaPairingSeq::const_iterator i = session->mediaPairings.begin();
i != session->mediaPairings.end(); ++i)
@@ -65,6 +70,7 @@ std::ostream& dumpState(std::ostream& os, const std::string& prefix,
comm->identityToString((*i)->sink->ice_getIdentity()) << '\n';
++index;
}
+#endif
return os;
#else
@@ -84,6 +90,7 @@ std::ostream& dumpState(std::ostream& os, const AsteriskSCF::Bridge::V1::BridgeS
}
os << "BRIDGE STATE DUMP <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n";
os << "key : " << state->key << "\nbridge id : " << state->bridgeId << "\n";
+ os << "serial : " << state->serial << "\n";
os << "state : ";
switch (state->runningState)
{
diff --git a/src/SessionCollection.cpp b/src/SessionCollection.cpp
new file mode 100644
index 0000000..cf26a6d
--- /dev/null
+++ b/src/SessionCollection.cpp
@@ -0,0 +1,158 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#include "SessionCollection.h"
+
+using namespace AsteriskSCF::System::Logging;
+using namespace AsteriskSCF::SessionCommunications::V1;
+using namespace AsteriskSCF::BridgeService;
+using namespace AsteriskSCF::Bridge::V1;
+using namespace AsteriskSCF;
+using namespace std;
+
+SessionCollection::SessionCollection(const Ice::CommunicatorPtr& comm, const string& bridgeId, const ReplicatorSmartPrx& replicator,
+ const Logger& logger) :
+ mCommunicator(comm),
+ mReplicator(replicator),
+ mLogger(logger),
+ mBridgeId(bridgeId),
+ mSessionCounter(0)
+{
+}
+
+SessionWrapperPtr SessionCollection::getSession(const SessionPrx& session)
+{
+ if (session)
+ {
+ return getSession(mCommunicator->identityToString(session->ice_getIdentity()));
+ }
+ return 0;
+}
+
+SessionWrapperPtr SessionCollection::getSession(const std::string& id)
+{
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ SessionMap::iterator i = mMap.find(id);
+ if (i != mMap.end())
+ {
+ return i->second;
+ }
+ return 0;
+}
+
+SessionWrapperPtr SessionCollection::addSession(const SessionPrx& session)
+{
+ //
+ // A nil session should never get here.
+ //
+ assert(session);
+
+ string key = mCommunicator->identityToString(session->ice_getIdentity());
+
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ SessionMap::iterator i = mMap.find(key);
+ if (i != mMap.end())
+ {
+ mLogger(Info) << ": session was already added, possible out of order or duplicate call: " << key;
+ return 0;
+ }
+
+ BridgedSessionPtr bridgedSession(new BridgedSession);
+ bridgedSession->key = key;
+ bridgedSession->serial = SerialCounterStart;
+ bridgedSession->session = session;
+ bridgedSession->currentState = BridgedSessionState::Added;
+ bridgedSession->sessionPairingKey = "";
+ bridgedSession->bridgeId = mBridgeId;
+ bridgedSession->orderAdded = mSessionCounter;
+
+ SessionWrapperPtr newSession(new SessionWrapper(bridgedSession, mReplicator, mLogger));
+ mMap[key] = newSession;
+ ++mSessionCounter;
+ return newSession;
+}
+
+bool SessionCollection::hasSession(const std::string& id)
+{
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ SessionMap::iterator i = mMap.find(id);
+ return (i != mMap.end());
+}
+
+bool SessionCollection::hasSession(const SessionPrx& prx)
+{
+ if (!prx)
+ {
+ return false;
+ }
+ return hasSession(mCommunicator->identityToString(prx->ice_getIdentity()));
+}
+
+
+SessionSeq SessionCollection::getSessionSeq()
+{
+ SessionSeq result;
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ reap();
+ for (SessionMap::const_iterator i = mMap.begin(); i != mMap.end(); ++i)
+ {
+ result.push_back(i->second->getSession());
+ }
+ return result;
+}
+
+size_t SessionCollection::size()
+{
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ return mMap.size();
+}
+
+void SessionCollection::reap()
+{
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ for (SessionMap::iterator i = mMap.begin(); i != mMap.end();)
+ {
+ if (i->second->isDestroyed())
+ {
+ mMap.erase(i++);
+ }
+ else
+ {
+ ++i;
+ }
+ }
+}
+
+void SessionCollection::replicaUpdate(const BridgedSessionPtr& session)
+{
+ SessionWrapperPtr updater;
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ SessionMap::iterator i = mMap.find(session->key);
+ if (i == mMap.end())
+ {
+ mMap[session->key] = new SessionWrapper(session, mReplicator, mLogger);
+ }
+ else
+ {
+ updater = i->second;
+ }
+ }
+ if (updater)
+ {
+ updater->update(session);
+ }
+}
diff --git a/src/SessionCollection.h b/src/SessionCollection.h
new file mode 100644
index 0000000..6090a8f
--- /dev/null
+++ b/src/SessionCollection.h
@@ -0,0 +1,120 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#pragma once
+
+#include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.h>
+#include <AsteriskSCF/logger.h>
+#include <boost/thread/locks.hpp>
+#include <map>
+#include <string>
+#include "BridgeServiceConfig.h"
+#include "SessionWrapper.h"
+
+namespace AsteriskSCF
+{
+namespace Logging
+{
+class Logger;
+}
+namespace BridgeService
+{
+
+typedef std::map<std::string, SessionWrapperPtr> SessionMap;
+
+/**
+ *
+ * The session collection is shared amongst multiple active entities so it has been made a separate reference counted
+ * item with no "back-references". This should help avoid cyclic references. While this is not a replicated object,
+ * it does create replicated objects and will cause updates to occur.
+ *
+ **/
+class SessionCollection : public IceUtil::Shared
+{
+public:
+
+ SessionCollection(const Ice::CommunicatorPtr& communicator, const std::string& bridgeId,
+ const ReplicatorSmartPrx& replicator, const AsteriskSCF::System::Logging::Logger& logger);
+
+ /**
+ * Obtains the SessionWrapper instance for the specified proxy.
+ **/
+ SessionWrapperPtr getSession(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session);
+
+ /**
+ * Obtains the SessionWrapper instance for the give id (usually the Session proxy's object id).
+ **/
+ SessionWrapperPtr getSession(const std::string& id);
+
+ /**
+ *
+ * A note on the add/update operations: addSession does not permit insertions of a session and will treat
+ * it as an error. updateSession will not automatically add a session if it does not exist and will also
+ * treat this is an error. If we candy-machine these interfaces it makes things quite complex and doesn't
+ * afford the application the opportunity to respond to possible errors. If an add/update *can* happen
+ * out of order, the invoking logic should respond to the errors and call the necessary operation. If such
+ * situations arise, it admittedly could get a little awkward but perhaps a third method that is more
+ * forgiving would be appropriate in that case, e.g. an addOrUpdateSession method.
+ *
+ **/
+
+ /**
+ * Adds a session proxy to the collection and returns the SessionWrapper. This will also take care of
+ * creating a BridgedSession object.
+ **/
+ SessionWrapperPtr addSession(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session);
+
+ /**
+ * Both of these functions check to see if a session exists in the collection.
+ **/
+ bool hasSession(const std::string& id);
+ bool hasSession(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session);
+
+ /**
+ * Return a Bridge service compatible sequence of sessions.
+ **/
+ AsteriskSCF::SessionCommunications::V1::SessionSeq getSessionSeq();
+
+ template <typename Func>
+ void visitSessions(Func& op)
+ {
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ for (SessionMap::iterator i = mMap.begin(); i != mMap.end(); ++i)
+ {
+ op(i->second);
+ }
+ }
+
+ size_t size();
+
+ void reap();
+
+ void replicaUpdate(const AsteriskSCF::Bridge::V1::BridgedSessionPtr& bridgedSession);
+
+private:
+
+ boost::shared_mutex mLock;
+ SessionMap mMap;
+ Ice::CommunicatorPtr mCommunicator;
+ ReplicatorSmartPrx mReplicator;
+ AsteriskSCF::System::Logging::Logger mLogger;
+ std::string mBridgeId;
+ long mSessionCounter;
+
+};
+
+typedef IceUtil::Handle<SessionCollection> SessionCollectionPtr;
+} /* End of namespace BridgeService */
+} /* End of namespace AsteriskSCF */
diff --git a/src/SessionListener.cpp b/src/SessionListener.cpp
new file mode 100644
index 0000000..ec58c22
--- /dev/null
+++ b/src/SessionListener.cpp
@@ -0,0 +1,140 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#include "SessionListener.h"
+#include "ServiceUtil.h"
+#include "SessionOperations.h"
+
+using namespace AsteriskSCF::System::Logging;
+using namespace AsteriskSCF::SessionCommunications::V1;
+using namespace AsteriskSCF::BridgeService;
+using namespace AsteriskSCF::Bridge::V1;
+using namespace AsteriskSCF;
+using namespace std;
+
+namespace
+{
+
+//
+// For events that require modification to the bridge, we use helper methods on the bridge itself.
+// For events result in distribution to the bridge sessions, we copy the current sessions and
+// run the calls from the listener itself.
+//
+class SessionListenerImpl : public SessionListener
+{
+public:
+ SessionListenerImpl(const SessionCollectionPtr& b, const Logger& logger) :
+ mSessions(b),
+ mLogger(logger)
+ {
+ }
+
+ void connected(const SessionPrx& source, const Ice::Current&)
+ {
+ string proxyString = source->ice_toString();
+ mLogger(Debug) << FUNLOG << ": session connected " << proxyString;
+
+ try
+ {
+ SessionWrapperPtr session = mSessions->getSession(source);
+ if (!session)
+ {
+ mLogger(Info) << "Attempt to respond to connected notification for session with proxy "
+ << proxyString << " that does not match known sessions. Possible out of order operations.";
+ return;
+ }
+
+ //
+ // XXX- media pairings.
+ //
+ session->connect();
+ mSessions->visitSessions(ConnectSessionOperation(source, mLogger));
+ }
+ catch (const Ice::Exception& ex)
+ {
+ mLogger(Debug) << FUNLOG << ": " << ex.what();
+ throw;
+ }
+ }
+
+ void flashed(const SessionPrx& source, const Ice::Current&)
+ {
+ }
+
+ void held(const SessionPrx& source, const Ice::Current&)
+ {
+ }
+
+ void progressing(const SessionPrx& source,
+ const ResponseCodePtr& response, const Ice::Current&)
+ {
+ }
+
+ void ringing(const SessionPrx& source, const Ice::Current&)
+ {
+ mSessions->visitSessions(RingSessionOperation(source, mLogger));
+ }
+
+ void stopped(const SessionPrx& source, const ResponseCodePtr& response, const Ice::Current& current)
+ {
+ string proxyString = source->ice_toString();
+ mLogger(Debug) << FUNLOG << ": session stopped " << proxyString;
+
+ try
+ {
+ SessionWrapperPtr session = mSessions->getSession(source);
+ if (!session)
+ {
+ mLogger(Info) << "Attempt to respond to connected notification for session with proxy "
+ << proxyString << " that does not match known sessions. Possible out of order operations.";
+ return;
+ }
+
+ //
+ // We don't actually have the proxy for this object, but we can create one on the fly.
+ //
+ SessionListenerPrx listenerPrx = SessionListenerPrx::uncheckedCast(current.adapter->createProxy(current.id));
+ source->removeBridge(listenerPrx);
+ session->disconnect();
+ }
+ catch (const Ice::ObjectNotExistException& ex)
+ {
+ session->destroy();
+ mLogger(Debug) << FUNLOG << ": " << ex.what();
+ }
+ catch (const Ice::Exception& ex)
+ {
+ mLogger(Debug) << FUNLOG << ": " << ex.what();
+ throw;
+ }
+ }
+
+ void unheld(const SessionPrx& source, const Ice::Current&)
+ {
+ }
+
+private:
+ SessionCollectionPtr mSessions;
+ Logger mLogger;
+};
+
+}
+
+SessionListenerPtr AsteriskSCF::BridgeService::createSessionListener(const SessionCollectionPtr& sessions,
+ const Logger& logger)
+{
+ return new SessionListenerImpl(sessions, logger);
+}
diff --git a/src/SessionListener.h b/src/SessionListener.h
new file mode 100644
index 0000000..3a0fc73
--- /dev/null
+++ b/src/SessionListener.h
@@ -0,0 +1,38 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#pragma once
+
+#include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.h>
+#include "SessionCollection.h"
+
+namespace AsteriskSCF
+{
+namespace System
+{
+namespace Logging
+{
+class Logger;
+}
+}
+namespace BridgeService
+{
+
+AsteriskSCF::SessionCommunications::V1::SessionListenerPtr createSessionListener(const SessionCollectionPtr& sessions,
+ const AsteriskSCF::System::Logging::Logger& logger);
+
+
+} /* End of namespace BridgeService */
+} /* End of namespace AsteriskSCF */
diff --git a/src/SessionOperations.cpp b/src/SessionOperations.cpp
new file mode 100644
index 0000000..279bc96
--- /dev/null
+++ b/src/SessionOperations.cpp
@@ -0,0 +1,122 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#include "SessionOperations.h"
+
+using namespace AsteriskSCF::System::Logging;
+using namespace AsteriskSCF::SessionCommunications::V1;
+using namespace AsteriskSCF::BridgeService;
+using namespace AsteriskSCF::Bridge::V1;
+using namespace AsteriskSCF;
+using namespace std;
+
+ConnectSessionOperation::ConnectSessionOperation(const SessionPrx& exclude, const Logger& logger) :
+ mExclude(exclude->ice_getIdentity()),
+ mLogger(logger)
+{
+}
+
+void ConnectSessionOperation::operator()(const SessionWrapperPtr& s)
+{
+ if (s->getSession()->ice_getIdentity() != mExclude)
+ {
+ try
+ {
+ //
+ // TODO: AMI!
+ //
+ s->connect();
+ }
+ catch (const Ice::ObjectNotExistException& ex)
+ {
... 590 lines suppressed ...
--
asterisk-scf/integration/bridging.git
More information about the asterisk-scf-commits
mailing list