[asterisk-scf-commits] asterisk-scf/release/media_operations_core.git branch "master" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Tue May 8 17:27:45 CDT 2012
branch "master" has been updated
via 41f00c90b2bfa24ce7d8d79aa7b0a630693d4bc8 (commit)
from 9ef786e8925ddbf636727aff181f7af6c7806d35 (commit)
Summary of changes:
config/test_component.conf | 2 +
.../MediaOperationsCore/MediaOperationsCoreIf.ice | 17 +-
src/DSP.cpp | 4 +-
src/InbandTelephonyEvents.cpp | 112 ++++++++---
src/InbandTelephonyEvents.h | 3 +
src/MediaOperationFactoryImpl.h | 10 +-
src/MediaOperationStateReplicator.h | 12 +-
src/MediaOperationStateReplicatorListener.cpp | 21 ++-
src/MediaOperationsCore.cpp | 8 +-
src/TranslatorOperation.cpp | 6 +-
src/TranslatorOperationFactory.cpp | 83 ++++++---
src/TranslatorOperationFactory.h | 6 +
src/TranslatorSink.cpp | 22 ++-
src/TranslatorSink.h | 13 +-
src/TranslatorSource.cpp | 50 ++++-
src/TranslatorSource.h | 22 ++-
test/TestMediaOperations.cpp | 205 +++++++++++++------
test/TestStreamSink.cpp | 2 +-
test/TestStreamSink.h | 2 +-
test/TestStreamSource.cpp | 7 +-
test/TestStreamSource.h | 7 +-
21 files changed, 441 insertions(+), 173 deletions(-)
- Log -----------------------------------------------------------------
commit 41f00c90b2bfa24ce7d8d79aa7b0a630693d4bc8
Author: Ken Hunt <ken.hunt at digium.com>
Date: Tue May 8 11:39:36 2012 -0500
Changes for new retry logic.
diff --git a/config/test_component.conf b/config/test_component.conf
index 1305e3d..ae429d3 100644
--- a/config/test_component.conf
+++ b/config/test_component.conf
@@ -1,5 +1,6 @@
IceBox.InheritProperties=1
Ice.Warn.UnknownProperties=0
+Ice.ThreadPool.Client.Size=4
IceBox.InstanceName=IceBox
IceBox.ServiceManager.Endpoints=tcp -h 127.0.0.1 -p 10007
@@ -31,6 +32,7 @@ TopicManager.Proxy=ServiceDiscovery/TopicManager:tcp -h 127.0.0.1 -p 4421
ServiceDiscovery.IceStorm.InstanceName=ServiceDiscovery
+ServiceDiscovery.Standalone=true
ServiceDiscovery.IceStorm.TopicManager.Endpoints=tcp -h 127.0.0.1 -p 4421
ServiceDiscovery.IceStorm.Publish.Endpoints=tcp -h 127.0.0.1 -p 4423
ServiceDiscovery.IceStorm.Trace.TopicManager=2
diff --git a/slice/AsteriskSCF/Replication/MediaOperationsCore/MediaOperationsCoreIf.ice b/slice/AsteriskSCF/Replication/MediaOperationsCore/MediaOperationsCoreIf.ice
index 33e8497..6737d2e 100644
--- a/slice/AsteriskSCF/Replication/MediaOperationsCore/MediaOperationsCoreIf.ice
+++ b/slice/AsteriskSCF/Replication/MediaOperationsCore/MediaOperationsCoreIf.ice
@@ -20,6 +20,7 @@
#include <Ice/BuiltinSequences.ice>
#include <AsteriskSCF/Media/MediaIf.ice>
+#include <AsteriskSCF/System/OperationsIf.ice>
module AsteriskSCF
{
@@ -120,18 +121,18 @@ class InbandTelephonyEventOperationStateItem extends TranslatorMediaOperationSta
interface MediaOperationStateReplicatorListener
{
- void stateRemoved(Ice::StringSeq itemKeys);
- void stateRemovedForItems(MediaOperationStateItemSeq items);
- void stateSet(MediaOperationStateItemSeq items);
+ idempotent void stateRemoved(AsteriskSCF::System::V1::OperationContext operationContext, Ice::StringSeq itemKeys);
+ idempotent void stateRemovedForItems(AsteriskSCF::System::V1::OperationContext operationContext, MediaOperationStateItemSeq items);
+ idempotent void stateSet(AsteriskSCF::System::V1::OperationContext operationContext, MediaOperationStateItemSeq items);
};
interface MediaOperationStateReplicator
{
- void addListener(MediaOperationStateReplicatorListener *listener);
- void removeListener(MediaOperationStateReplicatorListener *listener);
- void setState (MediaOperationStateItemSeq items);
- void removeState(Ice::StringSeq items);
- void removeStateForItems(MediaOperationStateItemSeq items);
+ idempotent void addListener(AsteriskSCF::System::V1::OperationContext operationContext, MediaOperationStateReplicatorListener *listener);
+ idempotent void removeListener(AsteriskSCF::System::V1::OperationContext operationContext, MediaOperationStateReplicatorListener *listener);
+ idempotent void setState (AsteriskSCF::System::V1::OperationContext operationContext, MediaOperationStateItemSeq items);
+ idempotent void removeState(AsteriskSCF::System::V1::OperationContext operationContext, Ice::StringSeq items);
+ idempotent void removeStateForItems(AsteriskSCF::System::V1::OperationContext operationContext, MediaOperationStateItemSeq items);
idempotent MediaOperationStateItemSeq getState(Ice::StringSeq iteKeys);
idempotent MediaOperationStateItemSeq getAllState();
};
diff --git a/src/DSP.cpp b/src/DSP.cpp
index 9d868f0..8110e66 100644
--- a/src/DSP.cpp
+++ b/src/DSP.cpp
@@ -1,4 +1,6 @@
-#define _USE_MATH_DEFINES
+#ifdef _MSC_VER
+# define _USE_MATH_DEFINES
+#endif
#include <cmath>
#include "DSP.h"
diff --git a/src/InbandTelephonyEvents.cpp b/src/InbandTelephonyEvents.cpp
index 9a0ebd7..7dc02cf 100644
--- a/src/InbandTelephonyEvents.cpp
+++ b/src/InbandTelephonyEvents.cpp
@@ -1,7 +1,7 @@
/*
* Asterisk SCF -- An open-source communications framework.
*
- * Copyright (C) 2011, Digium, Inc.
+ * Copyright (C) 2011-2012, Digium, Inc.
*
* See http://www.asterisk.org for more information about
* the Asterisk SCF project. Please do not directly contact
@@ -18,7 +18,9 @@
#include <AsteriskSCF/SessionCommunications/TelephonyEventsIf.h>
#include <AsteriskSCF/Media/Formats/AudioFormatsIf.h>
#include <AsteriskSCF/Replication/MediaOperationsCore/MediaOperationsCoreIf.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
+#include <boost/thread.hpp>
#include <IceUtil/UUID.h>
#include "TranslatorOperation.h"
@@ -30,6 +32,7 @@ using namespace AsteriskSCF::Media::V1;
using namespace AsteriskSCF::SessionCommunications::V1;
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::Media::Formats::Audio::V1;
+using namespace AsteriskSCF::Operations;
using namespace AsteriskSCF::Replication::MediaOperationsCore::V1;
namespace
@@ -51,7 +54,7 @@ private:
const std::string mFormatName;
};
-};
+} // anonymous namespace
namespace AsteriskSCF
{
@@ -64,13 +67,24 @@ class InbandTelephonyEventOperation : public TranslatorOperation
class InbandTelephonyEventSource : public TelephonyEventSource
{
public:
- InbandTelephonyEventSource(const Logger& logger)
- : mLogger(logger) { }
+ InbandTelephonyEventSource(const Logger& logger) :
+ mLogger(logger), mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)) { }
void addSinks_async(const AMD_TelephonyEventSource_addSinksPtr& cb,
- const TelephonyEventSinkSeq& sinks,
- const Ice::Current&)
+ const AsteriskSCF::System::V1::OperationContextPtr& context,
+ const TelephonyEventSinkSeq& sinks,
+ const Ice::Current&)
{
+ typedef AMDContextData<AMD_TelephonyEventSource_addSinksPtr> Data;
+ typedef Data::ptr_type DataPtr;
+ DataPtr data = getContext<Data>(mOperationContextCache, context, cb);
+ if (!data)
+ {
+ // retry detected
+ return;
+ }
+
+ boost::unique_lock<boost::shared_mutex> lock(mMutex);
for (TelephonyEventSinkSeq::const_iterator i = sinks.begin(); i != sinks.end(); ++i)
{
if (std::find_if(mSinks.begin(), mSinks.end(), IdentityComparePredicate<TelephonyEventSinkPrx>(*i)) == mSinks.end())
@@ -78,33 +92,53 @@ class InbandTelephonyEventOperation : public TranslatorOperation
mSinks.push_back(*i);
}
}
- cb->ice_response();
+ data->getProxy()->ice_response();
}
void removeSinks_async(const AMD_TelephonyEventSource_removeSinksPtr& cb,
- const TelephonyEventSinkSeq& sinks,
- const Ice::Current&)
+ const AsteriskSCF::System::V1::OperationContextPtr& context,
+ const TelephonyEventSinkSeq& sinks,
+ const Ice::Current&)
{
+ typedef AMDContextData<AMD_TelephonyEventSource_removeSinksPtr> Data;
+ typedef Data::ptr_type DataPtr;
+ DataPtr data = getContext<Data>(mOperationContextCache, context, cb);
+ if (!data)
+ {
+ // retry detected
+ return;
+ }
+
+ boost::unique_lock<boost::shared_mutex> lock(mMutex);
for (TelephonyEventSinkSeq::const_iterator i = sinks.begin(); i != sinks.end(); ++i)
{
mSinks.erase(remove_if(mSinks.begin(), mSinks.end(), IdentityComparePredicate<TelephonyEventSinkPrx>(*i)), mSinks.end());
}
- cb->ice_response();
+ data->getProxy()->ice_response();
}
void getSinks_async(const AMD_TelephonyEventSource_getSinksPtr& cb,
const Ice::Current&)
{
+ boost::shared_lock<boost::shared_mutex> lock(mMutex);
cb->ice_response(mSinks);
}
void distributeToSinks(const TelephonyEventPtr& event)
{
- for (TelephonyEventSinkSeq::iterator iter = mSinks.begin(); iter != mSinks.end(); ++iter)
+ // it's unwise to invoke remote operations while holding a mutex.
+ // copy the sink list while holding the mutex, then use the copy.
+ TelephonyEventSinkSeq sinks;
+ {
+ boost::shared_lock<boost::shared_mutex> lock(mMutex);
+ sinks = mSinks;
+ }
+
+ for (TelephonyEventSinkSeq::iterator iter = sinks.begin(); iter != sinks.end(); ++iter)
{
try
{
- (*iter)->write(event);
+ (*iter)->write(AsteriskSCF::Operations::createContext(), event);
}
catch (const std::exception&)
{
@@ -113,8 +147,10 @@ class InbandTelephonyEventOperation : public TranslatorOperation
}
}
private:
+ boost::shared_mutex mMutex;
TelephonyEventSinkSeq mSinks;
Logger mLogger;
+ OperationContextCachePtr mOperationContextCache;
};
typedef IceUtil::Handle<InbandTelephonyEventSource> InbandTelephonyEventSourcePtr;
@@ -235,7 +271,7 @@ class InbandTelephonyEventOperation : public TranslatorOperation
{
MediaOperationStateItemSeq items;
items.push_back(mStateItem);
- mReplicationContext->getReplicator()->setState(items);
+ mReplicationContext->getReplicator()->setState(AsteriskSCF::Operations::createContext(), items);
}
}
@@ -329,38 +365,62 @@ InbandTelephonyEventOperationFactory::InbandTelephonyEventOperationFactory(
: MediaOperationFactoryImpl(adapter,
logger,
replicationContext,
- "InbandTelephonyEventFactory")
+ "InbandTelephonyEventFactory"),
+ mOperationContextCache(Operations::OperationContextCache::create(DEFAULT_TTL_SECONDS, logger, ""))
{
mLocatorParams->category = MediaOperationDiscoveryCategory;
mLocatorParams->service = InbandTelephonyEventsDiscoveryCategory;
};
MediaOperationPrx InbandTelephonyEventOperationFactory::createMediaOperation(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const StreamSourcePrx& source,
const StreamSinkPrx& sink,
const Ice::Current&)
{
- if (sink == 0 || source == 0)
+ std::pair<bool, CreateMediaOperationCookiePtr> cacheHit =
+ getContextSync<CreateMediaOperationCookiePtr>(mOperationContextCache, operationContext);
+
+ if (cacheHit.first)
{
- throw UnsupportedMediaFormatException();
+ return cacheHit.second->getResult();
}
- FormatSeq sourceFormats = source->getFormats();
- FormatSeq sinkFormats = sink->getFormats();
+ try
+ {
+ if (sink == 0 || source == 0)
+ {
+ throw UnsupportedMediaFormatException();
+ }
+
+ FormatSeq sourceFormats = source->getFormats();
+ FormatSeq sinkFormats = sink->getFormats();
- MediaOperationPrx proxy = tryFormat(sourceFormats, sinkFormats, SignedLinear8Name);
+ MediaOperationPrx proxy = tryFormat(sourceFormats, sinkFormats, SignedLinear8Name);
- if (!proxy)
+ if (!proxy)
+ {
+ proxy = tryFormat(sourceFormats, sinkFormats, SignedLinear16Name);
+ }
+
+ if (!proxy)
+ {
+ throw UnsupportedMediaFormatException();
+ }
+
+ cacheHit.second->setResult(proxy);
+ return proxy;
+ }
+ catch (const std::exception& e)
{
- proxy = tryFormat(sourceFormats, sinkFormats, SignedLinear16Name);
+ cacheHit.second->setException(e);
+ throw;
}
-
- if (!proxy)
+ catch (...)
{
- throw UnsupportedMediaFormatException();
+ cacheHit.second->setException();
+ throw;
}
-
- return proxy;
}
MediaOperationPrx InbandTelephonyEventOperationFactory::tryFormat(const FormatSeq& sourceFormats,
diff --git a/src/InbandTelephonyEvents.h b/src/InbandTelephonyEvents.h
index da2f636..e955424 100644
--- a/src/InbandTelephonyEvents.h
+++ b/src/InbandTelephonyEvents.h
@@ -20,6 +20,7 @@
#include <AsteriskSCF/Media/MediaOperationIf.h>
#include <AsteriskSCF/Media/MediaIf.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
namespace AsteriskSCF
{
@@ -36,6 +37,7 @@ public:
const MediaOperationReplicationContextPtr& replicationContext);
AsteriskSCF::Media::V1::MediaOperationPrx createMediaOperation(
+ const AsteriskSCF::System::V1::OperationContextPtr&,
const AsteriskSCF::Media::V1::StreamSourcePrx& source,
const AsteriskSCF::Media::V1::StreamSinkPrx& sink,
const Ice::Current&);
@@ -49,6 +51,7 @@ public:
*/
void createOrUpdateMediaOperation(const AsteriskSCF::Replication::MediaOperationsCore::V1::InbandTelephonyEventOperationStateItemPtr& item);
private:
+ Operations::OperationContextCachePtr mOperationContextCache;
/**
* Internal method to create an operation using a specific format
diff --git a/src/MediaOperationFactoryImpl.h b/src/MediaOperationFactoryImpl.h
index 53089aa..a39f709 100644
--- a/src/MediaOperationFactoryImpl.h
+++ b/src/MediaOperationFactoryImpl.h
@@ -1,7 +1,7 @@
/*
* Asterisk SCF -- An open-source communications framework.
*
- * Copyright (C) 2011, Digium, Inc.
+ * Copyright (C) 2011-2012, Digium, Inc.
*
* See http://www.asterisk.org for more information about
* the Asterisk SCF project. Please do not directly contact
@@ -17,9 +17,12 @@
#pragma once
#include <Ice/Ice.h>
-#include <AsteriskSCF/Media/MediaOperationIf.h>
+
#include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
#include <AsteriskSCF/Logger.h>
+#include <AsteriskSCF/Media/MediaOperationIf.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
#include "MediaOperationReplicationContext.h"
@@ -71,5 +74,8 @@ protected:
typedef IceUtil::Handle<MediaOperationFactoryImpl> MediaOperationFactoryImplPtr;
+typedef Operations::ContextResultData<AsteriskSCF::Media::V1::MediaOperationPrx> CreateMediaOperationCookie;
+typedef boost::shared_ptr<CreateMediaOperationCookie> CreateMediaOperationCookiePtr;
+
} //end namespace MediaOperationsCore
} //end namespace AsteriskSCF
diff --git a/src/MediaOperationStateReplicator.h b/src/MediaOperationStateReplicator.h
index f8485d8..4260cbe 100644
--- a/src/MediaOperationStateReplicator.h
+++ b/src/MediaOperationStateReplicator.h
@@ -16,8 +16,9 @@
#pragma once
-#include <AsteriskSCF/Replication/StateReplicator.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
#include <AsteriskSCF/Replication/MediaOperationsCore/MediaOperationsCoreIf.h>
+#include <AsteriskSCF/Replication/StateReplicator.h>
namespace AsteriskSCF
{
@@ -36,10 +37,11 @@ class MediaOperationStateReplicatorListenerImpl : public AsteriskSCF::Replicatio
{
public:
MediaOperationStateReplicatorListenerImpl(const Ice::ObjectAdapterPtr& adapter);
- void stateRemoved(const Ice::StringSeq&, const Ice::Current&);
- void stateRemovedForItems(const AsteriskSCF::Replication::MediaOperationsCore::V1::MediaOperationStateItemSeq&, const Ice::Current&);
- void stateSet(const AsteriskSCF::Replication::MediaOperationsCore::V1::MediaOperationStateItemSeq&, const Ice::Current&);
+ void stateRemoved(const AsteriskSCF::System::V1::OperationContextPtr&, const Ice::StringSeq&, const Ice::Current&);
+ void stateRemovedForItems(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Replication::MediaOperationsCore::V1::MediaOperationStateItemSeq&, const Ice::Current&);
+ void stateSet(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Replication::MediaOperationsCore::V1::MediaOperationStateItemSeq&, const Ice::Current&);
private:
+ Operations::OperationContextCachePtr mOperationContextCache;
/**
* Note this is the "service" object adapter, and not the "backplane"
* adapter used for state replication.
@@ -47,7 +49,7 @@ private:
* We need this adapter in the state replication listener so that
* services can be added and removed as appropriate.
*/
- Ice::ObjectAdapterPtr mAdapter;
+ const Ice::ObjectAdapterPtr mAdapter;
};
typedef IceUtil::Handle<MediaOperationStateReplicatorListenerImpl> MediaOperationStateReplicatorListenerImplPtr;
diff --git a/src/MediaOperationStateReplicatorListener.cpp b/src/MediaOperationStateReplicatorListener.cpp
index d6e1d5c..5b00da6 100644
--- a/src/MediaOperationStateReplicatorListener.cpp
+++ b/src/MediaOperationStateReplicatorListener.cpp
@@ -30,11 +30,12 @@ namespace MediaOperationsCore
using namespace AsteriskSCF::Replication::MediaOperationsCore::V1;
MediaOperationStateReplicatorListenerImpl::MediaOperationStateReplicatorListenerImpl(const Ice::ObjectAdapterPtr& adapter)
- : mAdapter(adapter)
+ : mOperationContextCache(Operations::OperationContextCache::create(Operations::DEFAULT_TTL_SECONDS)), mAdapter(adapter)
{
}
void MediaOperationStateReplicatorListenerImpl::stateRemoved(
+ const AsteriskSCF::System::V1::OperationContextPtr&,
const Ice::StringSeq&,
const Ice::Current&)
{
@@ -42,9 +43,18 @@ void MediaOperationStateReplicatorListenerImpl::stateRemoved(
}
void MediaOperationStateReplicatorListenerImpl::stateRemovedForItems(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const MediaOperationStateItemSeq& items,
const Ice::Current&)
{
+ // mAdapter is constant; mOperationContextCache is thread safe
+ // no further locking needed
+ if (!mOperationContextCache->addOperationContext(operationContext))
+ {
+ // repeated execution; ignore
+ return;
+ }
+
class Visitor : public MediaOperationStateItemVisitor
{
public:
@@ -80,9 +90,18 @@ void MediaOperationStateReplicatorListenerImpl::stateRemovedForItems(
}
void MediaOperationStateReplicatorListenerImpl::stateSet(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const MediaOperationStateItemSeq& items,
const Ice::Current&)
{
+ // mAdapter is constant; mOperationContextCache is thread safe
+ // no further locking needed
+ if (!mOperationContextCache->addOperationContext(operationContext))
+ {
+ // repeated execution; ignore
+ return;
+ }
+
class Visitor : public MediaOperationStateItemVisitor
{
public:
diff --git a/src/MediaOperationsCore.cpp b/src/MediaOperationsCore.cpp
index 4dd9975..1296c34 100644
--- a/src/MediaOperationsCore.cpp
+++ b/src/MediaOperationsCore.cpp
@@ -234,13 +234,13 @@ private:
MediaOperationsComparePtr compare = new MediaOperationsCompare(iter->first);
ServiceLocatorParamsComparePrx compareProxy = ServiceLocatorParamsComparePrx::uncheckedCast(getServiceAdapter()->addWithUUID(compare));
- getServiceLocatorManagement()->addCompare(iter->first->getName(), compareProxy);
+ getServiceLocatorManagement()->addCompare(AsteriskSCF::Operations::createContext(), iter->first->getName(), compareProxy);
}
}
void onUnregisterPrimaryServices()
{
- getServiceLocatorManagement()->removeCompare(CompareGuid);
+ getServiceLocatorManagement()->removeCompare(AsteriskSCF::Operations::createContext(), CompareGuid);
}
void createReplicationStateListeners()
@@ -272,7 +272,7 @@ private:
{
if (context->getState() == STANDBY_IN_REPLICA_GROUP)
{
- context->getReplicator()->addListener(mReplicatorListenerProxy);
+ context->getReplicator()->addListener(AsteriskSCF::Operations::createContext(), mReplicatorListenerProxy);
mListeningToReplicator = true;
}
}
@@ -295,7 +295,7 @@ private:
try
{
- context->getReplicator()->removeListener(mReplicatorListenerProxy);
+ context->getReplicator()->removeListener(AsteriskSCF::Operations::createContext(), mReplicatorListenerProxy);
mListeningToReplicator = false;
}
catch (const Ice::Exception& e)
diff --git a/src/TranslatorOperation.cpp b/src/TranslatorOperation.cpp
index ea25865..10ba244 100644
--- a/src/TranslatorOperation.cpp
+++ b/src/TranslatorOperation.cpp
@@ -16,6 +16,8 @@
#include "TranslatorOperation.h"
+#include <AsteriskSCF/Operations/OperationContext.h>
+
namespace AsteriskSCF
{
@@ -75,7 +77,7 @@ void TranslatorOperation::setState()
{
MediaOperationStateItemSeq seq;
seq.push_back(mStateItem);
- mReplicationContext->getReplicator().tryOneWay()->setState(seq);
+ mReplicationContext->getReplicator().tryOneWay()->setState(AsteriskSCF::Operations::createContext(), seq);
}
catch (const Ice::Exception& ex)
{
@@ -94,7 +96,7 @@ void TranslatorOperation::removeState()
{
MediaOperationStateItemSeq seq;
seq.push_back(mStateItem);
- mReplicationContext->getReplicator().tryOneWay()->removeStateForItems(seq);
+ mReplicationContext->getReplicator().tryOneWay()->removeStateForItems(AsteriskSCF::Operations::createContext(), seq);
}
catch (const Ice::Exception& ex)
{
diff --git a/src/TranslatorOperationFactory.cpp b/src/TranslatorOperationFactory.cpp
index 82137fb..397d49e 100644
--- a/src/TranslatorOperationFactory.cpp
+++ b/src/TranslatorOperationFactory.cpp
@@ -1,7 +1,7 @@
/*
* Asterisk SCF -- An open-source communications framework.
*
- * Copyright (C) 2011, Digium, Inc.
+ * Copyright (C) 2011-2012, Digium, Inc.
*
* See http://www.asterisk.org for more information about
* the Asterisk SCF project. Please do not directly contact
@@ -26,13 +26,15 @@ namespace MediaOperationsCore
using namespace AsteriskSCF::Media::V1;
using namespace AsteriskSCF::System::Logging;
+using namespace AsteriskSCF::Operations;
TranslatorOperationFactory::TranslatorOperationFactory(
const Ice::ObjectAdapterPtr& adapter,
const Logger& logger,
const MediaOperationReplicationContextPtr& replicationContext,
const std::string& name)
- : MediaOperationFactoryImpl(adapter, logger, replicationContext, name)
+ : MediaOperationFactoryImpl(adapter, logger, replicationContext, name),
+ mOperationContextCache(Operations::OperationContextCache::create(DEFAULT_TTL_SECONDS))
{
mLocatorParams->category = MediaOperationDiscoveryCategory;
mLocatorParams->service = MediaOperationDiscoveryTranslatorService;
@@ -41,58 +43,82 @@ TranslatorOperationFactory::TranslatorOperationFactory(
TranslatorOperationFactory::~TranslatorOperationFactory() { }
MediaOperationPrx TranslatorOperationFactory::createMediaOperation(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const StreamSourcePrx& source,
const StreamSinkPrx& sink,
const Ice::Current&)
{
- if (sink == 0 || source == 0)
+ std::pair<bool, CreateMediaOperationCookiePtr> cacheHit =
+ getContextSync<CreateMediaOperationCookiePtr>(mOperationContextCache, operationContext);
+
+ if (cacheHit.first)
{
- mLogger(Error) << "Cannot create translator because of a missing source or sink";
- throw UnsupportedMediaFormatException();
+ return cacheHit.second->getResult();
}
- FormatSeq sourceFormats = source->getFormats();
- FormatSeq sinkFormats = sink->getFormats();
+ try
+ {
+ if (sink == 0 || source == 0)
+ {
+ mLogger(Error) << "Cannot create translator because of a missing source or sink";
+ throw UnsupportedMediaFormatException();
+ }
+
+ FormatSeq sourceFormats = source->getFormats();
+ FormatSeq sinkFormats = sink->getFormats();
- FormatSeq::iterator supportedInput = std::find_first_of(sourceFormats.begin(),
+ FormatSeq::iterator supportedInput = std::find_first_of(sourceFormats.begin(),
sourceFormats.end(),
mTranslations.begin(),
mTranslations.end(),
TranslatorOperationFactory::formatsEqualMap);
- if (supportedInput == sourceFormats.end())
- {
- mLogger(Error) << "Cannot create translator because none of the stream source's formats are supported";
- throw UnsupportedMediaFormatException();
- }
+ if (supportedInput == sourceFormats.end())
+ {
+ mLogger(Error) << "Cannot create translator because none of the stream source's formats are supported";
+ throw UnsupportedMediaFormatException();
+ }
- mLogger(Debug) << "When creating media operation, the input is " << (*supportedInput)->name;
+ mLogger(Debug) << "When creating media operation, the input is " << (*supportedInput)->name;
- for (FormatSeq::iterator iter = mTranslations[(*supportedInput)->name].begin();
- iter != mTranslations[(*supportedInput)->name].end(); ++iter)
- {
- mLogger(Debug) << (*supportedInput)->name << " can be translated to " << (*iter)->name;
- }
+ for (FormatSeq::iterator iter = mTranslations[(*supportedInput)->name].begin();
+ iter != mTranslations[(*supportedInput)->name].end(); ++iter)
+ {
+ mLogger(Debug) << (*supportedInput)->name << " can be translated to " << (*iter)->name;
+ }
- FormatSeq::iterator supportedOutput = std::find_first_of(sinkFormats.begin(),
+ FormatSeq::iterator supportedOutput = std::find_first_of(sinkFormats.begin(),
sinkFormats.end(),
mTranslations[(*supportedInput)->name].begin(),
mTranslations[(*supportedInput)->name].end(),
TranslatorOperationFactory::formatsEqual);
- if (supportedOutput == sinkFormats.end())
- {
- mLogger(Error) << "Cannot create translator because none of the stream sink's formats are supported";
- throw UnsupportedMediaFormatException();
- }
+ if (supportedOutput == sinkFormats.end())
+ {
+ mLogger(Error) << "Cannot create translator because none of the stream sink's formats are supported";
+ throw UnsupportedMediaFormatException();
+ }
- mLogger(Debug) << "Creating a translator from " << (*supportedInput)->name
- << " to " << (*supportedOutput)->name;
+ mLogger(Debug) << "Creating a translator from " << (*supportedInput)->name
+ << " to " << (*supportedOutput)->name;
- return createMediaOperation(
+ MediaOperationPrx r = createMediaOperation(
*supportedOutput,
*supportedInput,
IceUtil::generateUUID());
+ cacheHit.second->setResult(r);
+ return r;
+ }
+ catch (const std::exception& e)
+ {
+ cacheHit.second->setException(e);
+ throw;
+ }
+ catch (...)
+ {
+ cacheHit.second->setException();
+ throw;
+ }
}
void TranslatorOperationFactory::addTranslation(
@@ -100,6 +126,7 @@ void TranslatorOperationFactory::addTranslation(
const FormatPtr& outFormat,
int cost)
{
+ boost::unique_lock<boost::shared_mutex> lock(mMutex);
mTranslations[inFormat->name].push_back(outFormat);
MediaOperationAttributes attrs;
diff --git a/src/TranslatorOperationFactory.h b/src/TranslatorOperationFactory.h
index ee2a279..e887f84 100644
--- a/src/TranslatorOperationFactory.h
+++ b/src/TranslatorOperationFactory.h
@@ -18,6 +18,9 @@
#include "MediaOperationFactoryImpl.h"
+#include <AsteriskSCF/Operations/OperationContextCache.h>
+#include <boost/thread.hpp>
+
namespace AsteriskSCF
{
@@ -50,6 +53,7 @@ public:
* the formats have been determined.
*/
AsteriskSCF::Media::V1::MediaOperationPrx createMediaOperation(
+ const AsteriskSCF::System::V1::OperationContextPtr&,
const AsteriskSCF::Media::V1::StreamSourcePrx& source,
const AsteriskSCF::Media::V1::StreamSinkPrx& sink,
const Ice::Current&);
@@ -98,6 +102,8 @@ private:
const AsteriskSCF::Media::V1::FormatPtr& lhs,
const std::pair<std::string, AsteriskSCF::Media::V1::FormatSeq>& rhs);
+ boost::shared_mutex mMutex;
+ Operations::OperationContextCachePtr mOperationContextCache;
TranslationMap mTranslations;
};
diff --git a/src/TranslatorSink.cpp b/src/TranslatorSink.cpp
index 81719b4..441ab8d 100644
--- a/src/TranslatorSink.cpp
+++ b/src/TranslatorSink.cpp
@@ -1,7 +1,7 @@
/*
* Asterisk SCF -- An open-source communications framework.
*
- * Copyright (C) 2011, Digium, Inc.
+ * Copyright (C) 2011-2012, Digium, Inc.
*
* See http://www.asterisk.org for more information about
* the Asterisk SCF project. Please do not directly contact
@@ -25,48 +25,62 @@ namespace MediaOperationsCore
{
using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Operations;
using namespace AsteriskSCF::System::Logging;
TranslatorSink::TranslatorSink(const Logger& logger,
const FormatPtr& supportedFormat)
: mLogger(logger),
- mId(IceUtil::generateUUID())
+ mId(IceUtil::generateUUID()),
+ mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS))
{
mSupportedFormats.push_back(supportedFormat);
}
TranslatorSink::~TranslatorSink()
{
- mLogger(Debug) << "TranslatorSink destructor called";
+ mLogger(Trace) << "TranslatorSink destructor called";
}
void TranslatorSink::write(const FrameSeq& frames, const Ice::Current&)
{
+ boost::shared_lock<boost::shared_mutex> lock(mMutex);
mTranslator->translateFrames(frames);
}
void TranslatorSink::setTranslator(const TranslatorPtr& translator)
{
+ boost::unique_lock<boost::shared_mutex> lock(mMutex);
mTranslator = translator;
}
-void TranslatorSink::setSource(const StreamSourcePrx& source, const Ice::Current&)
+void TranslatorSink::setSource(const AsteriskSCF::System::V1::OperationContextPtr& context,
+ const StreamSourcePrx& source, const Ice::Current&)
{
+ if (!mOperationContextCache->addOperationContext(context))
+ {
+ // retry detected
+ return;
+ }
+ boost::unique_lock<boost::shared_mutex> lock(mMutex);
mSource = source;
}
StreamSourcePrx TranslatorSink::getSource(const Ice::Current&)
{
+ boost::shared_lock<boost::shared_mutex> lock(mMutex);
return mSource;
}
FormatSeq TranslatorSink::getFormats(const Ice::Current&)
{
+ boost::shared_lock<boost::shared_mutex> lock(mMutex);
return mSupportedFormats;
}
std::string TranslatorSink::getId(const Ice::Current&)
{
+ // constant; no need to lock
return mId;
}
diff --git a/src/TranslatorSink.h b/src/TranslatorSink.h
index 2281695..bffc1ce 100644
--- a/src/TranslatorSink.h
+++ b/src/TranslatorSink.h
@@ -1,7 +1,7 @@
/*
* Asterisk SCF -- An open-source communications framework.
*
- * Copyright (C) 2011, Digium, Inc.
+ * Copyright (C) 2011-2012, Digium, Inc.
*
* See http://www.asterisk.org for more information about
* the Asterisk SCF project. Please do not directly contact
@@ -16,8 +16,11 @@
#pragma once
+#include <boost/thread.hpp>
+
#include <AsteriskSCF/Media/MediaIf.h>
#include <AsteriskSCF/Logger.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
#include "Translator.h"
@@ -39,7 +42,9 @@ public:
void setTranslator(const TranslatorPtr& translator);
- void setSource(const AsteriskSCF::Media::V1::StreamSourcePrx& source, const Ice::Current&);
+ void setSource(
+ const AsteriskSCF::System::V1::OperationContextPtr&,
+ const AsteriskSCF::Media::V1::StreamSourcePrx& source, const Ice::Current&);
AsteriskSCF::Media::V1::StreamSourcePrx getSource(const Ice::Current&);
@@ -48,10 +53,12 @@ public:
std::string getId(const Ice::Current&);
private:
+ boost::shared_mutex mMutex;
+ AsteriskSCF::Operations::OperationContextCachePtr mOperationContextCache;
AsteriskSCF::System::Logging::Logger mLogger;
AsteriskSCF::Media::V1::FormatSeq mSupportedFormats;
AsteriskSCF::Media::V1::StreamSourcePrx mSource;
- std::string mId;
+ const std::string mId;
TranslatorPtr mTranslator;
};
diff --git a/src/TranslatorSource.cpp b/src/TranslatorSource.cpp
index b24cc68..9d00b6f 100644
--- a/src/TranslatorSource.cpp
+++ b/src/TranslatorSource.cpp
@@ -13,7 +13,10 @@
* the GNU General Public License Version 2. See the LICENSE.txt file
* at the top of the source tree.
*/
-#include "IceUtil/UUID.h"
+
+#include <IceUtil/UUID.h>
+
+#include <AsteriskSCF/Operations/OperationMonitor.h>
#include "TranslatorSource.h"
@@ -24,54 +27,75 @@ namespace MediaOperationsCore
{
using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Operations;
using namespace AsteriskSCF::System::Logging;
TranslatorSource::TranslatorSource(const Logger& logger,
const FormatPtr& format)
- : mLogger(logger),
- mId(IceUtil::generateUUID())
+ : mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
+ mLogger(logger),
+ mId(IceUtil::generateUUID())
{
mSupportedFormats.push_back(format);
}
TranslatorSource::~TranslatorSource()
{
- mLogger(Debug) << "TranslatorSource destructor called";
+ mLogger(Trace) << "TranslatorSource destructor called";
}
-void TranslatorSource::addSink(const StreamSinkPrx& sink, const Ice::Current&)
+void TranslatorSource::addSink(const AsteriskSCF::System::V1::OperationContextPtr& context,
+ const StreamSinkPrx& sink, const Ice::Current&)
{
+ if (!mOperationContextCache->addOperationContext(context))
+ {
+ // retry detected
+ return;
+ }
+ boost::unique_lock<boost::shared_mutex> lock(mMutex);
if (std::find(mSinks.begin(), mSinks.end(), sink) == mSinks.end())
{
mSinks.push_back(sink);
}
}
-void TranslatorSource::removeSink(const StreamSinkPrx& sink, const Ice::Current&)
+void TranslatorSource::removeSink(const AsteriskSCF::System::V1::OperationContextPtr& context,
+ const StreamSinkPrx& sink, const Ice::Current&)
{
+ if (!mOperationContextCache->addOperationContext(context))
+ {
+ // retry detected
+ return;
+ }
+ boost::unique_lock<boost::shared_mutex> lock(mMutex);
mSinks.erase(std::remove(mSinks.begin(), mSinks.end(), sink), mSinks.end());
}
StreamSinkSeq TranslatorSource::getSinks(const Ice::Current&)
{
+ boost::shared_lock<boost::shared_mutex> lock(mMutex);
return mSinks;
}
FormatSeq TranslatorSource::getFormats(const Ice::Current&)
{
+ // mSupportedFormats never changes after construction; no lock needed
return mSupportedFormats;
}
std::string TranslatorSource::getId(const Ice::Current&)
{
+ // mId contant; no lock needed
return mId;
}
//XXX
//I interpreted this to essentially be a check to make sure that
//the format we actually are using matches what is expected.
-void TranslatorSource::requestFormat(const FormatPtr& format, const Ice::Current&)
+void TranslatorSource::requestFormat(const AsteriskSCF::System::V1::OperationContextPtr&,
+ const FormatPtr& format, const Ice::Current&)
{
+ // mSupportedFormats never changes after construction; no lock needed
FormatPtr supportedFormat = mSupportedFormats.front();
if (supportedFormat->name != format->name)
@@ -83,8 +107,16 @@ void TranslatorSource::requestFormat(const FormatPtr& format, const Ice::Current
void TranslatorSource::distributeToSinks(const FrameSeq& translatedFrames)
{
- for (StreamSinkSeq::iterator iter = mSinks.begin();
- iter != mSinks.end(); ++iter)
+ // it's unwise to invoke remote operations while holding a mutex.
+ // copy the sink list while holding the mutex, then use the copy.
+ StreamSinkSeq sinks;
+ {
+ boost::shared_lock<boost::shared_mutex> lock(mMutex);
+ sinks = mSinks;
+ }
+
+ for (StreamSinkSeq::iterator iter = sinks.begin();
+ iter != sinks.end(); ++iter)
{
try
{
diff --git a/src/TranslatorSource.h b/src/TranslatorSource.h
index 06bdbf5..e70db26 100644
--- a/src/TranslatorSource.h
+++ b/src/TranslatorSource.h
@@ -16,8 +16,11 @@
#pragma once
-#include <AsteriskSCF/Media/MediaIf.h>
+#include <boost/thread.hpp>
+
#include <AsteriskSCF/Logger.h>
+#include <AsteriskSCF/Media/MediaIf.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
namespace AsteriskSCF
{
@@ -33,25 +36,30 @@ public:
~TranslatorSource();
- void addSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&);
-
- void removeSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&);
-
+ void addSink(const AsteriskSCF::System::V1::OperationContextPtr&,
+ const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&);
+
+ void removeSink(const AsteriskSCF::System::V1::OperationContextPtr&,
+ const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&);
+
AsteriskSCF::Media::V1::StreamSinkSeq getSinks(const Ice::Current&);
AsteriskSCF::Media::V1::FormatSeq getFormats(const Ice::Current&);
std::string getId(const Ice::Current&);
- void requestFormat(const AsteriskSCF::Media::V1::FormatPtr& format, const Ice::Current&);
+ void requestFormat(const AsteriskSCF::System::V1::OperationContextPtr&,
+ const AsteriskSCF::Media::V1::FormatPtr& format, const Ice::Current&);
void distributeToSinks(const AsteriskSCF::Media::V1::FrameSeq& translatedFrames);
private:
+ boost::shared_mutex mMutex;
+ AsteriskSCF::Operations::OperationContextCachePtr mOperationContextCache;
AsteriskSCF::System::Logging::Logger mLogger;
AsteriskSCF::Media::V1::StreamSinkSeq mSinks;
AsteriskSCF::Media::V1::FormatSeq mSupportedFormats;
- std::string mId;
+ const std::string mId;
};
typedef IceUtil::Handle<TranslatorSource> TranslatorSourcePtr;
diff --git a/test/TestMediaOperations.cpp b/test/TestMediaOperations.cpp
index 617d5f3..de58e83 100644
--- a/test/TestMediaOperations.cpp
+++ b/test/TestMediaOperations.cpp
@@ -1,7 +1,7 @@
/*
* Asterisk SCF -- An open-source communications framework.
*
- * Copyright (C) 2011, Digium, Inc.
+ * Copyright (C) 2011-2012, Digium, Inc.
*
* See http://www.asterisk.org for more information about
* the Asterisk SCF project. Please do not directly contact
@@ -31,6 +31,7 @@
#include <AsteriskSCF/Media/MediaIf.h>
#include <AsteriskSCF/Media/MediaOperationIf.h>
#include <AsteriskSCF/Media/Formats/AudioFormatsIf.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
#include <boost/test/debug.hpp>
#include <boost/thread/mutex.hpp>
@@ -452,13 +453,15 @@ BOOST_FIXTURE_TEST_CASE(createMediaOperations, PerTestFixture)
//translators and ulaw to alaw translators even though we searched for the factory with
//an input of alaw and an output of ulaw.
MediaOperationPrx alaw2ulawTranslator =
- ulawAlawFactory->createMediaOperation(Testbed.alawSourceProxy, Testbed.ulawSinkProxy);
+ ulawAlawFactory->createMediaOperation(AsteriskSCF::Operations::createContext(),
+ Testbed.alawSourceProxy, Testbed.ulawSinkProxy);
MediaOperationPrx ulaw2alawTranslator =
- ulawAlawFactory->createMediaOperation(Testbed.ulawSourceProxy, Testbed.alawSinkProxy);
+ ulawAlawFactory->createMediaOperation(AsteriskSCF::Operations::createContext(),
+ Testbed.ulawSourceProxy, Testbed.alawSinkProxy);
- BOOST_CHECK(alaw2ulawTranslator != 0);
- BOOST_CHECK(ulaw2alawTranslator != 0);
+ BOOST_CHECK_NE(0, alaw2ulawTranslator);
+ BOOST_CHECK_NE(0, ulaw2alawTranslator);
alaw2ulawTranslator->destroy();
ulaw2alawTranslator->destroy();
@@ -471,6 +474,66 @@ BOOST_FIXTURE_TEST_CASE(createMediaOperations, PerTestFixture)
}
}
+BOOST_FIXTURE_TEST_CASE(testRetrySuccess, PerTestFixture)
+{
+ MediaOperationServiceLocatorParamsPtr ulawAlawParams = createLocatorParams(Testbed.alaw, Testbed.ulaw);
+
+ MediaOperationFactoryPrx ulawAlawFactory =
+ MediaOperationFactoryPrx::checkedCast(Testbed.locator->locate(ulawAlawParams));
+
+ BOOST_CHECK_NE(ulawAlawFactory, 0);
+
+ AsteriskSCF::System::V1::OperationContextPtr context = AsteriskSCF::Operations::createContext();
+
+ MediaOperationPrx expected =
+ ulawAlawFactory->createMediaOperation(context, Testbed.alawSourceProxy, Testbed.ulawSinkProxy);
+
+ MediaOperationPrx unexpected =
+ ulawAlawFactory->createMediaOperation(AsteriskSCF::Operations::createContext(),
+ Testbed.alawSourceProxy, Testbed.ulawSinkProxy);
+
+ // simulate a retry
+ MediaOperationPrx actual =
+ ulawAlawFactory->createMediaOperation(context, Testbed.alawSourceProxy, Testbed.ulawSinkProxy);
+
+ // This test is only meaningful if create operations return different proxies
+ BOOST_CHECK_NE(unexpected, actual);
+ BOOST_CHECK_EQUAL(expected, actual);
+}
+
+BOOST_FIXTURE_TEST_CASE(testRetryFailure, PerTestFixture)
+{
+ MediaOperationServiceLocatorParamsPtr ulawAlawParams = createLocatorParams(Testbed.alaw, Testbed.ulaw);
+
+ MediaOperationFactoryPrx ulawAlawFactory =
+ MediaOperationFactoryPrx::checkedCast(Testbed.locator->locate(ulawAlawParams));
+
+ BOOST_CHECK_NE(ulawAlawFactory, 0);
+
+ AsteriskSCF::System::V1::OperationContextPtr context = AsteriskSCF::Operations::createContext();
+
+ try
+ {
+ ulawAlawFactory->createMediaOperation(context, 0, 0);
+ BOOST_FAIL("Expected Ice::Exception on first invocation");
+ }
+ catch (const Ice::Exception&)
+ {
+ // expected
+ }
+
+ // simulate a retry
+ try
+ {
+ ulawAlawFactory->createMediaOperation(context, 0, 0);
+ BOOST_FAIL("Expected Ice::Exception on retry invocation");
+ }
+ catch (const Ice::Exception&)
+ {
+ // expected
+ }
+}
+
BOOST_FIXTURE_TEST_CASE(translateAlawToUlaw, PerTestFixture)
{
size_t numFramesToTranslate = 10;
@@ -480,16 +543,17 @@ BOOST_FIXTURE_TEST_CASE(translateAlawToUlaw, PerTestFixture)
MediaOperationFactoryPrx::checkedCast(Testbed.locator->locate(alaw2ulawParams));
MediaOperationPrx alaw2ulawTranslator =
- alaw2ulawFactory->createMediaOperation(Testbed.alawSourceProxy, Testbed.ulawSinkProxy);
+ alaw2ulawFactory->createMediaOperation(AsteriskSCF::Operations::createContext(),
+ Testbed.alawSourceProxy, Testbed.ulawSinkProxy);
StreamSinkPrx translateSink = alaw2ulawTranslator->getSink();
StreamSourcePrx translateSource = alaw2ulawTranslator->getSource();
- Testbed.alawSourceProxy->addSink(translateSink);
- Testbed.ulawSinkProxy->setSource(translateSource);
+ Testbed.alawSourceProxy->addSink(AsteriskSCF::Operations::createContext(), translateSink);
+ Testbed.ulawSinkProxy->setSource(AsteriskSCF::Operations::createContext(), translateSource);
- translateSource->addSink(Testbed.ulawSinkProxy);
- translateSink->setSource(Testbed.alawSourceProxy);
+ translateSource->addSink(AsteriskSCF::Operations::createContext(), Testbed.ulawSinkProxy);
+ translateSink->setSource(AsteriskSCF::Operations::createContext(), Testbed.alawSourceProxy);
//We'll feed a few of the sample frames in.
FrameSeq alawFrames(numFramesToTranslate, Testbed.sampleAlawFrame);
@@ -540,16 +604,17 @@ BOOST_FIXTURE_TEST_CASE(translateAlawToSlin8, PerTestFixture)
MediaOperationFactoryPrx::checkedCast(Testbed.locator->locate(alaw2slin8Params));
MediaOperationPrx alaw2slin8Translator =
- alaw2slin8Factory->createMediaOperation(Testbed.alawSourceProxy, Testbed.slin8SinkProxy);
+ alaw2slin8Factory->createMediaOperation(AsteriskSCF::Operations::createContext(),
+ Testbed.alawSourceProxy, Testbed.slin8SinkProxy);
StreamSinkPrx translateSink = alaw2slin8Translator->getSink();
StreamSourcePrx translateSource = alaw2slin8Translator->getSource();
- Testbed.alawSourceProxy->addSink(translateSink);
- Testbed.slin8SinkProxy->setSource(translateSource);
+ Testbed.alawSourceProxy->addSink(AsteriskSCF::Operations::createContext(), translateSink);
+ Testbed.slin8SinkProxy->setSource(AsteriskSCF::Operations::createContext(), translateSource);
- translateSource->addSink(Testbed.slin8SinkProxy);
- translateSink->setSource(Testbed.alawSourceProxy);
+ translateSource->addSink(AsteriskSCF::Operations::createContext(), Testbed.slin8SinkProxy);
+ translateSink->setSource(AsteriskSCF::Operations::createContext(), Testbed.alawSourceProxy);
//We'll feed a few of the sample frames in.
FrameSeq alawFrames(numFramesToTranslate, Testbed.sampleAlawFrame);
@@ -601,16 +666,17 @@ BOOST_FIXTURE_TEST_CASE(translateSlin8ToAlaw, PerTestFixture)
MediaOperationFactoryPrx::checkedCast(Testbed.locator->locate(slin82AlawParams));
MediaOperationPrx slin82AlawTranslator =
- slin82AlawFactory->createMediaOperation(Testbed.slin8SourceProxy, Testbed.alawSinkProxy);
+ slin82AlawFactory->createMediaOperation(AsteriskSCF::Operations::createContext(),
+ Testbed.slin8SourceProxy, Testbed.alawSinkProxy);
StreamSinkPrx translateSink = slin82AlawTranslator->getSink();
StreamSourcePrx translateSource = slin82AlawTranslator->getSource();
- Testbed.slin8SourceProxy->addSink(translateSink);
- Testbed.alawSinkProxy->setSource(translateSource);
+ Testbed.slin8SourceProxy->addSink(AsteriskSCF::Operations::createContext(), translateSink);
+ Testbed.alawSinkProxy->setSource(AsteriskSCF::Operations::createContext(), translateSource);
- translateSource->addSink(Testbed.alawSinkProxy);
- translateSink->setSource(Testbed.slin8SourceProxy);
+ translateSource->addSink(AsteriskSCF::Operations::createContext(), Testbed.alawSinkProxy);
+ translateSink->setSource(AsteriskSCF::Operations::createContext(), Testbed.slin8SourceProxy);
//We'll feed a few of the sample frames in.
FrameSeq slin8Frames(numFramesToTranslate, Testbed.sampleSlin8Frame);
@@ -662,16 +728,17 @@ BOOST_FIXTURE_TEST_CASE(translateUlawToAlaw, PerTestFixture)
MediaOperationFactoryPrx::checkedCast(Testbed.locator->locate(ulaw2alawParams));
MediaOperationPrx ulaw2alawTranslator =
- ulaw2alawFactory->createMediaOperation(Testbed.ulawSourceProxy, Testbed.alawSinkProxy);
+ ulaw2alawFactory->createMediaOperation(AsteriskSCF::Operations::createContext(),
+ Testbed.ulawSourceProxy, Testbed.alawSinkProxy);
StreamSinkPrx translateSink = ulaw2alawTranslator->getSink();
StreamSourcePrx translateSource = ulaw2alawTranslator->getSource();
- Testbed.ulawSourceProxy->addSink(translateSink);
- Testbed.alawSinkProxy->setSource(translateSource);
+ Testbed.ulawSourceProxy->addSink(AsteriskSCF::Operations::createContext(), translateSink);
+ Testbed.alawSinkProxy->setSource(AsteriskSCF::Operations::createContext(), translateSource);
- translateSource->addSink(Testbed.alawSinkProxy);
- translateSink->setSource(Testbed.ulawSourceProxy);
+ translateSource->addSink(AsteriskSCF::Operations::createContext(), Testbed.alawSinkProxy);
+ translateSink->setSource(AsteriskSCF::Operations::createContext(), Testbed.ulawSourceProxy);
//We'll feed a few of the sample frames in.
FrameSeq ulawFrames(numFramesToTranslate, Testbed.sampleUlawFrame);
@@ -723,16 +790,17 @@ BOOST_FIXTURE_TEST_CASE(translateUlawToSlin8, PerTestFixture)
MediaOperationFactoryPrx::checkedCast(Testbed.locator->locate(ulaw2slin8Params));
MediaOperationPrx ulaw2slin8Translator =
- ulaw2slin8Factory->createMediaOperation(Testbed.ulawSourceProxy, Testbed.slin8SinkProxy);
+ ulaw2slin8Factory->createMediaOperation(AsteriskSCF::Operations::createContext(),
+ Testbed.ulawSourceProxy, Testbed.slin8SinkProxy);
StreamSinkPrx translateSink = ulaw2slin8Translator->getSink();
StreamSourcePrx translateSource = ulaw2slin8Translator->getSource();
- Testbed.ulawSourceProxy->addSink(translateSink);
- Testbed.slin8SinkProxy->setSource(translateSource);
+ Testbed.ulawSourceProxy->addSink(AsteriskSCF::Operations::createContext(), translateSink);
+ Testbed.slin8SinkProxy->setSource(AsteriskSCF::Operations::createContext(), translateSource);
- translateSource->addSink(Testbed.slin8SinkProxy);
- translateSink->setSource(Testbed.ulawSourceProxy);
+ translateSource->addSink(AsteriskSCF::Operations::createContext(), Testbed.slin8SinkProxy);
+ translateSink->setSource(AsteriskSCF::Operations::createContext(), Testbed.ulawSourceProxy);
//We'll feed a few of the sample frames in.
FrameSeq ulawFrames(numFramesToTranslate, Testbed.sampleUlawFrame);
@@ -784,16 +852,17 @@ BOOST_FIXTURE_TEST_CASE(translateSlin8ToUlaw, PerTestFixture)
MediaOperationFactoryPrx::checkedCast(Testbed.locator->locate(slin82UlawParams));
MediaOperationPrx slin82UlawTranslator =
- slin82UlawFactory->createMediaOperation(Testbed.slin8SourceProxy, Testbed.ulawSinkProxy);
+ slin82UlawFactory->createMediaOperation(AsteriskSCF::Operations::createContext(),
+ Testbed.slin8SourceProxy, Testbed.ulawSinkProxy);
StreamSinkPrx translateSink = slin82UlawTranslator->getSink();
StreamSourcePrx translateSource = slin82UlawTranslator->getSource();
- Testbed.slin8SourceProxy->addSink(translateSink);
- Testbed.ulawSinkProxy->setSource(translateSource);
+ Testbed.slin8SourceProxy->addSink(AsteriskSCF::Operations::createContext(), translateSink);
+ Testbed.ulawSinkProxy->setSource(AsteriskSCF::Operations::createContext(), translateSource);
- translateSource->addSink(Testbed.ulawSinkProxy);
- translateSink->setSource(Testbed.slin8SourceProxy);
+ translateSource->addSink(AsteriskSCF::Operations::createContext(), Testbed.ulawSinkProxy);
+ translateSink->setSource(AsteriskSCF::Operations::createContext(), Testbed.slin8SourceProxy);
//We'll feed a few of the sample frames in.
FrameSeq slin8Frames(numFramesToTranslate, Testbed.sampleSlin8Frame);
@@ -850,16 +919,17 @@ BOOST_FIXTURE_TEST_CASE(resample8To16, PerTestFixture)
MediaOperationFactoryPrx::checkedCast(Testbed.locator->locate(eightToSixteenParams));
MediaOperationPrx eightToSixteenResampler =
- eightToSixteenFactory->createMediaOperation(Testbed.slin8SourceProxy, Testbed.slin16SinkProxy);
+ eightToSixteenFactory->createMediaOperation(AsteriskSCF::Operations::createContext(),
+ Testbed.slin8SourceProxy, Testbed.slin16SinkProxy);
StreamSinkPrx resampleSink = eightToSixteenResampler->getSink();
StreamSourcePrx resampleSource = eightToSixteenResampler->getSource();
- Testbed.slin8SourceProxy->addSink(resampleSink);
- Testbed.slin16SinkProxy->setSource(resampleSource);
+ Testbed.slin8SourceProxy->addSink(AsteriskSCF::Operations::createContext(), resampleSink);
+ Testbed.slin16SinkProxy->setSource(AsteriskSCF::Operations::createContext(), resampleSource);
- resampleSource->addSink(Testbed.slin16SinkProxy);
- resampleSink->setSource(Testbed.slin8SourceProxy);
+ resampleSource->addSink(AsteriskSCF::Operations::createContext(), Testbed.slin16SinkProxy);
+ resampleSink->setSource(AsteriskSCF::Operations::createContext(), Testbed.slin8SourceProxy);
FrameSeq slin8Frames(numFramesToTranslate, Testbed.sampleSlin8Frame);
@@ -909,16 +979,17 @@ BOOST_FIXTURE_TEST_CASE(resample16To8, PerTestFixture)
MediaOperationFactoryPrx::checkedCast(Testbed.locator->locate(sixteenToEightParams));
MediaOperationPrx sixteenToEightResampler =
- sixteenToEightFactory->createMediaOperation(Testbed.slin16SourceProxy, Testbed.slin8SinkProxy);
+ sixteenToEightFactory->createMediaOperation(AsteriskSCF::Operations::createContext(),
+ Testbed.slin16SourceProxy, Testbed.slin8SinkProxy);
StreamSinkPrx resampleSink = sixteenToEightResampler->getSink();
StreamSourcePrx resampleSource = sixteenToEightResampler->getSource();
- Testbed.slin16SourceProxy->addSink(resampleSink);
- Testbed.slin8SinkProxy->setSource(resampleSource);
+ Testbed.slin16SourceProxy->addSink(AsteriskSCF::Operations::createContext(), resampleSink);
+ Testbed.slin8SinkProxy->setSource(AsteriskSCF::Operations::createContext(), resampleSource);
- resampleSource->addSink(Testbed.slin8SinkProxy);
- resampleSink->setSource(Testbed.slin16SourceProxy);
+ resampleSource->addSink(AsteriskSCF::Operations::createContext(), Testbed.slin8SinkProxy);
+ resampleSink->setSource(AsteriskSCF::Operations::createContext(), Testbed.slin16SourceProxy);
FrameSeq slin16Frames(numFramesToTranslate, Testbed.sampleSlin16Frame);
@@ -968,16 +1039,17 @@ BOOST_FIXTURE_TEST_CASE(slin8toG722, PerTestFixture)
MediaOperationFactoryPrx::checkedCast(Testbed.locator->locate(slinToG722Params));
MediaOperationPrx slinToG722Translator =
- slinToG722Factory->createMediaOperation(Testbed.slin8SourceProxy, Testbed.g722SinkProxy);
+ slinToG722Factory->createMediaOperation(AsteriskSCF::Operations::createContext(),
+ Testbed.slin8SourceProxy, Testbed.g722SinkProxy);
StreamSinkPrx translatorSink = slinToG722Translator->getSink();
StreamSourcePrx translatorSource = slinToG722Translator->getSource();
- Testbed.slin8SourceProxy->addSink(translatorSink);
- Testbed.g722SinkProxy->setSource(translatorSource);
+ Testbed.slin8SourceProxy->addSink(AsteriskSCF::Operations::createContext(), translatorSink);
+ Testbed.g722SinkProxy->setSource(AsteriskSCF::Operations::createContext(), translatorSource);
- translatorSource->addSink(Testbed.g722SinkProxy);
- translatorSink->setSource(Testbed.slin8SourceProxy);
+ translatorSource->addSink(AsteriskSCF::Operations::createContext(), Testbed.g722SinkProxy);
+ translatorSink->setSource(AsteriskSCF::Operations::createContext(), Testbed.slin8SourceProxy);
FrameSeq slin8Frames(numFramesToTranslate, Testbed.sampleSlin8Frame);
@@ -1027,16 +1099,17 @@ BOOST_FIXTURE_TEST_CASE(slin16toG722, PerTestFixture)
MediaOperationFactoryPrx::checkedCast(Testbed.locator->locate(slinToG722Params));
MediaOperationPrx slinToG722Translator =
- slinToG722Factory->createMediaOperation(Testbed.slin16SourceProxy, Testbed.g722SinkProxy);
+ slinToG722Factory->createMediaOperation(AsteriskSCF::Operations::createContext(),
+ Testbed.slin16SourceProxy, Testbed.g722SinkProxy);
StreamSinkPrx translatorSink = slinToG722Translator->getSink();
StreamSourcePrx translatorSource = slinToG722Translator->getSource();
- Testbed.slin16SourceProxy->addSink(translatorSink);
- Testbed.g722SinkProxy->setSource(translatorSource);
+ Testbed.slin16SourceProxy->addSink(AsteriskSCF::Operations::createContext(), translatorSink);
+ Testbed.g722SinkProxy->setSource(AsteriskSCF::Operations::createContext(), translatorSource);
- translatorSource->addSink(Testbed.g722SinkProxy);
- translatorSink->setSource(Testbed.slin16SourceProxy);
+ translatorSource->addSink(AsteriskSCF::Operations::createContext(), Testbed.g722SinkProxy);
+ translatorSink->setSource(AsteriskSCF::Operations::createContext(), Testbed.slin16SourceProxy);
FrameSeq slin16Frames(numFramesToTranslate, Testbed.sampleSlin16Frame);
@@ -1086,16 +1159,17 @@ BOOST_FIXTURE_TEST_CASE(g722ToSlin8, PerTestFixture)
MediaOperationFactoryPrx::checkedCast(Testbed.locator->locate(g722ToSlinParams));
MediaOperationPrx g722ToSlinTranslator =
- g722ToSlinFactory->createMediaOperation(Testbed.g722SourceProxy, Testbed.slin8SinkProxy);
+ g722ToSlinFactory->createMediaOperation(AsteriskSCF::Operations::createContext(),
+ Testbed.g722SourceProxy, Testbed.slin8SinkProxy);
StreamSinkPrx translatorSink = g722ToSlinTranslator->getSink();
StreamSourcePrx translatorSource = g722ToSlinTranslator->getSource();
- Testbed.g722SourceProxy->addSink(translatorSink);
- Testbed.slin8SinkProxy->setSource(translatorSource);
+ Testbed.g722SourceProxy->addSink(AsteriskSCF::Operations::createContext(), translatorSink);
+ Testbed.slin8SinkProxy->setSource(AsteriskSCF::Operations::createContext(), translatorSource);
- translatorSource->addSink(Testbed.slin8SinkProxy);
- translatorSink->setSource(Testbed.g722SourceProxy);
+ translatorSource->addSink(AsteriskSCF::Operations::createContext(), Testbed.slin8SinkProxy);
+ translatorSink->setSource(AsteriskSCF::Operations::createContext(), Testbed.g722SourceProxy);
FrameSeq g722Frames(numFramesToTranslate, Testbed.sampleG722Frame);
@@ -1145,16 +1219,17 @@ BOOST_FIXTURE_TEST_CASE(g722ToSlin16, PerTestFixture)
MediaOperationFactoryPrx::checkedCast(Testbed.locator->locate(g722ToSlinParams));
MediaOperationPrx g722ToSlinTranslator =
- g722ToSlinFactory->createMediaOperation(Testbed.g722SourceProxy, Testbed.slin16SinkProxy);
+ g722ToSlinFactory->createMediaOperation(AsteriskSCF::Operations::createContext(),
+ Testbed.g722SourceProxy, Testbed.slin16SinkProxy);
StreamSinkPrx translatorSink = g722ToSlinTranslator->getSink();
StreamSourcePrx translatorSource = g722ToSlinTranslator->getSource();
- Testbed.g722SourceProxy->addSink(translatorSink);
- Testbed.slin16SinkProxy->setSource(translatorSource);
+ Testbed.g722SourceProxy->addSink(AsteriskSCF::Operations::createContext(), translatorSink);
+ Testbed.slin16SinkProxy->setSource(AsteriskSCF::Operations::createContext(), translatorSource);
- translatorSource->addSink(Testbed.slin16SinkProxy);
- translatorSink->setSource(Testbed.g722SourceProxy);
+ translatorSource->addSink(AsteriskSCF::Operations::createContext(), Testbed.slin16SinkProxy);
+ translatorSink->setSource(AsteriskSCF::Operations::createContext(), Testbed.g722SourceProxy);
FrameSeq g722Frames(numFramesToTranslate, Testbed.sampleG722Frame);
diff --git a/test/TestStreamSink.cpp b/test/TestStreamSink.cpp
index 1abf36c..888a15c 100644
--- a/test/TestStreamSink.cpp
+++ b/test/TestStreamSink.cpp
@@ -33,7 +33,7 @@ void TestStreamSink::write(const FrameSeq& frames, const Ice::Current&)
/**
* Implementation of the setSource method which just stores the source away for later retrieval.
*/
-void TestStreamSink::setSource(const StreamSourcePrx& source, const Ice::Current&)
+void TestStreamSink::setSource(const AsteriskSCF::System::V1::OperationContextPtr&, const StreamSourcePrx& source, const Ice::Current&)
{
mSource = source;
}
diff --git a/test/TestStreamSink.h b/test/TestStreamSink.h
index 4278950..65db80a 100644
--- a/test/TestStreamSink.h
+++ b/test/TestStreamSink.h
@@ -30,7 +30,7 @@ public:
/**
* Implementation of the setSource method which just stores the source away for later retrieval.
*/
- void setSource(const AsteriskSCF::Media::V1::StreamSourcePrx& source, const Ice::Current&);
+ void setSource(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Media::V1::StreamSourcePrx& source, const Ice::Current&);
/**
* Implementation of the getSource method which just pulls the source back out.
diff --git a/test/TestStreamSource.cpp b/test/TestStreamSource.cpp
index 456b107..f824fe5 100644
--- a/test/TestStreamSource.cpp
+++ b/test/TestStreamSource.cpp
@@ -26,7 +26,7 @@ TestStreamSource::TestStreamSource(const FormatPtr& format)
/**
* Implementation of addSink that tacks a new sink onto our list
*/
-void TestStreamSource::addSink(const StreamSinkPrx& sink, const Ice::Current&)
+void TestStreamSource::addSink(const AsteriskSCF::System::V1::OperationContextPtr&, const StreamSinkPrx& sink, const Ice::Current&)
{
mSinks.push_back(sink);
}
@@ -34,7 +34,7 @@ void TestStreamSource::addSink(const StreamSinkPrx& sink, const Ice::Current&)
/**
* Implementation of removeSink that removes a sink from our list
*/
-void TestStreamSource::removeSink(const StreamSinkPrx& sink, const Ice::Current&)
+void TestStreamSource::removeSink(const AsteriskSCF::System::V1::OperationContextPtr&, const StreamSinkPrx& sink, const Ice::Current&)
{
mSinks.erase(std::remove(mSinks.begin(), mSinks.end(), sink), mSinks.end());
}
@@ -66,7 +66,8 @@ std::string TestStreamSource::getId(const Ice::Current&)
/**
* Does nothing. Don't call this you silly head!
*/
-void TestStreamSource::requestFormat(const FormatPtr&, const Ice::Current&)
+void TestStreamSource::requestFormat(const AsteriskSCF::System::V1::OperationContextPtr&,
+ const FormatPtr&, const Ice::Current&)
{
}
diff --git a/test/TestStreamSource.h b/test/TestStreamSource.h
index 782db7b..b7464ea 100644
--- a/test/TestStreamSource.h
+++ b/test/TestStreamSource.h
@@ -25,12 +25,12 @@ public:
/**
* Implementation of addSink that tacks a new sink onto our list
*/
- void addSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&);
+ void addSink(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&);
/**
* Implementation of removeSink that removes a sink from our list
*/
- void removeSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&);
+ void removeSink(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&);
/**
* Implementation of the getSinks that returns sinks that have been added.
@@ -50,7 +50,8 @@ public:
/**
* Does nothing. Don't call this you silly head!
*/
- void requestFormat(const AsteriskSCF::Media::V1::FormatPtr&, const Ice::Current&);
+ void requestFormat(const AsteriskSCF::System::V1::OperationContextPtr&,
+ const AsteriskSCF::Media::V1::FormatPtr&, const Ice::Current&);
/**
* Writes the given frame to all its sinks. Presumably, there will
-----------------------------------------------------------------------
--
asterisk-scf/release/media_operations_core.git
More information about the asterisk-scf-commits
mailing list