[asterisk-scf-commits] asterisk-scf/integration/media_rtp_pjmedia.git branch "retry_deux" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Mon Apr 9 16:27:15 CDT 2012
branch "retry_deux" has been updated
via 48cccf1cb79f3a76c2dc572996486f7e667ad664 (commit)
from cb5c5931ba2a559c17ba6fb47356af22524c9a0e (commit)
Summary of changes:
src/Component.cpp | 68 ++++++--
src/ICETransport.cpp | 80 +++++----
src/RTPConfiguration.cpp | 129 ++++++++------
src/RTPSession.cpp | 342 +++++++++++++++++++++++++----------
src/RTPSink.cpp | 94 ++++++++--
src/RTPSource.cpp | 28 ++--
src/RTPStateReplicatorListener.cpp | 47 +++--
src/RTPTelephonyEventSink.cpp | 40 ++++-
src/RTPTelephonyEventSink.h | 5 +-
src/RTPTelephonyEventSource.cpp | 2 +
test/TestRTPpjmedia.cpp | 90 +++++++++-
11 files changed, 658 insertions(+), 267 deletions(-)
- Log -----------------------------------------------------------------
commit 48cccf1cb79f3a76c2dc572996486f7e667ad664
Author: David M. Lee <dlee at digium.com>
Date: Fri Apr 6 15:46:43 2012 -0500
Retry logic, thready safety, formatting.
diff --git a/src/Component.cpp b/src/Component.cpp
index 673af44..55bb668 100644
--- a/src/Component.cpp
+++ b/src/Component.cpp
@@ -1,7 +1,7 @@
/*
* Asterisk SCF -- An open-source communications framework.
*
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2012, Digium, Inc.
*
* See http://www.asterisk.org for more information about
* the Asterisk SCF project. Please do not directly contact
@@ -24,39 +24,45 @@
#include <boost/shared_ptr.hpp>
#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Component/Component.h>
#include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
+#include <AsteriskSCF/Discovery/SmartProxy.h>
+#include <AsteriskSCF/Logger.h>
+#include <AsteriskSCF/Logger/IceLogger.h>
#include <AsteriskSCF/Media/MediaIf.h>
#include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
-#include <AsteriskSCF/System/Component/ConfigurationIf.h>
-#include <AsteriskSCF/Logger/IceLogger.h>
-#include <AsteriskSCF/Logger.h>
-#include <AsteriskSCF/Discovery/SmartProxy.h>
-#include <AsteriskSCF/Component/Component.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
#include <AsteriskSCF/PJLIB/ThreadHook.h>
+#include <AsteriskSCF/System/Component/ConfigurationIf.h>
+#include "PJMEDIAEnvironment.h"
+#include "RTPConfiguration.h"
+#include "RTPConfigurationIf.h"
#include "RTPReplicationContext.h"
#include "RTPSession.h"
#include "RTPStateReplicator.h"
-#include "RTPConfiguration.h"
-#include "RTPConfigurationIf.h"
-#include "PJMEDIAEnvironment.h"
-using namespace std;
+using namespace AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1;
using namespace AsteriskSCF::Core::Discovery::V1;
-using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Discovery;
using namespace AsteriskSCF::Media::RTP::V1;
+using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Operations;
+using namespace AsteriskSCF::PJMEDIARTP;
using namespace AsteriskSCF::Replication::MediaRTPPJMEDIA::V1;
-using namespace AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1;
-using namespace AsteriskSCF::System::Configuration::V1;
+using namespace AsteriskSCF::Replication;
using namespace AsteriskSCF::System::Component::V1;
+using namespace AsteriskSCF::System::Configuration::V1;
using namespace AsteriskSCF::System::Logging;
-using namespace AsteriskSCF::Discovery;
-using namespace AsteriskSCF::Replication;
-using namespace AsteriskSCF::PJMEDIARTP;
+using namespace std;
namespace
{
Logger lg = getLoggerFactory().getLogger("AsteriskSCF.MediaRTP");
+
+typedef ContextResultData<RTPSessionPrx> AllocateResultData;
+typedef boost::shared_ptr<AllocateResultData> AllocateResultDataPtr;
+
}
static const string ReplicaServiceId("MediaRTPReplica");
@@ -88,6 +94,7 @@ public:
}
private:
+ OperationContextCachePtr mOperationContextCache;
Ice::ObjectAdapterPtr mAdapter;
PJMEDIAEnvironmentPtr mEnvironment;
RTPReplicationContextPtr mReplicationContext;
@@ -252,6 +259,7 @@ void Component::onResume()
RTPMediaServiceImpl::RTPMediaServiceImpl(const Ice::ObjectAdapterPtr& adapter,
const RTPReplicationContextPtr& replicationContext,
const ConfigurationServiceImplPtr& configurationService) :
+ mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
mAdapter(adapter),
mEnvironment(PJMEDIAEnvironment::create(adapter->getCommunicator()->getProperties(), configurationService)),
mReplicationContext(replicationContext),
@@ -263,13 +271,24 @@ RTPMediaServiceImpl::RTPMediaServiceImpl(const Ice::ObjectAdapterPtr& adapter,
* Implementation of the allocate method as defined in MediaRTPIf.ice
*/
RTPSessionPrx RTPMediaServiceImpl::allocate(
- const AsteriskSCF::System::V1::OperationContextPtr&,
+ const AsteriskSCF::System::V1::OperationContextPtr& context,
const RTPServiceLocatorParamsPtr& params,
const RTPOptionsPtr& options,
RTPAllocationOutputsPtr& outputs,
const Ice::Current&)
{
- return AsteriskSCF::PJMEDIARTP::RTPSession::create(
+ std::pair<bool, AllocateResultDataPtr> data =
+ getContextSync<AllocateResultDataPtr>(mOperationContextCache, context);
+
+ if (data.first)
+ {
+ // retry detected
+ return data.second->getResult();
+ }
+
+ try
+ {
+ RTPSessionPrx r = AsteriskSCF::PJMEDIARTP::RTPSession::create(
mAdapter,
IceUtil::generateUUID(),
params,
@@ -278,6 +297,19 @@ RTPSessionPrx RTPMediaServiceImpl::allocate(
mConfigurationService,
options,
outputs);
+ data.second->setResult(r);
+ return r;
+ }
+ catch (const std::exception& e)
+ {
+ data.second->setException(e);
+ throw;
+ }
+ catch (...)
+ {
+ data.second->setException();
+ throw;
+ }
}
void Component::onPreInitialize()
diff --git a/src/ICETransport.cpp b/src/ICETransport.cpp
index 9553281..468bb30 100755
--- a/src/ICETransport.cpp
+++ b/src/ICETransport.cpp
@@ -14,39 +14,38 @@
* at the top of the source tree.
*/
-#include "ICETransport.h"
-#include "PJUtil.h"
-
-#include <pjmedia.h>
#include <pjlib.h>
+#include <pjmedia.h>
#include <pjnath.h>
-#include <AsteriskSCF/System/ExceptionsIf.h>
#include <map>
+#include <sstream>
+
#include <boost/thread.hpp>
-#include <boost/thread/shared_mutex.hpp>
-#include <AsteriskSCF/System/NAT/NATTraversalIf.h>
#include <Ice/Ice.h>
-#include <sstream>
-#include <AsteriskSCF/Logger.h>
#include <IceUtil/UUID.h>
+#include <AsteriskSCF/Logger.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
+#include <AsteriskSCF/System/ExceptionsIf.h>
+#include <AsteriskSCF/System/NAT/NATTraversalIf.h>
+
+#include "ICETransport.h"
+#include "PJUtil.h"
+
+using namespace AsteriskSCF::Helpers;
+using namespace AsteriskSCF::Operations;
using namespace AsteriskSCF::PJMEDIARTP;
-using namespace AsteriskSCF::System::V1;
using namespace AsteriskSCF::PJUtil;
-using namespace std;
-using namespace AsteriskSCF::Helpers;
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::System::NAT::V1;
+using namespace AsteriskSCF::System::V1;
+using namespace std;
namespace
{
Logger logger = getLoggerFactory().getLogger("AsteriskSCF.MediaRTP");
-}
-
-namespace
-{
class ICEAgentImpl : public InteractiveConnectionAgent
{
@@ -54,6 +53,7 @@ public:
ICEAgentImpl(const Ice::ObjectAdapterPtr& adapter, const Ice::Identity& id, const PJMEDIAEnvironmentPtr& env,
const PJMEDIAEndpointPtr& ep) :
+ mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
mAdapter(adapter),
mId(id),
mShuttingDown(false),
@@ -84,11 +84,22 @@ public:
return mRole;
}
+ typedef AMDContextResultData<CandidatePtr, AMD_InteractiveConnectionAgent_negotiatePtr> NegotiateContextData;
+ typedef boost::shared_ptr<NegotiateContextData> NegotiateContextDataPtr;
+
void negotiate_async(const AMD_InteractiveConnectionAgent_negotiatePtr& callback,
- const AsteriskSCF::System::V1::OperationContextPtr&,
+ const AsteriskSCF::System::V1::OperationContextPtr& context,
const string& hostname, Ice::Int port, const CandidateSeq& candidates,
const Ice::Current&)
{
+ NegotiateContextDataPtr data =
+ getContext<NegotiateContextDataPtr>(mOperationContextCache, context, callback);
+
+ if (!data)
+ {
+ // retry detected
+ return;
+ }
boost::unique_lock<boost::shared_mutex> lock(mLock);
stateCheck();
@@ -100,12 +111,12 @@ public:
// TODO: are we going to support cancellable negotiations.
//
}
- mCurrentNegotiation = callback;
+ mCurrentNegotiation = data->getProxy();
//
- // So how this works is we create a remote SDP and call pjmedia_transport_start() easy peasy. (Same deal
+ // So how this works is we create a remote SDP and call pjmedia_transport_start() easy peasy. (Same deal
//
- pjmedia_sdp_session* remoteSDPSession =
+ pjmedia_sdp_session* remoteSDPSession =
static_cast<pjmedia_sdp_session*>(pj_pool_zalloc(mEnv->memoryPool(), sizeof(pjmedia_sdp_session)));
@@ -179,7 +190,7 @@ public:
//
// I was concerned about the fact that for a given SIP session, there might be multiple media
- // streams and multiple candidates. I'm not sure that its actually too much of an issue even
+ // streams and multiple candidates. I'm not sure that its actually too much of an issue even
// if multiple media types are muxed on a single ICE negotiated flow, but there will need to be
// some redesign to pull in the multiple media streams associated with the session. For the moment
// we will operation under the premise that we are dealing with a single media stream.
@@ -192,7 +203,7 @@ public:
{
CandidatePtr candidate = *i;
ostringstream os;
- os << "candidate:" << candidate->foundation << ' ' << candidate->componentId << " UDP " <<
+ os << "candidate:" << candidate->foundation << ' ' << candidate->componentId << " UDP " <<
candidate->priority << ' ' << candidate->mappedAddress << ' ' << candidate->mappedPort << " typ ";
string hostType;
switch (candidate->type)
@@ -217,7 +228,7 @@ public:
}
string t = os.str();
pj_str_t candidateStr = pj_str(const_cast<char*>(t.c_str()));
- pjmedia_sdp_attr* newAttribute = pjmedia_sdp_attr_create(mEnv->memoryPool(),
+ pjmedia_sdp_attr* newAttribute = pjmedia_sdp_attr_create(mEnv->memoryPool(),
"candidate", &candidateStr);
pjmedia_sdp_attr_add(¤tMedia->attr_count, currentMedia->attr, newAttribute);
}
@@ -426,7 +437,7 @@ public:
candidateObj->mappedAddress = candidateObj->baseAddress;
candidateObj->mappedPort = candidateObj->basePort;
candidateObj->baseAddress = baseAddress;
- candidateObj->basePort = basePort;
+ candidateObj->basePort = basePort;
}
else
{
@@ -494,6 +505,7 @@ public:
private:
boost::shared_mutex mLock;
+ OperationContextCachePtr mOperationContextCache;
Ice::ObjectAdapterPtr mAdapter;
Ice::Identity mId;
bool mShuttingDown;
@@ -554,7 +566,7 @@ boost::shared_mutex ICECallbackAdapter::mLock;
// invoked before we get a chance to add the transport. The solution to that is to allow an entry to be created when
// the ICE completion callback arrives and there isn't a table entry. When the addEntry runs, it will see the entry and
// simply update the appropriate field.
-//
+//
void ICECallbackAdapter::addEntry(pjmedia_transport* transport, const ICETransportPtr& callback)
{
bool alreadyKnown = false;
@@ -627,7 +639,7 @@ void ICECallbackAdapter::onICEComplete(pjmedia_transport* transport, pj_ice_stra
{
//
// AFAICT, only PJ_ICE_STRANS_OP_NEGOTIATION should get here.
- //
+ //
switch (operation)
{
case PJ_ICE_STRANS_OP_INIT:
@@ -685,18 +697,18 @@ void ICECallbackAdapter::onICEComplete(pjmedia_transport* transport, pj_ice_stra
case PJ_ICE_STRANS_OP_KEEP_ALIVE:
//
// Keep alive has successfully completed. FWICT this should not get here.
- //
+ //
break;
- };
+ }
}
-}
+} // anonymous namespace
ICETransport::~ICETransport()
{
//
// TODO : cleanup ICE transport, the transport itself is closed by the parent class.
- //
+ //
ICECallbackAdapter::removeEntry(mTransport);
}
@@ -729,7 +741,7 @@ void ICETransport::onSetupComplete(pjmedia_transport* transport, int status)
{
//
// Address has changed! We need to let Session listeners know!
- // TODO!
+ // TODO!
//
pj_memcpy(mLastKnownAddr.get(), &info.sock_info.rtp_addr_name, sizeof(pj_sockaddr));
}
@@ -739,7 +751,7 @@ void ICETransport::onSetupComplete(pjmedia_transport* transport, int status)
mMonitor.notify_one();
}
-AddressPtr ICETransport::localAddress()
+AddressPtr ICETransport::localAddress()
{
boost::unique_lock<boost::mutex> lock(mLock);
if (mLocalAddress)
@@ -753,7 +765,7 @@ AddressPtr ICETransport::localAddress()
return mLocalAddress;
}
-AddressPtr ICETransport::remoteAddress()
+AddressPtr ICETransport::remoteAddress()
{
boost::unique_lock<boost::mutex> lock(mLock);
return mRemoteAddress;
@@ -792,7 +804,7 @@ void ICETransport::start()
PJICECallbackPtr callback(new pjmedia_ice_cb);
callback->on_ice_complete = &ICECallbackAdapter::onICEComplete;
NATModulePtr natModule = NATModule::create(mConfig, mEndpoint);
- pj_status_t result = pjmedia_ice_create(mEndpoint->endpoint(), "ASCF_ICE_MEDIA", (mEnableRTCP ? 2 : 1),
+ pj_status_t result = pjmedia_ice_create(mEndpoint->endpoint(), "ASCF_ICE_MEDIA", (mEnableRTCP ? 2 : 1),
natModule->configuration(), callback.get(), &t);
if (fail(result))
{
diff --git a/src/RTPConfiguration.cpp b/src/RTPConfiguration.cpp
index 69d3fc8..178bf22 100644
--- a/src/RTPConfiguration.cpp
+++ b/src/RTPConfiguration.cpp
@@ -1,7 +1,7 @@
/*
* Asterisk SCF -- An open-source communications framework.
*
- * Copyright (C) 2011, Digium, Inc.
+ * Copyright (C) 2011-2012, Digium, Inc.
*
* See http://www.asterisk.org for more information about
* the Asterisk SCF project. Please do not directly contact
@@ -14,19 +14,22 @@
* at the top of the source tree.
*/
-#include "RTPConfigurationIf.h"
-#include "RTPConfiguration.h"
#include <IceUtil/UUID.h>
-#include <AsteriskSCF/System/Component/ConfigurationIf.h>
#include <boost/thread.hpp>
-#include <boost/thread/shared_mutex.hpp>
-using namespace AsteriskSCF::System::Configuration::V1;
+#include <AsteriskSCF/System/Component/ConfigurationIf.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
+
+#include "RTPConfigurationIf.h"
+#include "RTPConfiguration.h"
+
+using namespace AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1;
+using namespace AsteriskSCF::Operations;
using namespace AsteriskSCF::PJMEDIARTP;
+using namespace AsteriskSCF::System::Configuration::V1;
using namespace std;
-using namespace AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1;
/**
* Implementation of the configuration service.
@@ -34,16 +37,17 @@ using namespace AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1;
class ConfigurationServiceServant : virtual public ConfigurationServiceImpl
{
public:
+ ConfigurationServiceServant() : mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)) {}
/**
* AsteriskSCF::System::Configuration::V1 interface. Slice to C++ mapping.
*/
AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq
getConfiguration(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
-
+
AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq
getConfigurationAll(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
-
+
AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq
getConfigurationGroups(const Ice::Current&);
@@ -90,12 +94,12 @@ public:
{
mICEConfig = newConfig;
}
-
+
void replaceConfig(const NATConfigPtr& newConfig)
{
mNATConfig = newConfig;
}
-
+
void replaceConfig(const SRTPConfigurationPtr& newConfig)
{
mSRTPConfig = newConfig;
@@ -121,7 +125,7 @@ public:
//
if (mGeneralGroup)
{
- ConfigurationItemDict::const_iterator item =
+ ConfigurationItemDict::const_iterator item =
mGeneralGroup->configurationItems.find(EnableSRTPItemName);
if (item != mGeneralGroup->configurationItems.end())
{
@@ -150,6 +154,7 @@ public:
}
private:
+ OperationContextCachePtr mOperationContextCache;
/**
* General RTP configuration
*/
@@ -223,7 +228,7 @@ ConfigurationGroupSeq ConfigurationServiceServant::getConfiguration(
void visitRTPICEConfigurationGroup(const RTPICEConfigurationGroupPtr& group)
{
RTPICEConfigurationGroupPtr currentGroup = mImpl->getICEConfigurationGroup();
-
+
if (!currentGroup)
{
return;
@@ -233,19 +238,19 @@ ConfigurationGroupSeq ConfigurationServiceServant::getConfiguration(
returnedGroup->configurationItems);
mGroups.push_back(returnedGroup);
}
-
+
ConfigurationServiceServantPtr mImpl;
ConfigurationGroupSeq& mGroups;
};
-
+
ConfigurationGroupSeq newGroups;
RTPConfigurationGroupVisitorPtr v = new GroupVisitor(this, newGroups);
-
+
for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
{
(*group)->visit(v);
}
-
+
return newGroups;
}
@@ -259,7 +264,7 @@ ConfigurationGroupSeq ConfigurationServiceServant::getConfigurationAll(
mImpl(impl), mGroups(visitorGroups)
{
}
-
+
private:
void visitRTPGeneralGroup(const ::AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1::RTPGeneralGroupPtr&)
{
@@ -278,21 +283,21 @@ ConfigurationGroupSeq ConfigurationServiceServant::getConfigurationAll(
mGroups.push_back(g);
}
}
-
+
ConfigurationServiceServantPtr mImpl;
ConfigurationGroupSeq& mGroups;
};
-
+
ConfigurationGroupSeq newGroups;
RTPConfigurationGroupVisitorPtr v = new GroupVisitor(this, newGroups);
boost::shared_lock<boost::shared_mutex> lock(mLock);
-
+
for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
{
(*group)->visit(v);
}
-
+
return newGroups;
}
@@ -300,7 +305,7 @@ ConfigurationGroupSeq ConfigurationServiceServant::getConfigurationGroups(const
{
ConfigurationGroupSeq groups;
boost::shared_lock<boost::shared_mutex> lock(mLock);
-
+
if (mGeneralGroup)
{
groups.push_back(new RTPGeneralGroup);
@@ -309,21 +314,21 @@ ConfigurationGroupSeq ConfigurationServiceServant::getConfigurationGroups(const
{
groups.push_back(new RTPICEConfigurationGroup);
}
-
+
return groups;
}
-void ConfigurationServiceServant::setConfiguration(const AsteriskSCF::System::V1::OperationContextPtr&, const ConfigurationGroupSeq& groups,
+void ConfigurationServiceServant::setConfiguration(const AsteriskSCF::System::V1::OperationContextPtr& context, const ConfigurationGroupSeq& groups,
const Ice::Current&)
{
class GroupVisitor : public RTPConfigurationGroupVisitor
{
public:
- GroupVisitor(const ConfigurationServiceServantPtr& impl) :
+ GroupVisitor(const ConfigurationServiceServantPtr& impl) :
mImpl(impl)
{
}
-
+
private:
/**
* Helper function which performs serial number checking of items
@@ -340,22 +345,22 @@ void ConfigurationServiceServant::setConfiguration(const AsteriskSCF::System::V1
{
continue;
}
-
+
ConfigurationItemDict::const_iterator localItem = localItems.find(item->first);
-
+
if (localItem == localItems.end())
{
// This is a new item so serial checking does not apply
continue;
}
-
+
if (item->second->serialNumber < localItem->second->serialNumber)
{
throw SerialConflict(group, item->second);
}
}
}
-
+
void visitRTPGeneralGroup(const ::AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1::RTPGeneralGroupPtr& group)
{
RTPGeneralGroupPtr g = mImpl->getGeneralGroup();
@@ -367,7 +372,7 @@ void ConfigurationServiceServant::setConfiguration(const AsteriskSCF::System::V1
{
performSerialCheck(group->configurationItems, g->configurationItems, group);
}
-
+
for (ConfigurationItemDict::const_iterator item = group->configurationItems.begin();
item != group->configurationItems.end();
++item)
@@ -415,7 +420,7 @@ void ConfigurationServiceServant::setConfiguration(const AsteriskSCF::System::V1
mImpl->replaceConfig(ICEConfiguration::create(mMaxCandidates, mMaxCalls));
}
}
-
+
void visitSTUNServerItem(const STUNServerItemPtr& item)
{
@@ -454,11 +459,11 @@ void ConfigurationServiceServant::setConfiguration(const AsteriskSCF::System::V1
mMaxCandidates = item->maxCandidates;
}
}
-
+
private:
ConfigurationServiceServantPtr mImpl;
-
+
bool mCreateRTPICEConfig;
bool mCreateNATConfig;
@@ -481,7 +486,7 @@ void ConfigurationServiceServant::setConfiguration(const AsteriskSCF::System::V1
}
RTPConfigurationItemVisitorPtr v(new Visitor(mImpl));
-
+
for (ConfigurationItemDict::const_iterator item = group->configurationItems.begin();
item != group->configurationItems.end();
++item)
@@ -495,7 +500,13 @@ void ConfigurationServiceServant::setConfiguration(const AsteriskSCF::System::V1
ConfigurationServiceServantPtr mImpl;
};
-
+
+ if (!mOperationContextCache->addOperationContext(context))
+ {
+ // retry detected
+ return;
+ }
+
RTPConfigurationGroupVisitorPtr v = new GroupVisitor(this);
boost::unique_lock<boost::shared_mutex> lock(mLock);
@@ -506,13 +517,13 @@ void ConfigurationServiceServant::setConfiguration(const AsteriskSCF::System::V1
}
void ConfigurationServiceServant::removeConfigurationItems(
- const AsteriskSCF::System::V1::OperationContextPtr&,
+ const AsteriskSCF::System::V1::OperationContextPtr& context,
const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups, const Ice::Current&)
{
class GroupVisitor : public RTPConfigurationGroupVisitor
{
public:
- GroupVisitor(const ConfigurationServiceServantPtr& impl) :
+ GroupVisitor(const ConfigurationServiceServantPtr& impl) :
mImpl(impl)
{
}
@@ -536,7 +547,7 @@ void ConfigurationServiceServant::removeConfigurationItems(
localItems.erase(localItem);
}
}
-
+
void visitRTPGeneralGroup(const ::AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1::RTPGeneralGroupPtr& group)
{
RTPGeneralGroupPtr g = mImpl->getGeneralGroup();
@@ -551,7 +562,7 @@ void ConfigurationServiceServant::removeConfigurationItems(
//
// The Visitor in this case interprets the provided information
// to decide what it needs to do. This type of thing might be better moved
- // into a separate helper class (along with the set operations), but
+ // into a separate helper class (along with the set operations), but
// we'll leave it here for now to avoid inconsistencies with the
// rest of the configuration implementation.
//
@@ -566,7 +577,7 @@ void ConfigurationServiceServant::removeConfigurationItems(
mResetICEConfig(false)
{
}
-
+
~Visitor()
{
if (mDisable)
@@ -602,7 +613,7 @@ void ConfigurationServiceServant::removeConfigurationItems(
mImpl->replaceConfig(ICEConfigurationPtr());
}
}
-
+
void visitSTUNServerItem(const STUNServerItemPtr& item)
{
if (item)
@@ -634,7 +645,7 @@ void ConfigurationServiceServant::removeConfigurationItems(
mResetICEConfig = true;
}
}
-
+
private:
ConfigurationServiceServantPtr mImpl;
bool mRemoveSTUNServer;
@@ -646,21 +657,27 @@ void ConfigurationServiceServant::removeConfigurationItems(
};
RTPICEConfigurationGroupPtr g = mImpl->getICEConfigurationGroup();
-
+
if (g)
{
RTPConfigurationItemVisitorPtr v(new Visitor(mImpl));
removeItems(v, group->configurationItems, g->configurationItems);
}
}
-
+
private:
ConfigurationServiceServantPtr mImpl;
};
-
+
+ if (!mOperationContextCache->addOperationContext(context))
+ {
+ // retry detected
+ return;
+ }
+
RTPConfigurationGroupVisitorPtr v = new GroupVisitor(this);
boost::unique_lock<boost::shared_mutex> lock(mLock);
-
+
for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
{
(*group)->visit(v);
@@ -668,17 +685,17 @@ void ConfigurationServiceServant::removeConfigurationItems(
}
void ConfigurationServiceServant::removeConfigurationGroups(
- const AsteriskSCF::System::V1::OperationContextPtr&,
+ const AsteriskSCF::System::V1::OperationContextPtr& context,
const ConfigurationGroupSeq& groups, const Ice::Current&)
{
class GroupVisitor : public RTPConfigurationGroupVisitor
{
public:
- GroupVisitor(const ConfigurationServiceServantPtr& impl) :
+ GroupVisitor(const ConfigurationServiceServantPtr& impl) :
mImpl(impl)
{
}
-
+
private:
void visitRTPGeneralGroup(const ::AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1::RTPGeneralGroupPtr&)
{
@@ -692,11 +709,17 @@ void ConfigurationServiceServant::removeConfigurationGroups(
ConfigurationServiceServantPtr mImpl;
};
-
+
+ if (!mOperationContextCache->addOperationContext(context))
+ {
+ // retry detected
+ return;
+ }
+
RTPConfigurationGroupVisitorPtr v = new GroupVisitor(this);
boost::unique_lock<boost::shared_mutex> lock(mLock);
-
+
for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
{
(*group)->visit(v);
diff --git a/src/RTPSession.cpp b/src/RTPSession.cpp
index 9d5f5ce..41f9a3c 100755
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@ -1,7 +1,7 @@
/*
* Asterisk SCF -- An open-source communications framework.
*
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2012, Digium, Inc.
*
* See http://www.asterisk.org for more information about
* the Asterisk SCF project. Please do not directly contact
@@ -32,22 +32,24 @@
#include <IceUtil/UUID.h>
#include <AsteriskSCF/Media/MediaIf.h>
-#include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
#include <AsteriskSCF/Media/RTP/MediaRTCPIf.h>
-#include <AsteriskSCF/System/Component/ReplicaIf.h>
-#include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.h>
+#include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
+#include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.h>
+#include <AsteriskSCF/System/Component/ReplicaIf.h>
-using namespace std;
using namespace AsteriskSCF::Core::Discovery::V1;
-using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Discovery;
using namespace AsteriskSCF::Media::RTP::V1;
+using namespace AsteriskSCF::Media::V1;
using namespace AsteriskSCF::Media;
-using namespace AsteriskSCF::Replication::MediaRTPPJMEDIA::V1;
-using namespace AsteriskSCF::System::Component::V1;
-using namespace AsteriskSCF::Discovery;
+using namespace AsteriskSCF::Operations;
using namespace AsteriskSCF::PJMEDIARTP;
+using namespace AsteriskSCF::Replication::MediaRTPPJMEDIA::V1;
using namespace AsteriskSCF::SessionCommunications::V1;
+using namespace AsteriskSCF::System::Component::V1;
+using namespace std;
/**
* RTCP Information Interface implementation.
@@ -55,7 +57,8 @@ using namespace AsteriskSCF::SessionCommunications::V1;
class RTCPInformationImpl : public RTCP::V1::Information
{
public:
- RTCPInformationImpl(pjmedia_rtcp_stat *general, pjmedia_rtcp_stream_stat *stream) :
+ RTCPInformationImpl(pjmedia_rtcp_stat *general, pjmedia_rtcp_stream_stat *stream, const OperationContextCachePtr& cache) :
+ mOperationContextCache(cache),
mGeneralStatistics(general), mStreamStatistics(stream) { }
RTCP::V1::StatisticsPtr getStatistics(const Ice::Current&)
@@ -89,14 +92,24 @@ public:
return statistics;
}
- void addListener(const AsteriskSCF::System::V1::OperationContextPtr&, const RTCP::V1::InformationListenerPrx& listener, const Ice::Current&)
+ void addListener(const AsteriskSCF::System::V1::OperationContextPtr& context, const RTCP::V1::InformationListenerPrx& listener, const Ice::Current&)
{
+ if (!mOperationContextCache->addOperationContext(context))
+ {
+ // retry detected
+ return;
+ }
boost::unique_lock<boost::shared_mutex> lock(mLock);
mListeners.push_back(listener);
}
- void removeListener(const AsteriskSCF::System::V1::OperationContextPtr&, const RTCP::V1::InformationListenerPrx& listener, const Ice::Current&)
+ void removeListener(const AsteriskSCF::System::V1::OperationContextPtr& context, const RTCP::V1::InformationListenerPrx& listener, const Ice::Current&)
{
+ if (!mOperationContextCache->addOperationContext(context))
+ {
+ // retry detected
+ return;
+ }
boost::unique_lock<boost::shared_mutex> lock(mLock);
mListeners.erase(std::remove(mListeners.begin(), mListeners.end(), listener), mListeners.end());
}
@@ -116,6 +129,8 @@ private:
*/
boost::shared_mutex mLock;
+ OperationContextCachePtr mOperationContextCache;
+
/**
* Listeners present.
*/
@@ -143,22 +158,22 @@ typedef IceUtil::Handle<RTCPInformationImpl> RTCPInformationImplPtr;
class RTPSessionImpl : public AsteriskSCF::Media::RTP::V1::SRTPSession
{
public:
- RTPSessionImpl(const Ice::ObjectAdapterPtr&,
- const string& id,
- const PJMEDIAEnvironmentPtr& env,
- const AsteriskSCF::Media::RTP::V1::RTPServiceLocatorParamsPtr& params,
- const RTPReplicationContextPtr& replicationContext,
- const ConfigurationServiceImplPtr&);
-
+ RTPSessionImpl(const Ice::ObjectAdapterPtr&,
+ const string& id,
+ const PJMEDIAEnvironmentPtr& env,
+ const AsteriskSCF::Media::RTP::V1::RTPServiceLocatorParamsPtr& params,
+ const RTPReplicationContextPtr& replicationContext,
+ const ConfigurationServiceImplPtr&);
+
RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter,
- const string& sessionIdentity,
- const PJMEDIAEnvironmentPtr& env,
- Ice::Int port,
- const AsteriskSCF::Media::V1::FormatSeq& formats,
- bool isIPv6,
- bool srtp,
- const RTPReplicationContextPtr& replicationContext,
- const ConfigurationServiceImplPtr& configurationServant);
+ const string& sessionIdentity,
+ const PJMEDIAEnvironmentPtr& env,
+ Ice::Int port,
+ const AsteriskSCF::Media::V1::FormatSeq& formats,
+ bool isIPv6,
+ bool srtp,
+ const RTPReplicationContextPtr& replicationContext,
+ const ConfigurationServiceImplPtr& configurationServant);
~RTPSessionImpl();
@@ -169,15 +184,15 @@ public:
AsteriskSCF::Media::V1::StreamSinkSeq getSinks(const Ice::Current&);
std::string getId(const Ice::Current&);
void setCookies(const AsteriskSCF::Media::V1::SessionCookieDict& cookies);
- void setCookies(const AsteriskSCF::System::V1::OperationContextPtr&,
- const AsteriskSCF::Media::V1::SessionCookies& cookies,
+ void setCookies(const AsteriskSCF::System::V1::OperationContextPtr&,
+ const AsteriskSCF::Media::V1::SessionCookies& cookies,
const Ice::Current&);
- void getCookies_async(const AsteriskSCF::Media::V1::AMD_Session_getCookiesPtr &cb,
- const AsteriskSCF::Media::V1::SessionCookies& cookiesToGet,
+ void getCookies_async(const AsteriskSCF::Media::V1::AMD_Session_getCookiesPtr &cb,
+ const AsteriskSCF::Media::V1::SessionCookies& cookiesToGet,
const Ice::Current&);
void removeCookies(
const AsteriskSCF::System::V1::OperationContextPtr&,
- const AsteriskSCF::Media::V1::SessionCookies& cookies,
+ const AsteriskSCF::Media::V1::SessionCookies& cookies,
const Ice::Current&);
void useRTCP(bool, const Ice::Current&);
@@ -191,7 +206,7 @@ public:
void start(const AsteriskSCF::System::V1::OperationContextPtr&, const string& suiteName, const string& keyInfo, bool enableAuthentication, bool enableEncryption, const Ice::Current&);
/**
- * Internal methods.
+ * Internal methods.
*/
AsteriskSCF::Media::V1::FormatSeq getFormats();
void setRemoteDetails(const std::string& address, Ice::Int port);
@@ -199,7 +214,7 @@ public:
int getPayload(const AsteriskSCF::Media::V1::FormatPtr& mediaformat);
/**
- * Accessors for the source and sink servants.
+ * Accessors for the source and sink servants.
*/
StreamSourceRTPImplPtr getSourceServant();
StreamSinkRTPImplPtr getSinkServant();
@@ -246,6 +261,8 @@ private:
boost::shared_mutex mLock;
+ OperationContextCachePtr mOperationContextCache;
+
/**
* Instance of the session adapter to be passed to sinks, sources, configuration, etc.
*/
@@ -312,7 +329,7 @@ private:
*/
RTPSessionStateItemPtr mSessionStateItem;
- // Context for state replication for this component.
+ // Context for state replication for this component.
RTPReplicationContextPtr mReplicationContext;
/**
@@ -353,7 +370,7 @@ public:
/**
* Constructor for this implementation.
*/
- RTCPSessionImpl(const RTPSessionImplPtr& session) : mSession(session) { }
+ RTCPSessionImpl(const RTPSessionImplPtr& session, const OperationContextCachePtr& cache) : mOperationContetCache(cache), mSession(session) { }
/**
* Method used to retrieve the port our RTCP session is listening on.
@@ -368,12 +385,35 @@ public:
return pj_sockaddr_get_port(&transportInfo.sock_info.rtcp_addr_name);
}
- void setRemoteDetails(const AsteriskSCF::System::V1::OperationContextPtr&, const std::string& address, Ice::Int port, const Ice::Current&)
+ void setRemoteDetails(const AsteriskSCF::System::V1::OperationContextPtr& context, const std::string& address, Ice::Int port, const Ice::Current&)
{
- mSession->setRemoteRtcpDetails(address, port);
+ ContextDataPtr data = checkAndThrow(mOperationContetCache, context);
+ if (!data)
+ {
+ // retry detected
+ return;
+ }
+
+ try
+ {
+ mSession->setRemoteRtcpDetails(address, port);
+ data->setCompleted();
+ }
+ catch (const std::exception& e)
+ {
+ data->setException(e);
+ throw;
+ }
+ catch (...)
+ {
+ data->setException();
+ throw;
+ }
}
private:
+ OperationContextCachePtr mOperationContetCache;
+
/**
* Pointer to the RTP session.
*/
@@ -393,7 +433,7 @@ public:
{
}
- void replicateState(const RTPStreamSinkStateItemPtr& sinkStateItem)
+ void replicateState(const RTPStreamSinkStateItemPtr& sinkStateItem)
{
mServant->replicateState(0, sinkStateItem, 0, 0, 0);
}
@@ -413,12 +453,12 @@ public:
mServant->replicateState(0, 0, 0, 0, item);
}
- AsteriskSCF::Media::V1::FormatPtr getFormat(int payload)
+ AsteriskSCF::Media::V1::FormatPtr getFormat(int payload)
{
return mServant->getFormat(payload);
}
- int getPayload(const AsteriskSCF::Media::V1::FormatPtr& format)
+ int getPayload(const AsteriskSCF::Media::V1::FormatPtr& format)
{
return mServant->getPayload(format);
}
@@ -470,7 +510,8 @@ RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter,
const PJMEDIAEnvironmentPtr& env,
const RTPServiceLocatorParamsPtr& params,
const RTPReplicationContextPtr& replicationContext,
- const ConfigurationServiceImplPtr& configurationService) :
+ const ConfigurationServiceImplPtr& configurationService) :
+ mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
mEnvironment(env),
mEndpoint(PJMEDIAEndpoint::create(env)),
mId(id),
@@ -598,8 +639,8 @@ std::string RTPSessionImpl::getId(const Ice::Current&)
return mId;
}
-/**
- * Local implementation.
+/**
+ * Local implementation.
*/
void RTPSessionImpl::setCookies(const AsteriskSCF::Media::V1::SessionCookieDict& cookieMap)
{
@@ -607,35 +648,56 @@ void RTPSessionImpl::setCookies(const AsteriskSCF::Media::V1::SessionCookieDict&
mSessionStateItem->cookies = cookieMap;
}
-/**
- * Support for the corresponding API call.
+/**
+ * Support for the corresponding API call.
*/
void RTPSessionImpl::setCookies(
- const AsteriskSCF::System::V1::OperationContextPtr&,
- const AsteriskSCF::Media::V1::SessionCookies& cookies,
+ const AsteriskSCF::System::V1::OperationContextPtr& context,
+ const AsteriskSCF::Media::V1::SessionCookies& cookies,
const Ice::Current&)
{
- { // scope the lock
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- for (AsteriskSCF::Media::V1::SessionCookies::const_iterator i = cookies.begin();
- i != cookies.end(); ++i)
+ ContextDataPtr data = checkAndThrow(mOperationContextCache, context);
+ if (!data)
+ {
+ // retry detected
+ return;
+ }
+
+ try
+ {
+ { // scope the lock
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ for (AsteriskSCF::Media::V1::SessionCookies::const_iterator i = cookies.begin();
+ i != cookies.end(); ++i)
+ {
+ mSessionStateItem->cookies[(*i)->ice_id()] = (*i);
+ }
+ }
+
+ if (mReplicationContext->isReplicating() == true)
{
- mSessionStateItem->cookies[(*i)->ice_id()] = (*i);
+ replicateState(mSessionStateItem, 0, 0, 0, 0);
}
+ data->setCompleted();
}
-
- if (mReplicationContext->isReplicating() == true)
+ catch (const std::exception& e)
{
- replicateState(mSessionStateItem, 0, 0, 0, 0);
+ data->setException(e);
+ throw;
+ }
+ catch (...)
+ {
+ data->setException();
+ throw;
}
}
-/**
- * Implementation of the corresponding API call.
+/**
+ * Implementation of the corresponding API call.
*/
void RTPSessionImpl::getCookies_async(
const AsteriskSCF::Media::V1::AMD_Session_getCookiesPtr &cb,
- const AsteriskSCF::Media::V1::SessionCookies& cookiesToGet,
+ const AsteriskSCF::Media::V1::SessionCookies& cookiesToGet,
const Ice::Current&)
{
AsteriskSCF::Media::V1::SessionCookies results;
@@ -658,26 +720,47 @@ void RTPSessionImpl::getCookies_async(
cb->ice_response(results);
}
-/**
- * Implementation of the corresponding API call.
+/**
+ * Implementation of the corresponding API call.
*/
void RTPSessionImpl::removeCookies(
- const AsteriskSCF::System::V1::OperationContextPtr&,
- const AsteriskSCF::Media::V1::SessionCookies& cookies,
+ const AsteriskSCF::System::V1::OperationContextPtr& context,
+ const AsteriskSCF::Media::V1::SessionCookies& cookies,
const Ice::Current&)
{
- { // scope the lock
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- for (AsteriskSCF::Media::V1::SessionCookies::const_iterator i = cookies.begin();
- i != cookies.end(); ++i)
+ ContextDataPtr data = checkAndThrow(mOperationContextCache, context);
+ if (!data)
+ {
+ // retry detected
+ return;
+ }
+
+ try
+ {
+ { // scope the lock
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ for (AsteriskSCF::Media::V1::SessionCookies::const_iterator i = cookies.begin();
+ i != cookies.end(); ++i)
+ {
+ mSessionStateItem->cookies.erase((*i)->ice_id());
+ }
+ }
+
+ if (mReplicationContext->isReplicating() == true)
{
- mSessionStateItem->cookies.erase((*i)->ice_id());
+ replicateState(mSessionStateItem, 0, 0, 0, 0);
}
+ data->setCompleted();
}
-
- if (mReplicationContext->isReplicating() == true)
+ catch (const std::exception& e)
{
- replicateState(mSessionStateItem, 0, 0, 0, 0);
+ data->setException(e);
+ throw;
+ }
+ catch (...)
+ {
+ data->setException();
+ throw;
}
}
@@ -717,34 +800,97 @@ void RTPSessionImpl::release(const Ice::Current&)
/**
* Implementation of the associatePayloads method as defined in MediaRTPIf.ice
*/
-void RTPSessionImpl::associatePayloads(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Media::RTP::V1::PayloadMap& mappings, const Ice::Current&)
+void RTPSessionImpl::associatePayloads(const AsteriskSCF::System::V1::OperationContextPtr& context, const AsteriskSCF::Media::RTP::V1::PayloadMap& mappings, const Ice::Current&)
{
- mSessionStateItem->payloadstoFormats = mappings;
- associatePayloadsImpl(mappings);
+ ContextDataPtr data = checkAndThrow(mOperationContextCache, context);
+ if (!data)
+ {
+ // retry detected
+ return;
+ }
- // Only the session has changed so push a single update out for it
- replicateState(mSessionStateItem, 0, 0, 0, 0);
+ try
+ {
+ mSessionStateItem->payloadstoFormats = mappings;
+ associatePayloadsImpl(mappings);
+
+ // Only the session has changed so push a single update out for it
+ replicateState(mSessionStateItem, 0, 0, 0, 0);
+ data->setCompleted();
+ }
+ catch (const std::exception& e)
+ {
+ data->setException(e);
+ throw;
+ }
+ catch (...)
+ {
+ data->setException();
+ throw;
+ }
}
-void RTPSessionImpl::setOptions(const AsteriskSCF::System::V1::OperationContextPtr&, const string& suiteName, const string& key, bool enableAuthentication, bool enableEncryption, const Ice::Current&)
+void RTPSessionImpl::setOptions(const AsteriskSCF::System::V1::OperationContextPtr& context, const string& suiteName, const string& key, bool enableAuthentication, bool enableEncryption, const Ice::Current&)
{
- SRTPTransportPtr srtpTransport(boost::dynamic_pointer_cast<SRTPTransport>(mTransport));
- if (!srtpTransport)
+ ContextDataPtr data = checkAndThrow(mOperationContextCache, context);
+ if (!data)
+ {
+ // retry detected
+ return;
+ }
+
+ try
{
- throw SRTPUnavailable();
+ SRTPTransportPtr srtpTransport(boost::dynamic_pointer_cast<SRTPTransport>(mTransport));
+ if (!srtpTransport)
+ {
+ throw SRTPUnavailable();
+ }
+ srtpTransport->setOptions(suiteName, key, enableAuthentication, enableEncryption);
+ data->setCompleted();
+ }
+ catch (const std::exception& e)
+ {
+ data->setException(e);
+ throw;
+ }
+ catch (...)
+ {
+ data->setException();
+ throw;
}
- srtpTransport->setOptions(suiteName, key, enableAuthentication, enableEncryption);
}
-void RTPSessionImpl::start(const AsteriskSCF::System::V1::OperationContextPtr&, const string& suiteName, const string& key, bool enableAuthentication, bool enableEncryption, const Ice::Current&)
+void RTPSessionImpl::start(const AsteriskSCF::System::V1::OperationContextPtr& context, const string& suiteName, const string& key, bool enableAuthentication, bool enableEncryption, const Ice::Current&)
{
- SRTPTransportPtr srtpTransport(boost::dynamic_pointer_cast<SRTPTransport>(mTransport));
- if (!srtpTransport)
+ ContextDataPtr data = checkAndThrow(mOperationContextCache, context);
+ if (!data)
+ {
+ // retry detected
+ return;
+ }
+
+ try
{
- throw SRTPUnavailable();
+ SRTPTransportPtr srtpTransport(boost::dynamic_pointer_cast<SRTPTransport>(mTransport));
+ if (!srtpTransport)
+ {
+ throw SRTPUnavailable();
+ }
+ srtpTransport->start(suiteName, key, enableAuthentication, enableEncryption);
+ data->setCompleted();
+ }
+ catch (const std::exception& e)
+ {
+ data->setException(e);
+ throw;
+ }
+ catch (...)
+ {
+ data->setException();
+ throw;
}
- srtpTransport->start(suiteName, key, enableAuthentication, enableEncryption);
}
/**
@@ -935,7 +1081,7 @@ void RTPSessionImpl::replicateState(
return;
}
- mReplicationContext->getReplicator().tryOneWay()->setState(AsteriskSCF::Operations::createContext(), items);
+ mReplicationContext->getReplicator().tryOneWay()->setState(createContext(), items);
}
/**
@@ -972,7 +1118,7 @@ void RTPSessionImpl::removeState(const RTPSessionStateItemPtr& session, const RT
return;
}
- mReplicationContext->getReplicator().tryOneWay()->removeState(AsteriskSCF::Operations::createContext(), items);
+ mReplicationContext->getReplicator().tryOneWay()->removeState(createContext(), items);
}
void RTPSessionImpl::associatePayloadsImpl(const AsteriskSCF::Media::RTP::V1::PayloadMap& mappings)
@@ -1041,9 +1187,9 @@ RTPSessionPrx RTPSessionImpl::activate(
outputs->eventSinks.push_back(mTelephonyEventSinkPrx);
}
- mRtcpSessionInterface = new RTCPSessionImpl(this);
- mReceiverReport = new RTCPInformationImpl(&mRtcpSession.stat, &mRtcpSession.stat.rx);
- mSenderReport = new RTCPInformationImpl(&mRtcpSession.stat, &mRtcpSession.stat.tx);
+ mRtcpSessionInterface = new RTCPSessionImpl(this, mOperationContextCache);
+ mReceiverReport = new RTCPInformationImpl(&mRtcpSession.stat, &mRtcpSession.stat.rx, mOperationContextCache);
+ mSenderReport = new RTCPInformationImpl(&mRtcpSession.stat, &mRtcpSession.stat.tx, mOperationContextCache);
mStreamSourceProxy = StreamSourceRTPPrx::uncheckedCast(mAdapter->add(mStreamSource, sourceId));
mStreamSinkProxy = StreamSinkRTPPrx::uncheckedCast(mAdapter->add(mStreamSink, sinkId));
mAdapter->addFacet(mRtcpSessionInterface, id, RTCP::V1::SessionFacet);
@@ -1156,8 +1302,8 @@ private:
RTPSessionPrx AsteriskSCF::PJMEDIARTP::RTPSession::create(const Ice::ObjectAdapterPtr& adapter,
const std::string& id, const RTPServiceLocatorParamsPtr& params,
- const PJMEDIAEnvironmentPtr& environment,
- const RTPReplicationContextPtr& replicationContext,
+ const PJMEDIAEnvironmentPtr& environment,
+ const RTPReplicationContextPtr& replicationContext,
const ConfigurationServiceImplPtr& configuration,
const RTPOptionsPtr& options,
RTPAllocationOutputsPtr& outputs)
@@ -1168,18 +1314,18 @@ RTPSessionPrx AsteriskSCF::PJMEDIARTP::RTPSession::create(const Ice::ObjectAdapt
}
ReplicationAdapterPtr AsteriskSCF::PJMEDIARTP::RTPSession::create(const Ice::ObjectAdapterPtr& adapter,
- const PJMEDIAEnvironmentPtr& environment,
+ const PJMEDIAEnvironmentPtr& environment,
const RTPSessionStateItemPtr& item,
- const RTPReplicationContextPtr& replicationContext,
+ const RTPReplicationContextPtr& replicationContext,
const ConfigurationServiceImplPtr& configuration,
const RTPOptionsPtr& options,
RTPAllocationOutputsPtr& outputs)
{
- RTPSessionImplPtr servant(new RTPSessionImpl(adapter,
- adapter->getCommunicator()->identityToString(item->sessionIdentity),
+ RTPSessionImplPtr servant(new RTPSessionImpl(adapter,
+ adapter->getCommunicator()->identityToString(item->sessionIdentity),
environment,
item->port, item->formats, item->ipv6, item->srtp,
- replicationContext,
+ replicationContext,
configuration));
servant->setCookies(item->cookies);
diff --git a/src/RTPSink.cpp b/src/RTPSink.cpp
index 8347a86..16faffb 100755
--- a/src/RTPSink.cpp
+++ b/src/RTPSink.cpp
@@ -1,7 +1,7 @@
/*
* Asterisk SCF -- An open-source communications framework.
*
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2012, Digium, Inc.
*
* See http://www.asterisk.org for more information about
* the Asterisk SCF project. Please do not directly contact
@@ -26,24 +26,28 @@
#include "RTPStateReplicationIf.h"
#include "RTPTelephonyEventSink.h"
+#include <boost/asio/detail/socket_ops.hpp>
+
#include <Ice/Ice.h>
#include <IceUtil/UUID.h>
-#include <boost/asio/detail/socket_ops.hpp>
#include <pjlib.h>
#include <pjmedia.h>
#include <AsteriskSCF/Media/MediaIf.h>
#include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
#include <AsteriskSCF/System/Component/ReplicaIf.h>
-using namespace std;
using namespace AsteriskSCF::Core::Discovery::V1;
-using namespace AsteriskSCF::Media::V1;
using namespace AsteriskSCF::Media::RTP::V1;
+using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Operations;
using namespace AsteriskSCF::PJMEDIARTP;
using namespace AsteriskSCF::Replication::MediaRTPPJMEDIA::V1;
using namespace AsteriskSCF::SessionCommunications::V1;
+using namespace std;
/**
* Private implementation details for the StreamSinkRTPImpl class.
@@ -55,10 +59,14 @@ public:
* Constructor for our StreamSinkRTPImplPriv class.
*/
StreamSinkRTPImplPriv(
- const SessionAdapterPtr& sessionAdapter,
+ const SessionAdapterPtr& sessionAdapter,
const PJMEDIATransportPtr& transport,
const std::string&);
+ boost::shared_mutex mMutex;
+
+ OperationContextCachePtr mOperationContextCache;
+
/**
* A structure containing outgoing pjmedia session data.
*/
@@ -91,11 +99,12 @@ public:
* Constructor for the StreamSinkRTPImplPriv class.
*/
StreamSinkRTPImplPriv::StreamSinkRTPImplPriv(
- const SessionAdapterPtr& session,
+ const SessionAdapterPtr& session,
const PJMEDIATransportPtr& transport,
const string& sessionId) :
- mSessionAdapter(session), mTransport(transport),
- mSinkStateItem(new RTPStreamSinkStateItem),
+ mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
+ mSessionAdapter(session), mTransport(transport),
+ mSinkStateItem(new RTPStreamSinkStateItem),
mSessionId(sessionId)
{
pjmedia_rtp_session_init(&mOutgoingSession, 0, pj_rand());
@@ -117,6 +126,7 @@ StreamSinkRTPImpl::StreamSinkRTPImpl(
TelephonyEventSinkPrx StreamSinkRTPImpl::createTelephonyEventSink(Ice::ObjectAdapterPtr& adapter, const Ice::Identity& id)
{
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
mImpl->mTelephonyEventSink =
new RTPTelephonyEventSink(
&mImpl->mOutgoingSession,
@@ -129,6 +139,7 @@ TelephonyEventSinkPrx StreamSinkRTPImpl::createTelephonyEventSink(Ice::ObjectAda
RTPTelephonyEventSinkPtr StreamSinkRTPImpl::getTelephonyEventSink()
{
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
return mImpl->mTelephonyEventSink;
}
@@ -137,6 +148,7 @@ RTPTelephonyEventSinkPtr StreamSinkRTPImpl::getTelephonyEventSink()
*/
void StreamSinkRTPImpl::write(const AsteriskSCF::Media::V1::FrameSeq& frames, const Ice::Current&)
{
+// boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
// Don't even bother if no remote address information is present
if (mImpl->mSinkStateItem->remoteAddress.empty() || !mImpl->mSinkStateItem->remotePort)
{
@@ -213,9 +225,16 @@ void StreamSinkRTPImpl::write(const AsteriskSCF::Media::V1::FrameSeq& frames, co
* Implementation of the setSource method as defined in MediaIf.ice
*/
void StreamSinkRTPImpl::setSource(
- const AsteriskSCF::System::V1::OperationContextPtr&,
+ const AsteriskSCF::System::V1::OperationContextPtr& context,
const AsteriskSCF::Media::V1::StreamSourcePrx& source, const Ice::Current&)
{
+ if (!mImpl->mOperationContextCache->addOperationContext(context))
+ {
+ std::clog << "!!! RETRY !!!\n";
+ // retry detected
+ return;
+ }
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
mImpl->mSinkStateItem->source = source;
mImpl->mSessionAdapter->replicateState(mImpl->mSinkStateItem);
@@ -226,6 +245,7 @@ void StreamSinkRTPImpl::setSource(
*/
AsteriskSCF::Media::V1::StreamSourcePrx StreamSinkRTPImpl::getSource(const Ice::Current&)
{
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
return mImpl->mSinkStateItem->source;
}
@@ -234,6 +254,7 @@ AsteriskSCF::Media::V1::StreamSourcePrx StreamSinkRTPImpl::getSource(const Ice::
*/
AsteriskSCF::Media::V1::FormatSeq StreamSinkRTPImpl::getFormats(const Ice::Current&)
{
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
return mImpl->mSessionAdapter->getFormats();
}
@@ -242,6 +263,7 @@ AsteriskSCF::Media::V1::FormatSeq StreamSinkRTPImpl::getFormats(const Ice::Curre
*/
std::string StreamSinkRTPImpl::getId(const Ice::Current&)
{
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
/* For now utilize the id of the session */
return mImpl->mSessionId;
}
@@ -249,21 +271,45 @@ std::string StreamSinkRTPImpl::getId(const Ice::Current&)
/**
* Implementation of the setRemoteDetails method as defined in MediaRTPIf.ice
*/
-void StreamSinkRTPImpl::setRemoteDetails(const AsteriskSCF::System::V1::OperationContextPtr&,
+void StreamSinkRTPImpl::setRemoteDetails(const AsteriskSCF::System::V1::OperationContextPtr& context,
const string& address, Ice::Int port, const Ice::Current&)
{
- /* This method is essentially a passthru to the RTPSourceImpl. It takes care of
- * actually attaching the transport.
- */
- mImpl->mSessionAdapter->setRemoteDetails(address, port);
+ ContextDataPtr data = checkAndThrow(mImpl->mOperationContextCache, context);
- /* We do store it though in case we have not yet received a packet from the remote side but
- * are asked for the remote address. It is also stored for replication purposes.
- */
- mImpl->mSinkStateItem->remoteAddress = address;
- mImpl->mSinkStateItem->remotePort = port;
+ if (!data)
+ {
+ std::clog << "!!! RETRY !!!\n";
+ // retry detected
+ return;
+ }
- mImpl->mSessionAdapter->replicateState(mImpl->mSinkStateItem);
+ try
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
+ /* This method is essentially a passthru to the RTPSourceImpl. It takes care of
+ * actually attaching the transport.
+ */
+ mImpl->mSessionAdapter->setRemoteDetails(address, port);
+
+ /* We do store it though in case we have not yet received a packet from the remote side but
+ * are asked for the remote address. It is also stored for replication purposes.
+ */
+ mImpl->mSinkStateItem->remoteAddress = address;
+ mImpl->mSinkStateItem->remotePort = port;
+
+ mImpl->mSessionAdapter->replicateState(mImpl->mSinkStateItem);
+ data->setCompleted();
+ }
+ catch (const std::exception& e)
+ {
+ data->setException(e);
+ throw;
+ }
+ catch (...)
+ {
+ data->setException();
+ throw;
+ }
}
/**
@@ -271,6 +317,7 @@ void StreamSinkRTPImpl::setRemoteDetails(const AsteriskSCF::System::V1::Operatio
*/
std::string StreamSinkRTPImpl::getRemoteAddress(const Ice::Current&)
{
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
if (mImpl->mTransport && mImpl->mTransport->remoteAddress())
{
string address = mImpl->mTransport->remoteAddress()->hostname();
@@ -284,6 +331,7 @@ std::string StreamSinkRTPImpl::getRemoteAddress(const Ice::Current&)
*/
Ice::Int StreamSinkRTPImpl::getRemotePort(const Ice::Current&)
{
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
if (mImpl->mTransport && mImpl->mTransport->remoteAddress())
{
int port = mImpl->mTransport->remoteAddress()->port();
@@ -297,11 +345,13 @@ Ice::Int StreamSinkRTPImpl::getRemotePort(const Ice::Current&)
*/
RTPStreamSinkStateItemPtr StreamSinkRTPImpl::getStateItem()
{
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
return mImpl->mSinkStateItem;
}
RTPTelephonyEventSinkStateItemPtr StreamSinkRTPImpl::getTelephonyEventSinkStateItem()
{
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
if (mImpl->mTelephonyEventSink)
{
return mImpl->mTelephonyEventSink->getStateItem();
@@ -311,17 +361,20 @@ RTPTelephonyEventSinkStateItemPtr StreamSinkRTPImpl::getTelephonyEventSinkStateI
void StreamSinkRTPImpl::setRemoteDetailsImpl(const std::string& host, Ice::Int port)
{
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
mImpl->mSinkStateItem->remoteAddress = host;
mImpl->mSinkStateItem->remotePort = port;
}
void StreamSinkRTPImpl::setSourceImpl(const AsteriskSCF::Media::V1::StreamSourcePrx& proxy)
{
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
mImpl->mSinkStateItem->source = proxy;
}
Ice::ByteSeq StreamSinkRTPImpl::encodeAudioPayload(const FramePayloadPtr& toEncode, const AudioFormatPtr& audioFormat)
{
+ // mImpl->mMutex should already be locked
if (audioFormat->sampleSize == 8)
{
ByteSeqPayloadPtr bytePayload = ByteSeqPayloadPtr::dynamicCast(toEncode);
@@ -354,6 +407,7 @@ Ice::ByteSeq StreamSinkRTPImpl::encodeAudioPayload(const FramePayloadPtr& toEnco
Ice::ByteSeq StreamSinkRTPImpl::encodeVideoPayload(const FramePayloadPtr& toEncode, const VideoFormatPtr&)
{
+ // no shared state, no need to lock
ByteSeqPayloadPtr bytePayload = ByteSeqPayloadPtr::dynamicCast(toEncode);
if (!bytePayload)
diff --git a/src/RTPSource.cpp b/src/RTPSource.cpp
index 20a00d0..a1cd040 100644
--- a/src/RTPSource.cpp
+++ b/src/RTPSource.cpp
@@ -84,15 +84,15 @@ typedef boost::shared_ptr<ThreadDescWrapper> ThreadDescWrapperPtr;
class RtcpTransmission : public IceUtil::TimerTask
{
public:
- RtcpTransmission(const SessionAdapterPtr& sessionAdapter,
- const StreamSinkRTPPrx& sink,
+ RtcpTransmission(const SessionAdapterPtr& sessionAdapter,
+ const StreamSinkRTPPrx& sink,
const PJMEDIATransportPtr& transport,
- const ThreadDescWrapperPtr& threadDesciptor)
- : mSessionAdapter(sessionAdapter),
- mSink(sink),
+ const ThreadDescWrapperPtr& threadDesciptor)
+ : mSessionAdapter(sessionAdapter),
+ mSink(sink),
mTransport(transport),
mThreadDescriptor(threadDesciptor)
- {
+ {
}
void runTimerTask()
@@ -161,7 +161,7 @@ public:
/**
* Constructor for our StreamSourceRTPImplPriv class.
*/
- StreamSourceRTPImplPriv(const SessionAdapterPtr& sessionAdapter,
+ StreamSourceRTPImplPriv(const SessionAdapterPtr& sessionAdapter,
const PJMEDIATransportPtr& transport,
const string& parentSessionId,
const StreamSourceRTPPrx& source,
@@ -225,13 +225,13 @@ public:
/**
* Constructor for the StreamSourceRTPImplPriv class.
*/
-StreamSourceRTPImplPriv::StreamSourceRTPImplPriv(const SessionAdapterPtr& session,
- const PJMEDIATransportPtr& transport,
+StreamSourceRTPImplPriv::StreamSourceRTPImplPriv(const SessionAdapterPtr& session,
+ const PJMEDIATransportPtr& transport,
const string& sessionId,
const StreamSourceRTPPrx& source,
const StreamSinkRTPPrx& sink) :
- mSessionAdapter(session), mTransport(transport),
- mSourceStateItem(new RTPStreamSourceStateItem),
+ mSessionAdapter(session), mTransport(transport),
+ mSourceStateItem(new RTPStreamSourceStateItem),
mSessionId(sessionId),
mSource(source),
mSink(sink),
@@ -282,6 +282,7 @@ RTPTelephonyEventSourcePtr StreamSourceRTPImpl::getTelephonyEventSource()
*/
void StreamSourceRTPImpl::addSink(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
{
+ // naturally idempotent; no retry logic needed
boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
// Do not allow the same sink to be added multiple times
@@ -304,6 +305,7 @@ void StreamSourceRTPImpl::addSink(const AsteriskSCF::System::V1::OperationContex
void StreamSourceRTPImpl::removeSink(const AsteriskSCF::System::V1::OperationContextPtr&,
const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
{
+ // naturally idempotent; no retry logic needed
boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
mImpl->mSourceStateItem->sinks.erase(std::remove(mImpl->mSourceStateItem->sinks.begin(),
@@ -345,7 +347,7 @@ void StreamSourceRTPImpl::requestFormat(
const AsteriskSCF::System::V1::OperationContextPtr&,
const AsteriskSCF::Media::V1::FormatPtr&, const Ice::Current&)
{
- // We do not currently support switching formats.
+ // We do not currently support switching formats.
throw MediaFormatSwitchException();
}
@@ -521,7 +523,7 @@ static void receiveRTCP(void *userdata, void *packet, pj_ssize_t size)
pjmedia_rtcp_rx_rtcp(source->mImpl->mSessionAdapter->getRtcpSession(), packet, size);
- std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx> listeners =
+ std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx> listeners =
source->mImpl->mSessionAdapter->getReceiverReportListeners();
if (listeners.empty())
diff --git a/src/RTPStateReplicatorListener.cpp b/src/RTPStateReplicatorListener.cpp
index 03498de..8e43755 100644
--- a/src/RTPStateReplicatorListener.cpp
+++ b/src/RTPStateReplicatorListener.cpp
@@ -1,7 +1,7 @@
/*
* Asterisk SCF -- An open-source communications framework.
*
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2012, Digium, Inc.
*
* See http://www.asterisk.org for more information about
* the Asterisk SCF project. Please do not directly contact
@@ -19,18 +19,19 @@
#include <IceUtil/UUID.h>
#include <boost/thread.hpp>
-#include <boost/shared_ptr.hpp>
#include <AsteriskSCF/System/Component/ReplicaIf.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
#include "RTPStateReplicator.h"
#include "ReplicationAdapter.h"
#include "RTPSession.h"
-using namespace std;
using namespace AsteriskSCF::Media::RTP::V1;
+using namespace AsteriskSCF::Operations;
using namespace AsteriskSCF::PJMEDIARTP;
using namespace AsteriskSCF::Replication::MediaRTPPJMEDIA::V1;
+using namespace std;
class RTPStateReplicatorItem
{
@@ -62,33 +63,36 @@ private:
struct RTPStateReplicatorListenerImpl
{
public:
- RTPStateReplicatorListenerImpl(const Ice::ObjectAdapterPtr& adapter,
+ RTPStateReplicatorListenerImpl(const Ice::ObjectAdapterPtr& adapter,
const PJMEDIAEnvironmentPtr& env,
const RTPGeneralStateItemPtr& generalState,
const RTPReplicationContextPtr& replicationContext,
const ConfigurationServiceImplPtr& configurationService)
- : mId(IceUtil::generateUUID()),
- mAdapter(adapter),
- mEnvironment(env),
+ : mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
+ mId(IceUtil::generateUUID()),
+ mAdapter(adapter),
+ mEnvironment(env),
mGeneralState(generalState),
mReplicationContext(replicationContext),
mConfigurationService(configurationService) {}
-
+
void removeStateNoticeImpl(const Ice::StringSeq& itemKeys)
{
+ // naturally idempotent; no retry logic needed
+ boost::mutex::scoped_lock lock(mMutex);
for (Ice::StringSeq::const_iterator key = itemKeys.begin(); key != itemKeys.end(); ++key)
{
// Just erasing this from the map will cause the destructor to actually shut things down
mStateItems.erase((*key));
}
}
-
- void setStateNoticeImpl(const RTPStateItemSeq& items)
+
+ void setStateNoticeImpl(const AsteriskSCF::System::V1::OperationContextPtr& context, const RTPStateItemSeq& items)
{
class visitor : public AsteriskSCF::Replication::MediaRTPPJMEDIA::V1::RTPStateItemVisitor
{
public:
- visitor(RTPStateReplicatorListenerImpl *impl) : mImpl(impl)
+ visitor(RTPStateReplicatorListenerImpl *impl) : mImpl(impl)
{
}
@@ -99,7 +103,7 @@ public:
{
mImpl->mGeneralState->serviceManagement = item->serviceManagement;
}
-
+
void visitRTPSessionStateItem(const RTPSessionStateItemPtr &item)
{
map<string, boost::shared_ptr<RTPStateReplicatorItem> >::iterator i = mImpl->mStateItems.find(item->sessionId);
@@ -121,7 +125,7 @@ public:
{
localitem = i->second;
}
-
+
//
// TODO: This appears to happen in testing on occasion. Should verify if this should be
// expected.
@@ -131,7 +135,7 @@ public:
localitem->getSession()->update(item);
}
}
-
+
void visitRTPStreamSinkStateItem(const RTPStreamSinkStateItemPtr &item)
{
map<string, boost::shared_ptr<RTPStateReplicatorItem> >::iterator i =
@@ -141,7 +145,7 @@ public:
i->second->getSession()->update(item);
}
}
-
+
void visitRTPStreamSourceStateItem(const RTPStreamSourceStateItemPtr &item)
{
map<string, boost::shared_ptr<RTPStateReplicatorItem> >::iterator i =
@@ -174,6 +178,13 @@ public:
};
+ if (!mOperationContextCache->addOperationContext(context))
+ {
+ // retry detected
+ return;
+ }
+
+ boost::mutex::scoped_lock lock(mMutex);
AsteriskSCF::Replication::MediaRTPPJMEDIA::V1::RTPStateItemVisitorPtr v = new visitor(this);
for (RTPStateItemSeq::const_iterator item = items.begin(); item != items.end(); ++item)
@@ -182,6 +193,8 @@ public:
}
}
+ boost::mutex mMutex;
+ OperationContextCachePtr mOperationContextCache;
string mId;
map<string, boost::shared_ptr<RTPStateReplicatorItem> > mStateItems;
Ice::ObjectAdapterPtr mAdapter;
@@ -205,9 +218,9 @@ void RTPStateReplicatorListenerI::stateRemoved(const AsteriskSCF::System::V1::Op
mImpl->removeStateNoticeImpl(itemKeys);
}
-void RTPStateReplicatorListenerI::stateSet(const AsteriskSCF::System::V1::OperationContextPtr&, const RTPStateItemSeq& items, const Ice::Current&)
+void RTPStateReplicatorListenerI::stateSet(const AsteriskSCF::System::V1::OperationContextPtr& context, const RTPStateItemSeq& items, const Ice::Current&)
{
- mImpl->setStateNoticeImpl(items);
+ mImpl->setStateNoticeImpl(context, items);
}
bool RTPStateReplicatorListenerI::operator==(const RTPStateReplicatorListenerI &rhs)
diff --git a/src/RTPTelephonyEventSink.cpp b/src/RTPTelephonyEventSink.cpp
index 75045a0..ecca74d 100755
--- a/src/RTPTelephonyEventSink.cpp
+++ b/src/RTPTelephonyEventSink.cpp
@@ -1,7 +1,7 @@
/*
* Asterisk SCF -- An open-source communications framework.
*
- * Copyright (C) 2011, Digium, Inc.
+ * Copyright (C) 2011-2012, Digium, Inc.
*
* See http://www.asterisk.org for more information about
* the Asterisk SCF project. Please do not directly contact
@@ -17,6 +17,7 @@
#include "RTPTelephonyEventSink.h"
#include <AsteriskSCF/Media/Formats/OtherFormatsIf.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
#include <IceUtil/UUID.h>
#include <pjmedia.h>
@@ -41,13 +42,14 @@ const pj_uint16_t Duration = 160;
using namespace AsteriskSCF::SessionCommunications::V1;
using namespace AsteriskSCF::PJMEDIARTP;
using namespace AsteriskSCF::Replication::MediaRTPPJMEDIA::V1;
+using namespace AsteriskSCF::Operations;
RTPTelephonyEventSink::RTPTelephonyEventSink(
pjmedia_rtp_session *session,
const PJMEDIATransportPtr& transport,
const SessionAdapterPtr& sessionAdapter,
const std::string& sessionId)
- : mSession(session), mTransport(transport), mSessionAdapter(sessionAdapter), mStateItem(new RTPTelephonyEventSinkStateItem)
+ : mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)), mSession(session), mTransport(transport), mSessionAdapter(sessionAdapter), mStateItem(new RTPTelephonyEventSinkStateItem)
{
mStateItem->sessionId = sessionId;
mStateItem->key = IceUtil::generateUUID();
@@ -56,15 +58,27 @@ RTPTelephonyEventSink::RTPTelephonyEventSink(
void RTPTelephonyEventSink::write_async(
const AMD_TelephonyEventSink_writePtr& cb,
- const AsteriskSCF::System::V1::OperationContextPtr&,
+ const AsteriskSCF::System::V1::OperationContextPtr& context,
const TelephonyEventPtr& event,
const Ice::Current&)
{
+ typedef AMDContextData<AMD_TelephonyEventSink_writePtr> Data;
+ typedef boost::shared_ptr<Data> DataPtr;
+ DataPtr data = getContext<DataPtr>(mOperationContextCache, context, cb);
+
+ if (!data)
+ {
+ // retry detected
+ return;
+ }
+
+ boost::mutex::scoped_lock lock(mMutex);
+
BeginDTMFEventPtr beginDTMF;
EndDTMFEventPtr endDTMF;
ContinueDTMFEventPtr continueDTMF;
FlashEventPtr flash;
-
+
pjmedia_rtp_dtmf_event payload;
bool setMarker = false;
bool tsRollover = false;
@@ -166,24 +180,36 @@ void RTPTelephonyEventSink::write_async(
mSessionAdapter->replicateState(mStateItem);
}
- cb->ice_response();
+ data->getProxy()->ice_response();
}
void RTPTelephonyEventSink::setSource_async(
const AMD_TelephonyEventSink_setSourcePtr& cb,
- const AsteriskSCF::System::V1::OperationContextPtr&,
+ const AsteriskSCF::System::V1::OperationContextPtr& context,
const TelephonyEventSourcePrx& source,
const Ice::Current&)
{
+ typedef AMDContextData<AMD_TelephonyEventSink_setSourcePtr> Data;
+ typedef boost::shared_ptr<Data> DataPtr;
+ DataPtr data = getContext<DataPtr>(mOperationContextCache, context, cb);
+
+ if (!data)
+ {
+ // retry detected
+ return;
+ }
+
+ boost::mutex::scoped_lock lock(mMutex);
mStateItem->source = source;
mSessionAdapter->replicateState(mStateItem);
- cb->ice_response();
+ data->getProxy()->ice_response();
}
void RTPTelephonyEventSink::getSource_async(
const AMD_TelephonyEventSink_getSourcePtr& cb,
const Ice::Current&)
{
+ boost::mutex::scoped_lock lock(mMutex);
cb->ice_response(mStateItem->source);
}
diff --git a/src/RTPTelephonyEventSink.h b/src/RTPTelephonyEventSink.h
index 4a73e80..de6ba11 100755
... 182 lines suppressed ...
--
asterisk-scf/integration/media_rtp_pjmedia.git
More information about the asterisk-scf-commits
mailing list