[asterisk-scf-commits] asterisk-scf/release/media_rtp_pjmedia.git branch "master" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Tue May 8 17:28:02 CDT 2012
branch "master" has been updated
via 29b69ef13c04cb6aae14e39911edae18c3f54f8f (commit)
from 69ac9b67a06a178901a56686b0d28028d5d98194 (commit)
Summary of changes:
config/RTP.config | 2 +-
config/test_component.conf | 4 +
config/test_component_v6.conf | 5 +
config/test_rtp_ice.conf | 11 +-
.../MediaRTPPJMEDIA/RTPStateReplicationIf.ice | 13 +-
src/Component.cpp | 118 +++++--
src/ICETransport.cpp | 97 ++++--
src/RTPConfiguration.cpp | 136 +++++---
src/RTPSession.cpp | 377 ++++++++++++++------
src/RTPSink.cpp | 99 ++++--
src/RTPSink.h | 6 +-
src/RTPSource.cpp | 44 ++-
src/RTPSource.h | 7 +-
src/RTPStateReplicator.h | 4 +-
src/RTPStateReplicatorListener.cpp | 112 +++++--
src/RTPTelephonyEventSink.cpp | 38 ++-
src/RTPTelephonyEventSink.h | 7 +-
src/RTPTelephonyEventSource.cpp | 7 +-
src/RTPTelephonyEventSource.h | 2 +
test/CMakeLists.txt | 3 +
test/TestRTPICE.cpp | 43 ++-
test/TestRTPpjmedia.cpp | 182 ++++++++--
22 files changed, 950 insertions(+), 367 deletions(-)
- Log -----------------------------------------------------------------
commit 29b69ef13c04cb6aae14e39911edae18c3f54f8f
Author: Ken Hunt <ken.hunt at digium.com>
Date: Tue May 8 11:40:51 2012 -0500
Changes for new retry logic.
diff --git a/config/RTP.config b/config/RTP.config
index 47b975d..30e0289 100644
--- a/config/RTP.config
+++ b/config/RTP.config
@@ -12,7 +12,7 @@ endport=20000
workerthreadcount=4
# IPv4 address we should bind sessions to
-#ipv4bind=
+ipv4bind=127.0.0.1
# IPv6 address we should bind sessions to
#ipv6bind=
diff --git a/config/test_component.conf b/config/test_component.conf
index 8b7b63b..377d264 100644
--- a/config/test_component.conf
+++ b/config/test_component.conf
@@ -3,6 +3,7 @@
#
# Icebox Configuration
#
+Ice.ThreadPool.Client.Size=4
IceBox.InheritProperties=1
IceBox.LoadOrder=ServiceDiscovery,RTPStateReplicator,MediaServiceRTP,MediaRTPpjmediaTest
@@ -15,6 +16,9 @@ Ice.Override.Timeout=5000
IceBox.Service.ServiceDiscovery=ServiceLocator:create
+# For unit test we run without state replication.
+ServiceDiscovery.Standalone = true
+
ServiceDiscovery.IceStorm.InstanceName=ServiceDiscovery
ServiceDiscovery.IceStorm.TopicManager.Endpoints=tcp -h 127.0.0.1 -p 4421
diff --git a/config/test_component_v6.conf b/config/test_component_v6.conf
index 6c991c6..8daa140 100644
--- a/config/test_component_v6.conf
+++ b/config/test_component_v6.conf
@@ -4,6 +4,8 @@
# Icebox Configuration
#
+Ice.ThreadPool.Client.Size=4
+
IceBox.InheritProperties=1
IceBox.LoadOrder=ServiceDiscovery,RTPStateReplicator,MediaServiceRTP,MediaRTPpjmediaTest
@@ -15,6 +17,9 @@ Ice.Override.Timeout=5000
IceBox.Service.ServiceDiscovery=ServiceLocator:create
+# For unit test we run without state replication.
+ServiceDiscovery.Standalone=true
+
ServiceDiscovery.IceStorm.InstanceName=ServiceDiscovery
ServiceDiscovery.IceStorm.TopicManager.Endpoints=tcp -h 127.0.0.1 -p 4421
diff --git a/config/test_rtp_ice.conf b/config/test_rtp_ice.conf
index 4edbeca..68fbee7 100644
--- a/config/test_rtp_ice.conf
+++ b/config/test_rtp_ice.conf
@@ -4,6 +4,7 @@
# Icebox Configuration
#
RTPConfiguration.Name=rtpoice
+Ice.ThreadPool.Client.Size=4
IceBox.InheritProperties=1
IceBox.LoadOrder=ServiceDiscovery,MediaRTPpjmedia,MediaRTPpjmediaTest
@@ -24,10 +25,11 @@ LocatorService.Proxy=LocatorService:tcp -h 127.0.0.1 -p 4411
IceBox.Service.MediaRTPpjmedia=MediaRTPPJMEDIA:create
+MediaRTPpjmedia.ServiceName=test
+
# Adapter parameters for this component
-MediaRTPpjmediaAdapter.Endpoints=default -h 127.0.0.1
-MediaRTPpjmediaAdapterLocal.Endpoints=default -h 127.0.0.1
-MediaRTPpjmediaAdapterLogger.Endpoints=default -h 127.0.0.1
+MediaRTPpjmedia.ServiceAdapter.Endpoints=default -h 127.0.0.1 -p 4471
+MediaRTPpjmedia.BackplaneAdapter.Endpoints=default -h 127.0.0.1 -p 4472
# A proxy to the service locator management service
ServiceLocatorManagementProxy=LocatorServiceManagement:tcp -h 127.0.0.1 -p 4422
@@ -47,6 +49,9 @@ IceBox.Service.MediaRTPpjmediaTest=MediaRTPPJMEDIAIceTest:create
IceBox.Service.ServiceDiscovery=ServiceLocator:create
+# For unit test we run without state replication.
+ServiceDiscovery.Standalone = true
+
ServiceDiscovery.IceStorm.InstanceName=ServiceDiscovery
ServiceDiscovery.IceStorm.TopicManager.Endpoints=tcp -h 127.0.0.1 -p 4421
diff --git a/slice/AsteriskSCF/Replication/MediaRTPPJMEDIA/RTPStateReplicationIf.ice b/slice/AsteriskSCF/Replication/MediaRTPPJMEDIA/RTPStateReplicationIf.ice
index 1bdd788..4d7099f 100644
--- a/slice/AsteriskSCF/Replication/MediaRTPPJMEDIA/RTPStateReplicationIf.ice
+++ b/slice/AsteriskSCF/Replication/MediaRTPPJMEDIA/RTPStateReplicationIf.ice
@@ -23,6 +23,7 @@
#include <AsteriskSCF/Media/RTP/MediaRTPIf.ice>
#include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.ice>
#include <AsteriskSCF/System/Component/ConfigurationIf.ice>
+#include <AsteriskSCF/System/OperationsIf.ice>
module AsteriskSCF
{
@@ -53,16 +54,16 @@ module V1
interface RTPStateReplicatorListener
{
- void stateRemoved(Ice::StringSeq itemKeys);
- void stateSet(RTPStateItemSeq items);
+ void stateRemoved(AsteriskSCF::System::V1::OperationContext operationContext, Ice::StringSeq itemKeys);
+ void stateSet(AsteriskSCF::System::V1::OperationContext operationContext, RTPStateItemSeq items);
};
interface RTPStateReplicator
{
- void addListener(RTPStateReplicatorListener *listener);
- void removeListener(RTPStateReplicatorListener *listener);
- void setState (RTPStateItemSeq items);
- void removeState(Ice::StringSeq items);
+ void addListener(AsteriskSCF::System::V1::OperationContext operationContext, RTPStateReplicatorListener *listener);
+ void removeListener(AsteriskSCF::System::V1::OperationContext operationContext, RTPStateReplicatorListener *listener);
+ void setState (AsteriskSCF::System::V1::OperationContext operationContext, RTPStateItemSeq items);
+ void removeState(AsteriskSCF::System::V1::OperationContext operationContext, Ice::StringSeq items);
idempotent RTPStateItemSeq getState(Ice::StringSeq itemKeys);
idempotent RTPStateItemSeq getAllState();
};
diff --git a/src/Component.cpp b/src/Component.cpp
index 43d1166..e1956dc 100644
--- a/src/Component.cpp
+++ b/src/Component.cpp
@@ -1,7 +1,7 @@
/*
* Asterisk SCF -- An open-source communications framework.
*
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2012, Digium, Inc.
*
* See http://www.asterisk.org for more information about
* the Asterisk SCF project. Please do not directly contact
@@ -22,40 +22,47 @@
#include <IceUtil/UUID.h>
#include <boost/shared_ptr.hpp>
+#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Component/Component.h>
#include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
+#include <AsteriskSCF/Discovery/SmartProxy.h>
+#include <AsteriskSCF/Logger.h>
+#include <AsteriskSCF/Logger/IceLogger.h>
#include <AsteriskSCF/Media/MediaIf.h>
#include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
-#include <AsteriskSCF/System/Component/ConfigurationIf.h>
-#include <AsteriskSCF/Logger/IceLogger.h>
-#include <AsteriskSCF/Logger.h>
-#include <AsteriskSCF/Discovery/SmartProxy.h>
-#include <AsteriskSCF/Component/Component.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
#include <AsteriskSCF/PJLIB/ThreadHook.h>
+#include <AsteriskSCF/System/Component/ConfigurationIf.h>
+#include "PJMEDIAEnvironment.h"
+#include "RTPConfiguration.h"
+#include "RTPConfigurationIf.h"
#include "RTPReplicationContext.h"
#include "RTPSession.h"
#include "RTPStateReplicator.h"
-#include "RTPConfiguration.h"
-#include "RTPConfigurationIf.h"
-#include "PJMEDIAEnvironment.h"
-using namespace std;
+using namespace AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1;
using namespace AsteriskSCF::Core::Discovery::V1;
-using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Discovery;
using namespace AsteriskSCF::Media::RTP::V1;
+using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Operations;
+using namespace AsteriskSCF::PJMEDIARTP;
using namespace AsteriskSCF::Replication::MediaRTPPJMEDIA::V1;
-using namespace AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1;
-using namespace AsteriskSCF::System::Configuration::V1;
+using namespace AsteriskSCF::Replication;
using namespace AsteriskSCF::System::Component::V1;
+using namespace AsteriskSCF::System::Configuration::V1;
using namespace AsteriskSCF::System::Logging;
-using namespace AsteriskSCF::Discovery;
-using namespace AsteriskSCF::Replication;
-using namespace AsteriskSCF::PJMEDIARTP;
+using namespace std;
namespace
{
Logger lg = getLoggerFactory().getLogger("AsteriskSCF.MediaRTP");
+
+typedef ContextResultData<RTPSessionPrx> AllocateResultData;
+typedef boost::shared_ptr<AllocateResultData> AllocateResultDataPtr;
+
}
static const string ReplicaServiceId("MediaRTPReplica");
@@ -73,10 +80,11 @@ public:
const ConfigurationServiceImplPtr&);
RTPSessionPrx allocate(
- const RTPServiceLocatorParamsPtr&,
- const RTPOptionsPtr&,
- RTPAllocationOutputsPtr&,
- const Ice::Current&);
+ const AsteriskSCF::System::V1::OperationContextPtr&,
+ const RTPServiceLocatorParamsPtr&,
+ const RTPOptionsPtr&,
+ RTPAllocationOutputsPtr&,
+ const Ice::Current&);
pj_pool_factory *getPoolFactory() { return mEnvironment->poolFactory(); };
@@ -86,6 +94,7 @@ public:
}
private:
+ OperationContextCachePtr mOperationContextCache;
Ice::ObjectAdapterPtr mAdapter;
PJMEDIAEnvironmentPtr mEnvironment;
RTPReplicationContextPtr mReplicationContext;
@@ -202,6 +211,7 @@ private:
virtual void onPreInitialize();
virtual void onStop();
virtual void onStart();
+ virtual void onActivated();
// Other base Component overrides
virtual void prepareBackplaneServicesForDiscovery();
@@ -236,12 +246,12 @@ private:
void Component::onSuspend()
{
- mGeneralState->serviceManagement->suspend();
+ mGeneralState->serviceManagement->suspend(AsteriskSCF::Operations::createContext());
}
void Component::onResume()
{
- mGeneralState->serviceManagement->unsuspend();
+ mGeneralState->serviceManagement->unsuspend(AsteriskSCF::Operations::createContext());
}
/**
@@ -250,6 +260,7 @@ void Component::onResume()
RTPMediaServiceImpl::RTPMediaServiceImpl(const Ice::ObjectAdapterPtr& adapter,
const RTPReplicationContextPtr& replicationContext,
const ConfigurationServiceImplPtr& configurationService) :
+ mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
mAdapter(adapter),
mEnvironment(PJMEDIAEnvironment::create(adapter->getCommunicator()->getProperties(), configurationService)),
mReplicationContext(replicationContext),
@@ -261,12 +272,24 @@ RTPMediaServiceImpl::RTPMediaServiceImpl(const Ice::ObjectAdapterPtr& adapter,
* Implementation of the allocate method as defined in MediaRTPIf.ice
*/
RTPSessionPrx RTPMediaServiceImpl::allocate(
- const RTPServiceLocatorParamsPtr& params,
- const RTPOptionsPtr& options,
- RTPAllocationOutputsPtr& outputs,
- const Ice::Current&)
+ const AsteriskSCF::System::V1::OperationContextPtr& context,
+ const RTPServiceLocatorParamsPtr& params,
+ const RTPOptionsPtr& options,
+ RTPAllocationOutputsPtr& outputs,
+ const Ice::Current&)
{
- return AsteriskSCF::PJMEDIARTP::RTPSession::create(
+ std::pair<bool, AllocateResultDataPtr> data =
+ getContextSync<AllocateResultDataPtr>(mOperationContextCache, context);
+
+ if (data.first)
+ {
+ // retry detected
+ return data.second->getResult();
+ }
+
+ try
+ {
+ RTPSessionPrx r = AsteriskSCF::PJMEDIARTP::RTPSession::create(
mAdapter,
IceUtil::generateUUID(),
params,
@@ -275,6 +298,19 @@ RTPSessionPrx RTPMediaServiceImpl::allocate(
mConfigurationService,
options,
outputs);
+ data.second->setResult(r);
+ return r;
+ }
+ catch (const std::exception& e)
+ {
+ data.second->setException(e);
+ throw;
+ }
+ catch (...)
+ {
+ data.second->setException();
+ throw;
+ }
}
void Component::onPreInitialize()
@@ -356,7 +392,7 @@ void Component::createPrimaryServices()
if (rtpReplicationContext->isActive() == true)
{
- getServiceLocatorManagement()->addCompare(getName() + ".RTP.Comparator", mRTPMediaComparatorServicePrx);
+ getServiceLocatorManagement()->addCompare(AsteriskSCF::Operations::createContext(), getName() + ".RTP.Comparator", mRTPMediaComparatorServicePrx);
}
}
@@ -413,7 +449,7 @@ void Component::findRemoteServices()
// replicator service.
ConfigurationReplicatorPrx configurationReplicator = ConfigurationReplicatorPrx::checkedCast(
rtpReplicationContext->getReplicator().initialize(), ReplicatorFacet);
- configurationReplicator->registerConfigurationService(mConfigurationServicePrx);
+ configurationReplicator->registerConfigurationService(AsteriskSCF::Operations::createContext(), mConfigurationServicePrx);
}
catch (const std::exception& e)
@@ -466,7 +502,7 @@ void Component::listenToStateReplicators()
// Are we in standby mode?
if (rtpReplicationContext->getState() == STANDBY_IN_REPLICA_GROUP)
{
- rtpReplicationContext->getReplicator()->addListener(mReplicatorListenerProxy);
+ rtpReplicationContext->getReplicator()->addListener(AsteriskSCF::Operations::createContext(), mReplicatorListenerProxy);
mListeningToReplicator = true;
}
}
@@ -494,7 +530,7 @@ void Component::stopListeningToStateReplicators()
try
{
- rtpReplicationContext->getReplicator()->removeListener(mReplicatorListenerProxy);
+ rtpReplicationContext->getReplicator()->removeListener(AsteriskSCF::Operations::createContext(), mReplicatorListenerProxy);
mListeningToReplicator = false;
}
catch (const Ice::Exception& e)
@@ -532,7 +568,19 @@ void Component::onRegisterPrimaryServices()
}
mGeneralState->serviceManagement = mRTPMediaServiceRegistration->getServiceManagement();
- mGeneralState->serviceManagement->addLocatorParams(mRTPOverIceLocatorParams, getName() + ".RTP.Comparator");
+ mGeneralState->serviceManagement->addLocatorParams(AsteriskSCF::Operations::createContext(),
+ mRTPOverIceLocatorParams, getName() + ".RTP.Comparator");
+}
+
+void Component::onActivated()
+{
+ RTPReplicationContextPtr rtpReplicationContext =
+ boost::static_pointer_cast<RTPReplicationContext>(getReplicationContext());
+
+ RTPStateItemSeq items;
+ items.push_back(mGeneralState);
+ RTPStateReplicatorPrx oneway = RTPStateReplicatorPrx::uncheckedCast(rtpReplicationContext->getReplicator()->ice_oneway());
+ oneway->setState(AsteriskSCF::Operations::createContext(), items);
}
void Component::onStart()
@@ -545,7 +593,7 @@ void Component::onStart()
RTPStateItemSeq items;
items.push_back(mGeneralState);
RTPStateReplicatorPrx oneway = RTPStateReplicatorPrx::uncheckedCast(rtpReplicationContext->getReplicator()->ice_oneway());
- oneway->setState(items);
+ oneway->setState(AsteriskSCF::Operations::createContext(), items);
}
}
@@ -553,10 +601,10 @@ void Component::onStop()
{
if (getReplicationContext()->isActive() == true)
{
- mGeneralState->serviceManagement->unregister();
+ mGeneralState->serviceManagement->unregister(AsteriskSCF::Operations::createContext());
}
- getServiceLocatorManagement()->removeCompare(getName() + ".RTP.Comparator");
+ getServiceLocatorManagement()->removeCompare(AsteriskSCF::Operations::createContext(), getName() + ".RTP.Comparator");
}
extern "C"
diff --git a/src/ICETransport.cpp b/src/ICETransport.cpp
index 994b1ad..b36c996 100644
--- a/src/ICETransport.cpp
+++ b/src/ICETransport.cpp
@@ -14,39 +14,38 @@
* at the top of the source tree.
*/
-#include "ICETransport.h"
-#include "PJUtil.h"
-
-#include <pjmedia.h>
#include <pjlib.h>
+#include <pjmedia.h>
#include <pjnath.h>
-#include <AsteriskSCF/System/ExceptionsIf.h>
#include <map>
+#include <sstream>
+
#include <boost/thread.hpp>
-#include <boost/thread/shared_mutex.hpp>
-#include <AsteriskSCF/System/NAT/NATTraversalIf.h>
#include <Ice/Ice.h>
-#include <sstream>
-#include <AsteriskSCF/Logger.h>
#include <IceUtil/UUID.h>
+#include <AsteriskSCF/Logger.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
+#include <AsteriskSCF/System/ExceptionsIf.h>
+#include <AsteriskSCF/System/NAT/NATTraversalIf.h>
+
+#include "ICETransport.h"
+#include "PJUtil.h"
+
+using namespace AsteriskSCF::Helpers;
+using namespace AsteriskSCF::Operations;
using namespace AsteriskSCF::PJMEDIARTP;
-using namespace AsteriskSCF::System::V1;
using namespace AsteriskSCF::PJUtil;
-using namespace std;
-using namespace AsteriskSCF::Helpers;
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::System::NAT::V1;
+using namespace AsteriskSCF::System::V1;
+using namespace std;
namespace
{
Logger logger = getLoggerFactory().getLogger("AsteriskSCF.MediaRTP");
-}
-
-namespace
-{
class ICEAgentImpl : public InteractiveConnectionAgent
{
@@ -54,6 +53,7 @@ public:
ICEAgentImpl(const Ice::ObjectAdapterPtr& adapter, const Ice::Identity& id, const PJMEDIAEnvironmentPtr& env,
const PJMEDIAEndpointPtr& ep) :
+ mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
mAdapter(adapter),
mId(id),
mShuttingDown(false),
@@ -84,10 +84,22 @@ public:
return mRole;
}
+ typedef AMDContextResultData<CandidatePtr, AMD_InteractiveConnectionAgent_negotiatePtr> NegotiateContextData;
+ typedef boost::shared_ptr<NegotiateContextData> NegotiateContextDataPtr;
+
void negotiate_async(const AMD_InteractiveConnectionAgent_negotiatePtr& callback,
- const string& hostname, Ice::Int port, const CandidateSeq& candidates,
- const Ice::Current&)
+ const AsteriskSCF::System::V1::OperationContextPtr& context,
+ const string& hostname, Ice::Int port, const CandidateSeq& candidates,
+ const Ice::Current&)
{
+ NegotiateContextDataPtr data =
+ getContext<NegotiateContextData>(mOperationContextCache, context, callback);
+
+ if (!data)
+ {
+ // retry detected
+ return;
+ }
boost::unique_lock<boost::shared_mutex> lock(mLock);
stateCheck();
@@ -99,12 +111,12 @@ public:
// TODO: are we going to support cancellable negotiations.
//
}
- mCurrentNegotiation = callback;
+ mCurrentNegotiation = data->getProxy();
//
- // So how this works is we create a remote SDP and call pjmedia_transport_start() easy peasy. (Same deal
+ // So how this works is we create a remote SDP and call pjmedia_transport_start() easy peasy. (Same deal
//
- pjmedia_sdp_session* remoteSDPSession =
+ pjmedia_sdp_session* remoteSDPSession =
static_cast<pjmedia_sdp_session*>(pj_pool_zalloc(mEnv->memoryPool(), sizeof(pjmedia_sdp_session)));
@@ -178,7 +190,7 @@ public:
//
// I was concerned about the fact that for a given SIP session, there might be multiple media
- // streams and multiple candidates. I'm not sure that its actually too much of an issue even
+ // streams and multiple candidates. I'm not sure that its actually too much of an issue even
// if multiple media types are muxed on a single ICE negotiated flow, but there will need to be
// some redesign to pull in the multiple media streams associated with the session. For the moment
// we will operation under the premise that we are dealing with a single media stream.
@@ -191,7 +203,7 @@ public:
{
CandidatePtr candidate = *i;
ostringstream os;
- os << "candidate:" << candidate->foundation << ' ' << candidate->componentId << " UDP " <<
+ os << "candidate:" << candidate->foundation << ' ' << candidate->componentId << " UDP " <<
candidate->priority << ' ' << candidate->mappedAddress << ' ' << candidate->mappedPort << " typ ";
string hostType;
switch (candidate->type)
@@ -216,7 +228,7 @@ public:
}
string t = os.str();
pj_str_t candidateStr = pj_str(const_cast<char*>(t.c_str()));
- pjmedia_sdp_attr* newAttribute = pjmedia_sdp_attr_create(mEnv->memoryPool(),
+ pjmedia_sdp_attr* newAttribute = pjmedia_sdp_attr_create(mEnv->memoryPool(),
"candidate", &candidateStr);
pjmedia_sdp_attr_add(¤tMedia->attr_count, currentMedia->attr, newAttribute);
}
@@ -425,7 +437,7 @@ public:
candidateObj->mappedAddress = candidateObj->baseAddress;
candidateObj->mappedPort = candidateObj->basePort;
candidateObj->baseAddress = baseAddress;
- candidateObj->basePort = basePort;
+ candidateObj->basePort = basePort;
}
else
{
@@ -493,6 +505,7 @@ public:
private:
boost::shared_mutex mLock;
+ OperationContextCachePtr mOperationContextCache;
Ice::ObjectAdapterPtr mAdapter;
Ice::Identity mId;
bool mShuttingDown;
@@ -553,7 +566,7 @@ boost::shared_mutex ICECallbackAdapter::mLock;
// invoked before we get a chance to add the transport. The solution to that is to allow an entry to be created when
// the ICE completion callback arrives and there isn't a table entry. When the addEntry runs, it will see the entry and
// simply update the appropriate field.
-//
+//
void ICECallbackAdapter::addEntry(pjmedia_transport* transport, const ICETransportPtr& callback)
{
bool alreadyKnown = false;
@@ -626,7 +639,7 @@ void ICECallbackAdapter::onICEComplete(pjmedia_transport* transport, pj_ice_stra
{
//
// AFAICT, only PJ_ICE_STRANS_OP_NEGOTIATION should get here.
- //
+ //
switch (operation)
{
case PJ_ICE_STRANS_OP_INIT:
@@ -684,18 +697,18 @@ void ICECallbackAdapter::onICEComplete(pjmedia_transport* transport, pj_ice_stra
case PJ_ICE_STRANS_OP_KEEP_ALIVE:
//
// Keep alive has successfully completed. FWICT this should not get here.
- //
+ //
break;
- };
+ }
}
-}
+} // anonymous namespace
ICETransport::~ICETransport()
{
//
// TODO : cleanup ICE transport, the transport itself is closed by the parent class.
- //
+ //
ICECallbackAdapter::removeEntry(mTransport);
}
@@ -728,7 +741,7 @@ void ICETransport::onSetupComplete(pjmedia_transport* transport, int status)
{
//
// Address has changed! We need to let Session listeners know!
- // TODO!
+ // TODO!
//
pj_memcpy(mLastKnownAddr.get(), &info.sock_info.rtp_addr_name, sizeof(pj_sockaddr));
}
@@ -738,21 +751,33 @@ void ICETransport::onSetupComplete(pjmedia_transport* transport, int status)
mMonitor.notify_one();
}
-AddressPtr ICETransport::localAddress()
+AddressPtr ICETransport::localAddress()
{
boost::unique_lock<boost::mutex> lock(mLock);
if (mLocalAddress)
{
return mLocalAddress;
}
+ //
+ // Retry check for local address for max of 2.5 seconds, then proceed as if it was unknown.
+ //
for (size_t i = 0; i < 5 && !mLocalAddress; ++i)
{
- mMonitor.wait(lock);
+ try
+ {
+ mMonitor.timed_wait(lock, boost::posix_time::milliseconds(500));
+ }
+ catch (const boost::thread_interrupted&)
+ {
+ }
+ catch (const boost::thread_resource_error&)
+ {
+ }
}
return mLocalAddress;
}
-AddressPtr ICETransport::remoteAddress()
+AddressPtr ICETransport::remoteAddress()
{
boost::unique_lock<boost::mutex> lock(mLock);
return mRemoteAddress;
@@ -791,7 +816,7 @@ void ICETransport::start()
PJICECallbackPtr callback(new pjmedia_ice_cb);
callback->on_ice_complete = &ICECallbackAdapter::onICEComplete;
NATModulePtr natModule = NATModule::create(mConfig, mEndpoint);
- pj_status_t result = pjmedia_ice_create(mEndpoint->endpoint(), "ASCF_ICE_MEDIA", (mEnableRTCP ? 2 : 1),
+ pj_status_t result = pjmedia_ice_create(mEndpoint->endpoint(), "ASCF_ICE_MEDIA", (mEnableRTCP ? 2 : 1),
natModule->configuration(), callback.get(), &t);
if (fail(result))
{
diff --git a/src/RTPConfiguration.cpp b/src/RTPConfiguration.cpp
index 3c5650a..178bf22 100644
--- a/src/RTPConfiguration.cpp
+++ b/src/RTPConfiguration.cpp
@@ -1,7 +1,7 @@
/*
* Asterisk SCF -- An open-source communications framework.
*
- * Copyright (C) 2011, Digium, Inc.
+ * Copyright (C) 2011-2012, Digium, Inc.
*
* See http://www.asterisk.org for more information about
* the Asterisk SCF project. Please do not directly contact
@@ -14,19 +14,22 @@
* at the top of the source tree.
*/
-#include "RTPConfigurationIf.h"
-#include "RTPConfiguration.h"
#include <IceUtil/UUID.h>
-#include <AsteriskSCF/System/Component/ConfigurationIf.h>
#include <boost/thread.hpp>
-#include <boost/thread/shared_mutex.hpp>
-using namespace AsteriskSCF::System::Configuration::V1;
+#include <AsteriskSCF/System/Component/ConfigurationIf.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
+
+#include "RTPConfigurationIf.h"
+#include "RTPConfiguration.h"
+
+using namespace AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1;
+using namespace AsteriskSCF::Operations;
using namespace AsteriskSCF::PJMEDIARTP;
+using namespace AsteriskSCF::System::Configuration::V1;
using namespace std;
-using namespace AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1;
/**
* Implementation of the configuration service.
@@ -34,24 +37,25 @@ using namespace AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1;
class ConfigurationServiceServant : virtual public ConfigurationServiceImpl
{
public:
+ ConfigurationServiceServant() : mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)) {}
/**
* AsteriskSCF::System::Configuration::V1 interface. Slice to C++ mapping.
*/
AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq
getConfiguration(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
-
+
AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq
getConfigurationAll(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
-
+
AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq
getConfigurationGroups(const Ice::Current&);
- void setConfiguration(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
+ void setConfiguration(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
- void removeConfigurationItems(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&,
+ void removeConfigurationItems(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&,
const Ice::Current&);
- void removeConfigurationGroups(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&,
+ void removeConfigurationGroups(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&,
const Ice::Current&);
/**
@@ -90,12 +94,12 @@ public:
{
mICEConfig = newConfig;
}
-
+
void replaceConfig(const NATConfigPtr& newConfig)
{
mNATConfig = newConfig;
}
-
+
void replaceConfig(const SRTPConfigurationPtr& newConfig)
{
mSRTPConfig = newConfig;
@@ -121,7 +125,7 @@ public:
//
if (mGeneralGroup)
{
- ConfigurationItemDict::const_iterator item =
+ ConfigurationItemDict::const_iterator item =
mGeneralGroup->configurationItems.find(EnableSRTPItemName);
if (item != mGeneralGroup->configurationItems.end())
{
@@ -150,6 +154,7 @@ public:
}
private:
+ OperationContextCachePtr mOperationContextCache;
/**
* General RTP configuration
*/
@@ -223,7 +228,7 @@ ConfigurationGroupSeq ConfigurationServiceServant::getConfiguration(
void visitRTPICEConfigurationGroup(const RTPICEConfigurationGroupPtr& group)
{
RTPICEConfigurationGroupPtr currentGroup = mImpl->getICEConfigurationGroup();
-
+
if (!currentGroup)
{
return;
@@ -233,19 +238,19 @@ ConfigurationGroupSeq ConfigurationServiceServant::getConfiguration(
returnedGroup->configurationItems);
mGroups.push_back(returnedGroup);
}
-
+
ConfigurationServiceServantPtr mImpl;
ConfigurationGroupSeq& mGroups;
};
-
+
ConfigurationGroupSeq newGroups;
RTPConfigurationGroupVisitorPtr v = new GroupVisitor(this, newGroups);
-
+
for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
{
(*group)->visit(v);
}
-
+
return newGroups;
}
@@ -259,7 +264,7 @@ ConfigurationGroupSeq ConfigurationServiceServant::getConfigurationAll(
mImpl(impl), mGroups(visitorGroups)
{
}
-
+
private:
void visitRTPGeneralGroup(const ::AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1::RTPGeneralGroupPtr&)
{
@@ -278,21 +283,21 @@ ConfigurationGroupSeq ConfigurationServiceServant::getConfigurationAll(
mGroups.push_back(g);
}
}
-
+
ConfigurationServiceServantPtr mImpl;
ConfigurationGroupSeq& mGroups;
};
-
+
ConfigurationGroupSeq newGroups;
RTPConfigurationGroupVisitorPtr v = new GroupVisitor(this, newGroups);
boost::shared_lock<boost::shared_mutex> lock(mLock);
-
+
for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
{
(*group)->visit(v);
}
-
+
return newGroups;
}
@@ -300,7 +305,7 @@ ConfigurationGroupSeq ConfigurationServiceServant::getConfigurationGroups(const
{
ConfigurationGroupSeq groups;
boost::shared_lock<boost::shared_mutex> lock(mLock);
-
+
if (mGeneralGroup)
{
groups.push_back(new RTPGeneralGroup);
@@ -309,21 +314,21 @@ ConfigurationGroupSeq ConfigurationServiceServant::getConfigurationGroups(const
{
groups.push_back(new RTPICEConfigurationGroup);
}
-
+
return groups;
}
-void ConfigurationServiceServant::setConfiguration(const ConfigurationGroupSeq& groups,
+void ConfigurationServiceServant::setConfiguration(const AsteriskSCF::System::V1::OperationContextPtr& context, const ConfigurationGroupSeq& groups,
const Ice::Current&)
{
class GroupVisitor : public RTPConfigurationGroupVisitor
{
public:
- GroupVisitor(const ConfigurationServiceServantPtr& impl) :
+ GroupVisitor(const ConfigurationServiceServantPtr& impl) :
mImpl(impl)
{
}
-
+
private:
/**
* Helper function which performs serial number checking of items
@@ -340,22 +345,22 @@ void ConfigurationServiceServant::setConfiguration(const ConfigurationGroupSeq&
{
continue;
}
-
+
ConfigurationItemDict::const_iterator localItem = localItems.find(item->first);
-
+
if (localItem == localItems.end())
{
// This is a new item so serial checking does not apply
continue;
}
-
+
if (item->second->serialNumber < localItem->second->serialNumber)
{
throw SerialConflict(group, item->second);
}
}
}
-
+
void visitRTPGeneralGroup(const ::AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1::RTPGeneralGroupPtr& group)
{
RTPGeneralGroupPtr g = mImpl->getGeneralGroup();
@@ -367,7 +372,7 @@ void ConfigurationServiceServant::setConfiguration(const ConfigurationGroupSeq&
{
performSerialCheck(group->configurationItems, g->configurationItems, group);
}
-
+
for (ConfigurationItemDict::const_iterator item = group->configurationItems.begin();
item != group->configurationItems.end();
++item)
@@ -415,7 +420,7 @@ void ConfigurationServiceServant::setConfiguration(const ConfigurationGroupSeq&
mImpl->replaceConfig(ICEConfiguration::create(mMaxCandidates, mMaxCalls));
}
}
-
+
void visitSTUNServerItem(const STUNServerItemPtr& item)
{
@@ -454,11 +459,11 @@ void ConfigurationServiceServant::setConfiguration(const ConfigurationGroupSeq&
mMaxCandidates = item->maxCandidates;
}
}
-
+
private:
ConfigurationServiceServantPtr mImpl;
-
+
bool mCreateRTPICEConfig;
bool mCreateNATConfig;
@@ -481,7 +486,7 @@ void ConfigurationServiceServant::setConfiguration(const ConfigurationGroupSeq&
}
RTPConfigurationItemVisitorPtr v(new Visitor(mImpl));
-
+
for (ConfigurationItemDict::const_iterator item = group->configurationItems.begin();
item != group->configurationItems.end();
++item)
@@ -495,7 +500,13 @@ void ConfigurationServiceServant::setConfiguration(const ConfigurationGroupSeq&
ConfigurationServiceServantPtr mImpl;
};
-
+
+ if (!mOperationContextCache->addOperationContext(context))
+ {
+ // retry detected
+ return;
+ }
+
RTPConfigurationGroupVisitorPtr v = new GroupVisitor(this);
boost::unique_lock<boost::shared_mutex> lock(mLock);
@@ -506,12 +517,13 @@ void ConfigurationServiceServant::setConfiguration(const ConfigurationGroupSeq&
}
void ConfigurationServiceServant::removeConfigurationItems(
+ const AsteriskSCF::System::V1::OperationContextPtr& context,
const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups, const Ice::Current&)
{
class GroupVisitor : public RTPConfigurationGroupVisitor
{
public:
- GroupVisitor(const ConfigurationServiceServantPtr& impl) :
+ GroupVisitor(const ConfigurationServiceServantPtr& impl) :
mImpl(impl)
{
}
@@ -535,7 +547,7 @@ void ConfigurationServiceServant::removeConfigurationItems(
localItems.erase(localItem);
}
}
-
+
void visitRTPGeneralGroup(const ::AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1::RTPGeneralGroupPtr& group)
{
RTPGeneralGroupPtr g = mImpl->getGeneralGroup();
@@ -550,7 +562,7 @@ void ConfigurationServiceServant::removeConfigurationItems(
//
// The Visitor in this case interprets the provided information
// to decide what it needs to do. This type of thing might be better moved
- // into a separate helper class (along with the set operations), but
+ // into a separate helper class (along with the set operations), but
// we'll leave it here for now to avoid inconsistencies with the
// rest of the configuration implementation.
//
@@ -565,7 +577,7 @@ void ConfigurationServiceServant::removeConfigurationItems(
mResetICEConfig(false)
{
}
-
+
~Visitor()
{
if (mDisable)
@@ -601,7 +613,7 @@ void ConfigurationServiceServant::removeConfigurationItems(
mImpl->replaceConfig(ICEConfigurationPtr());
}
}
-
+
void visitSTUNServerItem(const STUNServerItemPtr& item)
{
if (item)
@@ -633,7 +645,7 @@ void ConfigurationServiceServant::removeConfigurationItems(
mResetICEConfig = true;
}
}
-
+
private:
ConfigurationServiceServantPtr mImpl;
bool mRemoveSTUNServer;
@@ -645,37 +657,45 @@ void ConfigurationServiceServant::removeConfigurationItems(
};
RTPICEConfigurationGroupPtr g = mImpl->getICEConfigurationGroup();
-
+
if (g)
{
RTPConfigurationItemVisitorPtr v(new Visitor(mImpl));
removeItems(v, group->configurationItems, g->configurationItems);
}
}
-
+
private:
ConfigurationServiceServantPtr mImpl;
};
-
+
+ if (!mOperationContextCache->addOperationContext(context))
+ {
+ // retry detected
+ return;
+ }
+
RTPConfigurationGroupVisitorPtr v = new GroupVisitor(this);
boost::unique_lock<boost::shared_mutex> lock(mLock);
-
+
for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
{
(*group)->visit(v);
}
}
-void ConfigurationServiceServant::removeConfigurationGroups(const ConfigurationGroupSeq& groups, const Ice::Current&)
+void ConfigurationServiceServant::removeConfigurationGroups(
+ const AsteriskSCF::System::V1::OperationContextPtr& context,
+ const ConfigurationGroupSeq& groups, const Ice::Current&)
{
class GroupVisitor : public RTPConfigurationGroupVisitor
{
public:
- GroupVisitor(const ConfigurationServiceServantPtr& impl) :
+ GroupVisitor(const ConfigurationServiceServantPtr& impl) :
mImpl(impl)
{
}
-
+
private:
void visitRTPGeneralGroup(const ::AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1::RTPGeneralGroupPtr&)
{
@@ -689,11 +709,17 @@ void ConfigurationServiceServant::removeConfigurationGroups(const ConfigurationG
ConfigurationServiceServantPtr mImpl;
};
-
+
+ if (!mOperationContextCache->addOperationContext(context))
+ {
+ // retry detected
+ return;
+ }
+
RTPConfigurationGroupVisitorPtr v = new GroupVisitor(this);
boost::unique_lock<boost::shared_mutex> lock(mLock);
-
+
for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
{
(*group)->visit(v);
diff --git a/src/RTPSession.cpp b/src/RTPSession.cpp
index 4cccbb1..d3fe04c 100644
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@ -1,7 +1,7 @@
/*
* Asterisk SCF -- An open-source communications framework.
*
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2012, Digium, Inc.
*
* See http://www.asterisk.org for more information about
* the Asterisk SCF project. Please do not directly contact
@@ -32,21 +32,24 @@
#include <IceUtil/UUID.h>
#include <AsteriskSCF/Media/MediaIf.h>
-#include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
#include <AsteriskSCF/Media/RTP/MediaRTCPIf.h>
-#include <AsteriskSCF/System/Component/ReplicaIf.h>
+#include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
#include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.h>
+#include <AsteriskSCF/System/Component/ReplicaIf.h>
-using namespace std;
using namespace AsteriskSCF::Core::Discovery::V1;
-using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Discovery;
using namespace AsteriskSCF::Media::RTP::V1;
+using namespace AsteriskSCF::Media::V1;
using namespace AsteriskSCF::Media;
-using namespace AsteriskSCF::Replication::MediaRTPPJMEDIA::V1;
-using namespace AsteriskSCF::System::Component::V1;
-using namespace AsteriskSCF::Discovery;
+using namespace AsteriskSCF::Operations;
using namespace AsteriskSCF::PJMEDIARTP;
+using namespace AsteriskSCF::Replication::MediaRTPPJMEDIA::V1;
using namespace AsteriskSCF::SessionCommunications::V1;
+using namespace AsteriskSCF::System::Component::V1;
+using namespace std;
/**
* RTCP Information Interface implementation.
@@ -54,7 +57,8 @@ using namespace AsteriskSCF::SessionCommunications::V1;
class RTCPInformationImpl : public RTCP::V1::Information
{
public:
- RTCPInformationImpl(pjmedia_rtcp_stat *general, pjmedia_rtcp_stream_stat *stream) :
+ RTCPInformationImpl(pjmedia_rtcp_stat *general, pjmedia_rtcp_stream_stat *stream, const OperationContextCachePtr& cache) :
+ mOperationContextCache(cache),
mGeneralStatistics(general), mStreamStatistics(stream) { }
RTCP::V1::StatisticsPtr getStatistics(const Ice::Current&)
@@ -88,14 +92,24 @@ public:
return statistics;
}
- void addListener(const RTCP::V1::InformationListenerPrx& listener, const Ice::Current&)
+ void addListener(const AsteriskSCF::System::V1::OperationContextPtr& context, const RTCP::V1::InformationListenerPrx& listener, const Ice::Current&)
{
+ if (!mOperationContextCache->addOperationContext(context))
+ {
+ // retry detected
+ return;
+ }
boost::unique_lock<boost::shared_mutex> lock(mLock);
mListeners.push_back(listener);
}
- void removeListener(const RTCP::V1::InformationListenerPrx& listener, const Ice::Current&)
+ void removeListener(const AsteriskSCF::System::V1::OperationContextPtr& context, const RTCP::V1::InformationListenerPrx& listener, const Ice::Current&)
{
+ if (!mOperationContextCache->addOperationContext(context))
+ {
+ // retry detected
+ return;
+ }
boost::unique_lock<boost::shared_mutex> lock(mLock);
mListeners.erase(std::remove(mListeners.begin(), mListeners.end(), listener), mListeners.end());
}
@@ -115,6 +129,8 @@ private:
*/
boost::shared_mutex mLock;
+ OperationContextCachePtr mOperationContextCache;
+
/**
* Listeners present.
*/
@@ -142,22 +158,22 @@ typedef IceUtil::Handle<RTCPInformationImpl> RTCPInformationImplPtr;
class RTPSessionImpl : public AsteriskSCF::Media::RTP::V1::SRTPSession
{
public:
- RTPSessionImpl(const Ice::ObjectAdapterPtr&,
- const string& id,
- const PJMEDIAEnvironmentPtr& env,
- const AsteriskSCF::Media::RTP::V1::RTPServiceLocatorParamsPtr& params,
- const RTPReplicationContextPtr& replicationContext,
- const ConfigurationServiceImplPtr&);
-
+ RTPSessionImpl(const Ice::ObjectAdapterPtr&,
+ const string& id,
+ const PJMEDIAEnvironmentPtr& env,
+ const AsteriskSCF::Media::RTP::V1::RTPServiceLocatorParamsPtr& params,
+ const RTPReplicationContextPtr& replicationContext,
+ const ConfigurationServiceImplPtr&);
+
RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter,
- const string& sessionIdentity,
- const PJMEDIAEnvironmentPtr& env,
- Ice::Int port,
- const AsteriskSCF::Media::V1::FormatSeq& formats,
- bool isIPv6,
- bool srtp,
- const RTPReplicationContextPtr& replicationContext,
- const ConfigurationServiceImplPtr& configurationServant);
+ const string& sessionIdentity,
+ const PJMEDIAEnvironmentPtr& env,
+ Ice::Int port,
+ const AsteriskSCF::Media::V1::FormatSeq& formats,
+ bool isIPv6,
+ bool srtp,
+ const RTPReplicationContextPtr& replicationContext,
+ const ConfigurationServiceImplPtr& configurationServant);
~RTPSessionImpl();
@@ -168,26 +184,29 @@ public:
AsteriskSCF::Media::V1::StreamSinkSeq getSinks(const Ice::Current&);
std::string getId(const Ice::Current&);
void setCookies(const AsteriskSCF::Media::V1::SessionCookieDict& cookies);
- void setCookies(const AsteriskSCF::Media::V1::SessionCookies& cookies,
- const Ice::Current&);
- void getCookies_async(const AsteriskSCF::Media::V1::AMD_Session_getCookiesPtr &cb,
- const AsteriskSCF::Media::V1::SessionCookies& cookiesToGet,
+ void setCookies(const AsteriskSCF::System::V1::OperationContextPtr&,
+ const AsteriskSCF::Media::V1::SessionCookies& cookies,
+ const Ice::Current&);
+ void getCookies_async(const AsteriskSCF::Media::V1::AMD_Session_getCookiesPtr &cb,
+ const AsteriskSCF::Media::V1::SessionCookies& cookiesToGet,
const Ice::Current&);
- void removeCookies(const AsteriskSCF::Media::V1::SessionCookies& cookies,
- const Ice::Current&);
+ void removeCookies(
+ const AsteriskSCF::System::V1::OperationContextPtr&,
+ const AsteriskSCF::Media::V1::SessionCookies& cookies,
+ const Ice::Current&);
void useRTCP(bool, const Ice::Current&);
void release(const Ice::Current&);
- void associatePayloads(const AsteriskSCF::Media::RTP::V1::PayloadMap&, const Ice::Current&);
+ void associatePayloads(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Media::RTP::V1::PayloadMap&, const Ice::Current&);
/**
* AsteriskSCF::Media::V1::SRTPSession implementation.
*/
- void setOptions(const string& suiteName, const string& keyInfo, bool enableAuthentication, bool enableEncryption, const Ice::Current&);
- void start(const string& suiteName, const string& keyInfo, bool enableAuthentication, bool enableEncryption, const Ice::Current&);
+ void setOptions(const AsteriskSCF::System::V1::OperationContextPtr&, const string& suiteName, const string& keyInfo, bool enableAuthentication, bool enableEncryption, const Ice::Current&);
+ void start(const AsteriskSCF::System::V1::OperationContextPtr&, const string& suiteName, const string& keyInfo, bool enableAuthentication, bool enableEncryption, const Ice::Current&);
/**
- * Internal methods.
+ * Internal methods.
*/
AsteriskSCF::Media::V1::FormatSeq getFormats();
void setRemoteDetails(const std::string& address, Ice::Int port);
@@ -195,7 +214,7 @@ public:
int getPayload(const AsteriskSCF::Media::V1::FormatPtr& mediaformat);
/**
- * Accessors for the source and sink servants.
+ * Accessors for the source and sink servants.
*/
StreamSourceRTPImplPtr getSourceServant();
StreamSinkRTPImplPtr getSinkServant();
@@ -242,6 +261,8 @@ private:
boost::shared_mutex mLock;
+ OperationContextCachePtr mOperationContextCache;
+
/**
* Instance of the session adapter to be passed to sinks, sources, configuration, etc.
*/
@@ -308,7 +329,7 @@ private:
*/
RTPSessionStateItemPtr mSessionStateItem;
- // Context for state replication for this component.
+ // Context for state replication for this component.
RTPReplicationContextPtr mReplicationContext;
/**
@@ -333,6 +354,8 @@ private:
TelephonyEventSourcePrx mTelephonyEventSourcePrx;
TelephonyEventSinkPrx mTelephonyEventSinkPrx;
+
+ AsteriskSCF::Media::V1::SessionCookieDict mCookies;
};
/**
@@ -349,7 +372,7 @@ public:
/**
* Constructor for this implementation.
*/
- RTCPSessionImpl(const RTPSessionImplPtr& session) : mSession(session) { }
+ RTCPSessionImpl(const RTPSessionImplPtr& session, const OperationContextCachePtr& cache) : mOperationContetCache(cache), mSession(session) { }
/**
* Method used to retrieve the port our RTCP session is listening on.
@@ -364,12 +387,35 @@ public:
return pj_sockaddr_get_port(&transportInfo.sock_info.rtcp_addr_name);
}
- void setRemoteDetails(const std::string& address, Ice::Int port, const Ice::Current&)
+ void setRemoteDetails(const AsteriskSCF::System::V1::OperationContextPtr& context, const std::string& address, Ice::Int port, const Ice::Current&)
{
- mSession->setRemoteRtcpDetails(address, port);
+ ContextDataPtr data = checkAndThrow(mOperationContetCache, context);
+ if (!data)
+ {
+ // retry detected
+ return;
+ }
+
+ try
+ {
+ mSession->setRemoteRtcpDetails(address, port);
+ data->setCompleted();
+ }
+ catch (const std::exception& e)
+ {
+ data->setException(e);
+ throw;
+ }
+ catch (...)
+ {
+ data->setException();
+ throw;
+ }
}
private:
+ OperationContextCachePtr mOperationContetCache;
+
/**
* Pointer to the RTP session.
*/
@@ -389,7 +435,7 @@ public:
{
}
- void replicateState(const RTPStreamSinkStateItemPtr& sinkStateItem)
+ void replicateState(const RTPStreamSinkStateItemPtr& sinkStateItem)
{
mServant->replicateState(0, sinkStateItem, 0, 0, 0);
}
@@ -409,12 +455,12 @@ public:
mServant->replicateState(0, 0, 0, 0, item);
}
- AsteriskSCF::Media::V1::FormatPtr getFormat(int payload)
+ AsteriskSCF::Media::V1::FormatPtr getFormat(int payload)
{
return mServant->getFormat(payload);
}
- int getPayload(const AsteriskSCF::Media::V1::FormatPtr& format)
+ int getPayload(const AsteriskSCF::Media::V1::FormatPtr& format)
{
return mServant->getPayload(format);
}
@@ -466,7 +512,8 @@ RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter,
const PJMEDIAEnvironmentPtr& env,
const RTPServiceLocatorParamsPtr& params,
const RTPReplicationContextPtr& replicationContext,
- const ConfigurationServiceImplPtr& configurationService) :
+ const ConfigurationServiceImplPtr& configurationService) :
+ mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
mEnvironment(env),
mEndpoint(PJMEDIAEndpoint::create(env)),
mId(id),
@@ -594,42 +641,71 @@ std::string RTPSessionImpl::getId(const Ice::Current&)
return mId;
}
-/**
- * Local implementation.
+/**
+ * Local implementation.
*/
void RTPSessionImpl::setCookies(const AsteriskSCF::Media::V1::SessionCookieDict& cookieMap)
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
- mSessionStateItem->cookies = cookieMap;
+ mCookies = cookieMap;
+
+ if (mSessionStateItem)
+ {
+ mSessionStateItem->cookies = cookieMap;
+ }
}
-/**
- * Support for the corresponding API call.
+/**
+ * Support for the corresponding API call.
*/
-void RTPSessionImpl::setCookies(const AsteriskSCF::Media::V1::SessionCookies& cookies,
- const Ice::Current&)
+void RTPSessionImpl::setCookies(
+ const AsteriskSCF::System::V1::OperationContextPtr& context,
+ const AsteriskSCF::Media::V1::SessionCookies& cookies,
+ const Ice::Current&)
{
- { // scope the lock
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- for (AsteriskSCF::Media::V1::SessionCookies::const_iterator i = cookies.begin();
- i != cookies.end(); ++i)
+ ContextDataPtr data = checkAndThrow(mOperationContextCache, context);
+ if (!data)
+ {
+ // retry detected
+ return;
+ }
+
+ try
+ {
+ { // scope the lock
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ for (AsteriskSCF::Media::V1::SessionCookies::const_iterator i = cookies.begin();
+ i != cookies.end(); ++i)
+ {
+ mCookies[(*i)->ice_id()] = (*i);
+ }
+ }
+
+ if (mReplicationContext->isReplicating() == true)
{
- mSessionStateItem->cookies[(*i)->ice_id()] = (*i);
+ mSessionStateItem->cookies = mCookies;
+ replicateState(mSessionStateItem, 0, 0, 0, 0);
}
+ data->setCompleted();
}
-
- if (mReplicationContext->isReplicating() == true)
+ catch (const std::exception& e)
{
- replicateState(mSessionStateItem, 0, 0, 0, 0);
+ data->setException(e);
+ throw;
+ }
+ catch (...)
+ {
+ data->setException();
+ throw;
}
}
-/**
- * Implementation of the corresponding API call.
+/**
+ * Implementation of the corresponding API call.
*/
void RTPSessionImpl::getCookies_async(
const AsteriskSCF::Media::V1::AMD_Session_getCookiesPtr &cb,
- const AsteriskSCF::Media::V1::SessionCookies& cookiesToGet,
+ const AsteriskSCF::Media::V1::SessionCookies& cookiesToGet,
const Ice::Current&)
{
AsteriskSCF::Media::V1::SessionCookies results;
@@ -639,9 +715,9 @@ void RTPSessionImpl::getCookies_async(
i != cookiesToGet.end();
++i)
{
- AsteriskSCF::Media::V1::SessionCookieDict::const_iterator cookie = mSessionStateItem->cookies.find((*i)->ice_id());
+ AsteriskSCF::Media::V1::SessionCookieDict::const_iterator cookie = mCookies.find((*i)->ice_id());
- if (cookie == mSessionStateItem->cookies.end())
+ if (cookie == mCookies.end())
{
continue;
}
@@ -652,24 +728,48 @@ void RTPSessionImpl::getCookies_async(
cb->ice_response(results);
}
-/**
- * Implementation of the corresponding API call.
+/**
+ * Implementation of the corresponding API call.
*/
-void RTPSessionImpl::removeCookies(const AsteriskSCF::Media::V1::SessionCookies& cookies,
- const Ice::Current&)
+void RTPSessionImpl::removeCookies(
+ const AsteriskSCF::System::V1::OperationContextPtr& context,
+ const AsteriskSCF::Media::V1::SessionCookies& cookies,
+ const Ice::Current&)
{
- { // scope the lock
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- for (AsteriskSCF::Media::V1::SessionCookies::const_iterator i = cookies.begin();
- i != cookies.end(); ++i)
+ ContextDataPtr data = checkAndThrow(mOperationContextCache, context);
+ if (!data)
+ {
+ // retry detected
+ return;
+ }
+
+ try
+ {
+ { // scope the lock
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ for (AsteriskSCF::Media::V1::SessionCookies::const_iterator i = cookies.begin();
+ i != cookies.end(); ++i)
+ {
+ mCookies.erase((*i)->ice_id());
+ }
+ }
+
+ if (mReplicationContext->isReplicating() == true)
{
- mSessionStateItem->cookies.erase((*i)->ice_id());
+ mSessionStateItem->cookies = mCookies;
+ replicateState(mSessionStateItem, 0, 0, 0, 0);
}
+ data->setCompleted();
}
-
- if (mReplicationContext->isReplicating() == true)
+ catch (const std::exception& e)
{
- replicateState(mSessionStateItem, 0, 0, 0, 0);
+ data->setException(e);
+ throw;
+ }
+ catch (...)
+ {
+ data->setException();
+ throw;
}
}
@@ -709,34 +809,97 @@ void RTPSessionImpl::release(const Ice::Current&)
/**
* Implementation of the associatePayloads method as defined in MediaRTPIf.ice
*/
-void RTPSessionImpl::associatePayloads(const AsteriskSCF::Media::RTP::V1::PayloadMap& mappings, const Ice::Current&)
+void RTPSessionImpl::associatePayloads(const AsteriskSCF::System::V1::OperationContextPtr& context, const AsteriskSCF::Media::RTP::V1::PayloadMap& mappings, const Ice::Current&)
{
- mSessionStateItem->payloadstoFormats = mappings;
- associatePayloadsImpl(mappings);
+ ContextDataPtr data = checkAndThrow(mOperationContextCache, context);
+ if (!data)
+ {
+ // retry detected
+ return;
+ }
- // Only the session has changed so push a single update out for it
- replicateState(mSessionStateItem, 0, 0, 0, 0);
+ try
+ {
+ mSessionStateItem->payloadstoFormats = mappings;
+ associatePayloadsImpl(mappings);
+
+ // Only the session has changed so push a single update out for it
+ replicateState(mSessionStateItem, 0, 0, 0, 0);
+ data->setCompleted();
+ }
+ catch (const std::exception& e)
+ {
+ data->setException(e);
+ throw;
+ }
+ catch (...)
+ {
+ data->setException();
+ throw;
+ }
}
-void RTPSessionImpl::setOptions(const string& suiteName, const string& key, bool enableAuthentication, bool enableEncryption, const Ice::Current&)
+void RTPSessionImpl::setOptions(const AsteriskSCF::System::V1::OperationContextPtr& context, const string& suiteName, const string& key, bool enableAuthentication, bool enableEncryption, const Ice::Current&)
{
- SRTPTransportPtr srtpTransport(boost::dynamic_pointer_cast<SRTPTransport>(mTransport));
- if (!srtpTransport)
+ ContextDataPtr data = checkAndThrow(mOperationContextCache, context);
+ if (!data)
{
- throw SRTPUnavailable();
+ // retry detected
+ return;
+ }
+
+ try
+ {
+ SRTPTransportPtr srtpTransport(boost::dynamic_pointer_cast<SRTPTransport>(mTransport));
+ if (!srtpTransport)
+ {
+ throw SRTPUnavailable();
+ }
+ srtpTransport->setOptions(suiteName, key, enableAuthentication, enableEncryption);
+ data->setCompleted();
+ }
+ catch (const std::exception& e)
+ {
+ data->setException(e);
+ throw;
+ }
+ catch (...)
+ {
+ data->setException();
+ throw;
}
- srtpTransport->setOptions(suiteName, key, enableAuthentication, enableEncryption);
}
-void RTPSessionImpl::start(const string& suiteName, const string& key, bool enableAuthentication, bool enableEncryption, const Ice::Current&)
+void RTPSessionImpl::start(const AsteriskSCF::System::V1::OperationContextPtr& context, const string& suiteName, const string& key, bool enableAuthentication, bool enableEncryption, const Ice::Current&)
{
- SRTPTransportPtr srtpTransport(boost::dynamic_pointer_cast<SRTPTransport>(mTransport));
- if (!srtpTransport)
+ ContextDataPtr data = checkAndThrow(mOperationContextCache, context);
+ if (!data)
{
- throw SRTPUnavailable();
+ // retry detected
+ return;
+ }
+
+ try
+ {
+ SRTPTransportPtr srtpTransport(boost::dynamic_pointer_cast<SRTPTransport>(mTransport));
+ if (!srtpTransport)
+ {
+ throw SRTPUnavailable();
+ }
+ srtpTransport->start(suiteName, key, enableAuthentication, enableEncryption);
+ data->setCompleted();
+ }
+ catch (const std::exception& e)
+ {
+ data->setException(e);
+ throw;
+ }
+ catch (...)
+ {
+ data->setException();
+ throw;
}
- srtpTransport->start(suiteName, key, enableAuthentication, enableEncryption);
}
/**
@@ -927,7 +1090,7 @@ void RTPSessionImpl::replicateState(
return;
}
- mReplicationContext->getReplicator().tryOneWay()->setState(items);
+ mReplicationContext->getReplicator().tryOneWay()->setState(createContext(), items);
}
/**
@@ -964,7 +1127,7 @@ void RTPSessionImpl::removeState(const RTPSessionStateItemPtr& session, const RT
return;
}
- mReplicationContext->getReplicator().tryOneWay()->removeState(items);
+ mReplicationContext->getReplicator().tryOneWay()->removeState(createContext(), items);
}
void RTPSessionImpl::associatePayloadsImpl(const AsteriskSCF::Media::RTP::V1::PayloadMap& mappings)
@@ -1033,9 +1196,9 @@ RTPSessionPrx RTPSessionImpl::activate(
outputs->eventSinks.push_back(mTelephonyEventSinkPrx);
}
- mRtcpSessionInterface = new RTCPSessionImpl(this);
- mReceiverReport = new RTCPInformationImpl(&mRtcpSession.stat, &mRtcpSession.stat.rx);
- mSenderReport = new RTCPInformationImpl(&mRtcpSession.stat, &mRtcpSession.stat.tx);
+ mRtcpSessionInterface = new RTCPSessionImpl(this, mOperationContextCache);
+ mReceiverReport = new RTCPInformationImpl(&mRtcpSession.stat, &mRtcpSession.stat.rx, mOperationContextCache);
+ mSenderReport = new RTCPInformationImpl(&mRtcpSession.stat, &mRtcpSession.stat.tx, mOperationContextCache);
mStreamSourceProxy = StreamSourceRTPPrx::uncheckedCast(mAdapter->add(mStreamSource, sourceId));
mStreamSinkProxy = StreamSinkRTPPrx::uncheckedCast(mAdapter->add(mStreamSink, sinkId));
mAdapter->addFacet(mRtcpSessionInterface, id, RTCP::V1::SessionFacet);
@@ -1146,10 +1309,13 @@ private:
RTPSessionImplPtr mImpl;
};
+/**
+ * Factory method used by active component.
+ */
RTPSessionPrx AsteriskSCF::PJMEDIARTP::RTPSession::create(const Ice::ObjectAdapterPtr& adapter,
const std::string& id, const RTPServiceLocatorParamsPtr& params,
- const PJMEDIAEnvironmentPtr& environment,
- const RTPReplicationContextPtr& replicationContext,
+ const PJMEDIAEnvironmentPtr& environment,
+ const RTPReplicationContextPtr& replicationContext,
const ConfigurationServiceImplPtr& configuration,
const RTPOptionsPtr& options,
RTPAllocationOutputsPtr& outputs)
@@ -1159,19 +1325,22 @@ RTPSessionPrx AsteriskSCF::PJMEDIARTP::RTPSession::create(const Ice::ObjectAdapt
return servant->activate(id, options, outputs);
}
+/**
+ * Factory method used by state replicator.
+ */
ReplicationAdapterPtr AsteriskSCF::PJMEDIARTP::RTPSession::create(const Ice::ObjectAdapterPtr& adapter,
- const PJMEDIAEnvironmentPtr& environment,
+ const PJMEDIAEnvironmentPtr& environment,
const RTPSessionStateItemPtr& item,
- const RTPReplicationContextPtr& replicationContext,
+ const RTPReplicationContextPtr& replicationContext,
const ConfigurationServiceImplPtr& configuration,
const RTPOptionsPtr& options,
RTPAllocationOutputsPtr& outputs)
{
- RTPSessionImplPtr servant(new RTPSessionImpl(adapter,
- adapter->getCommunicator()->identityToString(item->sessionIdentity),
+ RTPSessionImplPtr servant(new RTPSessionImpl(adapter,
+ adapter->getCommunicator()->identityToString(item->sessionIdentity),
environment,
item->port, item->formats, item->ipv6, item->srtp,
- replicationContext,
+ replicationContext,
configuration));
servant->setCookies(item->cookies);
diff --git a/src/RTPSink.cpp b/src/RTPSink.cpp
index 9ea743c..72c3f5f 100644
--- a/src/RTPSink.cpp
+++ b/src/RTPSink.cpp
@@ -1,7 +1,7 @@
/*
* Asterisk SCF -- An open-source communications framework.
*
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2012, Digium, Inc.
*
* See http://www.asterisk.org for more information about
* the Asterisk SCF project. Please do not directly contact
@@ -26,24 +26,28 @@
#include "RTPStateReplicationIf.h"
#include "RTPTelephonyEventSink.h"
+#include <boost/asio/detail/socket_ops.hpp>
+
#include <Ice/Ice.h>
#include <IceUtil/UUID.h>
-#include <boost/asio/detail/socket_ops.hpp>
#include <pjlib.h>
#include <pjmedia.h>
#include <AsteriskSCF/Media/MediaIf.h>
#include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
#include <AsteriskSCF/System/Component/ReplicaIf.h>
-using namespace std;
using namespace AsteriskSCF::Core::Discovery::V1;
-using namespace AsteriskSCF::Media::V1;
using namespace AsteriskSCF::Media::RTP::V1;
+using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Operations;
using namespace AsteriskSCF::PJMEDIARTP;
using namespace AsteriskSCF::Replication::MediaRTPPJMEDIA::V1;
using namespace AsteriskSCF::SessionCommunications::V1;
+using namespace std;
/**
* Private implementation details for the StreamSinkRTPImpl class.
@@ -55,10 +59,14 @@ public:
* Constructor for our StreamSinkRTPImplPriv class.
*/
StreamSinkRTPImplPriv(
- const SessionAdapterPtr& sessionAdapter,
+ const SessionAdapterPtr& sessionAdapter,
const PJMEDIATransportPtr& transport,
const std::string&);
+ boost::shared_mutex mMutex;
+
+ OperationContextCachePtr mOperationContextCache;
+
/**
* A structure containing outgoing pjmedia session data.
*/
@@ -84,18 +92,19 @@ public:
/**
* The parent session's id.
*/
- string mSessionId;
+ const string mSessionId;
};
/**
* Constructor for the StreamSinkRTPImplPriv class.
*/
StreamSinkRTPImplPriv::StreamSinkRTPImplPriv(
- const SessionAdapterPtr& session,
+ const SessionAdapterPtr& session,
const PJMEDIATransportPtr& transport,
const string& sessionId) :
- mSessionAdapter(session), mTransport(transport),
- mSinkStateItem(new RTPStreamSinkStateItem),
+ mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
+ mSessionAdapter(session), mTransport(transport),
+ mSinkStateItem(new RTPStreamSinkStateItem),
mSessionId(sessionId)
{
pjmedia_rtp_session_init(&mOutgoingSession, 0, pj_rand());
@@ -117,6 +126,7 @@ StreamSinkRTPImpl::StreamSinkRTPImpl(
TelephonyEventSinkPrx StreamSinkRTPImpl::createTelephonyEventSink(Ice::ObjectAdapterPtr& adapter, const Ice::Identity& id)
{
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
mImpl->mTelephonyEventSink =
new RTPTelephonyEventSink(
&mImpl->mOutgoingSession,
@@ -129,6 +139,7 @@ TelephonyEventSinkPrx StreamSinkRTPImpl::createTelephonyEventSink(Ice::ObjectAda
RTPTelephonyEventSinkPtr StreamSinkRTPImpl::getTelephonyEventSink()
{
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
return mImpl->mTelephonyEventSink;
}
@@ -137,6 +148,7 @@ RTPTelephonyEventSinkPtr StreamSinkRTPImpl::getTelephonyEventSink()
*/
void StreamSinkRTPImpl::write(const AsteriskSCF::Media::V1::FrameSeq& frames, const Ice::Current&)
{
+// boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
// Don't even bother if no remote address information is present
if (mImpl->mSinkStateItem->remoteAddress.empty() || !mImpl->mSinkStateItem->remotePort)
{
@@ -212,8 +224,17 @@ void StreamSinkRTPImpl::write(const AsteriskSCF::Media::V1::FrameSeq& frames, co
/**
* Implementation of the setSource method as defined in MediaIf.ice
*/
-void StreamSinkRTPImpl::setSource(const AsteriskSCF::Media::V1::StreamSourcePrx& source, const Ice::Current&)
+void StreamSinkRTPImpl::setSource(
+ const AsteriskSCF::System::V1::OperationContextPtr& context,
+ const AsteriskSCF::Media::V1::StreamSourcePrx& source, const Ice::Current&)
{
+ if (!mImpl->mOperationContextCache->addOperationContext(context))
+ {
+ std::clog << "!!! RETRY !!!\n";
+ // retry detected
+ return;
+ }
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
mImpl->mSinkStateItem->source = source;
mImpl->mSessionAdapter->replicateState(mImpl->mSinkStateItem);
@@ -224,6 +245,7 @@ void StreamSinkRTPImpl::setSource(const AsteriskSCF::Media::V1::StreamSourcePrx&
*/
AsteriskSCF::Media::V1::StreamSourcePrx StreamSinkRTPImpl::getSource(const Ice::Current&)
{
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
return mImpl->mSinkStateItem->source;
}
@@ -232,6 +254,7 @@ AsteriskSCF::Media::V1::StreamSourcePrx StreamSinkRTPImpl::getSource(const Ice::
*/
AsteriskSCF::Media::V1::FormatSeq StreamSinkRTPImpl::getFormats(const Ice::Current&)
{
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
return mImpl->mSessionAdapter->getFormats();
}
@@ -240,6 +263,7 @@ AsteriskSCF::Media::V1::FormatSeq StreamSinkRTPImpl::getFormats(const Ice::Curre
*/
std::string StreamSinkRTPImpl::getId(const Ice::Current&)
{
+ // Immutable; no lock needed
/* For now utilize the id of the session */
return mImpl->mSessionId;
}
@@ -247,20 +271,45 @@ std::string StreamSinkRTPImpl::getId(const Ice::Current&)
/**
* Implementation of the setRemoteDetails method as defined in MediaRTPIf.ice
*/
-void StreamSinkRTPImpl::setRemoteDetails(const string& address, Ice::Int port, const Ice::Current&)
+void StreamSinkRTPImpl::setRemoteDetails(const AsteriskSCF::System::V1::OperationContextPtr& context,
+ const string& address, Ice::Int port, const Ice::Current&)
{
- /* This method is essentially a passthru to the RTPSourceImpl. It takes care of
- * actually attaching the transport.
- */
- mImpl->mSessionAdapter->setRemoteDetails(address, port);
+ ContextDataPtr data = checkAndThrow(mImpl->mOperationContextCache, context);
- /* We do store it though in case we have not yet received a packet from the remote side but
- * are asked for the remote address. It is also stored for replication purposes.
- */
- mImpl->mSinkStateItem->remoteAddress = address;
- mImpl->mSinkStateItem->remotePort = port;
+ if (!data)
+ {
+ std::clog << "!!! RETRY !!!\n";
+ // retry detected
+ return;
+ }
- mImpl->mSessionAdapter->replicateState(mImpl->mSinkStateItem);
+ try
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
+ /* This method is essentially a passthru to the RTPSourceImpl. It takes care of
+ * actually attaching the transport.
+ */
+ mImpl->mSessionAdapter->setRemoteDetails(address, port);
+
+ /* We do store it though in case we have not yet received a packet from the remote side but
+ * are asked for the remote address. It is also stored for replication purposes.
+ */
+ mImpl->mSinkStateItem->remoteAddress = address;
+ mImpl->mSinkStateItem->remotePort = port;
+
+ mImpl->mSessionAdapter->replicateState(mImpl->mSinkStateItem);
+ data->setCompleted();
+ }
+ catch (const std::exception& e)
+ {
+ data->setException(e);
+ throw;
+ }
+ catch (...)
+ {
+ data->setException();
+ throw;
+ }
}
/**
@@ -268,6 +317,7 @@ void StreamSinkRTPImpl::setRemoteDetails(const string& address, Ice::Int port, c
*/
std::string StreamSinkRTPImpl::getRemoteAddress(const Ice::Current&)
{
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
if (mImpl->mTransport && mImpl->mTransport->remoteAddress())
{
string address = mImpl->mTransport->remoteAddress()->hostname();
@@ -281,6 +331,7 @@ std::string StreamSinkRTPImpl::getRemoteAddress(const Ice::Current&)
*/
Ice::Int StreamSinkRTPImpl::getRemotePort(const Ice::Current&)
{
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
if (mImpl->mTransport && mImpl->mTransport->remoteAddress())
{
int port = mImpl->mTransport->remoteAddress()->port();
@@ -294,11 +345,13 @@ Ice::Int StreamSinkRTPImpl::getRemotePort(const Ice::Current&)
*/
RTPStreamSinkStateItemPtr StreamSinkRTPImpl::getStateItem()
{
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
return mImpl->mSinkStateItem;
}
RTPTelephonyEventSinkStateItemPtr StreamSinkRTPImpl::getTelephonyEventSinkStateItem()
{
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
if (mImpl->mTelephonyEventSink)
{
return mImpl->mTelephonyEventSink->getStateItem();
@@ -308,17 +361,20 @@ RTPTelephonyEventSinkStateItemPtr StreamSinkRTPImpl::getTelephonyEventSinkStateI
void StreamSinkRTPImpl::setRemoteDetailsImpl(const std::string& host, Ice::Int port)
{
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
mImpl->mSinkStateItem->remoteAddress = host;
mImpl->mSinkStateItem->remotePort = port;
}
void StreamSinkRTPImpl::setSourceImpl(const AsteriskSCF::Media::V1::StreamSourcePrx& proxy)
{
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
mImpl->mSinkStateItem->source = proxy;
}
Ice::ByteSeq StreamSinkRTPImpl::encodeAudioPayload(const FramePayloadPtr& toEncode, const AudioFormatPtr& audioFormat)
{
+ // mImpl->mMutex should already be locked
if (audioFormat->sampleSize == 8)
{
ByteSeqPayloadPtr bytePayload = ByteSeqPayloadPtr::dynamicCast(toEncode);
@@ -351,6 +407,7 @@ Ice::ByteSeq StreamSinkRTPImpl::encodeAudioPayload(const FramePayloadPtr& toEnco
Ice::ByteSeq StreamSinkRTPImpl::encodeVideoPayload(const FramePayloadPtr& toEncode, const VideoFormatPtr&)
{
+ // no shared state, no need to lock
... 1175 lines suppressed ...
--
asterisk-scf/release/media_rtp_pjmedia.git
More information about the asterisk-scf-commits
mailing list