[asterisk-scf-commits] asterisk-scf/integration/sip.git branch "retry_deux" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Tue Apr 10 23:10:57 CDT 2012
branch "retry_deux" has been updated
via 4c37ced142d56129a54a1e3dc49cd378fcad8244 (commit)
from 71eeab06a117530626725bab0c43c5835ffe600c (commit)
Summary of changes:
src/Component.cpp | 2 +
src/PJSIPRegistrarModule.cpp | 76 +++-
src/PJSIPRegistrarModule.h | 4 +
src/PJSIPSessionModule.cpp | 154 +++-----
src/PJSIPSessionModule.h | 4 +
src/SIPConfiguration.cpp | 366 +++++++++-------
src/SIPEndpoint.cpp | 222 +++++++---
src/SIPRegistrarListener.cpp | 84 +++-
src/SIPRegistrarListener.h | 2 +
src/SIPSession.cpp | 885 ++++++++++++++++++++++++++-------------
src/SIPTelephonyEventSink.cpp | 49 ++-
src/SIPTelephonyEventSource.cpp | 28 +-
src/SIPTransfer.cpp | 48 ++-
src/SIPTransfer.h | 5 +
14 files changed, 1242 insertions(+), 687 deletions(-)
- Log -----------------------------------------------------------------
commit 4c37ced142d56129a54a1e3dc49cd378fcad8244
Author: Ken Hunt <ken.hunt at digium.com>
Date: Tue Apr 10 23:10:17 2012 -0500
Uses the OperationMonitor helpers.
diff --git a/src/Component.cpp b/src/Component.cpp
index 5a3d3ed..e353b2f 100644
--- a/src/Component.cpp
+++ b/src/Component.cpp
@@ -95,11 +95,13 @@ public:
void removeAuthHook(const OperationContextPtr& operationContext, const AuthHookPrx &hook, const Ice::Current&)
{
+ // This method can be called any number of times, so no special retry logic required.
mPJSIPManager->removeAuthHook(hook);
}
void clearAuthHooks(const OperationContextPtr& operationContext, const Ice::Current&)
{
+ // This method can be called any number of times, so no special retry logic required.
mPJSIPManager->clearAuthHooks();
}
private:
diff --git a/src/PJSIPRegistrarModule.cpp b/src/PJSIPRegistrarModule.cpp
index dc8c175..d2df307 100644
--- a/src/PJSIPRegistrarModule.cpp
+++ b/src/PJSIPRegistrarModule.cpp
@@ -20,6 +20,7 @@
#include <AsteriskSCF/WorkQueue/WorkQueue.h>
#include <AsteriskSCF/WorkQueue/DefaultQueueListener.h>
#include <AsteriskSCF/PJLIB/ThreadHook.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
#include "PJSIPRegistrarModule.h"
#include "PJSIPManager.h"
@@ -32,6 +33,7 @@ using namespace AsteriskSCF::System::WorkQueue::V1;
using namespace AsteriskSCF::System::Hook::V1;
using namespace AsteriskSCF::SIP::ExtensionPoint::V1;
using namespace AsteriskSCF::System::V1;
+using namespace AsteriskSCF::Operations;
namespace
{
@@ -148,7 +150,8 @@ void BindingWrapper::updateBinding(const std::string& callID, int cSeq, int expi
queue->enqueueWork(new UpdateBinding(this, callID, cSeq, expiration));
}
-RegistrarI::RegistrarI(const RegistrarListenerPrx& defaultListener)
+RegistrarI::RegistrarI(const RegistrarListenerPrx& defaultListener) :
+ mOperationContextCache(OperationContextCache::create(120))
{
lg(Debug) << "In RegistrarI constructor, should be adding a listener...";
mQueue = new AsteriskSCF::WorkQueue::WorkQueue();
@@ -174,23 +177,44 @@ class AddListener : public Work
public:
AddListener(
const AMD_Registrar_addListenerPtr& cb,
+ const OperationContextPtr& operationContext,
const RegistrarListenerPrx& listener,
const RegistrarIPtr& registrar)
- : mCB(cb), mListener(listener), mRegistrar(registrar) { }
+ : mCB(cb), mOperationContext(operationContext), mListener(listener), mRegistrar(registrar) { }
void execute()
{
- std::vector<AsteriskSCF::SIP::Registration::V1::RegistrarListenerPrx>& listeners(mRegistrar->getListeners());
+ AMDContextResultData<ContactDict, AMD_Registrar_addListenerPtr>::ptr_type operationCookie =
+ getContext<AMDContextResultData<ContactDict, AMD_Registrar_addListenerPtr> >(
+ mRegistrar->getOperationContextCache(),
+ mOperationContext,
+ mCB);
+ if (!operationCookie)
+ {
+ lg(Debug) << "Retry of addListener() detected for operation " << mOperationContext->id;
+ return;
+ }
- if (std::find(listeners.begin(), listeners.end(), mListener) == listeners.end())
+ try
{
- lg(Debug) << "Adding Listener " << mListener->ice_getIdentity().name << " to registrar";
- listeners.push_back(mListener);
+ std::vector<AsteriskSCF::SIP::Registration::V1::RegistrarListenerPrx>& listeners(mRegistrar->getListeners());
+
+ if (std::find(listeners.begin(), listeners.end(), mListener) == listeners.end())
+ {
+ lg(Debug) << "Adding Listener " << mListener->ice_getIdentity().name << " to registrar";
+ listeners.push_back(mListener);
+ }
+ operationCookie->setResult(mRegistrar->createContactDict(mRegistrar->getBindings()));
+ }
+ catch (const std::exception& e)
+ {
+ operationCookie->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+ assert(false);
}
- mCB->ice_response(mRegistrar->createContactDict(mRegistrar->getBindings()));
}
private:
AMD_Registrar_addListenerPtr mCB;
+ OperationContextPtr mOperationContext;
RegistrarListenerPrx mListener;
RegistrarIPtr mRegistrar;
};
@@ -201,7 +225,7 @@ void RegistrarI::addListener_async(
const RegistrarListenerPrx& listener,
const Ice::Current&)
{
- mQueue->enqueueWork(new AddListener(cb, listener, this));
+ mQueue->enqueueWork(new AddListener(cb, operationContext, listener, this));
}
class RemoveListener : public Work
@@ -209,20 +233,41 @@ class RemoveListener : public Work
public:
RemoveListener(
const AMD_Registrar_removeListenerPtr& cb,
+ const OperationContextPtr& operationContext,
const RegistrarListenerPrx& listener,
const RegistrarIPtr& registrar)
- : mCB(cb), mListener(listener), mRegistrar(registrar) { }
+ : mCB(cb), mOperationContext(operationContext), mListener(listener), mRegistrar(registrar) { }
void execute()
{
- std::vector<AsteriskSCF::SIP::Registration::V1::RegistrarListenerPrx>& listeners(mRegistrar->getListeners());
+ AMDContextData<AMD_Registrar_removeListenerPtr>::ptr_type operationCookie =
+ getContext<AMDContextData<AMD_Registrar_removeListenerPtr> >(
+ mRegistrar->getOperationContextCache(),
+ mOperationContext,
+ mCB);
+ if (!operationCookie)
+ {
+ lg(Debug) << "Retry of removeListener() detected for operation " << mOperationContext->id;
+ return;
+ }
- listeners.erase(std::remove(listeners.begin(), listeners.end(), mListener));
- mCB->ice_response();
+ try
+ {
+ std::vector<AsteriskSCF::SIP::Registration::V1::RegistrarListenerPrx>& listeners(mRegistrar->getListeners());
+
+ listeners.erase(std::remove(listeners.begin(), listeners.end(), mListener));
+ operationCookie->setCompleted();
+ }
+ catch (const std::exception& e)
+ {
+ operationCookie->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+ assert(false);
+ }
}
private:
AMD_Registrar_removeListenerPtr mCB;
+ OperationContextPtr mOperationContext;
RegistrarListenerPrx mListener;
RegistrarIPtr mRegistrar;
@@ -234,7 +279,7 @@ void RegistrarI::removeListener_async(
const RegistrarListenerPrx& listener,
const Ice::Current&)
{
- mQueue->enqueueWork(new RemoveListener(cb, listener, this));
+ mQueue->enqueueWork(new RemoveListener(cb, operationContext, listener, this));
}
class GetAllBindings : public Work
@@ -392,6 +437,11 @@ QueuePtr RegistrarI::getQueue()
return mQueue;
}
+OperationContextCachePtr RegistrarI::getOperationContextCache()
+{
+ return mOperationContextCache;
+}
+
class ReplicateState : public Work
{
public:
diff --git a/src/PJSIPRegistrarModule.h b/src/PJSIPRegistrarModule.h
index bfa9033..ea0211b 100644
--- a/src/PJSIPRegistrarModule.h
+++ b/src/PJSIPRegistrarModule.h
@@ -19,6 +19,7 @@
#include <AsteriskSCF/Discovery/SmartProxy.h>
#include <AsteriskSCF/SIP/SIPRegistrarIf.h>
#include <AsteriskSCF/System/WorkQueue/WorkQueueIf.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
#include "SIPStateReplicator.h"
#include "PJSIPModule.h"
@@ -88,11 +89,14 @@ public:
AsteriskSCF::System::WorkQueue::V1::QueuePtr getQueue();
+ AsteriskSCF::Operations::OperationContextCachePtr getOperationContextCache();
+
private:
BindingWrapperDict mBindings;
AsteriskSCF::SIP::Registration::V1::ContactDict mContacts;
std::vector<AsteriskSCF::SIP::Registration::V1::RegistrarListenerPrx> mListeners;
AsteriskSCF::System::WorkQueue::V1::QueuePtr mQueue;
+ AsteriskSCF::Operations::OperationContextCachePtr mOperationContextCache;
};
typedef IceUtil::Handle<RegistrarI> RegistrarIPtr;
diff --git a/src/PJSIPSessionModule.cpp b/src/PJSIPSessionModule.cpp
index a244e7f..9fb3d35 100644
--- a/src/PJSIPSessionModule.cpp
+++ b/src/PJSIPSessionModule.cpp
@@ -42,11 +42,14 @@
#include <AsteriskSCF/WorkQueue/SuspendableWorkQueue.h>
#include <AsteriskSCF/Helpers/Retry.h>
#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::SIP::ExtensionPoint::V1;
using namespace AsteriskSCF::System::Hook::V1;
using namespace AsteriskSCF::System::V1;
+using namespace AsteriskSCF::Operations;
namespace
{
@@ -195,25 +198,57 @@ SessionWorkPtr PJSIPSessionModInfo::getSessionWork()
return mSessionWork;
}
+SIPSessionCreationExtensionPoint::SIPSessionCreationExtensionPoint() :
+ mOperationContextCache(OperationContextCache::create(120))
+{
+}
+
void SIPSessionCreationExtensionPoint::addSessionCreationHook(
const OperationContextPtr& operationContext,
const SessionCreationHookPrx& hook, const Ice::Current&)
{
+ ContextDataPtr contextData;
+ if (!(contextData = Operations::checkAndThrow(mOperationContextCache, operationContext)))
+ {
+ lg(Debug) << "SIPSessionCreationExtensionPoint::addSessionCreationHook() detected retry for operation " << operationContext->id;
+ return;
+ }
+
boost::unique_lock<boost::shared_mutex> lock(mLock);
mHooks.push_back(hook);
+ contextData->setCompleted();
}
+
void SIPSessionCreationExtensionPoint::removeSessionCreationHook(
const OperationContextPtr& operationContext,
const SessionCreationHookPrx& hook, const Ice::Current&)
{
+ ContextDataPtr contextData;
+ if (!(contextData = Operations::checkAndThrow(mOperationContextCache, operationContext)))
+ {
+ lg(Debug) << "SIPSessionCreationExtensionPoint::removeSessionCreationHook() detected retry for operation " << operationContext->id;
+ return;
+ }
+
boost::unique_lock<boost::shared_mutex> lock(mLock);
mHooks.erase(std::find(mHooks.begin(), mHooks.end(), hook));
+
+ contextData->setCompleted();
}
void SIPSessionCreationExtensionPoint::clearSessionCreationHooks(
const OperationContextPtr& operationContext, const Ice::Current&)
{
+ ContextDataPtr contextData;
+ if (!(contextData = Operations::checkAndThrow(mOperationContextCache, operationContext)))
+ {
+ lg(Debug) << "SIPSessionCreationExtensionPoint::clearSessionCreationHooks() detected retry for operation " << operationContext->id;
+ return;
+ }
+
boost::unique_lock<boost::shared_mutex> lock(mLock);
mHooks.clear();
+
+ contextData->setCompleted();
}
SessionCreationHookSeq SIPSessionCreationExtensionPoint::getHooks()
@@ -394,9 +429,7 @@ public:
mReplacedDialog(replacedDialog),
mDestination(destination),
mCallerID(callerID),
- mRedirections(redirections),
- mRetryPolicy(5, 500),
- mOperationContext(AsteriskSCF::Operations::createContext()) { }
+ mRedirections(redirections) { }
protected:
SuspendableWorkResult initial(const SuspendableWorkListenerPtr&)
@@ -548,12 +581,9 @@ protected:
// Update the Party Id information on the session.
mSession->setSelfAsCaller();
- // Setup an operation context to support retries.
- mOperationContext = AsteriskSCF::Operations::createContext();
-
- SIPAMICallbackPtr amiCallback = new SIPAMICallback(listener, mSession, this, false, true);
- SIPAMICallbackCookiePtr cookie = new SIPAMICallbackCookie(amiCallback);
- invokeOperation(cookie);
+ SIPAMICallbackPtr cb(new SIPAMICallback(listener, mSession, this, false, true));
+ Ice::CallbackPtr d = Ice::newCallback(cb, &SIPAMICallback::callback);
+ sessionRouter->begin_routeSession(Operations::createContext(), mSession->getSessionProxy(), mDestination, 0, mCallerID, mRedirections, d);
}
}
catch (const Ice::CommunicatorDestroyedException &)
@@ -571,17 +601,6 @@ protected:
return Complete;
}
- /**
- * Invoke the operation.
- * @param cookie The cookie contains the SIPAMICallback object. Passed into the AMI operation so
- * that retry operations have access to the callback object.
- */
- void invokeOperation(const SIPAMICallbackCookiePtr& cookie)
- {
- Ice::CallbackPtr d = Ice::newCallback(cookie->getSIPAMICallback(), &SIPAMICallback::callback);
- mSessionRouter->begin_routeSession(mOperationContext, mSession->getSessionProxy(), mDestination, 0, mCallerID, mRedirections, d, cookie);
- }
-
SuspendableWorkResult calledBack(const Ice::AsyncResultPtr& asyncResult)
{
SessionRouterPrx router = SessionRouterPrx::uncheckedCast(asyncResult->getProxy());
@@ -589,25 +608,6 @@ protected:
{
router->end_routeSession(asyncResult);
}
- catch (const Ice::ConnectionLostException& cle)
- {
- // Assume a failover is occurring for the routing service.
- // This will block the WorkQueue's thread, but it's highly likely
- // that the failover effects most of the other enqueued operations
- // anyway.
- if(mRetryPolicy.retry())
- {
- lg(Warning) << "SessionCreationOperation: Retrying routeSession operation.";
-
- // Retry the operation.
- invokeOperation(SIPAMICallbackCookiePtr::dynamicCast(asyncResult->getCookie()));
- }
- else
- {
- lg(Error) << "SessionCreationOperation: ConnectionLostException routing session failed " << mRetryPolicy.getMaxRetries() << " retries." ;
- endSession(500);
- }
- }
catch (const DestinationNotFoundException &)
{
endSession(404);
@@ -642,8 +642,6 @@ private:
SIPSessionPtr mSession;
CallerPtr mCallerID;
RedirectionsPtr mRedirections;
- RetryPolicy mRetryPolicy;
- OperationContextPtr mOperationContext;
};
bool PJSIPSessionModule::getPrivacy(pjsip_rx_data *rdata)
@@ -1541,7 +1539,6 @@ public:
protected:
SuspendableWorkResult initial(const SuspendableWorkListenerPtr&)
{
- OperationContextPtr context = AsteriskSCF::Operations::createContext();
lg(Debug) << "Executing HandleInviteResponseOperation" << std::endl;
//Treat all 1XX messages we don't recognize the same as a 180
if (mRespCode > 100 && mRespCode < 200 && mRespCode != 183)
@@ -1565,7 +1562,7 @@ protected:
{
SIPAMICallbackPtr cb(new SIPAMICallback(0, mSession, this, false, true));
Ice::CallbackPtr d = Ice::newCallback(cb, &SIPAMICallback::callback);
- (*listener)->begin_indicated(context, mSession->getSessionProxy(), new RingingIndication(), mSession->getCookies(), d);
+ (*listener)->begin_indicated(Operations::createContext(), mSession->getSessionProxy(), new RingingIndication(), mSession->getCookies(), d);
}
catch (const Ice::Exception &ex)
{
@@ -1589,7 +1586,7 @@ protected:
Ice::CallbackPtr d = Ice::newCallback(cb, &SIPAMICallback::callback);
ProgressingIndicationPtr progressing(new ProgressingIndication());
progressing->response = response;
- (*listener)->begin_indicated(context, mSession->getSessionProxy(), progressing, mSession->getCookies(), d);
+ (*listener)->begin_indicated(Operations::createContext(), mSession->getSessionProxy(), progressing, mSession->getCookies(), d);
}
catch (const Ice::Exception &ex)
{
@@ -1612,7 +1609,7 @@ protected:
{
SIPAMICallbackPtr cb(new SIPAMICallback(0, mSession, this, false, true));
Ice::CallbackPtr d = Ice::newCallback(cb, &SIPAMICallback::callback);
- (*listener)->begin_indicated(context, mSession->getSessionProxy(), new ConnectedIndication(), mSession->getCookies(), d);
+ (*listener)->begin_indicated(Operations::createContext(), mSession->getSessionProxy(), new ConnectedIndication(), mSession->getCookies(), d);
}
catch (const Ice::Exception &ex)
{
@@ -1622,7 +1619,7 @@ protected:
}
}
mSession->setSessionOwnerId(mConnected);
- mSession->getSessionControllerProxy()->updateConnectedLine(context, mConnected);
+ mSession->getSessionControllerProxy()->updateConnectedLine(Operations::createContext(), mConnected);
return Complete;
}
@@ -1863,7 +1860,6 @@ protected:
lg(Debug) << "Relating stopped state to " << listeners.size() << " listeners";
AsteriskSCF::SessionCommunications::V1::StoppedIndicationPtr stopped(new AsteriskSCF::SessionCommunications::V1::StoppedIndication());
stopped->response = response;
- OperationContextPtr context = AsteriskSCF::Operations::createContext();
for (std::vector<AsteriskSCF::SessionCommunications::V1::SessionListenerPrx>::iterator listener =
listeners.begin();
listener != listeners.end();
@@ -1871,7 +1867,7 @@ protected:
{
try
{
- (*listener)->indicated(context, session->getSessionProxy(), stopped, session->getCookies());
+ (*listener)->indicated(Operations::createContext(), session->getSessionProxy(), stopped, session->getCookies());
}
catch (const Ice::Exception &ex)
{
@@ -2136,9 +2132,7 @@ public:
: mInv(inv),
mContact(contact),
mRouter(router),
- mSession(session),
- mRetryPolicy(5, 500),
- mOperationContext(AsteriskSCF::Operations::createContext()) { }
+ mSession(session) { }
protected:
SuspendableWorkResult initial(const SuspendableWorkListenerPtr&)
@@ -2147,8 +2141,14 @@ protected:
{
SuspendableWorkListenerPtr listener = 0;
SIPAMICallbackPtr cb(new SIPAMICallback(listener, mSession, this, false, true));
- SIPAMICallbackCookiePtr cookie = new SIPAMICallbackCookie(cb);
- invokeOperation(cookie);
+ Ice::CallbackPtr d = Ice::newCallback(cb, &SIPAMICallback::callback);
+ mRouter->begin_connectBridgedSessionsWithDestination(
+ Operations::createContext(),
+ mSession->getSessionProxy(),
+ mContact,
+ true,
+ 0,
+ d);
}
catch (const Ice::CommunicatorDestroyedException&)
{
@@ -2157,24 +2157,6 @@ protected:
return Complete;
}
- /**
- * Invoke the operation.
- * @param cookie The cookie contains the SIPAMICallback object. Passed into the AMI operation so
- * that retry operations have access to the callback object.
- */
- void invokeOperation(const SIPAMICallbackCookiePtr& cookie)
- {
- Ice::CallbackPtr d = Ice::newCallback(cookie->getSIPAMICallback(), &SIPAMICallback::callback);
- mRouter->begin_connectBridgedSessionsWithDestination(
- mOperationContext,
- mSession->getSessionProxy(),
- mContact,
- true,
- 0,
- d,
- cookie);
- }
-
SuspendableWorkResult calledBack(const Ice::AsyncResultPtr& asyncResult)
{
SessionRouterPrx router =
@@ -2188,28 +2170,6 @@ protected:
{
router->end_connectBridgedSessionsWithDestination(asyncResult);
}
- catch (const Ice::ConnectionLostException& cle)
- {
- // Assume a failover is occurring for the routing service.
- // This will block the WorkQueue's thread, but it's highly likely
- // that the failover effects most of the other enqueued operations
- // anyway.
- if(mRetryPolicy.retry())
- {
- lg(Warning) << "HandleRedirection: Retrying connectBridgedSessionsWithDestination operation.";
-
- // Retry the operation.
- invokeOperation(SIPAMICallbackCookiePtr::dynamicCast(asyncResult->getCookie()));
- }
- else
- {
- lg(Error) << "HandleRedirection: connectBridgedSessionsWithDestination failed " << mRetryPolicy.getMaxRetries() << " retries." ;
-
- op = PJSIP_REDIRECT_REJECT;
- pjsip_inv_process_redirect(mInv, op, NULL);
- }
- return Complete;
- }
catch (const std::exception&)
{
op = PJSIP_REDIRECT_REJECT;
@@ -2224,8 +2184,6 @@ private:
const std::string mContact;
AsteriskSCF::Discovery::SmartProxy<SessionRouterPrx> mRouter;
SIPSessionPtr mSession;
- RetryPolicy mRetryPolicy;
- OperationContextPtr mOperationContext;
};
pjsip_redirect_op PJSIPSessionModule::invOnRedirected(pjsip_inv_session* inv, const pjsip_uri* uri,
@@ -2259,8 +2217,7 @@ public:
HandleSendReinviteResponse(pjsip_inv_session *inv, const int moduleId, pjsip_tx_data *tdata)
: mInv(inv),
mModuleId(moduleId),
- mResponse(tdata),
- mOperationContext(AsteriskSCF::Operations::createContext()) { }
+ mResponse(tdata) { }
protected:
SuspendableWorkResult initial(const SuspendableWorkListenerPtr&)
@@ -2299,7 +2256,7 @@ protected:
// we can actually accept
SIPAMICallbackPtr cb(new SIPAMICallback(0, mSession, this, false, true));
Ice::CallbackPtr d = Ice::newCallback(cb, &SIPAMICallback::callback);
- mSession->getSessionControllerProxy()->begin_addStreams(mOperationContext, mStreamsAdded, d);
+ mSession->getSessionControllerProxy()->begin_addStreams(Operations::createContext(), mStreamsAdded, d);
return Complete;
}
@@ -2338,7 +2295,6 @@ private:
const int mModuleId;
pjsip_tx_data *mResponse;
StreamInformationDict mStreamsAdded;
- OperationContextPtr mOperationContext;
SIPSessionPtr mSession;
};
diff --git a/src/PJSIPSessionModule.h b/src/PJSIPSessionModule.h
index 3dc5e5a..941704b 100644
--- a/src/PJSIPSessionModule.h
+++ b/src/PJSIPSessionModule.h
@@ -27,6 +27,7 @@
#include <AsteriskSCF/System/ThreadPool/ThreadPoolIf.h>
#include <AsteriskSCF/System/WorkQueue/WorkQueueIf.h>
#include <AsteriskSCF/PJLIB/ThreadHook.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
#include "SIPEndpointFactory.h"
#include "SIPReplicationContext.h"
@@ -84,6 +85,8 @@ typedef IceUtil::Handle<PJSIPSessionModuleThreadPoolListener> PJSIPSessionModule
class SIPSessionCreationExtensionPoint : public AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationExtensionPoint
{
public:
+ SIPSessionCreationExtensionPoint();
+
void addSessionCreationHook(
const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationHookPrx& hook,
@@ -100,6 +103,7 @@ public:
private:
AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationHookSeq mHooks;
boost::shared_mutex mLock;
+ AsteriskSCF::Operations::OperationContextCachePtr mOperationContextCache;
};
typedef IceUtil::Handle<SIPSessionCreationExtensionPoint> SIPSessionCreationExtensionPointPtr;
diff --git a/src/SIPConfiguration.cpp b/src/SIPConfiguration.cpp
index 31ebf66..1017b03 100644
--- a/src/SIPConfiguration.cpp
+++ b/src/SIPConfiguration.cpp
@@ -39,6 +39,7 @@
#include <vector>
#include <AsteriskSCF/Operations/OperationContext.h>
#include <AsteriskSCF/Operations/OperationContextCache.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::System::Configuration::V1;
@@ -1955,81 +1956,96 @@ void ConfigurationServiceImpl::setConfiguration(
const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups,
const Ice::Current&)
{
- lg(Debug) << "Configuration: setting configuration data.";
-
- // Is this a retry for an operation we're already processing?
- if (!mOperationContextCache->addOperationContext(operationContext))
+ ContextDataPtr contextData;
+ if (!(contextData = Operations::checkAndThrow(mOperationContextCache, operationContext)))
{
- // It's not quite this simple. But for now, ...
+ lg(Debug) << "ConfigurationService::setConfiguration() detected retry for operation " << operationContext->id;
return;
}
- class GroupsVisitor : public SIPConfigurationGroupVisitor
+ lg(Debug) << "Configuration: setting configuration data.";
+
+ try
{
- public:
- GroupsVisitor(const ConfigurationServiceImplPtr& impl) :
- mImpl(impl)
- {
- };
+ class GroupsVisitor : public SIPConfigurationGroupVisitor
+ {
+ public:
+ GroupsVisitor(const ConfigurationServiceImplPtr& impl) :
+ mImpl(impl)
+ {
+ };
- private:
+ private:
- void visitSIPGeneralGroup(const SIPGeneralGroupPtr& group)
- {
- genericSet<SIPGeneralGroupPtr>(mImpl->getData(), group);
- }
+ void visitSIPGeneralGroup(const SIPGeneralGroupPtr& group)
+ {
+ genericSet<SIPGeneralGroupPtr>(mImpl->getData(), group);
+ }
- void visitSIPDomainGroup(const SIPDomainGroupPtr& group)
- {
- genericSet<SIPDomainGroupPtr>(mImpl->getData(), group);
- };
+ void visitSIPDomainGroup(const SIPDomainGroupPtr& group)
+ {
+ genericSet<SIPDomainGroupPtr>(mImpl->getData(), group);
+ };
- void visitSIPUDPTransportGroup(const SIPUDPTransportGroupPtr& group)
- {
- genericSet<SIPUDPTransportGroupPtr>(mImpl->getData(), group);
- };
+ void visitSIPUDPTransportGroup(const SIPUDPTransportGroupPtr& group)
+ {
+ genericSet<SIPUDPTransportGroupPtr>(mImpl->getData(), group);
+ };
- void visitSIPTCPTransportGroup(const SIPTCPTransportGroupPtr& group)
- {
- genericSet<SIPTCPTransportGroupPtr>(mImpl->getData(), group);
- };
+ void visitSIPTCPTransportGroup(const SIPTCPTransportGroupPtr& group)
+ {
+ genericSet<SIPTCPTransportGroupPtr>(mImpl->getData(), group);
+ };
- void visitSIPTLSTransportGroup(const SIPTLSTransportGroupPtr& group)
- {
- genericSet<SIPTLSTransportGroupPtr>(mImpl->getData(), group);
- };
+ void visitSIPTLSTransportGroup(const SIPTLSTransportGroupPtr& group)
+ {
+ genericSet<SIPTLSTransportGroupPtr>(mImpl->getData(), group);
+ };
- void visitSIPSTUNTransportGroup(const SIPSTUNTransportGroupPtr& group)
- {
- genericSet<SIPSTUNTransportGroupPtr>(mImpl->getData(), group);
- }
+ void visitSIPSTUNTransportGroup(const SIPSTUNTransportGroupPtr& group)
+ {
+ genericSet<SIPSTUNTransportGroupPtr>(mImpl->getData(), group);
+ }
- void visitSIPEndpointGroup(const SIPEndpointGroupPtr& group)
- {
- genericSet<SIPEndpointGroupPtr>(mImpl->getData(), group);
- };
+ void visitSIPEndpointGroup(const SIPEndpointGroupPtr& group)
+ {
+ genericSet<SIPEndpointGroupPtr>(mImpl->getData(), group);
+ };
- void visitSIPRegistrationGroup(const SIPRegistrationGroupPtr& group)
- {
- genericSet<SIPRegistrationGroupPtr>(mImpl->getData(), group);
- }
- void visitIdentityGroup(const IdentityGroupPtr& group)
- {
- genericSet<IdentityGroupPtr>(mImpl->getData(), group);
- }
+ void visitSIPRegistrationGroup(const SIPRegistrationGroupPtr& group)
+ {
+ genericSet<SIPRegistrationGroupPtr>(mImpl->getData(), group);
+ }
+ void visitIdentityGroup(const IdentityGroupPtr& group)
+ {
+ genericSet<IdentityGroupPtr>(mImpl->getData(), group);
+ }
- ConfigurationServiceImplPtr mImpl;
- };
+ ConfigurationServiceImplPtr mImpl;
+ };
- SIPConfigurationGroupVisitorPtr v = new GroupsVisitor(this);
+ SIPConfigurationGroupVisitorPtr v = new GroupsVisitor(this);
- postProcesses.clear();
- for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
+ postProcesses.clear();
+ for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
+ {
+ (*group)->visit(v);
+ }
+
+ runPostProcessing();
+ contextData->setCompleted();
+ }
+ catch (const Ice::Exception& ex)
{
- (*group)->visit(v);
+ contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(ex)));
+ throw;
+ }
+ catch (const std::exception& e)
+ {
+ contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+ assert(false);
+ throw;
}
-
- runPostProcessing();
}
void ConfigurationServiceImpl::removeConfigurationItems(
@@ -2037,150 +2053,182 @@ void ConfigurationServiceImpl::removeConfigurationItems(
const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups,
const Ice::Current&)
{
- // Is this a retry for an operation we're already processing?
- if (!mOperationContextCache->addOperationContext(operationContext))
+ ContextDataPtr contextData;
+ if (!(contextData = Operations::checkAndThrow(mOperationContextCache, operationContext)))
{
- return;
+ lg(Debug) << "ConfigurationService::removeConfigurationItems() detected retry for operation " << operationContext->id;
+ return;
}
- class GroupsVisitor : public SIPConfigurationGroupVisitor
+ try
{
- public:
- GroupsVisitor(const ConfigurationServiceImplPtr& impl) :
- mImpl(impl)
- {
- };
+ class GroupsVisitor : public SIPConfigurationGroupVisitor
+ {
+ public:
+ GroupsVisitor(const ConfigurationServiceImplPtr& impl) :
+ mImpl(impl)
+ {
+ };
- void visitSIPGeneralGroup(const SIPGeneralGroupPtr& group)
- {
- mImpl->getData()->removeFromGroup(group);
- };
+ void visitSIPGeneralGroup(const SIPGeneralGroupPtr& group)
+ {
+ mImpl->getData()->removeFromGroup(group);
+ };
- void visitSIPDomainGroup(const SIPDomainGroupPtr& group)
- {
- mImpl->getData()->removeFromGroup(group);
- };
+ void visitSIPDomainGroup(const SIPDomainGroupPtr& group)
+ {
+ mImpl->getData()->removeFromGroup(group);
+ };
- void visitSIPUDPTransportGroup(const SIPUDPTransportGroupPtr& group)
- {
- mImpl->getData()->removeFromGroup(group);
- };
+ void visitSIPUDPTransportGroup(const SIPUDPTransportGroupPtr& group)
+ {
+ mImpl->getData()->removeFromGroup(group);
+ };
- void visitSIPTCPTransportGroup(const SIPTCPTransportGroupPtr& group)
- {
- mImpl->getData()->removeFromGroup(group);
- };
+ void visitSIPTCPTransportGroup(const SIPTCPTransportGroupPtr& group)
+ {
+ mImpl->getData()->removeFromGroup(group);
+ };
- void visitSIPTLSTransportGroup(const SIPTLSTransportGroupPtr& group)
- {
- mImpl->getData()->removeFromGroup(group);
- };
+ void visitSIPTLSTransportGroup(const SIPTLSTransportGroupPtr& group)
+ {
+ mImpl->getData()->removeFromGroup(group);
+ };
- void visitSIPSTUNTransportGroup(const SIPSTUNTransportGroupPtr& group)
- {
- mImpl->getData()->removeFromGroup(group);
- }
+ void visitSIPSTUNTransportGroup(const SIPSTUNTransportGroupPtr& group)
+ {
+ mImpl->getData()->removeFromGroup(group);
+ }
- void visitSIPEndpointGroup(const SIPEndpointGroupPtr& group)
- {
- mImpl->getData()->removeFromGroup(group);
- };
+ void visitSIPEndpointGroup(const SIPEndpointGroupPtr& group)
+ {
+ mImpl->getData()->removeFromGroup(group);
+ };
- void visitSIPRegistrationGroup(const SIPRegistrationGroupPtr& group)
- {
- mImpl->getData()->removeFromGroup(group);
- }
- void visitIdentityGroup(const IdentityGroupPtr& group)
- {
- mImpl->getData()->removeFromGroup(group);
- }
+ void visitSIPRegistrationGroup(const SIPRegistrationGroupPtr& group)
+ {
+ mImpl->getData()->removeFromGroup(group);
+ }
+ void visitIdentityGroup(const IdentityGroupPtr& group)
+ {
+ mImpl->getData()->removeFromGroup(group);
+ }
- private:
- ConfigurationServiceImplPtr mImpl;
- };
+ private:
+ ConfigurationServiceImplPtr mImpl;
+ };
- SIPConfigurationGroupVisitorPtr v = new GroupsVisitor(this);
+ SIPConfigurationGroupVisitorPtr v = new GroupsVisitor(this);
- postProcesses.clear();
- for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
+ postProcesses.clear();
+ for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
+ {
+ (*group)->visit(v);
+ }
+ runPostProcessing();
+ contextData->setCompleted();
+ }
+ catch (const Ice::Exception& ex)
{
- (*group)->visit(v);
+ contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(ex)));
+ throw;
+ }
+ catch (const std::exception& e)
+ {
+ contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+ assert(false);
+ throw;
}
- runPostProcessing();
}
void ConfigurationServiceImpl::removeConfigurationGroups(
const OperationContextPtr& operationContext,
const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups,
-const Ice::Current&)
+ const Ice::Current&)
{
- // Is this a retry for an operation we're already processing?
- if (!mOperationContextCache->addOperationContext(operationContext))
+ ContextDataPtr contextData;
+ if (!(contextData = Operations::checkAndThrow(mOperationContextCache, operationContext)))
{
+ lg(Debug) << "ConfigurationService::removeConfigurationGroups() detected retry for operation " << operationContext->id;
return;
}
- class Visitor : public SIPConfigurationGroupVisitor
+ try
{
- public:
- Visitor(const ConfigurationServiceImplPtr& impl) : mImpl(impl) { };
+ class Visitor : public SIPConfigurationGroupVisitor
+ {
+ public:
+ Visitor(const ConfigurationServiceImplPtr& impl) : mImpl(impl) { };
- private:
- void visitSIPGeneralGroup(const SIPGeneralGroupPtr& group)
- {
- mImpl->getData()->remove(group);
- };
+ private:
+ void visitSIPGeneralGroup(const SIPGeneralGroupPtr& group)
+ {
+ mImpl->getData()->remove(group);
+ };
- void visitSIPDomainGroup(const SIPDomainGroupPtr& group)
- {
- mImpl->getData()->remove(group);
- };
+ void visitSIPDomainGroup(const SIPDomainGroupPtr& group)
+ {
+ mImpl->getData()->remove(group);
+ };
- void visitSIPUDPTransportGroup(const SIPUDPTransportGroupPtr& group)
- {
- mImpl->getData()->remove(group);
- };
+ void visitSIPUDPTransportGroup(const SIPUDPTransportGroupPtr& group)
+ {
+ mImpl->getData()->remove(group);
+ };
- void visitSIPTCPTransportGroup(const SIPTCPTransportGroupPtr& group)
- {
- mImpl->getData()->remove(group);
- };
+ void visitSIPTCPTransportGroup(const SIPTCPTransportGroupPtr& group)
+ {
+ mImpl->getData()->remove(group);
+ };
- void visitSIPTLSTransportGroup(const SIPTLSTransportGroupPtr& group)
- {
- mImpl->getData()->remove(group);
- };
+ void visitSIPTLSTransportGroup(const SIPTLSTransportGroupPtr& group)
+ {
+ mImpl->getData()->remove(group);
+ };
- void visitSIPSTUNTransportGroup(const SIPSTUNTransportGroupPtr& group)
- {
- mImpl->getData()->remove(group);
- }
+ void visitSIPSTUNTransportGroup(const SIPSTUNTransportGroupPtr& group)
+ {
+ mImpl->getData()->remove(group);
+ }
- void visitSIPEndpointGroup(const SIPEndpointGroupPtr& group)
- {
- mImpl->getData()->remove(group);
- };
+ void visitSIPEndpointGroup(const SIPEndpointGroupPtr& group)
+ {
+ mImpl->getData()->remove(group);
+ };
- void visitSIPRegistrationGroup(const SIPRegistrationGroupPtr& group)
- {
- mImpl->getData()->remove(group);
- }
- void visitIdentityGroup(const IdentityGroupPtr& group)
- {
- mImpl->getData()->remove(group);
- }
+ void visitSIPRegistrationGroup(const SIPRegistrationGroupPtr& group)
+ {
+ mImpl->getData()->remove(group);
+ }
+ void visitIdentityGroup(const IdentityGroupPtr& group)
+ {
+ mImpl->getData()->remove(group);
+ }
- ConfigurationServiceImplPtr mImpl;
- };
+ ConfigurationServiceImplPtr mImpl;
+ };
- SIPConfigurationGroupVisitorPtr v = new Visitor(this);
+ SIPConfigurationGroupVisitorPtr v = new Visitor(this);
- postProcesses.clear();
- for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
+ postProcesses.clear();
+ for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
+ {
+ (*group)->visit(v);
+ }
+ runPostProcessing();
+ contextData->setCompleted();
+ }
+ catch (const Ice::Exception& ex)
{
- (*group)->visit(v);
+ contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(ex)));
+ throw;
+ }
+ catch (const std::exception& e)
+ {
+ contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+ assert(false);
+ throw;
}
- runPostProcessing();
}
};
};
diff --git a/src/SIPEndpoint.cpp b/src/SIPEndpoint.cpp
index 65ac5f4..a26d5a8 100644
--- a/src/SIPEndpoint.cpp
+++ b/src/SIPEndpoint.cpp
@@ -29,6 +29,7 @@
#include <AsteriskSCF/Media/SDP/MediaSDPIf.h>
#include <AsteriskSCF/Operations/OperationContextCache.h>
#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
#include "NATOptions.h"
@@ -535,57 +536,78 @@ std::string SIPEndpoint::getId(const Ice::Current&)
}
/**
- * This version of this overloaded operation handles a slice invocation.
+ * This version of this overloaded operation handles a remote invocation for an active component.
*/
AsteriskSCF::SessionCommunications::V1::SessionPrx SIPEndpoint::createSession(
- const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const OperationContextPtr& operationContext,
const string& destination,
const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
const AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationHookPrx& oneShotHook,
const Ice::Current&)
{
+ std::pair<bool, ContextResultData<AsteriskSCF::SessionCommunications::V1::SessionPrx>::ptr_type> cacheResult =
+ getContextSync<ContextResultData<AsteriskSCF::SessionCommunications::V1::SessionPrx>::ptr_type>(
+ mImplPriv->mOperationContextCache, operationContext);
+ if (cacheResult.first)
+ {
+ lg(Debug) << BOOST_CURRENT_FUNCTION << " detected retry for operation " << operationContext->id;
+ return cacheResult.second->getResult();
+ }
+
std::cout << "Got call over Ice to create a session for endpoint " << mImplPriv->mName << std::endl;
if (mImplPriv->mConfig.sessionConfig.callDirection != BOTH &&
mImplPriv->mConfig.sessionConfig.callDirection != INBOUND)
{
// TODO: We should have an exception here
+ cacheResult.second->setResult(0);
return 0;
}
- // Combine the default listeners and the argument listener.
- vector<SessionListenerPrx> listeners = mImplPriv->mDefaultListeners->getAll();
- if (listener != 0)
+ try
{
- listeners.push_back(listener);
- }
-
- AsteriskSCF::SessionCommunications::V1::SessionCookies defaultCookies = mImplPriv->mDefaultSessionCookies->getAll();
-
- SIPSessionPtr session = SIPSession::create(
- operationContext,
- mImplPriv->mAdapter,
- this,
- destination,
- listeners,
- defaultCookies,
- mImplPriv->mManager,
- mImplPriv->mServiceLocator,
- mImplPriv->mReplicationContext,
- oneShotHook,
- mImplPriv->mConfig.sessionConfig.rtpOverIPv6,
- true,
- mImplPriv->mConfig,
- NATEndpointOptions(mImplPriv->mConfig.sessionConfig.rtpOverICE,
- mImplPriv->mConfig.sessionConfig.rtpICEIncludeTURN,
- mImplPriv->mConfig.transportConfig.enableNAT,
- mImplPriv->mConfig.sessionConfig.udptlOverICE,
- mImplPriv->mConfig.sessionConfig.udptlWithTURN));
-
- mImplPriv->mSessions.push_back(session);
+ // Combine the default listeners and the argument listener.
+ vector<SessionListenerPrx> listeners = mImplPriv->mDefaultListeners->getAll();
+ if (listener != 0)
+ {
+ listeners.push_back(listener);
+ }
- std::cout << "And now we're returing a session proxy..." << std::endl;
- return session->getSessionProxy();
+ AsteriskSCF::SessionCommunications::V1::SessionCookies defaultCookies = mImplPriv->mDefaultSessionCookies->getAll();
+
+ SIPSessionPtr session = SIPSession::create(
+ operationContext,
+ mImplPriv->mAdapter,
+ this,
+ destination,
+ listeners,
+ defaultCookies,
+ mImplPriv->mManager,
+ mImplPriv->mServiceLocator,
+ mImplPriv->mReplicationContext,
+ oneShotHook,
+ mImplPriv->mConfig.sessionConfig.rtpOverIPv6,
+ true,
+ mImplPriv->mConfig,
+ NATEndpointOptions(mImplPriv->mConfig.sessionConfig.rtpOverICE,
+ mImplPriv->mConfig.sessionConfig.rtpICEIncludeTURN,
+ mImplPriv->mConfig.transportConfig.enableNAT,
+ mImplPriv->mConfig.sessionConfig.udptlOverICE,
+ mImplPriv->mConfig.sessionConfig.udptlWithTURN));
+
+ mImplPriv->mSessions.push_back(session);
+
+ std::cout << "And now we're returing a session proxy..." << std::endl;
+
+ cacheResult.second->setResult(session->getSessionProxy());
+ return session->getSessionProxy();
+ }
+ catch (const std::exception& e)
+ {
+ cacheResult.second->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+ assert(false);
+ throw;
+ }
}
/**
@@ -710,28 +732,43 @@ void SIPEndpoint::addDefaultSessionListener(
const SessionListenerPrx& listener,
const Ice::Current&)
{
- if (!mImplPriv->mOperationContextCache->addOperationContext(operationContext))
+ ContextDataPtr contextData;
+ if (!(contextData = Operations::checkAndThrow(mImplPriv->mOperationContextCache, operationContext)))
{
- lg(Debug) << BOOST_CURRENT_FUNCTION << " detected and rejected duplicate operation. Id = " << operationContext->id << " transaction = " << operationContext->transactionId;
+ lg(Debug) << BOOST_CURRENT_FUNCTION << " detected retry for operation " << operationContext->id;
return;
}
- mImplPriv->mDefaultListeners->add(listener);
+ try
+ {
+ mImplPriv->mDefaultListeners->add(listener);
- if (mImplPriv->mReplicationContext->isReplicating() == false)
+ if (mImplPriv->mReplicationContext->isReplicating())
+ {
+ // Replicate this change.
+ SIPStateItemSeq items;
+ items.push_back(new DefaultSessionListenerItem
+ (mImplPriv->replicaKeyName(listener),
+ "",
+ mImplPriv->mName,
+ listener));
+
+ mImplPriv->mReplicationContext->getReplicator().tryOneWay()->setState(Operations::createContext(), items);
+ }
+ contextData->setCompleted();
+ }
+ catch (const Ice::Exception& ex)
{
- return;
+ contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(ex)));
+ throw;
+ }
+ catch (const std::exception& e)
+ {
+ contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+ assert(false);
+ throw;
}
- // Replicate this change.
- SIPStateItemSeq items;
- items.push_back(new DefaultSessionListenerItem
- (mImplPriv->replicaKeyName(listener),
- "",
- mImplPriv->mName,
- listener));
-
- mImplPriv->mReplicationContext->getReplicator().tryOneWay()->setState(Operations::createContext(), items);
}
void SIPEndpoint::removeDefaultSessionListener(
@@ -739,28 +776,46 @@ void SIPEndpoint::removeDefaultSessionListener(
const SessionListenerPrx& listener,
const Ice::Current&)
{
- if (!mImplPriv->mOperationContextCache->addOperationContext(operationContext))
+ ContextDataPtr contextData;
+ if (!(contextData = Operations::checkAndThrow(mImplPriv->mOperationContextCache, operationContext)))
{
- lg(Debug) << BOOST_CURRENT_FUNCTION << " detected and rejected duplicate operation. Id = " << operationContext->id << " transaction = " << operationContext->transactionId;
+ lg(Debug) << BOOST_CURRENT_FUNCTION << " detected retry for operation " << operationContext->id;
return;
}
- mImplPriv->mDefaultListeners->remove(listener);
+ try
+ {
+ mImplPriv->mDefaultListeners->remove(listener);
- if (mImplPriv->mReplicationContext->isReplicating() == false)
+ if (mImplPriv->mReplicationContext->isReplicating() == false)
+ {
+ contextData->setCompleted();
+ return;
+ }
+
+ // Replicate this change.
+ SIPStateItemSeq items;
+ items.push_back(new DefaultSessionListenerItem
+ (mImplPriv->replicaKeyName(listener),
+ "",
+ mImplPriv->mName,
+ listener));
+
+ mImplPriv->mReplicationContext->getReplicator().tryOneWay()->removeStateForItems(Operations::createContext(), items);
+ contextData->setCompleted();
+ }
+ catch (const Ice::Exception& ex)
{
- return;
+ contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(ex)));
+ throw;
+ }
+ catch (const std::exception& e)
+ {
+ contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+ assert(false);
+ throw;
}
- // Replicate this change.
- SIPStateItemSeq items;
- items.push_back(new DefaultSessionListenerItem
- (mImplPriv->replicaKeyName(listener),
- "",
- mImplPriv->mName,
- listener));
-
- mImplPriv->mReplicationContext->getReplicator().tryOneWay()->removeStateForItems(Operations::createContext(), items);
}
/**
@@ -829,13 +884,29 @@ void SIPEndpoint::addDefaultSessionCookies(
const AsteriskSCF::SessionCommunications::V1::SessionCookies& cookies,
const Ice::Current&)
{
- if (!mImplPriv->mOperationContextCache->addOperationContext(operationContext))
+ ContextDataPtr contextData;
+ if (!(contextData = Operations::checkAndThrow(mImplPriv->mOperationContextCache, operationContext)))
{
- lg(Debug) << BOOST_CURRENT_FUNCTION << " detected and rejected duplicate operation. Id = " << operationContext->id << " transaction = " << operationContext->transactionId;
+ lg(Debug) << BOOST_CURRENT_FUNCTION << " detected retry for operation " << operationContext->id;
return;
}
- addDefaultSessionCookies(cookies, false);
+ try
+ {
+ addDefaultSessionCookies(cookies, false);
+ contextData->setCompleted();
+ }
+ catch (const Ice::Exception& ex)
+ {
+ contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(ex)));
+ throw;
+ }
+ catch (const std::exception& e)
+ {
+ contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+ assert(false);
+ throw;
+ }
}
/**
@@ -882,14 +953,29 @@ void SIPEndpoint::removeDefaultSessionCookies(
const AsteriskSCF::SessionCommunications::V1::SessionCookies& cookies,
const Ice::Current&)
{
-
- if (!mImplPriv->mOperationContextCache->addOperationContext(operationContext))
+ ContextDataPtr contextData;
+ if (!(contextData = Operations::checkAndThrow(mImplPriv->mOperationContextCache, operationContext)))
{
- lg(Debug) << BOOST_CURRENT_FUNCTION << " detected and rejected duplicate operation. Id = " << operationContext->id << " transaction = " << operationContext->transactionId;
+ lg(Debug) << BOOST_CURRENT_FUNCTION << " detected retry for operation " << operationContext->id;
return;
}
- removeDefaultSessionCookies(cookies, false);
+ try
+ {
+ removeDefaultSessionCookies(cookies, false);
+ contextData->setCompleted();
+ }
+ catch (const Ice::Exception& ex)
+ {
+ contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(ex)));
+ throw;
+ }
+ catch (const std::exception& e)
+ {
+ contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+ assert(false);
+ throw;
+ }
}
void SIPEndpoint::addDefaultSessionCookie(const AsteriskSCF::SessionCommunications::V1::SessionCookiePtr& cookie)
diff --git a/src/SIPRegistrarListener.cpp b/src/SIPRegistrarListener.cpp
index 93c2ca5..6fd2084 100644
--- a/src/SIPRegistrarListener.cpp
+++ b/src/SIPRegistrarListener.cpp
@@ -14,11 +14,13 @@
* at the top of the source tree.
*/
#include <AsteriskSCF/Logger.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
#include "SIPRegistrarListener.h"
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::System::V1;
+using namespace AsteriskSCF::Operations;
namespace
{
@@ -33,7 +35,7 @@ namespace SIPSessionManager
using namespace AsteriskSCF::SIP::Registration::V1;
SIPDefaultRegistrarListener::SIPDefaultRegistrarListener(const boost::shared_ptr<SIPEndpointFactory>& endpointFactory)
- : mEndpointFactory(endpointFactory)
+ : mOperationContextCache(OperationContextCache::create(120)), mEndpointFactory(endpointFactory)
{
pj_caching_pool_init(&mCachingPool, NULL, 2048);
}
@@ -79,17 +81,34 @@ void SIPDefaultRegistrarListener::contactsAdded(
const BindingUpdateSeq& contacts,
const Ice::Current&)
{
- pj_pool_t *pool = pj_pool_create(&mCachingPool.factory, "DefaultRegistrarListener", 256, 256, NULL);
- for (BindingUpdateSeq::const_iterator iter = contacts.begin(); iter != contacts.end(); ++iter)
+ ContextDataPtr contextData;
+ if (!(contextData = Operations::checkAndThrow(mOperationContextCache, operationContext)))
{
- SIPEndpointSeq endpoints = getEndpoints(pool, iter->aor);
- for (Ice::StringSeq::const_iterator contactIter = iter->contacts.begin();
- contactIter != iter->contacts.end(); ++contactIter)
+ lg(Debug) << "SIPDefaultRegistrarListener::contactsAdded() detected retry for operation " << operationContext->id;
+ return;
+ }
+
+ try
+ {
+ pj_pool_t *pool = pj_pool_create(&mCachingPool.factory, "DefaultRegistrarListener", 256, 256, NULL);
+ for (BindingUpdateSeq::const_iterator iter = contacts.begin(); iter != contacts.end(); ++iter)
{
- updateEndpoints(endpoints, pool, *contactIter);
+ SIPEndpointSeq endpoints = getEndpoints(pool, iter->aor);
+ for (Ice::StringSeq::const_iterator contactIter = iter->contacts.begin();
+ contactIter != iter->contacts.end(); ++contactIter)
+ {
+ updateEndpoints(endpoints, pool, *contactIter);
+ }
}
+ pj_pool_release(pool);
+ contextData->setCompleted();
+ }
+ catch (const std::exception& e)
+ {
+ contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+ assert(false);
+ throw;
}
- pj_pool_release(pool);
}
void SIPDefaultRegistrarListener::contactsRemoved(
@@ -97,26 +116,43 @@ void SIPDefaultRegistrarListener::contactsRemoved(
const BindingUpdateSeq& contacts,
const Ice::Current&)
{
- pj_pool_t *pool = pj_pool_create(&mCachingPool.factory, "DefaultRegistrarListener", 256, 256, NULL);
- for (BindingUpdateSeq::const_iterator iter = contacts.begin(); iter != contacts.end(); ++iter)
+ ContextDataPtr contextData;
+ if (!(contextData = Operations::checkAndThrow(mOperationContextCache, operationContext)))
{
- if (iter->contacts.empty())
- {
- //If an aor has no contacts being removed, bail now.
- continue;
- }
- SIPEndpointSeq endpoints = getEndpoints(pool, iter->aor);
- for (SIPEndpointSeq::iterator endpointIter = endpoints.begin();
- endpointIter != endpoints.end(); ++endpointIter)
+ lg(Debug) << "SIPDefaultRegistrarListener::contactsRemoved() detected retry for operation " << operationContext->id;
+ return;
+ }
+
+ try
+ {
+ pj_pool_t *pool = pj_pool_create(&mCachingPool.factory, "DefaultRegistrarListener", 256, 256, NULL);
+ for (BindingUpdateSeq::const_iterator iter = contacts.begin(); iter != contacts.end(); ++iter)
{
- lg(Debug) << "Removing contacts from AoR " << iter->aor;
- //Setting the endpoint's target address to be empty is the method we currently use
- //to make the endpoint inaccessible. Once endpoints can support multiple addresses,
- //it'll be a bit less hacky since we can just remove a specific contact from the list.
- (*endpointIter)->setTargetAddress("", 0);
+ if (iter->contacts.empty())
+ {
+ //If an aor has no contacts being removed, bail now.
+ continue;
+ }
+ SIPEndpointSeq endpoints = getEndpoints(pool, iter->aor);
+ for (SIPEndpointSeq::iterator endpointIter = endpoints.begin();
+ endpointIter != endpoints.end(); ++endpointIter)
+ {
+ lg(Debug) << "Removing contacts from AoR " << iter->aor;
+ //Setting the endpoint's target address to be empty is the method we currently use
+ //to make the endpoint inaccessible. Once endpoints can support multiple addresses,
+ //it'll be a bit less hacky since we can just remove a specific contact from the list.
+ (*endpointIter)->setTargetAddress("", 0);
+ }
}
+ pj_pool_release(pool);
+ contextData->setCompleted();
+ }
+ catch (const std::exception& e)
+ {
+ contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+ assert(false);
+ throw;
}
- pj_pool_release(pool);
}
} // namespace SIPSessionManager
diff --git a/src/SIPRegistrarListener.h b/src/SIPRegistrarListener.h
index bfe5879..1e6e017 100644
--- a/src/SIPRegistrarListener.h
+++ b/src/SIPRegistrarListener.h
@@ -19,6 +19,7 @@
#include <pjsip.h>
#include <pjlib.h>
#include <AsteriskSCF/SIP/SIPRegistrarIf.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
#include "SIPEndpointFactory.h"
@@ -38,6 +39,7 @@ public:
void contactsRemoved(const AsteriskSCF::System::V1::OperationContextPtr&,
const AsteriskSCF::SIP::Registration::V1::BindingUpdateSeq&, const Ice::Current&);
private:
+ AsteriskSCF::Operations::OperationContextCachePtr mOperationContextCache;
SIPEndpointSeq getEndpoints(pj_pool_t *pool, const std::string& aor);
void updateEndpoints(const SIPEndpointSeq& endpoints, pj_pool_t *pool, const std::string& contact);
boost::shared_ptr<SIPEndpointFactory> mEndpointFactory;
diff --git a/src/SIPSession.cpp b/src/SIPSession.cpp
index 982a390..c1215fb 100755
--- a/src/SIPSession.cpp
+++ b/src/SIPSession.cpp
@@ -48,6 +48,7 @@
#include <AsteriskSCF/Collections/HandleSet.h>
#include <AsteriskSCF/Operations/OperationContextCache.h>
#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
#include "NATOptions.h"
#include "PJSIPSessionModule.h"
@@ -426,34 +427,40 @@ public:
* @param sessionPriv The session for whom the information is requested from.
*/
SetMediaSessionCookiesOperation(
- const OperationContextPtr& sourceContext,
+ const OperationContextPtr& operationContext,
const AsteriskSCF::Media::V1::SessionCookies& cookies,
const SIPSessionPtr& session)
- : mSourceContext(sourceContext), mCookies(cookies), mSession(session)
+ : mOperationContext(operationContext), mCookies(cookies), mSession(session)
{
}
SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
{
+ // Note: Retry logic is handled by the operation that enqueues this Work.
+
lg(Debug) << "Executing a SetMediaSessionCookiesOperation operation";
AsteriskSCF::Media::V1::SessionCookies results;
// Set the cookies on all of the Session's RTP Media Sessions.
RTPMediaSessionDict mediaSessions = mSession->getRTPMediaSessions();
+ int count=0;
+ string modifier;
for(RTPMediaSessionDict::iterator i = mediaSessions.begin();
- i != mediaSessions.end(); ++i)
+ i != mediaSessions.end(); ++count, ++i)
{
- i->second->setCookies(Operations::createContext(mSourceContext), mCookies);
+ modifier = "setCookies." + boost::lexical_cast<std::string>(count);
+ i->second->setCookies(Operations::calculateOperationContext(mOperationContext, modifier), mCookies);
}
return Complete;
}
private:
- OperationContextPtr mSourceContext;
+ OperationContextPtr mOperationContext;
AsteriskSCF::Media::V1::SessionCookies mCookies;
SIPSessionPtr mSession;
+ SIPSessionPrivPtr mSessionPriv;
};
/**
@@ -468,34 +475,42 @@ public:
* @param sessionPriv The session for whom the information is requested from.
*/
RemoveMediaSessionCookiesOperation(
- const OperationContextPtr& sourceContext,
+ const OperationContextPtr& operationContext,
const AsteriskSCF::Media::V1::SessionCookies& cookieTypes,
const SIPSessionPtr& session)
- : mSourceContext(sourceContext), mCookieTypes(cookieTypes), mSession(session)
+ : mOperationContext(operationContext),
+ mCookieTypes(cookieTypes),
+ mSession(session)
{
}
SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
{
+ // Note: Retry logic is handled by the operation that enqueues this Work.
+
lg(Debug) << "Executing a RemoveMediaSessionCookiesOperation operation";
AsteriskSCF::Media::V1::SessionCookies results;
// Set the cookies on all of the Session's RTP Media Sessions.
RTPMediaSessionDict mediaSessions = mSession->getRTPMediaSessions();
+ int count=0;
+ string modifier;
for(RTPMediaSessionDict::iterator i = mediaSessions.begin();
- i != mediaSessions.end(); ++i)
+ i != mediaSessions.end(); ++count, ++i)
{
- i->second->removeCookies(Operations::createContext(mSourceContext), mCookieTypes);
+ modifier = "removeCookies." + boost::lexical_cast<std::string>(count);
+ i->second->removeCookies(Operations::createContext(mOperationContext), mCookieTypes);
}
return Complete;
}
private:
- OperationContextPtr mSourceContext;
+ OperationContextPtr mOperationContext;
AsteriskSCF::Media::V1::SessionCookies mCookieTypes;
SIPSessionPtr mSession;
+ SIPSessionPrivPtr mSessionPriv;
};
/**
@@ -504,9 +519,11 @@ private:
class SIPMediaSession : public Media::V1::Session
{
public:
- SIPMediaSession(const SIPSessionPtr& session)
+ SIPMediaSession(const SIPSessionPtr& session,
+ const boost::shared_ptr<SIPSessionPriv>& sessionPriv)
: mId(IceUtil::generateUUID()),
- mSession(session)
+ mSession(session),
+ mOperationContextCache(OperationContextCache::create(120))
{
}
@@ -530,7 +547,17 @@ public:
const AsteriskSCF::Media::V1::SessionCookies& cookies,
const Ice::Current&)
{
+ ContextDataPtr operationCookie;
+ if (!(operationCookie = Operations::checkAndThrow(mOperationContextCache, operationContext)))
+ {
+ lg(Debug) << BOOST_CURRENT_FUNCTION << " detected retry for operation " << operationContext->id;
+ return;
+ }
+
mSession->enqueueSessionWork(new SetMediaSessionCookiesOperation(operationContext, cookies, mSession));
+
+ // Enqueuing the operation is "complete" enough for this case.
+ operationCookie->setCompleted();
}
void getCookies_async(
@@ -546,7 +573,17 @@ public:
const AsteriskSCF::Media::V1::SessionCookies& cookies,
const Ice::Current&)
{
+ ContextDataPtr operationCookie;
+ if (!(operationCookie = Operations::checkAndThrow(mOperationContextCache, operationContext)))
+ {
+ lg(Debug) << BOOST_CURRENT_FUNCTION << " detected retry for operation " << operationContext->id;
+ return;
+ }
+
mSession->enqueueSessionWork(new RemoveMediaSessionCookiesOperation(operationContext, cookies, mSession));
+
+ // Enqueuing the operation is "complete" enough for this case.
+ operationCookie->setCompleted();
}
private:
@@ -559,6 +596,8 @@ private:
* A pointer to the communications session that created us.
*/
SIPSessionPtr mSession;
+
+ OperationContextCachePtr mOperationContextCache;
};
/**
@@ -582,109 +621,120 @@ public:
SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
{
- // Is this a retry for an operation we're already processing?
- if (!mImplPriv->mOperationContextCache->addOperationContext(mOperationContext))
+ AMDContextData<AMD_SessionController_changeStreamStatesPtr>::ptr_type operationCookie =
+ getContext<AMDContextData<AMD_SessionController_changeStreamStatesPtr> >(
+ mImplPriv->mOperationContextCache,
+ mOperationContext,
+ mCb);
+ if (!operationCookie)
{
- lg(Debug) << "Retry of previously processed changeStreamStates() operation detected and rejected.";
+ lg(Debug) << "changeStreamStates() detected retry for operation " << mOperationContext->id;
return Complete;
}
lg(Debug) << "Executing a changeStreamStates Operation";
- // This boolean is set to true if at least one stream is actually changed. This is to prevent
- // needless reinvites.
- bool changed = false;
-
- // We iterate through each stream making sure we have one that matches it
- for (AsteriskSCF::Media::V1::StreamStateDict::const_iterator stream = mStreams.begin();
- stream != mStreams.end();
- ++stream)
+ try
{
- AsteriskSCF::Media::V1::StreamInformationDict::iterator ourStream = mImplPriv->mStreams.find(stream->first);
-
- // If we don't have a stream stored locally then they gave us a stream that we really know nothing about it
- if (ourStream == mImplPriv->mStreams.end())
+ // This boolean is set to true if at least one stream is actually changed. This is to prevent
+ // needless reinvites.
+ bool changed = false;
+
+ // We iterate through each stream making sure we have one that matches it
+ for (AsteriskSCF::Media::V1::StreamStateDict::const_iterator stream = mStreams.begin();
+ stream != mStreams.end();
+ ++stream)
{
- continue;
- }
+ AsteriskSCF::Media::V1::StreamInformationDict::iterator ourStream = mImplPriv->mStreams.find(stream->first);
+
+ // If we don't have a stream stored locally then they gave us a stream that we really know nothing about it
+ if (ourStream == mImplPriv->mStreams.end())
+ {
+ continue;
+ }
- // If this doesn't actually alter the state of the stream do nothing, since it would just be silly
- if (ourStream->second->state == stream->second)
- {
- continue;
- }
+ // If this doesn't actually alter the state of the stream do nothing, since it would just be silly
+ if (ourStream->second->state == stream->second)
+ {
+ continue;
+ }
- // The implementation of how we store streams and SDP are linked together, if we can find a stream
- // in our dictionary of streams than it also exists in the SDP
- pjmedia_sdp_media *media = mImplPriv->mSDP->media[boost::lexical_cast<int>(stream->first)];
+ // The implementation of how we store streams and SDP are linked together, if we can find a stream
+ // in our dictionary of streams than it also exists in the SDP
+ pjmedia_sdp_media *media = mImplPriv->mSDP->media[boost::lexical_cast<int>(stream->first)];
- // Depending on the current state go ahead and remove the current attribute
- if (ourStream->second->state == SendAndReceive)
- {
- pjmedia_sdp_media_remove_all_attr(media, "sendrecv");
- }
- else if (ourStream->second->state == SendOnly)
- {
- pjmedia_sdp_media_remove_all_attr(media, "sendonly");
- }
- else if (ourStream->second->state == ReceiveOnly)
- {
- pjmedia_sdp_media_remove_all_attr(media, "recvonly");
- }
- else if (ourStream->second->state == Inactive)
- {
- pjmedia_sdp_media_remove_all_attr(media, "inactive");
- }
+ // Depending on the current state go ahead and remove the current attribute
+ if (ourStream->second->state == SendAndReceive)
+ {
+ pjmedia_sdp_media_remove_all_attr(media, "sendrecv");
+ }
+ else if (ourStream->second->state == SendOnly)
+ {
+ pjmedia_sdp_media_remove_all_attr(media, "sendonly");
+ }
+ else if (ourStream->second->state == ReceiveOnly)
+ {
+ pjmedia_sdp_media_remove_all_attr(media, "recvonly");
+ }
+ else if (ourStream->second->state == Inactive)
+ {
+ pjmedia_sdp_media_remove_all_attr(media, "inactive");
+ }
- // Now that the old attribute is removed we can go ahead and update our state
- ourStream->second->state = stream->second;
+ // Now that the old attribute is removed we can go ahead and update our state
+ ourStream->second->state = stream->second;
- // Now we can go ahead and add in the corret attribute
- pjmedia_sdp_attr *attr = NULL;
+ // Now we can go ahead and add in the corret attribute
+ pjmedia_sdp_attr *attr = NULL;
- if (ourStream->second->state == SendAndReceive)
- {
- attr = pjmedia_sdp_attr_create(mImplPriv->mDialog->pool, "sendrecv", NULL);
- }
- else if (ourStream->second->state == SendOnly)
- {
- attr = pjmedia_sdp_attr_create(mImplPriv->mDialog->pool, "sendonly", NULL);
- }
- else if (ourStream->second->state == ReceiveOnly)
- {
- attr = pjmedia_sdp_attr_create(mImplPriv->mDialog->pool, "recvonly", NULL);
- }
- else if (ourStream->second->state == Inactive)
- {
- attr = pjmedia_sdp_attr_create(mImplPriv->mDialog->pool, "inactive", NULL);
+ if (ourStream->second->state == SendAndReceive)
+ {
+ attr = pjmedia_sdp_attr_create(mImplPriv->mDialog->pool, "sendrecv", NULL);
+ }
+ else if (ourStream->second->state == SendOnly)
+ {
+ attr = pjmedia_sdp_attr_create(mImplPriv->mDialog->pool, "sendonly", NULL);
+ }
+ else if (ourStream->second->state == ReceiveOnly)
+ {
+ attr = pjmedia_sdp_attr_create(mImplPriv->mDialog->pool, "recvonly", NULL);
+ }
+ else if (ourStream->second->state == Inactive)
+ {
+ attr = pjmedia_sdp_attr_create(mImplPriv->mDialog->pool, "inactive", NULL);
+ }
+
+ if (attr)
+ {
+ pjmedia_sdp_media_add_attr(media, attr);
+ changed = true;
+ }
}
- if (attr)
+ // If any streams were actually updated trigger a reinvite if need be
+ if (changed == true)
{
- pjmedia_sdp_media_add_attr(media, attr);
- changed = true;
+ // TODO: Add code here to determine if we really do need to send the reinvite, if we haven't actually answered yet
+ // what we really want to do is just update our answer SDP
+ pjsip_tx_data *packet = NULL;
+
+ if (success(pjsip_inv_reinvite(mImplPriv->mInviteSession, NULL, mImplPriv->mSDP, &packet)))
+ {
+ pjsip_inv_send_msg(mImplPriv->mInviteSession, packet);
+ }
+ else
+ {
+ lg(Warning) << "Unable to create reinvite";
+ }
}
- }
- // If any streams were actually updated trigger a reinvite if need be
- if (changed == true)
+ operationCookie->setCompleted();
+ }
+ catch (const std::exception& e)
{
- // TODO: Add code here to determine if we really do need to send the reinvite, if we haven't actually answered yet
- // what we really want to do is just update our answer SDP
- pjsip_tx_data *packet = NULL;
-
- if (success(pjsip_inv_reinvite(mImplPriv->mInviteSession, NULL, mImplPriv->mSDP, &packet)))
- {
- pjsip_inv_send_msg(mImplPriv->mInviteSession, packet);
- }
- else
- {
- lg(Warning) << "Unable to create reinvite";
- }
+ operationCookie->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+ assert(false);
}
-
- mCb->ice_response();
-
return Complete;
}
@@ -708,52 +758,63 @@ public:
SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
{
- // Is this a retry for an operation we're already processing?
- if (!mImplPriv->mOperationContextCache->addOperationContext(mOperationContext))
+ AMDContextResultData<StreamInformationDict, AMD_SessionController_addStreamsPtr>::ptr_type operationCookie =
+ getContext<AMDContextResultData<StreamInformationDict, AMD_SessionController_addStreamsPtr> >(
+ mImplPriv->mOperationContextCache,
+ mOperationContext,
+ mCb);
+ if (!operationCookie)
{
- lg(Debug) << "Retry of previously processed addStreams() operation detected and rejected.";
+ lg(Debug) << "addStreams() detected retry for operation " << mOperationContext->id;
return Complete;
}
- lg(Debug) << "Executing an addStreams Operation";
-
- // If there is an outstanding transaction then no streams can be added at this time
- if (mImplPriv->mInviteSession->invite_tsx)
+ try
{
- mCb->ice_response(StreamInformationDict());
- return Complete;
- }
+ lg(Debug) << "Executing an addStreams Operation";
- // Create an offer adding in the requested streams
- StreamInformationDict added;
- pjmedia_sdp_session *sdp = mSession->createSDPOffer(mStreams, added);
+ // If there is an outstanding transaction then no streams can be added at this time
+ if (mImplPriv->mInviteSession->invite_tsx)
+ {
+ mCb->ice_response(StreamInformationDict());
+ return Complete;
+ }
- // If no streams were actually added respond back appropriately
- if (added.empty())
- {
- mCb->ice_response(StreamInformationDict());
- return Complete;
- }
+ // Create an offer adding in the requested streams
+ StreamInformationDict added;
+ pjmedia_sdp_session *sdp = mSession->createSDPOffer(mStreams, added);
- // Store callback information so when the remote party responds with which streams were accepted we can
- // communicate it to the controller
- mImplPriv->mAddStreamsCb = mCb;
+ // If no streams were actually added respond back appropriately
+ if (added.empty())
+ {
+ mCb->ice_response(StreamInformationDict());
+ return Complete;
+ }
+
+ // Store callback information so when the remote party responds with which streams were accepted we can
+ // communicate it to the controller
+ mImplPriv->mAddStreamsCb = mCb;
- // Okay, create and send the reinvite!
- pjsip_tx_data *packet = NULL;
+ // Okay, create and send the reinvite!
+ pjsip_tx_data *packet = NULL;
- if (success(pjsip_inv_reinvite(mImplPriv->mInviteSession, NULL, sdp, &packet)))
- {
- pjsip_inv_send_msg(mImplPriv->mInviteSession, packet);
+ if (success(pjsip_inv_reinvite(mImplPriv->mInviteSession, NULL, sdp, &packet)))
+ {
+ pjsip_inv_send_msg(mImplPriv->mInviteSession, packet);
+ }
+ else
+ {
+ // If we couldn't create the reinvite no streams were added
+ lg(Warning) << "Unable to create reinvite when adding streams";
+ }
+
+ operationCookie->setResult(StreamInformationDict());
}
- else
+ catch (const std::exception& e)
{
- // If we couldn't create the reinvite no streams were added
- lg(Warning) << "Unable to create reinvite when adding streams";
+ operationCookie->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+ assert(false);
}
-
- mCb->ice_response(StreamInformationDict());
-
return Complete;
}
@@ -778,38 +839,50 @@ public:
SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
{
- // Is this a retry for an operation we're already processing?
- if (!mImplPriv->mOperationContextCache->addOperationContext(mOperationContext))
+ AMDContextData<AMD_SessionController_removeStreamsPtr>::ptr_type operationCookie =
+ getContext<AMDContextData<AMD_SessionController_removeStreamsPtr> >(
+ mImplPriv->mOperationContextCache,
+ mOperationContext,
+ mCb);
+ if (!operationCookie)
{
- lg(Debug) << "Retry of previously processed removeStates() operation detected and rejected.";
+ lg(Debug) << "removeStreamStates() detected retry for operation " << mOperationContext->id;
return Complete;
}
- lg(Debug) << "Executing a removeStreams Operation";
-
- pjmedia_sdp_session *sdp = mSession->modifySDP(mStreams);
-
- // If there is an outstanding transaction just set this as the answer SDP, otherwise trigger a reinvite
- if (!mImplPriv->mInviteSession->invite_tsx)
+ try
{
- pjsip_tx_data *packet = NULL;
+ lg(Debug) << "Executing a removeStreams Operation";
+
+ pjmedia_sdp_session *sdp = mSession->modifySDP(mStreams);
... 1264 lines suppressed ...
--
asterisk-scf/integration/sip.git
More information about the asterisk-scf-commits
mailing list