[asterisk-scf-commits] asterisk-scf/release/media_transport_udptl.git branch "master" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Tue May 8 17:28:14 CDT 2012
branch "master" has been updated
via 9e7a46fbcac528046354a88afceecf9b57c97284 (commit)
from d3eb271c5ee3d83f3e334dd728ff111625c85b5b (commit)
Summary of changes:
config/test_component.config | 8 +-
config/test_component_v6.config | 10 +-
config/test_udptl_ice.conf | 14 +-
.../Replication/UDPTL/UdptlStateReplicationIf.ice | 12 +-
src/Component.cpp | 86 ++++++++---
src/ICETransport.cpp | 68 ++++++---
src/UDPTLConfiguration.cpp | 27 ++-
src/UDPTLSession.cpp | 46 ++++--
src/UDPTLSink.cpp | 70 +++++++--
src/UDPTLSink.h | 8 +-
src/UDPTLSource.cpp | 9 +-
src/UDPTLSource.h | 6 +-
src/UdptlStateReplicator.h | 4 +-
src/UdptlStateReplicatorListener.cpp | 165 ++++++++++++--------
test/CMakeLists.txt | 4 +
test/TestMediaTransportUDPTL.cpp | 139 ++++++++++++++---
test/TestUDPTLICE.cpp | 59 ++++++-
17 files changed, 531 insertions(+), 204 deletions(-)
- Log -----------------------------------------------------------------
commit 9e7a46fbcac528046354a88afceecf9b57c97284
Author: Ken Hunt <ken.hunt at digium.com>
Date: Tue May 8 12:03:39 2012 -0500
Changes for new retry logic.
diff --git a/config/test_component.config b/config/test_component.config
index f7fc24e..8bbc9bd 100644
--- a/config/test_component.config
+++ b/config/test_component.config
@@ -3,6 +3,7 @@
#
# Icebox Configuration
#
+Ice.ThreadPool.Client.Size=4
IceBox.InheritProperties=1
IceBox.LoadOrder=ServiceDiscovery,UDPTLStateReplicator,MediaTransportUDPTL,TestMediaTransportUDPTL
@@ -28,11 +29,8 @@ LocatorService.Proxy=LocatorService:tcp -h 127.0.0.1 -p 4411
IceBox.Service.MediaTransportUDPTL=MediaTransportUDPTL:create
# Adapter parameters for this component
-MediaTransportUDPTLAdapter.Endpoints=default -h 127.0.0.1
-MediaTransportUDPTLAdapterLocal.Endpoints=default -h 127.0.0.1
-MediaTransportUDPTLAdapterLogger.Endpoints=default -h 127.0.0.1
MediaTransportUDPTL.ServiceAdapter.Endpoints=default -h 127.0.0.1
-MediaTransportUDPTL.BackplaneAdapter.Endpoints=default -h 127.0.0.1
+MediaTransportUDPTL.BackplaneAdapter.Endpoints=default -h 127.0.0.1
#
# TestMediaTransportUDPTL Configuration
@@ -46,6 +44,8 @@ IceBox.Service.TestMediaTransportUDPTL=MediaTransportUDPTLTest:create
IceBox.Service.ServiceDiscovery=ServiceLocator:create
+ServiceDiscovery.Standalone = true
+
ServiceDiscovery.IceStorm.InstanceName=ServiceDiscovery
ServiceDiscovery.IceStorm.TopicManager.Endpoints=default -h 127.0.0.1 -p 10000
ServiceDiscovery.IceStorm.Publish.Endpoints=tcp -h 127.0.0.1 -p 10001:udp -h 127.0.0.1 -p 10001
diff --git a/config/test_component_v6.config b/config/test_component_v6.config
index 2a04fed..ef809dd 100644
--- a/config/test_component_v6.config
+++ b/config/test_component_v6.config
@@ -3,7 +3,7 @@
#
# Icebox Configuration
#
-
+Ice.ThreadPool.Client.Size=4
IceBox.InheritProperties=1
IceBox.LoadOrder=ServiceDiscovery,UDPTLStateReplicator,MediaTransportUDPTL,TestMediaTransportUDPTL
@@ -28,11 +28,8 @@ LocatorService.Proxy=LocatorService:tcp -h 127.0.0.1 -p 4411
IceBox.Service.MediaTransportUDPTL=MediaTransportUDPTL:create
# Adapter parameters for this component
-MediaTransportUDPTLAdapter.Endpoints=default -h 127.0.0.1
-MediaTransportUDPTLAdapterLocal.Endpoints=default -h 127.0.0.1
-MediaTransportUDPTLAdapterLogger.Endpoints=default -h 127.0.0.1
MediaTransportUDPTL.ServiceAdapter.Endpoints=default -h 127.0.0.1
-MediaTransportUDPTL.BackplaneAdapter.Endpoints=default -h 127.0.0.1
+MediaTransportUDPTL.BackplaneAdapter.Endpoints=default -h 127.0.0.1
#
# TestMediaTransportUDPTL Configuration
@@ -46,6 +43,9 @@ IceBox.Service.TestMediaTransportUDPTL=MediaTransportUDPTLTestV6:create
IceBox.Service.ServiceDiscovery=ServiceLocator:create
+# For unit test we run without state replication.
+ServiceDiscovery.Standalone = true
+
ServiceDiscovery.IceStorm.InstanceName=ServiceDiscovery
ServiceDiscovery.IceStorm.TopicManager.Endpoints=default -h 127.0.0.1 -p 10000
ServiceDiscovery.IceStorm.Publish.Endpoints=tcp -h 127.0.0.1 -p 10001:udp -h 127.0.0.1 -p 10001
diff --git a/config/test_udptl_ice.conf b/config/test_udptl_ice.conf
index 9e20ebd..f54c2ef 100644
--- a/config/test_udptl_ice.conf
+++ b/config/test_udptl_ice.conf
@@ -6,14 +6,18 @@
UDPTLConfiguration.Name=UDPTLStateReplicator
+Ice.Override.Timeout=5000
+Ice.ThreadPool.Client.Size=4
+
IceBox.InheritProperties=1
IceBox.LoadOrder=ServiceDiscovery,UDPTLStateReplicator,MediaTransportUDPTL,TestMediaTransportUDPTL
-Ice.Override.Timeout=5000
-
# RtpStateReplicator Configuration
IceBox.Service.UDPTLStateReplicator=UDPTLStateReplicator:create
+# For unit testing, we run without state replicator
+UDPTLStateReplicator.Standalone = true
+
# Adapter parameters for this component
UDPTLStateReplicator.Adapter.Endpoints=tcp -h 127.0.0.1:udp -h 127.0.0.1
@@ -30,9 +34,6 @@ LocatorService.Proxy=LocatorService:tcp -h 127.0.0.1 -p 4411
IceBox.Service.MediaTransportUDPTL=MediaTransportUDPTL:create
# Adapter parameters for this component
-MediaTransportUDPTLAdapter.Endpoints=default -h 127.0.0.1
-MediaTransportUDPTLAdapterLocal.Endpoints=default -h 127.0.0.1
-MediaTransportUDPTLAdapterLogger.Endpoints=default -h 127.0.0.1
MediaTransportUDPTL.ServiceAdapter.Endpoints=default -h 127.0.0.1
MediaTransportUDPTL.BackplaneAdapter.Endpoints=default -h 127.0.0.1
@@ -48,6 +49,9 @@ IceBox.Service.TestMediaTransportUDPTL=MediaTransportUDPTLIceTest:create
IceBox.Service.ServiceDiscovery=ServiceLocator:create
+# For unit testing we run without state replicator.
+ServiceDiscovery.Standalone = true
+
ServiceDiscovery.IceStorm.InstanceName=ServiceDiscovery
ServiceDiscovery.IceStorm.TopicManager.Endpoints=default -h 127.0.0.1 -p 10000
ServiceDiscovery.IceStorm.Publish.Endpoints=tcp -h 127.0.0.1 -p 10001:udp -h 127.0.0.1 -p 10001
diff --git a/slice/AsteriskSCF/Replication/UDPTL/UdptlStateReplicationIf.ice b/slice/AsteriskSCF/Replication/UDPTL/UdptlStateReplicationIf.ice
index d7e65f4..8fad508 100644
--- a/slice/AsteriskSCF/Replication/UDPTL/UdptlStateReplicationIf.ice
+++ b/slice/AsteriskSCF/Replication/UDPTL/UdptlStateReplicationIf.ice
@@ -54,16 +54,16 @@ module V1
interface UdptlStateReplicatorListener
{
- void stateRemoved(Ice::StringSeq itemKeys);
- void stateSet(UdptlStateItemSeq items);
+ idempotent void stateRemoved(AsteriskSCF::System::V1::OperationContext operationContext, Ice::StringSeq itemKeys);
+ idempotent void stateSet(AsteriskSCF::System::V1::OperationContext operationContext, UdptlStateItemSeq items);
};
interface UdptlStateReplicator
{
- void addListener(UdptlStateReplicatorListener *listener);
- void removeListener(UdptlStateReplicatorListener *listener);
- void setState (UdptlStateItemSeq items);
- void removeState(Ice::StringSeq items);
+ idempotent void addListener(AsteriskSCF::System::V1::OperationContext operationContext, UdptlStateReplicatorListener *listener);
+ idempotent void removeListener(AsteriskSCF::System::V1::OperationContext operationContext, UdptlStateReplicatorListener *listener);
+ idempotent void setState (AsteriskSCF::System::V1::OperationContext operationContext, UdptlStateItemSeq items);
+ idempotent void removeState(AsteriskSCF::System::V1::OperationContext operationContext, Ice::StringSeq items);
idempotent UdptlStateItemSeq getState(Ice::StringSeq itemKeys);
idempotent UdptlStateItemSeq getAllState();
};
diff --git a/src/Component.cpp b/src/Component.cpp
index 60dadb9..260e230 100644
--- a/src/Component.cpp
+++ b/src/Component.cpp
@@ -1,7 +1,7 @@
/*
* Asterisk SCF -- An open-source communications framework.
*
- * Copyright (C) 2011, Digium, Inc.
+ * Copyright (C) 2011-2012, Digium, Inc.
*
* See http://www.asterisk.org for more information about
* the Asterisk SCF project. Please do not directly contact
@@ -31,6 +31,9 @@
#include <AsteriskSCF/Logger.h>
#include <AsteriskSCF/Discovery/SmartProxy.h>
#include <AsteriskSCF/Component/Component.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
#include "UdptlReplicationContext.h"
#include "UDPTLSession.h"
@@ -40,25 +43,30 @@
#include "PJMediaEnvironment.h"
using namespace std;
+using namespace AsteriskSCF::Configuration::UDPTL::V1;
using namespace AsteriskSCF::Core::Discovery::V1;
-using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Discovery;
using namespace AsteriskSCF::Media::UDPTL::V1;
+using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Operations;
using namespace AsteriskSCF::Replication::UDPTL::V1;
-using namespace AsteriskSCF::Configuration::UDPTL::V1;
-using namespace AsteriskSCF::System::Configuration::V1;
+using namespace AsteriskSCF::Replication;
using namespace AsteriskSCF::System::Component::V1;
+using namespace AsteriskSCF::System::Configuration::V1;
using namespace AsteriskSCF::System::Logging;
-using namespace AsteriskSCF::Discovery;
-using namespace AsteriskSCF::Replication;
using namespace AsteriskSCF::UDPTL;
namespace
{
Logger lg = getLoggerFactory().getLogger("AsteriskSCF.MediaUDPTL");
-}
-static const string ReplicaServiceId("MediaUdptlReplica");
-static const string MediaServiceId("UDPTLMediaService");
+const string ReplicaServiceId("MediaUdptlReplica");
+const string MediaServiceId("UDPTLMediaService");
+
+typedef ContextResultData<UDPTLSessionPrx> UDPTLSessionPrxCookie;
+
+typedef boost::shared_ptr<UDPTLSessionPrxCookie> UDPTLSessionPrxCookiePtr;
+}
/**
* Implementation of the UDPTLMediaService interface as defined in MediaUDPTLIf.ice
@@ -69,6 +77,7 @@ public:
UDPTLMediaServiceImpl(const Ice::ObjectAdapterPtr& adapter,
const UdptlReplicationContextPtr& replicationContext,
const ConfigurationServiceImplPtr& configurationService) :
+ mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
mAdapter(adapter),
mEnvironment(PJMediaEnvironment::create(adapter->getCommunicator()->getProperties(), configurationService)),
mReplicationContext(replicationContext),
@@ -76,10 +85,34 @@ public:
{
};
- UDPTLSessionPrx allocate(const UDPTLServiceLocatorParamsPtr& params, const Ice::Current&)
+ UDPTLSessionPrx allocate(const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const UDPTLServiceLocatorParamsPtr& params, const Ice::Current&)
{
- return AsteriskSCF::UDPTL::UDPTLSession::create(mAdapter, IceUtil::generateUUID(), mEnvironment, params,
- mReplicationContext, mConfigurationService);
+ std::pair<bool, UDPTLSessionPrxCookiePtr> cacheHit =
+ getContextSync<UDPTLSessionPrxCookiePtr>(mOperationContextCache, operationContext);
+
+ if (cacheHit.first)
+ {
+ return cacheHit.second->getResult();
+ }
+
+ try
+ {
+ UDPTLSessionPrx r = AsteriskSCF::UDPTL::UDPTLSession::create(mAdapter, IceUtil::generateUUID(),
+ mEnvironment, params, mReplicationContext, mConfigurationService);
+ cacheHit.second->setResult(r);
+ return r;
+ }
+ catch (const std::exception& e)
+ {
+ cacheHit.second->setException(e);
+ throw;
+ }
+ catch (...)
+ {
+ cacheHit.second->setException();
+ throw;
+ }
};
pj_pool_factory *getPoolFactory() { return mEnvironment->poolFactory(); };
@@ -90,6 +123,7 @@ public:
}
private:
+ AsteriskSCF::Operations::OperationContextCachePtr mOperationContextCache;
Ice::ObjectAdapterPtr mAdapter;
PJMediaEnvironmentPtr mEnvironment;
UdptlReplicationContextPtr mReplicationContext;
@@ -127,6 +161,7 @@ private:
virtual void onPreInitialize();
virtual void onStop();
virtual void onStart();
+ virtual void onActivated();
// Other base Component overrides
virtual void prepareBackplaneServicesForDiscovery();
@@ -159,12 +194,12 @@ private:
void Component::onSuspend()
{
- mGeneralState->serviceManagement->suspend();
+ mGeneralState->serviceManagement->suspend(AsteriskSCF::Operations::createContext());
}
void Component::onResume()
{
- mGeneralState->serviceManagement->unsuspend();
+ mGeneralState->serviceManagement->unsuspend(AsteriskSCF::Operations::createContext());
}
/**
@@ -336,7 +371,7 @@ void Component::findRemoteServices()
// replicator service.
ConfigurationReplicatorPrx configurationReplicator = ConfigurationReplicatorPrx::checkedCast(
udptlReplicationContext->getReplicator().initialize(), ReplicatorFacet);
- configurationReplicator->registerConfigurationService(mConfigurationServicePrx);
+ configurationReplicator->registerConfigurationService(AsteriskSCF::Operations::createContext(), mConfigurationServicePrx);
}
catch (const std::exception& e)
@@ -389,7 +424,7 @@ void Component::listenToStateReplicators()
// Are we in standby mode?
if (udptlReplicationContext->getState() == STANDBY_IN_REPLICA_GROUP)
{
- udptlReplicationContext->getReplicator()->addListener(mReplicatorListenerProxy);
+ udptlReplicationContext->getReplicator()->addListener(AsteriskSCF::Operations::createContext(), mReplicatorListenerProxy);
mListeningToReplicator = true;
}
}
@@ -417,7 +452,7 @@ void Component::stopListeningToStateReplicators()
try
{
- udptlReplicationContext->getReplicator()->removeListener(mReplicatorListenerProxy);
+ udptlReplicationContext->getReplicator()->removeListener(AsteriskSCF::Operations::createContext(), mReplicatorListenerProxy);
mListeningToReplicator = false;
}
catch (const Ice::Exception& e)
@@ -455,7 +490,18 @@ void Component::onRegisterPrimaryServices()
}
mGeneralState->serviceManagement = mUdptlMediaServiceRegistration->getServiceManagement();
- mGeneralState->serviceManagement->addLocatorParams(mUdptlLocatorParams, "");
+ mGeneralState->serviceManagement->addLocatorParams(AsteriskSCF::Operations::createContext(), mUdptlLocatorParams, "");
+}
+
+void Component::onActivated()
+{
+ UdptlReplicationContextPtr udptlReplicationContext =
+ boost::static_pointer_cast<UdptlReplicationContext>(getReplicationContext());
+
+ UdptlStateItemSeq items;
+ items.push_back(mGeneralState);
+ UdptlStateReplicatorPrx oneway = UdptlStateReplicatorPrx::uncheckedCast(udptlReplicationContext->getReplicator()->ice_oneway());
+ oneway->setState(AsteriskSCF::Operations::createContext(), items);
}
void Component::onStart()
@@ -468,7 +514,7 @@ void Component::onStart()
UdptlStateItemSeq items;
items.push_back(mGeneralState);
UdptlStateReplicatorPrx oneway = UdptlStateReplicatorPrx::uncheckedCast(udptlReplicationContext->getReplicator()->ice_oneway());
- oneway->setState(items);
+ oneway->setState(AsteriskSCF::Operations::createContext(), items);
}
}
@@ -476,7 +522,7 @@ void Component::onStop()
{
if (getReplicationContext()->isActive() == true)
{
- mGeneralState->serviceManagement->unregister();
+ mGeneralState->serviceManagement->unregister(AsteriskSCF::Operations::createContext());
}
}
diff --git a/src/ICETransport.cpp b/src/ICETransport.cpp
index db2337d..99d15f6 100644
--- a/src/ICETransport.cpp
+++ b/src/ICETransport.cpp
@@ -1,7 +1,7 @@
/*
* Asterisk SCF -- An open-source communications framework.
*
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010,2012, Digium, Inc.
*
* See http://www.asterisk.org for more information about
* the Asterisk SCF project. Please do not directly contact
@@ -14,39 +14,39 @@
* at the top of the source tree.
*/
-#include "ICETransport.h"
-#include "PJUtil.h"
+#include <map>
+#include <sstream>
-#include <pjmedia.h>
-#include <pjlib.h>
-#include <pjnath.h>
+#include <Ice/Ice.h>
+#include <IceUtil/UUID.h>
-#include <AsteriskSCF/System/ExceptionsIf.h>
-#include <map>
#include <boost/thread.hpp>
#include <boost/thread/shared_mutex.hpp>
+#include <pjlib.h>
+#include <pjmedia.h>
+#include <pjnath.h>
+
+#include <AsteriskSCF/System/ExceptionsIf.h>
#include <AsteriskSCF/System/NAT/NATTraversalIf.h>
-#include <Ice/Ice.h>
-#include <sstream>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
#include <AsteriskSCF/Logger.h>
-#include <IceUtil/UUID.h>
-using namespace AsteriskSCF::UDPTL;
-using namespace AsteriskSCF::System::V1;
-using namespace AsteriskSCF::PJUtil;
-using namespace std;
+#include "ICETransport.h"
+#include "PJUtil.h"
+
using namespace AsteriskSCF::Helpers;
+using namespace AsteriskSCF::Operations;
+using namespace AsteriskSCF::PJUtil;
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::System::NAT::V1;
+using namespace AsteriskSCF::System::V1;
+using namespace AsteriskSCF::UDPTL;
+using namespace std;
namespace
{
Logger logger = getLoggerFactory().getLogger("AsteriskSCF.MediaUDPTL");
-}
-
-namespace
-{
class ICEAgentImpl : public InteractiveConnectionAgent
{
@@ -54,6 +54,7 @@ public:
ICEAgentImpl(const Ice::ObjectAdapterPtr& adapter, const Ice::Identity& id, const PJMediaEnvironmentPtr& env,
const PJMediaEndpointPtr& ep) :
+ mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
mAdapter(adapter),
mId(id),
mShuttingDown(false),
@@ -85,10 +86,15 @@ public:
}
void negotiate_async(const AMD_InteractiveConnectionAgent_negotiatePtr& callback,
+ const AsteriskSCF::System::V1::OperationContextPtr& context,
const string& hostname, Ice::Int port, const CandidateSeq& candidates,
const Ice::Current&)
{
-
+ if (!mOperationContextCache->addOperationContext(context))
+ {
+ // retry detected; ignore
+ return;
+ }
boost::unique_lock<boost::shared_mutex> lock(mLock);
stateCheck();
if (mCurrentNegotiation)
@@ -493,6 +499,7 @@ public:
private:
boost::shared_mutex mLock;
+ OperationContextCachePtr mOperationContextCache;
Ice::ObjectAdapterPtr mAdapter;
Ice::Identity mId;
bool mShuttingDown;
@@ -703,6 +710,8 @@ void ICETransport::onSetupComplete(pjmedia_transport* transport, int status)
{
if (fail(status))
{
+ boost::unique_lock<boost::mutex> lock(mLock);
+ mMonitor.notify_all();
//
// TODO!
//
@@ -733,9 +742,9 @@ void ICETransport::onSetupComplete(pjmedia_transport* transport, int status)
pj_memcpy(mLastKnownAddr.get(), &info.sock_info.rtp_addr_name, sizeof(pj_sockaddr));
}
}
- boost::lock_guard<boost::mutex> lock(mLock);
+ boost::unique_lock<boost::mutex> lock(mLock);
mLocalAddress = fromInfo(info);
- mMonitor.notify_one();
+ mMonitor.notify_all();
}
AddressPtr ICETransport::localAddress()
@@ -745,9 +754,21 @@ AddressPtr ICETransport::localAddress()
{
return mLocalAddress;
}
+ //
+ // Retry check for local address for max of 2.5 seconds, then proceed as if it was unknown.
+ //
for (size_t i = 0; i < 5 && !mLocalAddress; ++i)
{
- mMonitor.wait(lock);
+ try
+ {
+ mMonitor.timed_wait(lock, boost::posix_time::milliseconds(500));
+ }
+ catch (const boost::thread_interrupted&)
+ {
+ }
+ catch (const boost::thread_resource_error&)
+ {
+ }
}
return mLocalAddress;
}
@@ -796,6 +817,7 @@ void ICETransport::start()
{
throw InternalInitializationException("Unable to create new ICE media transport");
}
+
ICECallbackAdapter::addEntry(t, shared_from_this());
mTransport = t;
mCallback = callback;
diff --git a/src/UDPTLConfiguration.cpp b/src/UDPTLConfiguration.cpp
index 4fe9af5..ce0f987 100644
--- a/src/UDPTLConfiguration.cpp
+++ b/src/UDPTLConfiguration.cpp
@@ -21,7 +21,6 @@
#include <AsteriskSCF/System/Component/ConfigurationIf.h>
#include <boost/thread.hpp>
-#include <boost/thread/shared_mutex.hpp>
#include "udptl.h"
@@ -49,12 +48,16 @@ public:
AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq
getConfigurationGroups(const Ice::Current&);
- void setConfiguration(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
+ void setConfiguration(const AsteriskSCF::System::V1::OperationContextPtr&,
+ const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
- void removeConfigurationItems(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&,
- const Ice::Current&);
- void removeConfigurationGroups(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&,
- const Ice::Current&);
+ void removeConfigurationItems(const AsteriskSCF::System::V1::OperationContextPtr&,
+ const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&,
+ const Ice::Current&);
+
+ void removeConfigurationGroups(const AsteriskSCF::System::V1::OperationContextPtr&,
+ const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&,
+ const Ice::Current&);
int getStartPort();
int getEndPort();
@@ -278,9 +281,12 @@ ConfigurationGroupSeq ConfigurationServiceServant::getConfigurationGroups(const
return groups;
}
-void ConfigurationServiceServant::setConfiguration(const ConfigurationGroupSeq& groups,
+void ConfigurationServiceServant::setConfiguration(
+ const AsteriskSCF::System::V1::OperationContextPtr&,
+ const ConfigurationGroupSeq& groups,
const Ice::Current&)
{
+ // config's serial number already handles idempotency
class GroupVisitor : public UdptlConfigurationGroupVisitor
{
public:
@@ -465,9 +471,10 @@ void ConfigurationServiceServant::setConfiguration(const ConfigurationGroupSeq&
}
}
-void ConfigurationServiceServant::removeConfigurationItems(
+void ConfigurationServiceServant::removeConfigurationItems(const AsteriskSCF::System::V1::OperationContextPtr&,
const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups, const Ice::Current&)
{
+ // multiple removes would be naturally idempotent
class GroupVisitor : public UdptlConfigurationGroupVisitor
{
public:
@@ -619,8 +626,10 @@ void ConfigurationServiceServant::removeConfigurationItems(
}
}
-void ConfigurationServiceServant::removeConfigurationGroups(const ConfigurationGroupSeq& groups, const Ice::Current&)
+void ConfigurationServiceServant::removeConfigurationGroups(const AsteriskSCF::System::V1::OperationContextPtr&,
+ const ConfigurationGroupSeq& groups, const Ice::Current&)
{
+ // multiple removes would be naturally idempotent
class GroupVisitor : public UdptlConfigurationGroupVisitor
{
public:
diff --git a/src/UDPTLSession.cpp b/src/UDPTLSession.cpp
index b0203a5..6fb5934 100644
--- a/src/UDPTLSession.cpp
+++ b/src/UDPTLSession.cpp
@@ -1,7 +1,7 @@
/*
* Asterisk SCF -- An open-source communications framework.
*
- * Copyright (C) 2011, Digium, Inc.
+ * Copyright (C) 2011-2012, Digium, Inc.
*
* See http://www.asterisk.org for more information about
* the Asterisk SCF project. Please do not directly contact
@@ -33,18 +33,22 @@
#include <AsteriskSCF/Media/MediaIf.h>
#include <AsteriskSCF/Media/UDPTL/MediaUDPTLIf.h>
#include <AsteriskSCF/System/Component/ReplicaIf.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
#include "udptl.h"
-using namespace std;
using namespace AsteriskSCF::Core::Discovery::V1;
-using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Discovery;
using namespace AsteriskSCF::Media::UDPTL::V1;
+using namespace AsteriskSCF::Media::V1;
using namespace AsteriskSCF::Media;
+using namespace AsteriskSCF::Operations;
using namespace AsteriskSCF::Replication::UDPTL::V1;
using namespace AsteriskSCF::System::Component::V1;
-using namespace AsteriskSCF::Discovery;
+using namespace AsteriskSCF::System::V1;
using namespace AsteriskSCF::UDPTL;
+using namespace std;
/**
* Implementation of the UDPTLSession interface as defined in MediaUDPTLIf.ice
@@ -79,12 +83,12 @@ public:
AsteriskSCF::Media::V1::StreamSinkSeq getSinks(const Ice::Current&);
std::string getId(const Ice::Current&);
void setCookies(const AsteriskSCF::Media::V1::SessionCookieDict& cookies);
- void setCookies(const AsteriskSCF::Media::V1::SessionCookies& cookies,
+ void setCookies(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Media::V1::SessionCookies& cookies,
const Ice::Current&);
void getCookies_async(const AsteriskSCF::Media::V1::AMD_Session_getCookiesPtr &cb,
const AsteriskSCF::Media::V1::SessionCookies& cookiesToGet,
const Ice::Current&);
- void removeCookies(const AsteriskSCF::Media::V1::SessionCookies& cookies,
+ void removeCookies(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Media::V1::SessionCookies& cookies,
const Ice::Current&);
void release(const Ice::Current&);
@@ -113,6 +117,8 @@ private:
boost::shared_mutex mLock;
+ OperationContextCachePtr mOperationContextCache;
+
/**
* Instance of the session adapter to be passed to sinks, sources, configuration, etc.
*/
@@ -222,7 +228,8 @@ UDPTLSessionImpl::UDPTLSessionImpl(const Ice::ObjectAdapterPtr& adapter,
const PJMediaEnvironmentPtr& env,
const UDPTLServiceLocatorParamsPtr& params,
const UdptlReplicationContextPtr& replicationContext,
- const ConfigurationServiceImplPtr& configurationService) :
+ const ConfigurationServiceImplPtr& configurationService) :
+ mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
mEnvironment(env),
mEndpoint(PJMediaEndpoint::create(env)),
mId(id),
@@ -264,6 +271,7 @@ UDPTLSessionImpl::UDPTLSessionImpl(const Ice::ObjectAdapterPtr& adapter,
bool ipv6,
const UdptlReplicationContextPtr& replicationContext,
const ConfigurationServiceImplPtr& configurationService) :
+ mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
mEnvironment(env),
mEndpoint(PJMediaEndpoint::create(env)),
mId(sessionIdentity),
@@ -327,11 +335,17 @@ void UDPTLSessionImpl::setCookies(const AsteriskSCF::Media::V1::SessionCookieDic
/**
* Support for the corresponding API call.
*/
-void UDPTLSessionImpl::setCookies(const AsteriskSCF::Media::V1::SessionCookies& cookies,
- const Ice::Current&)
+void UDPTLSessionImpl::setCookies(const AsteriskSCF::System::V1::OperationContextPtr& context,
+ const AsteriskSCF::Media::V1::SessionCookies& cookies,
+ const Ice::Current&)
{
{ // scope the lock
boost::unique_lock<boost::shared_mutex> lock(mLock);
+ if (!mOperationContextCache->addOperationContext(context))
+ {
+ // retry detected
+ return;
+ }
for (AsteriskSCF::Media::V1::SessionCookies::const_iterator i = cookies.begin();
i != cookies.end(); ++i)
{
@@ -376,11 +390,17 @@ void UDPTLSessionImpl::getCookies_async(
/**
* Implementation of the corresponding API call.
*/
-void UDPTLSessionImpl::removeCookies(const AsteriskSCF::Media::V1::SessionCookies& cookies,
- const Ice::Current&)
+void UDPTLSessionImpl::removeCookies(const AsteriskSCF::System::V1::OperationContextPtr& context,
+ const AsteriskSCF::Media::V1::SessionCookies& cookies,
+ const Ice::Current&)
{
{ // scope the lock
boost::unique_lock<boost::shared_mutex> lock(mLock);
+ if (!mOperationContextCache->addOperationContext(context))
+ {
+ // retry detected
+ return;
+ }
for (AsteriskSCF::Media::V1::SessionCookies::const_iterator i = cookies.begin();
i != cookies.end(); ++i)
{
@@ -470,7 +490,7 @@ void UDPTLSessionImpl::replicateState(const UdptlSessionStateItemPtr& session, c
return;
}
- mReplicationContext->getReplicator().tryOneWay()->setState(items);
+ mReplicationContext->getReplicator().tryOneWay()->setState(AsteriskSCF::Operations::createContext(), items);
}
/**
@@ -507,7 +527,7 @@ void UDPTLSessionImpl::removeState(const UdptlSessionStateItemPtr& session, cons
return;
}
- mReplicationContext->getReplicator().tryOneWay()->removeState(items);
+ mReplicationContext->getReplicator().tryOneWay()->removeState(AsteriskSCF::Operations::createContext(), items);
}
UDPTLSessionPrx UDPTLSessionImpl::activate(const string& id)
diff --git a/src/UDPTLSink.cpp b/src/UDPTLSink.cpp
index 319e530..84bb021 100644
--- a/src/UDPTLSink.cpp
+++ b/src/UDPTLSink.cpp
@@ -1,7 +1,7 @@
/*
* Asterisk SCF -- An open-source communications framework.
*
- * Copyright (C) 2011, Digium, Inc.
+ * Copyright (C) 2011-2012, Digium, Inc.
*
* See http://www.asterisk.org for more information about
* the Asterisk SCF project. Please do not directly contact
@@ -20,22 +20,26 @@
#include <pjlib.h>
#include <pjmedia.h>
+#include <boost/thread.hpp>
+
#include <Ice/Ice.h>
#include <IceUtil/UUID.h>
#include <AsteriskSCF/Media/MediaIf.h>
#include <AsteriskSCF/Media/UDPTL/MediaUDPTLIf.h>
#include <AsteriskSCF/System/Component/ReplicaIf.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
#include "udptl.h"
-using namespace std;
using namespace AsteriskSCF::Core::Discovery::V1;
-using namespace AsteriskSCF::Media::V1;
using namespace AsteriskSCF::Media::UDPTL::V1;
+using namespace AsteriskSCF::Media::V1;
using namespace AsteriskSCF::Network::V1;
-using namespace AsteriskSCF::UDPTL;
+using namespace AsteriskSCF::Operations;
using namespace AsteriskSCF::Replication::UDPTL::V1;
+using namespace AsteriskSCF::UDPTL;
+using namespace std;
/**
* Private implementation details for the StreamSinkUDPTLImpl class.
@@ -46,11 +50,15 @@ public:
/**
* Constructor for our StreamSinkUDPTLImplPriv class.
*/
- StreamSinkUDPTLImplPriv(const SessionAdapterPtr& sessionAdapter,
+ StreamSinkUDPTLImplPriv(const SessionAdapterPtr& sessionAdapter,
const PJMediaTransportPtr& transport,
const std::string&,
struct udptl *udptl);
+ boost::shared_mutex mMutex;
+
+ OperationContextCachePtr mOperationContextCache;
+
/**
* A pointer to the UDPTL session we are associated with.
*/
@@ -80,13 +88,14 @@ public:
/**
* Constructor for the StreamSinkUDPTLImplPriv class.
*/
-StreamSinkUDPTLImplPriv::StreamSinkUDPTLImplPriv(const SessionAdapterPtr& session,
+StreamSinkUDPTLImplPriv::StreamSinkUDPTLImplPriv(const SessionAdapterPtr& session,
const PJMediaTransportPtr& transport,
const string& sessionId,
struct udptl *udptl) :
+ mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
mSessionAdapter(session),
mTransport(transport),
- mSinkStateItem(new UdptlStreamSinkStateItem),
+ mSinkStateItem(new UdptlStreamSinkStateItem),
mSessionId(sessionId),
mUdptl(udptl)
{
@@ -111,6 +120,7 @@ StreamSinkUDPTLImpl::StreamSinkUDPTLImpl(const SessionAdapterPtr& session,
*/
void StreamSinkUDPTLImpl::write(const AsteriskSCF::Media::V1::FrameSeq& frames, const Ice::Current&)
{
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
// Don't even bother if no remote address information is present
if (mImpl->mSinkStateItem->remoteAddress.empty() || !mImpl->mSinkStateItem->remotePort)
{
@@ -139,8 +149,10 @@ void StreamSinkUDPTLImpl::write(const AsteriskSCF::Media::V1::FrameSeq& frames,
/**
* Implementation of the setSource method as defined in MediaIf.ice
*/
-void StreamSinkUDPTLImpl::setSource(const AsteriskSCF::Media::V1::StreamSourcePrx& source, const Ice::Current&)
+void StreamSinkUDPTLImpl::setSource(const AsteriskSCF::System::V1::OperationContextPtr& context, const AsteriskSCF::Media::V1::StreamSourcePrx& source, const Ice::Current&)
{
+ // multiple sets naturally idempotent
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
mImpl->mSinkStateItem->source = source;
mImpl->mSessionAdapter->replicateState(mImpl->mSinkStateItem);
@@ -151,6 +163,7 @@ void StreamSinkUDPTLImpl::setSource(const AsteriskSCF::Media::V1::StreamSourcePr
*/
AsteriskSCF::Media::V1::StreamSourcePrx StreamSinkUDPTLImpl::getSource(const Ice::Current&)
{
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
return mImpl->mSinkStateItem->source;
}
@@ -159,6 +172,7 @@ AsteriskSCF::Media::V1::StreamSourcePrx StreamSinkUDPTLImpl::getSource(const Ice
*/
AsteriskSCF::Media::V1::FormatSeq StreamSinkUDPTLImpl::getFormats(const Ice::Current&)
{
+ // no access to local state - no need to lock
FormatSeq formats;
return formats;
}
@@ -168,6 +182,7 @@ AsteriskSCF::Media::V1::FormatSeq StreamSinkUDPTLImpl::getFormats(const Ice::Cur
*/
std::string StreamSinkUDPTLImpl::getId(const Ice::Current&)
{
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
/* For now utilize the id of the session */
return mImpl->mSessionId;
}
@@ -175,8 +190,17 @@ std::string StreamSinkUDPTLImpl::getId(const Ice::Current&)
/**
* Implementation of the setRemoteDetails method as defined in NetworkIf.ice
*/
-void StreamSinkUDPTLImpl::setRemoteDetails(const string& address, Ice::Int port, const Ice::Current&)
+void StreamSinkUDPTLImpl::setRemoteDetails(
+ const AsteriskSCF::System::V1::OperationContextPtr& context,
+ const string& address, Ice::Int port,
+ const Ice::Current&)
{
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
+ if (!mImpl->mOperationContextCache->addOperationContext(context))
+ {
+ // retry detected
+ return;
+ }
mImpl->mSessionAdapter->setRemoteDetails(address, port);
mImpl->mSinkStateItem->remoteAddress = address;
@@ -190,14 +214,21 @@ void StreamSinkUDPTLImpl::setRemoteDetails(const string& address, Ice::Int port,
*/
AddressInformation StreamSinkUDPTLImpl::getRemoteDetails(const Ice::Current&)
{
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
return AddressInformation(mImpl->mSinkStateItem->remoteAddress, mImpl->mSinkStateItem->remotePort);
}
/**
* Implemnentation of the setFarMaxDatagram method as defined in MediaUDPTLIf.ice
*/
-void StreamSinkUDPTLImpl::setFarMaxDatagram(int datagram, const Ice::Current&)
+void StreamSinkUDPTLImpl::setFarMaxDatagram(const AsteriskSCF::System::V1::OperationContextPtr& context, int datagram, const Ice::Current&)
{
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
+ if (!mImpl->mOperationContextCache->addOperationContext(context))
+ {
+ // retry detected
+ return;
+ }
udptl_set_far_max_datagram(mImpl->mUdptl, datagram);
mImpl->mSinkStateItem->farMaxDatagram = datagram;
@@ -209,6 +240,7 @@ void StreamSinkUDPTLImpl::setFarMaxDatagram(int datagram, const Ice::Current&)
*/
Ice::Int StreamSinkUDPTLImpl::getFarMaxDatagram(const Ice::Current&)
{
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
return udptl_get_far_max_datagram(mImpl->mUdptl);
}
@@ -217,14 +249,22 @@ Ice::Int StreamSinkUDPTLImpl::getFarMaxDatagram(const Ice::Current&)
*/
Ice::Int StreamSinkUDPTLImpl::getFarMaxIFP(const Ice::Current&)
{
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
return udptl_get_far_max_ifp(mImpl->mUdptl);
}
/**
* Implemnentation of the setErrorCorrectionScheme method as defined in MediaUDPTLIf.ice
*/
-void StreamSinkUDPTLImpl::setErrorCorrectionScheme(ErrorCorrectionScheme scheme, const Ice::Current&)
+void StreamSinkUDPTLImpl::setErrorCorrectionScheme(
+ const AsteriskSCF::System::V1::OperationContextPtr& context, ErrorCorrectionScheme scheme, const Ice::Current&)
{
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
+ if (!mImpl->mOperationContextCache->addOperationContext(context))
+ {
+ // retry detected
+ return;
+ }
enum t38_ec_modes mode;
if (scheme == FEC)
@@ -251,6 +291,7 @@ void StreamSinkUDPTLImpl::setErrorCorrectionScheme(ErrorCorrectionScheme scheme,
*/
ErrorCorrectionScheme StreamSinkUDPTLImpl::getErrorCorrectionScheme(const Ice::Current&)
{
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
enum t38_ec_modes mode = udptl_get_error_correction_scheme(mImpl->mUdptl);
if (mode == UDPTL_ERROR_CORRECTION_FEC)
@@ -272,27 +313,32 @@ ErrorCorrectionScheme StreamSinkUDPTLImpl::getErrorCorrectionScheme(const Ice::C
*/
UdptlStreamSinkStateItemPtr StreamSinkUDPTLImpl::getStateItem()
{
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
return mImpl->mSinkStateItem;
}
void StreamSinkUDPTLImpl::setRemoteDetailsImpl(const std::string& host, Ice::Int port)
{
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
mImpl->mSinkStateItem->remoteAddress = host;
mImpl->mSinkStateItem->remotePort = port;
}
void StreamSinkUDPTLImpl::setSourceImpl(const AsteriskSCF::Media::V1::StreamSourcePrx& proxy)
{
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
mImpl->mSinkStateItem->source = proxy;
}
void StreamSinkUDPTLImpl::setFarMaxDatagramImpl(int datagram)
{
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
udptl_set_far_max_datagram(mImpl->mUdptl, datagram);
}
void StreamSinkUDPTLImpl::setErrorCorrectionSchemeImpl(ErrorCorrectionScheme scheme)
{
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
enum t38_ec_modes mode;
if (scheme == FEC)
@@ -309,4 +355,4 @@ void StreamSinkUDPTLImpl::setErrorCorrectionSchemeImpl(ErrorCorrectionScheme sch
}
udptl_set_error_correction_scheme(mImpl->mUdptl, mode);
-}
+}
diff --git a/src/UDPTLSink.h b/src/UDPTLSink.h
index 4357106..7bee459 100644
--- a/src/UDPTLSink.h
+++ b/src/UDPTLSink.h
@@ -35,17 +35,17 @@ public:
* AsteriskSCF::Media::UDPTL::V1::StreamSinkUDPTL implementation.
*/
void write(const AsteriskSCF::Media::V1::FrameSeq&, const Ice::Current&);
- void setSource(const AsteriskSCF::Media::V1::StreamSourcePrx&, const Ice::Current&);
+ void setSource(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Media::V1::StreamSourcePrx&, const Ice::Current&);
AsteriskSCF::Media::V1::StreamSourcePrx getSource(const Ice::Current&);
AsteriskSCF::Media::V1::FormatSeq getFormats(const Ice::Current&);
std::string getId(const Ice::Current&);
- void setRemoteDetails(const std::string&, Ice::Int, const Ice::Current&);
+ void setRemoteDetails(const AsteriskSCF::System::V1::OperationContextPtr&, const std::string&, Ice::Int, const Ice::Current&);
AsteriskSCF::Network::V1::AddressInformation getRemoteDetails(const Ice::Current&);
- void setFarMaxDatagram(Ice::Int, const Ice::Current&);
+ void setFarMaxDatagram(const AsteriskSCF::System::V1::OperationContextPtr&, Ice::Int, const Ice::Current&);
Ice::Int getFarMaxDatagram(const Ice::Current&);
Ice::Int getFarMaxIFP(const Ice::Current&);
- void setErrorCorrectionScheme(AsteriskSCF::Media::UDPTL::V1::ErrorCorrectionScheme, const Ice::Current&);
+ void setErrorCorrectionScheme(const AsteriskSCF::System::V1::OperationContextPtr&, AsteriskSCF::Media::UDPTL::V1::ErrorCorrectionScheme, const Ice::Current&);
AsteriskSCF::Media::UDPTL::V1::ErrorCorrectionScheme getErrorCorrectionScheme(const Ice::Current&);
/**
diff --git a/src/UDPTLSource.cpp b/src/UDPTLSource.cpp
index 44ce10c..a1961bc 100644
--- a/src/UDPTLSource.cpp
+++ b/src/UDPTLSource.cpp
@@ -121,8 +121,9 @@ StreamSourceUDPTLImpl::StreamSourceUDPTLImpl(const SessionAdapterPtr& session,
/**
* Implementation of the addSink method as defined in MediaIf.ice
*/
-void StreamSourceUDPTLImpl::addSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
+void StreamSourceUDPTLImpl::addSink(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
{
+ // naturally idempotent - no extra retry logic needed
boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
// Do not allow the same sink to be added multiple times
@@ -142,8 +143,9 @@ void StreamSourceUDPTLImpl::addSink(const AsteriskSCF::Media::V1::StreamSinkPrx&
/**
* Implementation of the removeSink method as defined in MediaIf.ice
*/
-void StreamSourceUDPTLImpl::removeSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
+void StreamSourceUDPTLImpl::removeSink(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
{
+ // naturally idempotent - no extra retry logic needed
boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
mImpl->mSourceStateItem->sinks.erase(std::remove(mImpl->mSourceStateItem->sinks.begin(),
@@ -182,8 +184,9 @@ std::string StreamSourceUDPTLImpl::getId(const Ice::Current&)
/**
* Implementation of the requestMethod method as defined in MediaIf.ice
*/
-void StreamSourceUDPTLImpl::requestFormat(const AsteriskSCF::Media::V1::FormatPtr&, const Ice::Current&)
+void StreamSourceUDPTLImpl::requestFormat(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Media::V1::FormatPtr&, const Ice::Current&)
{
+ // naturally idempotent - no extra retry logic needed
// We do not currently support switching formats.
throw MediaFormatSwitchException();
}
diff --git a/src/UDPTLSource.h b/src/UDPTLSource.h
index fede4a7..f17c98d 100644
--- a/src/UDPTLSource.h
+++ b/src/UDPTLSource.h
@@ -31,12 +31,12 @@ public:
const std::string& parentSessionId,
struct udptl *udptl);
- void addSink(const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
- void removeSink(const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
+ void addSink(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
+ void removeSink(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
AsteriskSCF::Media::V1::StreamSinkSeq getSinks(const Ice::Current&);
AsteriskSCF::Media::V1::FormatSeq getFormats(const Ice::Current&);
std::string getId(const Ice::Current&);
- void requestFormat(const AsteriskSCF::Media::V1::FormatPtr&, const Ice::Current&);
+ void requestFormat(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Media::V1::FormatPtr&, const Ice::Current&);
AsteriskSCF::Network::V1::AddressInformation getLocalDetails(const Ice::Current&);
diff --git a/src/UdptlStateReplicator.h b/src/UdptlStateReplicator.h
index 845572a..cf665fb 100644
--- a/src/UdptlStateReplicator.h
+++ b/src/UdptlStateReplicator.h
@@ -47,8 +47,8 @@ public:
const UdptlReplicationContextPtr& replicationContext,
const ConfigurationServiceImplPtr&);
~UdptlStateReplicatorListenerI();
- void stateRemoved(const Ice::StringSeq&, const Ice::Current&);
- void stateSet(const AsteriskSCF::Replication::UDPTL::V1::UdptlStateItemSeq&, const Ice::Current&);
+ void stateRemoved(const AsteriskSCF::System::V1::OperationContextPtr&, const Ice::StringSeq&, const Ice::Current&);
+ void stateSet(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Replication::UDPTL::V1::UdptlStateItemSeq&, const Ice::Current&);
bool operator==(const UdptlStateReplicatorListenerI &rhs);
private:
boost::shared_ptr<UdptlStateReplicatorListenerImpl> mImpl;
diff --git a/src/UdptlStateReplicatorListener.cpp b/src/UdptlStateReplicatorListener.cpp
index 5c34b68..82aed82 100644
--- a/src/UdptlStateReplicatorListener.cpp
+++ b/src/UdptlStateReplicatorListener.cpp
@@ -20,15 +20,18 @@
#include <boost/shared_ptr.hpp>
#include <AsteriskSCF/System/Component/ReplicaIf.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
#include "UdptlStateReplicator.h"
#include "ReplicationAdapter.h"
#include "UDPTLSession.h"
-using namespace std;
using namespace AsteriskSCF::Media::UDPTL::V1;
-using namespace AsteriskSCF::UDPTL;
+using namespace AsteriskSCF::Operations;
using namespace AsteriskSCF::Replication::UDPTL::V1;
+using namespace AsteriskSCF::System::V1;
+using namespace AsteriskSCF::UDPTL;
+using namespace std;
class UdptlStateReplicatorItem
{
@@ -36,7 +39,19 @@ public:
~UdptlStateReplicatorItem()
{
- mSession->destroy();
+ if (mSession)
+ {
+ try
+ {
+ mSession->destroy();
+ }
+ catch (...)
+ {
+ //
+ // Can't do much in a destructor!
+ //
+ }
+ }
}
//
@@ -61,93 +76,108 @@ struct UdptlStateReplicatorListenerImpl
{
public:
UdptlStateReplicatorListenerImpl(const Ice::ObjectAdapterPtr& adapter,
- const PJMediaEnvironmentPtr& env,
+ const PJMediaEnvironmentPtr& env,
const UdptlGeneralStateItemPtr& generalState,
const UdptlReplicationContextPtr& replicationContext,
const ConfigurationServiceImplPtr& configurationService)
- : mId(IceUtil::generateUUID()),
- mAdapter(adapter),
+ : mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
+ mId(IceUtil::generateUUID()),
+ mAdapter(adapter),
mEnvironment(env),
mGeneralState(generalState),
mReplicationContext(replicationContext),
mConfigurationService(configurationService) {}
-
- void removeStateNoticeImpl(const Ice::StringSeq& itemKeys)
+
+ void removeStateNoticeImpl(const AsteriskSCF::System::V1::OperationContextPtr& context, const Ice::StringSeq& itemKeys)
{
+ boost::mutex::scoped_lock lock(mMutex);
+ if (!mOperationContextCache->addOperationContext(context))
+ {
+ // retry detected
+ return;
+ }
for (Ice::StringSeq::const_iterator key = itemKeys.begin(); key != itemKeys.end(); ++key)
{
// Just erasing this from the map will cause the destructor to actually shut things down
mStateItems.erase((*key));
}
}
-
- void setStateNoticeImpl(const UdptlStateItemSeq& items)
- {
- class visitor : public AsteriskSCF::Replication::UDPTL::V1::UdptlStateItemVisitor
- {
- public:
- visitor(UdptlStateReplicatorListenerImpl *impl) : mImpl(impl)
- {
- }
- private:
- UdptlStateReplicatorListenerImpl *mImpl;
+ void setStateNoticeImpl(const AsteriskSCF::System::V1::OperationContextPtr& context, const UdptlStateItemSeq& items)
+ {
+ boost::mutex::scoped_lock lock(mMutex);
- void visitUdptlGeneralStateItem(const UdptlGeneralStateItemPtr &item)
+ if (!mOperationContextCache->addOperationContext(context))
{
- mImpl->mGeneralState->serviceManagement = item->serviceManagement;
+ // retry detected
+ return;
}
-
- void visitUdptlSessionStateItem(const UdptlSessionStateItemPtr &item)
- {
- map<string, boost::shared_ptr<UdptlStateReplicatorItem> >::iterator i = mImpl->mStateItems.find(item->sessionId);
- boost::shared_ptr<UdptlStateReplicatorItem> localitem;
- if (i == mImpl->mStateItems.end())
+ class visitor : public AsteriskSCF::Replication::UDPTL::V1::UdptlStateItemVisitor
+ {
+ public:
+ visitor(UdptlStateReplicatorListenerImpl *impl) : mImpl(impl)
{
- boost::shared_ptr<UdptlStateReplicatorItem> newitem(new UdptlStateReplicatorItem());
- localitem = newitem;
- mImpl->mStateItems.insert(make_pair(item->sessionId, newitem));
-
- localitem->setSession(
- AsteriskSCF::UDPTL::UDPTLSession::create(mImpl->mAdapter, mImpl->mEnvironment, item,
- mImpl->mReplicationContext, mImpl->mConfigurationService));
}
- else
+
+ private:
+ UdptlStateReplicatorListenerImpl *mImpl;
+
+ void visitUdptlGeneralStateItem(const UdptlGeneralStateItemPtr &item)
{
- localitem = i->second;
+ mImpl->mGeneralState->serviceManagement = item->serviceManagement;
}
-
- //
- // TODO: This appears to happen in testing on occasion. Should verify if this should be
- // expected.
- //
- if (localitem)
+
+ void visitUdptlSessionStateItem(const UdptlSessionStateItemPtr &item)
{
- localitem->getSession()->update(item);
+ map<string, boost::shared_ptr<UdptlStateReplicatorItem> >::iterator i = mImpl->mStateItems.find(item->sessionId);
+ boost::shared_ptr<UdptlStateReplicatorItem> localitem;
+
+ if (i == mImpl->mStateItems.end())
+ {
+ boost::shared_ptr<UdptlStateReplicatorItem> newitem(new UdptlStateReplicatorItem());
+ localitem = newitem;
+ mImpl->mStateItems.insert(make_pair(item->sessionId, newitem));
+
+ localitem->setSession(
+ AsteriskSCF::UDPTL::UDPTLSession::create(mImpl->mAdapter, mImpl->mEnvironment, item,
+ mImpl->mReplicationContext, mImpl->mConfigurationService));
+ }
+ else
+ {
+ localitem = i->second;
+ }
+
+ //
+ // TODO: This appears to happen in testing on occasion. Should verify if this should be
+ // expected.
+ //
+ if (localitem)
+ {
+ localitem->getSession()->update(item);
+ }
}
- }
-
- void visitUdptlStreamSinkStateItem(const UdptlStreamSinkStateItemPtr &item)
- {
- map<string, boost::shared_ptr<UdptlStateReplicatorItem> >::iterator i =
- mImpl->mStateItems.find(item->sessionId);
- if (i != mImpl->mStateItems.end())
+
+ void visitUdptlStreamSinkStateItem(const UdptlStreamSinkStateItemPtr &item)
{
- i->second->getSession()->update(item);
- }
- }
-
- void visitUdptlStreamSourceStateItem(const UdptlStreamSourceStateItemPtr &item)
- {
- map<string, boost::shared_ptr<UdptlStateReplicatorItem> >::iterator i =
+ map<string, boost::shared_ptr<UdptlStateReplicatorItem> >::iterator i =
mImpl->mStateItems.find(item->sessionId);
- if (i != mImpl->mStateItems.end())
+ if (i != mImpl->mStateItems.end())
+ {
+ i->second->getSession()->update(item);
+ }
+ }
+
+ void visitUdptlStreamSourceStateItem(const UdptlStreamSourceStateItemPtr &item)
{
- i->second->getSession()->update(item);
+ map<string, boost::shared_ptr<UdptlStateReplicatorItem> >::iterator i =
+ mImpl->mStateItems.find(item->sessionId);
+ if (i != mImpl->mStateItems.end())
+ {
+ i->second->getSession()->update(item);
+ }
}
- }
- };
+ };
AsteriskSCF::Replication::UDPTL::V1::UdptlStateItemVisitorPtr v = new visitor(this);
@@ -157,7 +187,9 @@ public:
}
}
- string mId;
+ boost::mutex mMutex;
+ OperationContextCachePtr mOperationContextCache;
+ const string mId;
map<string, boost::shared_ptr<UdptlStateReplicatorItem> > mStateItems;
Ice::ObjectAdapterPtr mAdapter;
PJMediaEnvironmentPtr mEnvironment;
@@ -177,17 +209,18 @@ UdptlStateReplicatorListenerI::~UdptlStateReplicatorListenerI()
{
}
-void UdptlStateReplicatorListenerI::stateRemoved(const Ice::StringSeq& itemKeys, const Ice::Current&)
+void UdptlStateReplicatorListenerI::stateRemoved(const AsteriskSCF::System::V1::OperationContextPtr& context, const Ice::StringSeq& itemKeys, const Ice::Current&)
{
- mImpl->removeStateNoticeImpl(itemKeys);
+ mImpl->removeStateNoticeImpl(context, itemKeys);
}
-void UdptlStateReplicatorListenerI::stateSet(const UdptlStateItemSeq& items, const Ice::Current&)
+void UdptlStateReplicatorListenerI::stateSet(const AsteriskSCF::System::V1::OperationContextPtr& context, const UdptlStateItemSeq& items, const Ice::Current&)
{
- mImpl->setStateNoticeImpl(items);
+ mImpl->setStateNoticeImpl(context, items);
}
bool UdptlStateReplicatorListenerI::operator==(const UdptlStateReplicatorListenerI &rhs)
{
+ // reading immutable variables; no lock needed
return mImpl->mId == rhs.mImpl->mId;
}
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 5644864..12a6e45 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -5,6 +5,7 @@ astscf_component_add_slices(MediaTransportUDPTLTest PROJECT AsteriskSCF/Replicat
astscf_component_add_boost_libraries(MediaTransportUDPTLTest unit_test_framework thread date_time)
astscf_component_add_slice_collection_libraries(MediaTransportUDPTLTest ASTSCF)
astscf_component_build_icebox(MediaTransportUDPTLTest)
+target_link_libraries(MediaTransportUDPTLTest ASTSCFIceUtilCpp)
astscf_component_init(MediaTransportUDPTLTestV6)
astscf_component_add_files(MediaTransportUDPTLTestV6 TestMediaTransportUDPTL.cpp)
@@ -13,6 +14,7 @@ astscf_component_add_boost_libraries(MediaTransportUDPTLTestV6 unit_test_framewo
astscf_component_add_slice_collection_libraries(MediaTransportUDPTLTestV6 ASTSCF)
astscf_component_build_icebox(MediaTransportUDPTLTestV6)
set_property(TARGET MediaTransportUDPTLTestV6 PROPERTY COMPILE_DEFINITIONS IPV6Test)
+target_link_libraries(MediaTransportUDPTLTestV6 ASTSCFIceUtilCpp)
astscf_component_init(MediaTransportUDPTLIceTest)
astscf_component_add_files(MediaTransportUDPTLIceTest TestUDPTLICE.cpp)
@@ -21,6 +23,8 @@ astscf_component_add_slices(MediaTransportUDPTLIceTest PROJECT AsteriskSCF/Confi
astscf_component_add_boost_libraries(MediaTransportUDPTLIceTest unit_test_framework thread date_time)
astscf_component_add_slice_collection_libraries(MediaTransportUDPTLIceTest ASTSCF)
astscf_component_build_icebox(MediaTransportUDPTLIceTest)
+target_link_libraries(MediaTransportUDPTLIceTest ASTSCFIceUtilCpp)
+
pjproject_link(MediaTransportUDPTLIceTest pjlib)
astscf_test_icebox(MediaTransportUDPTLTest config/test_component.config)
diff --git a/test/TestMediaTransportUDPTL.cpp b/test/TestMediaTransportUDPTL.cpp
index 4e6bba9..ba58567 100644
--- a/test/TestMediaTransportUDPTL.cpp
+++ b/test/TestMediaTransportUDPTL.cpp
@@ -35,6 +35,8 @@
#include <AsteriskSCF/Media/MediaIf.h>
#include <AsteriskSCF/Media/UDPTL/MediaUDPTLIf.h>
#include <AsteriskSCF/Media/NetworkIf.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/System/Component/ReplicaIf.h>
#include "UdptlStateReplicationIf.h"
@@ -45,6 +47,7 @@ using namespace AsteriskSCF::Media::V1;
using namespace AsteriskSCF::Media::UDPTL::V1;
using namespace AsteriskSCF::Replication::UDPTL::V1;
using namespace AsteriskSCF::Network::V1;
+using namespace AsteriskSCF::System::Component::V1;
/**
* Test service, for loading into icebox
@@ -70,8 +73,8 @@ static ArgCacheType mCachedArgs;
class TestUdptlReplicatorListener : public UdptlStateReplicatorListener
{
public:
- void stateRemoved(const Ice::StringSeq&, const Ice::Current&);
- void stateSet(const UdptlStateItemSeq&, const Ice::Current&);
+ void stateRemoved(const AsteriskSCF::System::V1::OperationContextPtr&, const Ice::StringSeq&, const Ice::Current&);
+ void stateSet(const AsteriskSCF::System::V1::OperationContextPtr&, const UdptlStateItemSeq&, const Ice::Current&);
UdptlGeneralStateItemPtr mGeneral;
UdptlSessionStateItemPtr mSession;
@@ -145,11 +148,13 @@ public:
};
static SharedTestData Testbed;
-void TestUdptlReplicatorListener::stateRemoved(const Ice::StringSeq&, const Ice::Current&)
+void TestUdptlReplicatorListener::stateRemoved(const AsteriskSCF::System::V1::OperationContextPtr&,
+ const Ice::StringSeq&, const Ice::Current&)
{
}
-void TestUdptlReplicatorListener::stateSet(const UdptlStateItemSeq& items, const Ice::Current&)
+void TestUdptlReplicatorListener::stateSet(const AsteriskSCF::System::V1::OperationContextPtr&,
+ const UdptlStateItemSeq& items, const Ice::Current&)
{
class visitor : public AsteriskSCF::Replication::UDPTL::V1::UdptlStateItemVisitor
{
@@ -215,7 +220,8 @@ public:
/**
* Implementation of the setSource method which just stores the source away for later retrieval.
*/
- void setSource(const AsteriskSCF::Media::V1::StreamSourcePrx& source, const Ice::Current&)
+ void setSource(const AsteriskSCF::System::V1::OperationContextPtr&,
+ const AsteriskSCF::Media::V1::StreamSourcePrx& source, const Ice::Current&)
{
mSource = source;
}
@@ -283,9 +289,27 @@ struct GlobalIceFixture
Testbed.locator = ServiceLocatorPrx::checkedCast(Testbed.communicator->stringToProxy("LocatorService:tcp -h 127.0.0.1 -p 4411"));
- if (!Testbed.locator) {
+ if (!Testbed.locator)
+ {
throw "Invalid service locator proxy";
}
+
+ ServiceLocatorParamsPtr primaryReplicaParams(new ServiceLocatorParams);
+ primaryReplicaParams->category = "MediaUDPTLService.Replica";
+ primaryReplicaParams->service = "default";
+ primaryReplicaParams->id = "MediaTransportUDPTL";
+
+ ReplicaPrx mPrimaryReplica;
+ try
+ {
+ mPrimaryReplica = ReplicaPrx::uncheckedCast(Testbed.locator->locate(primaryReplicaParams));
+ }
+ catch(const AsteriskSCF::Core::Discovery::V1::ServiceNotFound&)
+ {
+ throw;
+ }
+
+ mPrimaryReplica->activate(AsteriskSCF::Operations::createContext());
}
catch (const Ice::Exception& ex)
{
@@ -378,7 +402,7 @@ BOOST_AUTO_TEST_CASE(AddListenertoStateReplicator)
try
{
- Testbed.mStateReplicator->addListener(Testbed.mListenerProxy);
+ Testbed.mStateReplicator->addListener(AsteriskSCF::Operations::createContext(), Testbed.mListenerProxy);
added = true;
}
@@ -424,7 +448,7 @@ BOOST_AUTO_TEST_CASE(AllocateUDPTLSession)
params->category = "udptl";
params->service = "";
#ifdef IPV6_TEST
- params->ipv6 = true;
+ params->ipv6 = true;
#endif
UDPTLMediaServicePrx service = UDPTLMediaServicePrx::uncheckedCast(Testbed.locator->locate(params));
@@ -432,7 +456,7 @@ BOOST_AUTO_TEST_CASE(AllocateUDPTLSession)
// You might think "geez, this should deadlock due to state replication" but no, we use one ways for that
boost::mutex::scoped_lock lock(Testbed.mLock);
- Testbed.session = service->allocate(params);
+ Testbed.session = service->allocate(AsteriskSCF::Operations::createContext(), params);
// Give the RTP component time to replicate this session
Testbed.mCondition.wait(lock);
@@ -699,11 +723,11 @@ BOOST_AUTO_TEST_CASE(ConfirmRemoteUDPTLAddressSetting)
StreamSinkUDPTLPrx sink = StreamSinkUDPTLPrx::checkedCast((*i));
#ifdef IPV6_TEST
- std::string address = "::1";
+ std::string address = "::1";
#else
- std::string address = "127.0.0.1";
+ std::string address = "127.0.0.1";
#endif
- sink->setRemoteDetails(address, 10000);
+ sink->setRemoteDetails(AsteriskSCF::Operations::createContext(), address, 10000);
AddressInformation info = sink->getRemoteDetails();
if (info.ipAddress == address && info.port == 10000)
@@ -739,7 +763,7 @@ BOOST_AUTO_TEST_CASE(ConfirmSinkSetting)
{
StreamSourceUDPTLPrx source = StreamSourceUDPTLPrx::checkedCast((*i));
- source->addSink(Testbed.sink);
+ source->addSink(AsteriskSCF::Operations::createContext(), Testbed.sink);
StreamSinkPrx sink = source->getSinks().front();
@@ -783,7 +807,7 @@ BOOST_AUTO_TEST_CASE(SetupUDPTLLoopback)
boost::mutex::scoped_lock lock(Testbed.mLock);
- sink->setRemoteDetails(info.ipAddress, info.port);
+ sink->setRemoteDetails(AsteriskSCF::Operations::createContext(), info.ipAddress, info.port);
looped = true;
@@ -815,7 +839,7 @@ BOOST_AUTO_TEST_CASE(SetFarMaxDatagram)
boost::mutex::scoped_lock lock(Testbed.mLock);
- sink->setFarMaxDatagram(1000);
+ sink->setFarMaxDatagram(AsteriskSCF::Operations::createContext(), 1000);
set = true;
@@ -847,7 +871,7 @@ BOOST_AUTO_TEST_CASE(SetErrorCorrectionScheme)
boost::mutex::scoped_lock lock(Testbed.mLock);
- sink->setErrorCorrectionScheme(AsteriskSCF::Media::UDPTL::V1::REDUNDANCY);
+ sink->setErrorCorrectionScheme(AsteriskSCF::Operations::createContext(), AsteriskSCF::Media::UDPTL::V1::REDUNDANCY);
set = true;
@@ -904,16 +928,16 @@ BOOST_AUTO_TEST_CASE(SetInvalidAddressFamilyAddress)
{
StreamSinkUDPTLPrx sink = StreamSinkUDPTLPrx::checkedCast((*i));
- /* No, these are not accidentally reversed. We want to try to set an IPv4 address
- * on an IPv6 sink and vice versa to make sure that an exception does get thrown.
- */
+ /* No, these are not accidentally reversed. We want to try to set an IPv4 address
+ * on an IPv6 sink and vice versa to make sure that an exception does get thrown.
+ */
#ifdef IPV6_TEST
- sink->setRemoteDetails("127.0.0.1", 10000);
+ sink->setRemoteDetails(AsteriskSCF::Operations::createContext(), "127.0.0.1", 10000);
#else
- sink->setRemoteDetails("::1", 10000);
+ sink->setRemoteDetails(AsteriskSCF::Operations::createContext(), "::1", 10000);
#endif
- set = true;
+ set = true;
}
}
catch (const Ice::Exception &e)
@@ -1013,6 +1037,77 @@ BOOST_AUTO_TEST_CASE(ReleaseUDPTLSession)
BOOST_CHECK(released);
}
+/**
+ * Ensure that multiple calls to allocate return different proxies.
+ */
+BOOST_AUTO_TEST_CASE(AllocateDoubleCalls)
+{
+ UDPTLServiceLocatorParamsPtr params = new UDPTLServiceLocatorParams();
+ params->category = "udptl";
+ params->service = "";
+
+ UDPTLMediaServicePrx service = UDPTLMediaServicePrx::uncheckedCast(Testbed.locator->locate(params));
+
+ UDPTLSessionPrx expected = service->allocate(AsteriskSCF::Operations::createContext(), params);
+ UDPTLSessionPrx actual = service->allocate(AsteriskSCF::Operations::createContext(), params);
+
+ BOOST_REQUIRE_NE(expected, actual);
+}
+
+/**
+ * Test retry logic in ::allocate
+ */
+BOOST_AUTO_TEST_CASE(AllocateRetry)
+{
+ UDPTLServiceLocatorParamsPtr params = new UDPTLServiceLocatorParams();
+ params->category = "udptl";
+ params->service = "";
+
+ UDPTLMediaServicePrx service = UDPTLMediaServicePrx::uncheckedCast(Testbed.locator->locate(params));
+ AsteriskSCF::System::V1::OperationContextPtr context = AsteriskSCF::Operations::createContext();
+
+ UDPTLSessionPrx expected = service->allocate(context, params);
+ UDPTLSessionPrx actual = service->allocate(context, params);
+
+ BOOST_REQUIRE_EQUAL(expected, actual);
+}
+
+/**
+ * Test retry logic in ::allocate, when it throws an exception
+ */
+BOOST_AUTO_TEST_CASE(ThrowingAllocateRetry)
+{
+ UDPTLOverICEServiceLocatorParamsPtr params = new UDPTLOverICEServiceLocatorParams();
+ params->category = "udptl";
+ params->service = "";
+
+ UDPTLMediaServicePrx service = UDPTLMediaServicePrx::uncheckedCast(Testbed.locator->locate(params));
+ AsteriskSCF::System::V1::OperationContextPtr context = AsteriskSCF::Operations::createContext();
+
+ // allocating for ICE when ICE isn't enabled should throw an exception
+ params->enableICE = true;
+
+ try
+ {
+ service->allocate(context, params);
+ BOOST_FAIL("Expected allocation to throw an exception");
+ }
+ catch (const SessionAllocationFailure&)
+ {
+ // expected
+ }
+
+ try
+ {
+ service->allocate(context, params);
+ BOOST_FAIL("Expected allocation to throw an exception");
+ }
+ catch (const SessionAllocationFailure&)
+ {
+ // expected
+ }
+}
+
void UDPTLTest::start(std::string const& name,
Ice::CommunicatorPtr const&,
Ice::StringSeq const& args)
diff --git a/test/TestUDPTLICE.cpp b/test/TestUDPTLICE.cpp
index 546d9a3..596f5bf 100644
--- a/test/TestUDPTLICE.cpp
+++ b/test/TestUDPTLICE.cpp
@@ -29,10 +29,12 @@
#include <boost/thread/condition.hpp>
#include <Ice/Ice.h>
#include <Ice/BuiltinSequences.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
#include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
#include <AsteriskSCF/Media/MediaIf.h>
#include <AsteriskSCF/Media/UDPTL/MediaUDPTLIf.h>
+#include <AsteriskSCF/System/Component/ReplicaIf.h>
//
// An attempt to get some reasonable code coverage and verify that the basic *premise* of the functionality works as
@@ -48,6 +50,8 @@ using namespace AsteriskSCF::Media::UDPTL::V1;
using namespace AsteriskSCF::System::Configuration::V1;
using namespace AsteriskSCF::Replication::UDPTL::V1;
using namespace AsteriskSCF::Configuration::UDPTL::V1;
+using namespace AsteriskSCF::System::Component::V1;
+using namespace AsteriskSCF::Core::Discovery::V1;
namespace
{
@@ -55,11 +59,11 @@ namespace
class TestReplicatorListener : public UdptlStateReplicatorListener
{
public:
- void stateRemoved(const Ice::StringSeq&, const Ice::Current&)
+ void stateRemoved(const AsteriskSCF::System::V1::OperationContextPtr&, const Ice::StringSeq&, const Ice::Current&)
{
}
- void stateSet(const UdptlStateItemSeq&, const Ice::Current&)
+ void stateSet(const AsteriskSCF::System::V1::OperationContextPtr&, const UdptlStateItemSeq&, const Ice::Current&)
{
}
@@ -114,6 +118,32 @@ public:
BOOST_TEST_MESSAGE("Creating test fixture");
::boost::debug::detect_memory_leaks(false);
::boost::unit_test::unit_test_log.set_stream(cout);
+
+ IceEnvironment iceEnv;
+
+ ServiceLocatorPrx locator = ServiceLocatorPrx::checkedCast(iceEnv.getCommunicator()->stringToProxy("LocatorService:tcp -h 127.0.0.1 -p 4411"));
+
+ if (!locator)
+ {
+ throw "Invalid service locator proxy";
+ }
+
+ ServiceLocatorParamsPtr primaryReplicaParams(new ServiceLocatorParams);
+ primaryReplicaParams->category = "MediaUDPTLService.Replica";
+ primaryReplicaParams->service = "default";
+ primaryReplicaParams->id = "MediaTransportUDPTL";
+
+ ReplicaPrx mPrimaryReplica;
+ try
+ {
+ mPrimaryReplica = ReplicaPrx::uncheckedCast(locator->locate(primaryReplicaParams));
+ }
+ catch(const AsteriskSCF::Core::Discovery::V1::ServiceNotFound&)
+ {
+ throw;
+ }
+
+ mPrimaryReplica->activate(AsteriskSCF::Operations::createContext());
}
~TestFixture()
@@ -137,7 +167,22 @@ ConfigurationServicePrx locateConfigurationService(const ServiceLocatorPrx& loca
{
ServiceLocatorParamsPtr query = new ServiceLocatorParams;
query->category = ConfigurationDiscoveryCategory;
- return ConfigurationServicePrx::uncheckedCast(locator->locate(query));
+ // We need to synchronously setConfiguration, but the locate() returns the proxy to the
+ // configuration IceStorm topic, which is asynchronous. Instead, locateAll() and find the
+ // real ConfigurationServicePrx.
+ Ice::ObjectProxySeq objs = locator->locateAll(query);
+ for (Ice::ObjectProxySeq::iterator obj = objs.begin(); obj != objs.end(); ++obj)
+ {
+ try
+ {
+ return ConfigurationServicePrx::checkedCast(*obj);
+ }
+ catch (...)
+ {
+ // must be the IceStorm topic. try again
+ }
+ }
+ return 0;
}
ServiceLocatorPrx getLocator(const Ice::CommunicatorPtr& comm)
@@ -177,7 +222,7 @@ BOOST_AUTO_TEST_CASE(UdptlSessionWithICEEnabled)
ConfigurationGroupSeq s;
s.push_back(iceGroup);
- configPrx->setConfiguration(s);
+ configPrx->setConfiguration(AsteriskSCF::Operations::createContext(), s);
UDPTLMediaServicePrx servicePrx;
{
@@ -195,7 +240,7 @@ BOOST_AUTO_TEST_CASE(UdptlSessionWithICEEnabled)
query->enableICE = true;
query->enableTURN = false;
UDPTLSessionPrx sessionPrx;
- BOOST_REQUIRE_NO_THROW(sessionPrx = servicePrx->allocate(query));
+ BOOST_REQUIRE_NO_THROW(sessionPrx = servicePrx->allocate(AsteriskSCF::Operations::createContext(), query));
BOOST_REQUIRE(sessionPrx != 0);
sessionPrx->ice_ping(); // To silence unused arg warning.
sessionPrx->release();
@@ -240,14 +285,14 @@ BOOST_AUTO_TEST_CASE(UdptlSessionAllocationFailure)
iceGroup->configurationItems[UDPTLICETransportFlagsItemName] = iceFlags;
ConfigurationGroupSeq s;
s.push_back(iceGroup);
- BOOST_REQUIRE_NO_THROW(configPrx->setConfiguration(s));
+ BOOST_REQUIRE_NO_THROW(configPrx->setConfiguration(AsteriskSCF::Operations::createContext(), s));
try
{
UDPTLOverICEServiceLocatorParamsPtr query = new UDPTLOverICEServiceLocatorParams;
query->category = "udptl";
query->enableICE = true;
query->enableTURN = true;
- UDPTLSessionPrx sessionPrx = servicePrx->allocate(query);
+ UDPTLSessionPrx sessionPrx = servicePrx->allocate(AsteriskSCF::Operations::createContext(), query);
sessionPrx->ice_ping();
}
catch (const SessionAllocationFailure& ex)
-----------------------------------------------------------------------
--
asterisk-scf/release/media_transport_udptl.git
More information about the asterisk-scf-commits
mailing list