[asterisk-scf-commits] asterisk-scf/integration/bridging.git branch "bridge-replication" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Thu Mar 3 13:20:57 CST 2011
branch "bridge-replication" has been updated
via 007048196f9c9f1f19f35126961804a9928e4312 (commit)
via d44c4a263724b7ce6900103bf24dcc0b89a0bd8f (commit)
from 381d74f1d554b08eaa22db45062fd709b35a4ce5 (commit)
Summary of changes:
config/test_bridging.conf.in | 20 ++-
src/BridgeImpl.cpp | 9 +-
src/BridgeReplicatorIf.ice | 12 +-
src/BridgeReplicatorStateListenerI.cpp | 42 +++++-
src/DebugUtil.h | 17 --
src/MediaSplicer.cpp | 253 +++++++++++++++++++++++++++-----
src/MediaSplicer.h | 19 ++-
src/Service.cpp | 2 +-
src/SessionCollection.cpp | 12 +-
src/SessionCollection.h | 3 +-
src/SessionListener.cpp | 4 +-
src/SessionWrapper.cpp | 34 ++++-
src/SessionWrapper.h | 5 +-
test/TestBridging.cpp | 3 +-
test/UnitTests.cpp | 42 +++---
15 files changed, 355 insertions(+), 122 deletions(-)
- Log -----------------------------------------------------------------
commit 007048196f9c9f1f19f35126961804a9928e4312
Author: Brent Eagles <beagles at digium.com>
Date: Thu Mar 3 15:50:30 2011 -0330
Remove some cruft (debug statements, commented out code)
diff --git a/src/DebugUtil.h b/src/DebugUtil.h
index b691050..ba06db1 100644
--- a/src/DebugUtil.h
+++ b/src/DebugUtil.h
@@ -54,23 +54,6 @@ std::ostream& dumpState(std::ostream& os, const std::string& prefix,
default:
os << "(invalid)\n";
}
-#if 0
- //
- // XXX: media pairings are probably going to occur elsewhere, update when that's decided.
- //
- int index = 0;
- for (AsteriskSCF::Bridge::V1::MediaPairingSeq::const_iterator i = session->mediaPairings.begin();
- i != session->mediaPairings.end(); ++i)
- {
- if (index == 0)
- {
- os << prefix << "Pairings:\n";
- }
- os << prefix << "\t" << index << ". source: " << comm->identityToString((*i)->source->ice_getIdentity()) << " to sink : " <<
- comm->identityToString((*i)->sink->ice_getIdentity()) << '\n';
- ++index;
- }
-#endif
return os;
#else
diff --git a/test/TestBridging.cpp b/test/TestBridging.cpp
index 2ca5c88..7ae5176 100644
--- a/test/TestBridging.cpp
+++ b/test/TestBridging.cpp
@@ -652,8 +652,6 @@ public:
BridgeSeq bridges = mgrPrx2->listBridges();
BridgeSeq bridges2 = mgrPrx->listBridges();
BOOST_CHECK(bridges.size() == bridges2.size()); // XXX
- cout << __LINE__ << " XXX " << bridges2.size() << endl;
- cout << __LINE__ << " XXX should be " << bridges.size() << endl;
mgrPrx->addListener(listenerPrx);
bridge = mgrPrx->createBridge(sessions, 0);
servant->wait(5000);
commit d44c4a263724b7ce6900103bf24dcc0b89a0bd8f
Author: Brent Eagles <beagles at digium.com>
Date: Thu Mar 3 15:44:13 2011 -0330
- Fix some cleanup bugs on finalized state.
- Fix media pairing replication.
- Refactored the MediaSplicer object out of the Bridge Object. Now belongs
the SessionCollection.
- Added some addditional tracing.
- Fixed a test suite bug.
- Fixed a race condition where an externally initiated connected callback
can cause a disconnected session to be reborn (disconnection is a
terminal state for bridged sessions)
- Updated test configuration file.
diff --git a/config/test_bridging.conf.in b/config/test_bridging.conf.in
index 8b16e1b..461f245 100644
--- a/config/test_bridging.conf.in
+++ b/config/test_bridging.conf.in
@@ -1,22 +1,27 @@
AsteriskSCFIceStorm.InstanceName=AsteriskSCFIceStorm
AsteriskSCFIceStorm.TopicManager.Endpoints=default -p 55555
+AsteriskSCFIceStorm.TopicManager.ThreadPool.Size=4
AsteriskSCFIceStorm.Publish.Endpoints=default -p 55556
+AsteriskSCFIceStorm.Publish.ThreadPool.Size=4
AsteriskSCFIceStorm.Transient=1
+TopicManager.Proxy=AsteriskSCFIceStorm/TopicManager:default -p 55555
LoggerAdapter.Endpoints=default
AsteriskSCF.LoggingService.Endpoints=default
+AsteriskSCF.LoggingService.ThreadPool.Size=4
AsteriskSCF.LoggingClient.Endpoints=default
+AsteriskSCF.LoggingClient.ThreadPool.Size=4
AsteriskSCF.TestChannelService.Endpoints=default -p 55560
+AsteriskSCF.TestChannelService.ThreadPool.Size=4
-LocatorService.Proxy=LocatorService:default -p 55558
ServiceLocatorManagementAdapter.Endpoints=tcp -p 55557
+ServiceLocatorManagementAdapter.ThreadPool.Size=4
ServiceLocatorAdapter.Endpoints=tcp -p 55558
-
+ServiceLocatorAdapter.ThreadPool.Size=4
+LocatorService.Proxy=LocatorService:default -p 55558
ServiceLocatorManagementProxy=LocatorServiceManagement:tcp -p 55557
-TopicManager.Proxy=AsteriskSCFIceStorm/TopicManager:default -p 55555
-
IceBox.InheritProperties=1
IceBox.Service.Logger=${logger_bindir}/server/src at logging-service:createLoggingService
IceBox.Service.Replicator=../src at BridgeReplicator:create
@@ -25,7 +30,7 @@ IceBox.Service.TestBridge2=../src/@bridgeservice:create
IceBox.Service.TestServiceLocator=${service_locator_bindir}/src at service_locator:create
IceBox.Service.TestChannel=${test_channel_bindir}/src at test_channel:create
IceBox.Service.TestDriver=../test at bridge_component_test:create
-
+
TestBridge.InstanceName=TestBridge
TestBridge.BridgeService.Endpoints=default -p 55561
TestBridge.BridgeService.ThreadPool.Size=4
@@ -39,6 +44,7 @@ TestBridge2.BridgeService.ThreadPool.Size=4
TestBridge2.Proxy=BridgeManager:default -p 55572
TestBridge2.BridgeServiceInternal.Endpoints=default -p 65462
TestBridge2.BridgeServiceInternal.ThreadPool.Size=4
+TestBridge2.StateReplicatorListener=yes
Replicator.InstanceName=Replicator
Replicator.BridgeReplicator.Endpoints=default -p 63242
@@ -47,9 +53,11 @@ Replicator.BridgeReplicator.ThreadPool.Size=4
TestChannel.Proxy=TestChannel:default -p 55560
TestUtilAdapter.Endpoints=default -p 55562
+TestUtilAdapter.ThreadPool.Size=4
Commands.Proxy=TestChannel.Locator.Commands:default -p 55560
-IceBox.LoadOrder=TestServiceLocator Logger TestBridge TestChannel TestDriver
+IceBox.LoadOrder=TestServiceLocator Logger Replicator TestBridge TestBridge2 TestChannel TestDriver
IceBox.ServiceManager.Endpoints=default -p 56000
+IceBox.ServiceManager.ThreadPool.Size=4
IceBoxMgr.Proxy=IceBox/ServiceManager:default -p 56000
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index d647416..4dd7969 100644
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -126,11 +126,6 @@ private:
//
SessionCollectionPtr mSessions;
- //
- // TODO: Move this out.. this doesn't belong with the bridge implementation any longer.
- //
- MediaSplicer mSplicer;
-
const string mName;
Ice::ObjectAdapterPtr mObjAdapter;
@@ -289,7 +284,7 @@ void BridgeImpl::addSessions(const SessionSeq& sessions, const Ice::Current& cur
// We setup media.
// TODO: AMI should come into play here.
//
- session->setConnector(mSplicer.connect(*i, mLogger));
+ session->setupMedia();
}
}
}
@@ -551,7 +546,7 @@ void BridgeImpl::replaceSession(const SessionPrx& oldSession, const SessionSeq&
// We setup media.
// TODO: AMI should come into play here.
//
- session->setConnector(mSplicer.connect(*i, mLogger));
+ session->setupMedia();
}
added.push_back(*i);
diff --git a/src/BridgeReplicatorIf.ice b/src/BridgeReplicatorIf.ice
index e31c153..691f0e2 100644
--- a/src/BridgeReplicatorIf.ice
+++ b/src/BridgeReplicatorIf.ice
@@ -89,7 +89,7 @@ enum MediaOperationReplicationPolicy
* to media sources and sinks in the bridge.
*
**/
-class MediaPairing extends ReplicatedStateItem
+class MediaPairing
{
AsteriskSCF::Media::V1::StreamSource* source;
AsteriskSCF::Media::V1::StreamSink* sink;
@@ -104,10 +104,10 @@ sequence<MediaPairing> MediaPairingSeq;
class SessionPairing extends ReplicatedStateItem
{
string bridgeKey;
- string sessionKeyA;
- string sessionKeyB;
-
- MediaPairingSeq mediaPairings;
+ string sessionKey;
+ AsteriskSCF::Media::V1::Session* mediaSession;
+ MediaPairingSeq incomingMediaPairings;
+ MediaPairingSeq outgoingMediaPairings;
};
/**
@@ -144,7 +144,7 @@ class BridgedSession extends ReplicatedStateItem
BridgedSessionState currentState;
/**
- * Key to the child session pairing
+ * Key to the child session pairing. TODO!
**/
string sessionPairingKey;
diff --git a/src/BridgeReplicatorStateListenerI.cpp b/src/BridgeReplicatorStateListenerI.cpp
index c5060d0..29e5d2c 100644
--- a/src/BridgeReplicatorStateListenerI.cpp
+++ b/src/BridgeReplicatorStateListenerI.cpp
@@ -45,10 +45,13 @@ public:
{
for (Ice::StringSeq::const_iterator k = itemKeys.begin(); k != itemKeys.end(); ++k)
{
+
map<string, ReplicatedStateItemPtr>::iterator entry = mItems.find((*k));
if (entry != mItems.end())
{
ReplicatedStateItemPtr item = entry->second;
+ mLogger(Debug) << " received removal of " << (*k) << ": a " << item->ice_id();
+
mItems.erase(entry);
BridgedSessionPtr bridgedSessionItem = BridgedSessionPtr::dynamicCast(item);
if (bridgedSessionItem)
@@ -98,6 +101,10 @@ public:
continue;
}
+ //
+ // Session pairings are cleaned up by way of sessions going away.
+ //
+
///
// The bridge manager isn't really removable.
//
@@ -109,6 +116,8 @@ public:
{
for (ReplicatedStateItemSeq::const_iterator i = items.begin(); i != items.end(); ++i)
{
+ mLogger(Debug) << " received update " << (*i)->serial << " for " << (*i)->key << " (a " <<
+ (*i)->ice_id() << ")";
map<string, ReplicatedStateItemPtr>::iterator entry = mItems.find((*i)->key);
ReplicatedStateItemPtr existingItem;
if (entry != mItems.end())
@@ -207,7 +216,38 @@ public:
continue;
}
-
+
+ SessionPairingPtr sessionPairing = SessionPairingPtr::dynamicCast((*i));
+ if (sessionPairing)
+ {
+ vector<BridgeServantPtr> bridges = mManager->getBridges();
+ bool found = false;
+ for (vector<BridgeServantPtr>::iterator b = bridges.begin(); b != bridges.end(); ++b)
+ {
+ if ((*b) && (*b)->id() == sessionPairing->bridgeKey)
+ {
+ SessionWrapperPtr session = (*b)->sessions()->getSession(sessionPairing->sessionKey);
+ if (session)
+ {
+ session->updateMedia(sessionPairing);
+ }
+ //
+ // Keep the session list clean.
+ //
+ found = true;
+ }
+ //
+ // We could break here if we could be sure that there were no other updates.
+ //
+ }
+ if (!found)
+ {
+ mLogger(Error) << "received an update for a session on a bridge that does not exist!";
+ }
+
+ continue;
+ }
+
BridgeListenerStateItemPtr bridgeListener = BridgeListenerStateItemPtr::dynamicCast((*i));
if (bridgeListener)
{
diff --git a/src/MediaSplicer.cpp b/src/MediaSplicer.cpp
index fe49d5d..3268d98 100644
--- a/src/MediaSplicer.cpp
+++ b/src/MediaSplicer.cpp
@@ -17,11 +17,16 @@
#include <IceUtil/Shared.h>
#include <IceUtil/Handle.h>
#include "BridgeServiceConfig.h"
+#include "BridgeReplicatorIf.h"
+#include <string>
+#include <vector>
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Bridge::V1;
using namespace std;
+
namespace AsteriskSCF
{
namespace BridgeService
@@ -32,8 +37,8 @@ namespace BridgeService
class MediaSplicerI;
class MedixMixerPtr;
-typedef pair<StreamSinkPrx, StreamSourcePrx> OutgoingPairing;
-typedef pair<StreamSourcePrx, StreamSinkPrx> IncomingPairing;
+typedef pair<AsteriskSCF::Media::V1::StreamSinkPrx, AsteriskSCF::Media::V1::StreamSourcePrx> OutgoingPairing;
+typedef pair<AsteriskSCF::Media::V1::StreamSourcePrx, AsteriskSCF::Media::V1::StreamSinkPrx> IncomingPairing;
//
// TODO: These proxies could use some retry properties added to them...
@@ -43,18 +48,27 @@ typedef pair<StreamSourcePrx, StreamSinkPrx> IncomingPairing;
class MediaConnectorI : public MediaConnector
{
public:
-
- MediaConnectorI(const vector<OutgoingPairing>& outgoing, const std::vector<IncomingPairing>& incoming,
- const MediaConnectorPtr& peer,
- const Logger& logger):
+ MediaConnectorI(const AsteriskSCF::Media::V1::SessionPrx& media,
+ const vector<OutgoingPairing>& outgoing, const vector<IncomingPairing>& incoming,
+ const MediaConnectorPtr& peer,
+ const string& bridgeId,
+ const string& sessionId,
+ const ReplicatorSmartPrx& replicator,
+ const Logger& logger):
+ mMedia(media),
mOutgoing(outgoing),
mIncoming(incoming),
mPeer(peer),
mConnected(true),
+ mBridgeId(bridgeId),
+ mSessionId(sessionId),
+ mRepCounter(SerialCounterStart),
+ mReplicator(replicator),
mLogger(logger)
{
mLogger(Debug) << FUNLOG << ": setting up a media connector with " << outgoing.size()
<< " outgoing pairings and " << incoming.size() << " incoming pairings.";
+ mKey = mBridgeId + mSessionId + "media";
for (vector<OutgoingPairing>::iterator i = mOutgoing.begin(); i != mOutgoing.end(); ++i)
{
i->second->setSink(i->first);
@@ -67,6 +81,29 @@ public:
}
}
+ MediaConnectorI(const SessionPairingPtr& pairing,
+ const MediaConnectorPtr& peer,
+ const ReplicatorSmartPrx& replicator,
+ const Logger& logger) :
+ mMedia(pairing->mediaSession),
+ mPeer(peer),
+ mConnected(true),
+ mBridgeId(pairing->bridgeKey),
+ mSessionId(pairing->sessionKey),
+ mRepCounter(pairing->serial),
+ mReplicator(replicator),
+ mLogger(logger)
+ {
+ for (MediaPairingSeq::iterator i = pairing->outgoingMediaPairings.begin(); i != pairing->outgoingMediaPairings.end(); ++i)
+ {
+ mOutgoing.push_back(OutgoingPairing((*i)->sink, (*i)->source));
+ }
+ for (MediaPairingSeq::iterator i = pairing->incomingMediaPairings.begin(); i != pairing->incomingMediaPairings.end(); ++i)
+ {
+ mIncoming.push_back(IncomingPairing((*i)->source, (*i)->sink));
+ }
+ }
+
void unplug()
{
mLogger(Debug) << FUNLOG << ": called.";
@@ -76,7 +113,7 @@ public:
mLogger(Debug) << FUNLOG << ": we did not have any media. Contacting the peer connector and telling it to disconnect!";
MediaConnectorPtr peer;
{
- IceUtil::Mutex::Lock lock(mMutex);
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
peer = mPeer;
mPeer = 0;
}
@@ -90,7 +127,7 @@ public:
mLogger(Debug) << FUNLOG << ": media connections unplugged. Let's forget about our peer connector now!";
MediaConnectorPtr peer;
{
- IceUtil::Mutex::Lock lock(mMutex);
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
peer = mPeer;
mPeer = 0;
}
@@ -101,18 +138,31 @@ public:
mPeer = 0;
}
mLogger(Debug) << FUNLOG << ": finished unplugging.";
+ if (mReplicator)
+ {
+ try
+ {
+ Ice::StringSeq keys;
+ keys.push_back(mKey);
+ mReplicator->removeState(keys);
+ }
+ catch (const std::exception& ex)
+ {
+ mLogger(Debug) << FUNLOG << ": exception " << ex.what();
+ }
+ }
}
bool isConnected()
{
- IceUtil::Mutex::Lock lock(mMutex);
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
return mConnected;
}
void clearConnections()
{
mLogger(Debug) << FUNLOG << ": clearing connections.";
- IceUtil::Mutex::Lock lock(mMutex);
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
mOutgoing.clear();
mIncoming.clear();
}
@@ -120,10 +170,76 @@ public:
void update(const MediaConnectorPtr& peer, const vector<OutgoingPairing>& outgoing, const std::vector<IncomingPairing>& incoming)
{
mLogger(Debug) << FUNLOG << ": establishing a new peer connection.";
- IceUtil::Mutex::Lock lock(mMutex);
- mPeer = peer;
- mOutgoing = outgoing;
- mIncoming = incoming;
+ SessionPairingPtr update;
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mPeer = peer;
+ mOutgoing = outgoing;
+ mIncoming = incoming;
+ update = createUpdate();
+ }
+ pushUpdate(update);
+ }
+
+ SessionPairingPtr createUpdate()
+ {
+ if (mReplicator)
+ {
+ SessionPairingPtr update(new SessionPairing);
+ update->mediaSession = mMedia;
+ update->bridgeKey = mBridgeId;
+ update->sessionKey = mSessionId;
+ update->key = mKey;
+ update->serial = ++mRepCounter;
+ for (vector<OutgoingPairing>::iterator i = mOutgoing.begin(); i != mOutgoing.end(); ++i)
+ {
+ update->outgoingMediaPairings.push_back(new MediaPairing(i->second, i->first));
+ }
+ for (vector<IncomingPairing>::iterator i = mIncoming.begin(); i != mIncoming.end(); ++i)
+ {
+ update->incomingMediaPairings.push_back(new MediaPairing(i->first, i->second));
+ }
+ return update;
+ }
+ return 0;
+ }
+
+ void pushUpdate(const SessionPairingPtr& update)
+ {
+ if (mReplicator && update)
+ {
+ try
+ {
+ ReplicatedStateItemSeq seq;
+ seq.push_back(update);
+ mReplicator->setState(seq);
+ }
+ catch (const std::exception& ex)
+ {
+ mLogger(Debug) << ": exception " << ex.what();
+ }
+ }
+ }
+
+ void initialUpdate()
+ {
+ pushUpdate(createUpdate());
+ }
+
+ void update(const SessionPairingPtr& update)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mRepCounter = update->serial;
+ mOutgoing.clear();
+ mIncoming.clear();
+ for (MediaPairingSeq::iterator i = update->outgoingMediaPairings.begin(); i != update->outgoingMediaPairings.end(); ++i)
+ {
+ mOutgoing.push_back(OutgoingPairing((*i)->sink, (*i)->source));
+ }
+ for (MediaPairingSeq::iterator i = update->incomingMediaPairings.begin(); i != update->incomingMediaPairings.end(); ++i)
+ {
+ mIncoming.push_back(IncomingPairing((*i)->source, (*i)->sink));
+ }
}
bool disconnectMedia()
@@ -131,14 +247,17 @@ public:
mLogger(Debug) << FUNLOG << ": unplugging sinks and sources.";
vector<OutgoingPairing> outgoing;
vector<IncomingPairing> incoming;
+ SessionPairingPtr update;
{
- IceUtil::Mutex::Lock lock(mMutex);
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
outgoing = mOutgoing;
mOutgoing.clear();
incoming = mIncoming;
mIncoming.clear();
mConnected = false;
+ update = createUpdate();
}
+ pushUpdate(update);
if (outgoing.size() == 0 && incoming.size() == 0)
{
return false;
@@ -193,12 +312,23 @@ public:
return true;
}
+ string id()
+ {
+ return mKey;
+ }
+
private:
- IceUtil::Mutex mMutex;
+ boost::shared_mutex mLock;
+ AsteriskSCF::Media::V1::SessionPrx mMedia;
vector<OutgoingPairing> mOutgoing;
vector<IncomingPairing> mIncoming;
MediaConnectorPtr mPeer;
bool mConnected;
+ string mBridgeId;
+ string mSessionId;
+ string mKey;
+ long mRepCounter;
+ ReplicatorSmartPrx mReplicator;
Logger mLogger;
};
@@ -212,18 +342,25 @@ class MediaSplicerI : public IceUtil::Shared
public:
- MediaConnectorPtr connect(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session,
- const Logger& logger)
+ MediaSplicerI(const Ice::CommunicatorPtr& communicator, const string& bridgeId, const ReplicatorSmartPrx& replicator, const Logger& logger) :
+ mCommunicator(communicator),
+ mBridgeId(bridgeId),
+ mReplicator(replicator),
+ mLogger(logger)
+ {
+ }
+
+ MediaConnectorPtr connect(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session)
{
- SessionPrx media = session->getMediaSession();
+ AsteriskSCF::Media::V1::SessionPrx media = session->getMediaSession();
if (!media)
{
- logger(Debug) << FUNLOG << ": no media available!";
+ mLogger(Debug) << FUNLOG << ": no media available!";
return 0;
}
- IceUtil::Mutex::Lock lock(mMutex);
- MediaConnectorPtr result;
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ MediaConnectorIPtr result;
//
// Loop through looking for an identical media session, also reaping disconnected connectors as we go.
@@ -233,20 +370,21 @@ public:
{
if (!i->connector->isConnected())
{
- logger(Debug) << FUNLOG << ": reaping a connector";
+ mLogger(Debug) << FUNLOG << ": reaping a connector";
MediaSessions::iterator t = i;
i = mSessions.erase(t);
continue;
}
if (i->mediaSession == media)
{
- result = i->connector;
+ result = MediaConnectorIPtr::dynamicCast(i->connector);
+ assert(result);
}
++i;
}
if (result)
{
- logger(Debug) << FUNLOG << ": found an existing matching connector, returning that!";
+ mLogger(Debug) << FUNLOG << ": found an existing matching connector, returning that!";
return result;
}
@@ -273,10 +411,11 @@ public:
vector<OutgoingPairing> outgoingPairings(findCompatiblePairings(sinks));
vector<IncomingPairing> incomingPairings(findCompatiblePairings(sources));
-
- result = new MediaConnectorI(outgoingPairings, incomingPairings, existing, logger);
- logger(Debug) << FUNLOG << ": established connections, returning connector object";
+ string sessionId = mCommunicator->identityToString(session->ice_getIdentity());
+ result = new MediaConnectorI(media, outgoingPairings, incomingPairings, existing, mBridgeId, sessionId, mReplicator, mLogger);
+ mLogger(Debug) << FUNLOG << ": established connections, returning connector object";
mSessions.push_back(MediaSessionStruct(media, result));
+ result->initialUpdate();
//
// PRE-ALPHA-ONLY code. When it's a two party bridge and the sessions are added or removed separately,
@@ -290,9 +429,42 @@ public:
return result;
}
+ MediaConnectorPtr createReplica(const SessionPairingPtr& pairings)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ MediaConnectorIPtr result;
+
+ MediaSessions::iterator i = mSessions.begin();
+ while (i != mSessions.end())
+ {
+ if (!i->connector->isConnected())
+ {
+ mLogger(Debug) << FUNLOG << ": reaping a connector";
+ MediaSessions::iterator t = i;
+ i = mSessions.erase(t);
+ continue;
+ }
+ ++i;
+ }
+
+ MediaConnectorPtr existing;
+ if (mSessions.size() == 1)
+ {
+ existing = mSessions.back().connector;
+ existing->clearConnections();
+ }
+ result = new MediaConnectorI(pairings, existing, mReplicator, mLogger);
+ mSessions.push_back(MediaSessionStruct(pairings->mediaSession, result));
+ return result;
+ }
+
private:
- IceUtil::Mutex mMutex;
+ boost::shared_mutex mLock;
+ Ice::CommunicatorPtr mCommunicator;
+ string mBridgeId;
+ ReplicatorSmartPrx mReplicator;
+ Logger mLogger;
struct MediaSessionStruct
{
@@ -436,24 +608,27 @@ private:
}
};
-typedef IceUtil::Handle<MediaSplicerI> MediaSplicerPtr;
-
} // End of namespace BridgeService
} // End of namespace AsteriskSCF
-AsteriskSCF::BridgeService::MediaSplicer::MediaSplicer() :
- mImpl(new MediaSplicerI)
+using namespace AsteriskSCF::BridgeService;
+
+MediaSplicer::MediaSplicer(const Ice::CommunicatorPtr& comm, const std::string& bridgeId, const ReplicatorSmartPrx& replicator,
+ const Logger& logger) :
+ mImpl(new MediaSplicerI(comm, bridgeId, replicator, logger))
+{
+}
+
+MediaSplicer::~MediaSplicer()
{
}
-AsteriskSCF::BridgeService::MediaSplicer::~MediaSplicer()
+MediaConnectorPtr MediaSplicer::connect(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session)
{
- delete mImpl;
+ return mImpl->connect(session);
}
-AsteriskSCF::BridgeService::MediaConnectorPtr AsteriskSCF::BridgeService::MediaSplicer::connect(
- const AsteriskSCF::SessionCommunications::V1::SessionPrx& session,
- const AsteriskSCF::System::Logging::Logger& logger)
+MediaConnectorPtr MediaSplicer::createReplica(const SessionPairingPtr& pairings)
{
- return mImpl->connect(session, logger);
+ return mImpl->createReplica(pairings);
}
diff --git a/src/MediaSplicer.h b/src/MediaSplicer.h
index 776c3b5..08443d8 100644
--- a/src/MediaSplicer.h
+++ b/src/MediaSplicer.h
@@ -20,6 +20,8 @@
#include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.h>
#include <AsteriskSCF/logger.h>
#include <memory>
+#include "BridgeServiceConfig.h"
+#include "BridgeReplicatorIf.h"
namespace AsteriskSCF
{
@@ -33,22 +35,29 @@ public:
virtual bool isConnected() = 0;
virtual void clearConnections() = 0;
virtual bool disconnectMedia() = 0;
+
+ virtual void update(const AsteriskSCF::Bridge::V1::SessionPairingPtr& update) = 0;
};
typedef IceUtil::Handle<MediaConnector> MediaConnectorPtr;
class MediaSplicerI;
-class MediaSplicer
+class MediaSplicer : public IceUtil::Shared
{
public:
- MediaSplicer();
+ MediaSplicer(const Ice::CommunicatorPtr& comm, const std::string& bridgeId, const ReplicatorSmartPrx& replicator,
+ const AsteriskSCF::System::Logging::Logger& logger);
~MediaSplicer();
- MediaConnectorPtr connect(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session,
- const AsteriskSCF::System::Logging::Logger& logger);
+ MediaConnectorPtr connect(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session);
+
+ MediaConnectorPtr createReplica(const AsteriskSCF::Bridge::V1::SessionPairingPtr& pairings);
private:
- MediaSplicerI* mImpl;
+ boost::shared_ptr<MediaSplicerI> mImpl;
};
+
+typedef IceUtil::Handle<MediaSplicer> MediaSplicerPtr;
+
} // End of namespace BridgeService
} // End of namespace AsteriskSCF
diff --git a/src/Service.cpp b/src/Service.cpp
index 078ae7a..660f18b 100644
--- a/src/Service.cpp
+++ b/src/Service.cpp
@@ -213,7 +213,7 @@ void BridgingApp::start(const std::string& name, const Ice::CommunicatorPtr& com
bool onStandby = false;
if (replicator)
{
- onStandby = communicator->getProperties()->getPropertyWithDefault(adapterName + "StateReplicatorListener", "no") == "yes";
+ onStandby = communicator->getProperties()->getPropertyWithDefault(name + ".StateReplicatorListener", "no") == "yes";
}
if (onStandby)
{
diff --git a/src/SessionCollection.cpp b/src/SessionCollection.cpp
index cfd8ac7..e59e6e6 100644
--- a/src/SessionCollection.cpp
+++ b/src/SessionCollection.cpp
@@ -40,7 +40,8 @@ SessionCollection::SessionCollection(const Ice::CommunicatorPtr& comm, const str
mReplicator(replicator),
mLogger(logger),
mBridgeId(bridgeId),
- mSessionCounter(0)
+ mSessionCounter(0),
+ mSplicer(new MediaSplicer(comm, bridgeId, replicator, logger))
{
}
@@ -90,7 +91,7 @@ SessionWrapperPtr SessionCollection::addSession(const SessionPrx& session)
bridgedSession->bridgeId = mBridgeId;
bridgedSession->orderAdded = mSessionCounter;
- SessionWrapperPtr newSession(new SessionWrapper(bridgedSession, mReplicator, mLogger));
+ SessionWrapperPtr newSession(new SessionWrapper(bridgedSession, mSplicer, mReplicator, mLogger));
mMap[key] = newSession;
++mSessionCounter;
return newSession;
@@ -125,7 +126,10 @@ SessionSeq SessionCollection::getSessionSeq()
IfInCriteria<set<BridgedSessionState>, StateMemberSelector> active(states);
SelectOperation< IfInCriteria<set<BridgedSessionState>, StateMemberSelector>, SessionPrxSelector, SessionSeq>
selectIf(active);
- return visitSessions(selectIf).results();
+ visitSessions(selectIf);
+ SessionSeq results(selectIf.results());
+ mLogger(Debug) << FUNLOG << " " << results.size();
+ return results;
}
size_t SessionCollection::size()
@@ -183,7 +187,7 @@ void SessionCollection::replicaUpdate(const BridgedSessionPtr& session)
SessionMap::iterator i = mMap.find(session->key);
if (i == mMap.end())
{
- mMap[session->key] = new SessionWrapper(session, mReplicator, mLogger);
+ mMap[session->key] = new SessionWrapper(session, mSplicer, mReplicator, mLogger);
}
else
{
diff --git a/src/SessionCollection.h b/src/SessionCollection.h
index a93f7c2..cf95815 100644
--- a/src/SessionCollection.h
+++ b/src/SessionCollection.h
@@ -22,6 +22,7 @@
#include <string>
#include "BridgeServiceConfig.h"
#include "SessionWrapper.h"
+#include "MediaSplicer.h"
namespace AsteriskSCF
{
@@ -129,7 +130,7 @@ private:
AsteriskSCF::System::Logging::Logger mLogger;
std::string mBridgeId;
long mSessionCounter;
-
+ MediaSplicerPtr mSplicer;
};
typedef IceUtil::Handle<SessionCollection> SessionCollectionPtr;
diff --git a/src/SessionListener.cpp b/src/SessionListener.cpp
index 3f0312b..280c49e 100644
--- a/src/SessionListener.cpp
+++ b/src/SessionListener.cpp
@@ -57,10 +57,8 @@ public:
return;
}
- //
- // XXX- media pairings.
- //
session->setConnected();
+ session->setupMedia();
ConnectSessionOperation connector(source, mLogger);
mSessions->visitSessions(connector);
}
diff --git a/src/SessionWrapper.cpp b/src/SessionWrapper.cpp
index e5eeb5e..21bca8d 100644
--- a/src/SessionWrapper.cpp
+++ b/src/SessionWrapper.cpp
@@ -25,12 +25,14 @@ using namespace AsteriskSCF::Bridge::V1;
using namespace AsteriskSCF;
using namespace std;
-SessionWrapper::SessionWrapper(const BridgedSessionPtr& session, const ReplicatorSmartPrx& replicator,
+SessionWrapper::SessionWrapper(const BridgedSessionPtr& session,
+ const MediaSplicerPtr& splicer, const ReplicatorSmartPrx& replicator,
const Logger& logger) :
mSession(session),
mReplicator(replicator),
mLogger(logger),
- mId(mSession->key)
+ mId(mSession->key),
+ mSplicer(splicer)
{
}
@@ -62,13 +64,11 @@ void SessionWrapper::ring()
bool SessionWrapper::setConnected()
{
+ mLogger(Debug) << FUNLOG << " for " << mId;
BridgedSessionPtr update;
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
- //
- // TODO: Going from done or disconnected to connected is probably a bug. Investigate!
- //
- if (mSession->currentState != BridgedSessionState::Connected)
+ if (mSession->currentState == BridgedSessionState::Added)
{
mSession->currentState = BridgedSessionState::Connected;
update = createUpdate();
@@ -90,8 +90,9 @@ SessionPrx SessionWrapper::getSession() const
return mSession->session;
}
-void SessionWrapper::setConnector(const MediaConnectorPtr& connector)
+void SessionWrapper::setupMedia()
{
+ mLogger(Debug) << FUNLOG << " for " << mId;
BridgedSessionPtr update;
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
@@ -99,7 +100,7 @@ void SessionWrapper::setConnector(const MediaConnectorPtr& connector)
{
if (!mConnector)
{
- mConnector = connector;
+ mConnector = mSplicer->connect(mSession->session);
}
mSession->currentState = BridgedSessionState::Connected;
update = createUpdate();
@@ -108,8 +109,23 @@ void SessionWrapper::setConnector(const MediaConnectorPtr& connector)
pushUpdate(update);
}
+void SessionWrapper::updateMedia(const SessionPairingPtr& pairings)
+{
+ mLogger(Debug) << FUNLOG << " for " << mId;
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ if (mConnector)
+ {
+ mConnector->update(pairings);
+ }
+ else
+ {
+ mConnector = mSplicer->createReplica(pairings);
+ }
+}
+
void SessionWrapper::disconnect()
{
+ mLogger(Debug) << FUNLOG << ": disconnecting " << mId;
BridgedSessionPtr update;
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
@@ -126,6 +142,7 @@ void SessionWrapper::disconnect()
void SessionWrapper::update(const BridgedSessionPtr& update)
{
+ mLogger(Debug) << FUNLOG << ": for " << mId;
boost::unique_lock<boost::shared_mutex> lock(mLock);
if (mSession->serial < update->serial || update->serial < SerialCounterStart)
{
@@ -140,6 +157,7 @@ void SessionWrapper::update(const BridgedSessionPtr& update)
void SessionWrapper::destroy()
{
+ mLogger(Debug) << FUNLOG << ": for " << mId;
BridgedSessionPtr update;
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
diff --git a/src/SessionWrapper.h b/src/SessionWrapper.h
index e8e2d66..e11da7c 100644
--- a/src/SessionWrapper.h
+++ b/src/SessionWrapper.h
@@ -30,6 +30,7 @@ class SessionWrapper : public IceUtil::Shared
{
public:
SessionWrapper(const AsteriskSCF::Bridge::V1::BridgedSessionPtr& session,
+ const MediaSplicerPtr& splicer,
const AsteriskSCF::BridgeService::ReplicatorSmartPrx& replicator,
const AsteriskSCF::System::Logging::Logger& logger);
@@ -83,7 +84,8 @@ public:
* with this session.
* Initiates replication.
**/
- void setConnector(const MediaConnectorPtr& connector);
+ void setupMedia();
+ void updateMedia(const AsteriskSCF::Bridge::V1::SessionPairingPtr& pairings);
/**
* Frees connection related resources.
@@ -118,6 +120,7 @@ private:
MediaConnectorPtr mConnector;
AsteriskSCF::System::Logging::Logger mLogger;
std::string mId;
+ MediaSplicerPtr mSplicer;
/**
* Sends changes to the replication service. This should never occur
diff --git a/test/TestBridging.cpp b/test/TestBridging.cpp
index 46cd1f1..2ca5c88 100644
--- a/test/TestBridging.cpp
+++ b/test/TestBridging.cpp
@@ -574,8 +574,7 @@ public:
dumplog(log);
sessions = bridge->listSessions();
- BOOST_CHECK(sessions.size() == 2); // XXX
- BOOST_CHECK(sessions.back() == b); // XXX
+ BOOST_CHECK(sessions.size() == 2);
bridge->shutdown();
}
@@ -653,6 +652,8 @@ public:
BridgeSeq bridges = mgrPrx2->listBridges();
BridgeSeq bridges2 = mgrPrx->listBridges();
BOOST_CHECK(bridges.size() == bridges2.size()); // XXX
+ cout << __LINE__ << " XXX " << bridges2.size() << endl;
+ cout << __LINE__ << " XXX should be " << bridges.size() << endl;
mgrPrx->addListener(listenerPrx);
bridge = mgrPrx->createBridge(sessions, 0);
servant->wait(5000);
diff --git a/test/UnitTests.cpp b/test/UnitTests.cpp
index c57517f..d14012a 100644
--- a/test/UnitTests.cpp
+++ b/test/UnitTests.cpp
@@ -144,7 +144,7 @@ private:
BOOST_FIXTURE_TEST_CASE(testIfState, Fixture)
{
BridgedSessionPtr letter(createBridgedSession(BridgedSessionState::Added));
- SessionWrapperPtr wrapper(new SessionWrapper(letter, AsteriskSCF::BridgeService::ReplicatorSmartPrx(), getLogger()));
+ SessionWrapperPtr wrapper(new SessionWrapper(letter, 0, AsteriskSCF::BridgeService::ReplicatorSmartPrx(), getLogger()));
IfStateCriteria a(BridgedSessionState::Added);
IfStateCriteria b(BridgedSessionState::Connected);
BOOST_CHECK(a(wrapper));
@@ -154,7 +154,7 @@ BOOST_FIXTURE_TEST_CASE(testIfState, Fixture)
BOOST_FIXTURE_TEST_CASE(testIfInCrriteria, Fixture)
{
BridgedSessionPtr letter(createBridgedSession(BridgedSessionState::Added));
- SessionWrapperPtr wrapper(new SessionWrapper(letter, AsteriskSCF::BridgeService::ReplicatorSmartPrx(), getLogger()));
+ SessionWrapperPtr wrapper(new SessionWrapper(letter, 0, AsteriskSCF::BridgeService::ReplicatorSmartPrx(), getLogger()));
set<BridgedSessionState> yesVals;
yesVals.insert(BridgedSessionState::Added);
yesVals.insert(BridgedSessionState::Connected);
@@ -170,7 +170,7 @@ BOOST_FIXTURE_TEST_CASE(testIfInCrriteria, Fixture)
BOOST_FIXTURE_TEST_CASE(testNegation, Fixture)
{
BridgedSessionPtr letter(createBridgedSession(BridgedSessionState::Added));
- SessionWrapperPtr wrapper(new SessionWrapper(letter, AsteriskSCF::BridgeService::ReplicatorSmartPrx(), getLogger()));
+ SessionWrapperPtr wrapper(new SessionWrapper(letter, 0, AsteriskSCF::BridgeService::ReplicatorSmartPrx(), getLogger()));
set<BridgedSessionState> yesVals;
yesVals.insert(BridgedSessionState::Added);
yesVals.insert(BridgedSessionState::Connected);
@@ -193,15 +193,15 @@ BOOST_FIXTURE_TEST_CASE(testCountIf, Fixture)
//
AsteriskSCF::BridgeService::ReplicatorSmartPrx rep;
vector<SessionWrapperPtr> testData;
- testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Added), rep, getLogger()));
- testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Added), rep, getLogger()));
- testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Disconnected), rep, getLogger()));
- testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Connected), rep, getLogger()));
- testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Done), rep, getLogger()));
- testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Added), rep, getLogger()));
- testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Disconnected), rep, getLogger()));
- testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Done), rep, getLogger()));
- testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Connected), rep, getLogger()));
+ testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Added), 0, rep, getLogger()));
+ testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Added), 0, rep, getLogger()));
+ testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Disconnected), 0, rep, getLogger()));
+ testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Connected), 0, rep, getLogger()));
+ testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Done), 0, rep, getLogger()));
+ testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Added), 0, rep, getLogger()));
+ testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Disconnected), 0, rep, getLogger()));
+ testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Done), 0, rep, getLogger()));
+ testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Connected), 0, rep, getLogger()));
//
// Now a variety of test criteria.
@@ -250,17 +250,17 @@ BOOST_FIXTURE_TEST_CASE(testSelect, Fixture)
//
AsteriskSCF::BridgeService::ReplicatorSmartPrx rep;
vector<SessionWrapperPtr> testData;
- testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Added, "a"), rep, getLogger()));
- testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Added, "b"), rep, getLogger()));
- testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Disconnected, "a"),
+ testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Added, "a"), 0, rep, getLogger()));
+ testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Added, "b"), 0, rep, getLogger()));
+ testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Disconnected, "a"), 0,
rep, getLogger()));
- testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Connected, "c"), rep, getLogger()));
- testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Done, "a"), rep, getLogger()));
- testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Added, "b"), rep, getLogger()));
- testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Disconnected, "c"), rep,
+ testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Connected, "c"), 0, rep, getLogger()));
+ testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Done, "a"), 0, rep, getLogger()));
+ testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Added, "b"), 0, rep, getLogger()));
+ testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Disconnected, "c"), 0, rep,
getLogger()));
- testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Done, "a"), rep, getLogger()));
- testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Connected, "b"), rep, getLogger()));
+ testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Done, "a"), 0, rep, getLogger()));
+ testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Connected, "b"), 0, rep, getLogger()));
//
// Now a variety of test criteria.
-----------------------------------------------------------------------
--
asterisk-scf/integration/bridging.git
More information about the asterisk-scf-commits
mailing list