[asterisk-scf-commits] asterisk-scf/integration/sip.git branch "retry_deux" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Tue Feb 14 18:43:38 CST 2012
branch "retry_deux" has been updated
via bf499c004ddc0a11774ac51fc446a220d62d106f (commit)
from f7de9e3e3c9e447f89543e57e35a98857ff95e70 (commit)
Summary of changes:
src/PJSIPSessionModule.cpp | 63 +++++----
src/PJSIPSessionModuleConstruction.cpp | 3 +-
src/SIPConfiguration.cpp | 47 +++++--
src/SIPEndpoint.cpp | 4 +-
src/SIPEndpoint.h | 1 +
src/SIPSession.cpp | 244 +++++++++++++++++++++++---------
src/SIPSession.h | 15 ++-
src/SIPTelephonyEventSink.cpp | 32 ++++-
src/SIPTelephonyEventSink.h | 5 +
src/SIPTelephonyEventSource.cpp | 43 +++++-
src/SIPTelephonyEventSource.h | 6 +
src/SIPTransfer.cpp | 9 +-
src/SIPTransfer.h | 6 +-
13 files changed, 358 insertions(+), 120 deletions(-)
- Log -----------------------------------------------------------------
commit bf499c004ddc0a11774ac51fc446a220d62d106f
Author: Ken Hunt <ken.hunt at digium.com>
Date: Tue Feb 14 18:43:41 2012 -0600
Operation context handling.
diff --git a/src/PJSIPSessionModule.cpp b/src/PJSIPSessionModule.cpp
index 1cd5ead..d74e359 100644
--- a/src/PJSIPSessionModule.cpp
+++ b/src/PJSIPSessionModule.cpp
@@ -41,10 +41,12 @@
#include <AsteriskSCF/WorkQueue/WorkQueue.h>
#include <AsteriskSCF/WorkQueue/SuspendableWorkQueue.h>
#include <AsteriskSCF/Helpers/Retry.h>
+#include <AsteriskSCF/Helpers/OperationContext.h>
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::SIP::ExtensionPoint::V1;
using namespace AsteriskSCF::System::Hook::V1;
+using namespace AsteriskSCF::System::V1;
namespace
{
@@ -387,7 +389,8 @@ public:
mDestination(destination),
mCallerID(callerID),
mRedirections(redirections),
- mRetryPolicy(5, 500) { }
+ mRetryPolicy(5, 500),
+ mOperationContext(AsteriskSCF::createContext()) { }
protected:
SuspendableWorkResult initial(const SuspendableWorkListenerPtr&)
@@ -474,12 +477,14 @@ protected:
else
{
// If this is not an attended transfer we can just route the session as normally
- mOperationId = ::IceUtil::generateUUID();
+ SuspendableWorkListenerPtr listener = 0;
// Update the Party Id information on the session.
mSession->setSelfAsCaller();
- SuspendableWorkListenerPtr listener = 0;
+ // Setup an operation context to support retries.
+ mOperationContext = AsteriskSCF::createContext();
+
SIPAMICallbackPtr amiCallback = new SIPAMICallback(listener, mSession, this, false, true);
SIPAMICallbackCookiePtr cookie = new SIPAMICallbackCookie(amiCallback);
invokeOperation(cookie);
@@ -508,7 +513,7 @@ protected:
void invokeOperation(const SIPAMICallbackCookiePtr& cookie)
{
Ice::CallbackPtr d = Ice::newCallback(cookie->getSIPAMICallback(), &SIPAMICallback::callback);
- mSessionRouter->begin_routeSession(mOperationId, mSession->getSessionProxy(), mDestination, 0, mCallerID, mRedirections, d, cookie);
+ mSessionRouter->begin_routeSession(mOperationContext, mSession->getSessionProxy(), mDestination, 0, mCallerID, mRedirections, d, cookie);
}
SuspendableWorkResult calledBack(const Ice::AsyncResultPtr& asyncResult)
@@ -518,7 +523,7 @@ protected:
{
router->end_routeSession(asyncResult);
}
- catch (const Ice::ConnectionLostException &cle)
+ catch (const Ice::ConnectionLostException& cle)
{
// Assume a failover is occurring for the routing service.
// This will block the WorkQueue's thread, but it's highly likely
@@ -572,7 +577,7 @@ private:
CallerPtr mCallerID;
RedirectionsPtr mRedirections;
RetryPolicy mRetryPolicy;
- std::string mOperationId;
+ OperationContextPtr mOperationContext;
};
bool PJSIPSessionModule::getPrivacy(pjsip_rx_data *rdata)
@@ -1439,6 +1444,7 @@ public:
protected:
SuspendableWorkResult initial(const SuspendableWorkListenerPtr&)
{
+ OperationContextPtr context = AsteriskSCF::createContext();
lg(Debug) << "Executing HandleInviteResponseOperation" << std::endl;
//Treat all 1XX messages we don't recognize the same as a 180
if (mRespCode > 100 && mRespCode < 200 && mRespCode != 183)
@@ -1462,7 +1468,7 @@ protected:
{
SIPAMICallbackPtr cb(new SIPAMICallback(0, mSession, this, false, true));
Ice::CallbackPtr d = Ice::newCallback(cb, &SIPAMICallback::callback);
- (*listener)->begin_indicated(mSession->getSessionProxy(), new RingingIndication(), mSession->getCookies(), d);
+ (*listener)->begin_indicated(context, mSession->getSessionProxy(), new RingingIndication(), mSession->getCookies(), d);
}
catch (const Ice::Exception &ex)
{
@@ -1486,7 +1492,7 @@ protected:
Ice::CallbackPtr d = Ice::newCallback(cb, &SIPAMICallback::callback);
ProgressingIndicationPtr progressing(new ProgressingIndication());
progressing->response = response;
- (*listener)->begin_indicated(mSession->getSessionProxy(), progressing, mSession->getCookies(), d);
+ (*listener)->begin_indicated(context, mSession->getSessionProxy(), progressing, mSession->getCookies(), d);
}
catch (const Ice::Exception &ex)
{
@@ -1502,13 +1508,14 @@ protected:
std::vector<AsteriskSCF::SessionCommunications::V1::SessionListenerPrx> listeners = mSession->getListeners();
std::vector<AsteriskSCF::SessionCommunications::V1::SessionListenerPrx>::const_iterator listener;
lg(Debug) << "Relating connected state to " << listeners.size() << " listeners";
+
for (listener = listeners.begin(); listener != listeners.end(); ++listener)
{
try
{
SIPAMICallbackPtr cb(new SIPAMICallback(0, mSession, this, false, true));
Ice::CallbackPtr d = Ice::newCallback(cb, &SIPAMICallback::callback);
- (*listener)->begin_indicated(mSession->getSessionProxy(), new ConnectedIndication(), mSession->getCookies(), d);
+ (*listener)->begin_indicated(context, mSession->getSessionProxy(), new ConnectedIndication(), mSession->getCookies(), d);
}
catch (const Ice::Exception &ex)
{
@@ -1518,7 +1525,7 @@ protected:
}
}
mSession->setSessionOwnerId(mConnected);
- mSession->getSessionControllerProxy()->updateConnectedLine(mConnected);
+ mSession->getSessionControllerProxy()->updateConnectedLine(context, mConnected);
return Complete;
}
@@ -1759,6 +1766,7 @@ protected:
lg(Debug) << "Relating stopped state to " << listeners.size() << " listeners";
AsteriskSCF::SessionCommunications::V1::StoppedIndicationPtr stopped(new AsteriskSCF::SessionCommunications::V1::StoppedIndication());
stopped->response = response;
+ OperationContextPtr context = AsteriskSCF::createContext();
for (std::vector<AsteriskSCF::SessionCommunications::V1::SessionListenerPrx>::iterator listener =
listeners.begin();
listener != listeners.end();
@@ -1766,7 +1774,7 @@ protected:
{
try
{
- (*listener)->indicated(session->getSessionProxy(), stopped, session->getCookies());
+ (*listener)->indicated(context, session->getSessionProxy(), stopped, session->getCookies());
}
catch (const Ice::Exception &ex)
{
@@ -2033,7 +2041,7 @@ public:
mRouter(router),
mSession(session),
mRetryPolicy(5, 500),
- mOperationId(::IceUtil::generateUUID()) { }
+ mOperationContext(AsteriskSCF::createContext()) { }
protected:
SuspendableWorkResult initial(const SuspendableWorkListenerPtr&)
@@ -2061,7 +2069,7 @@ protected:
{
Ice::CallbackPtr d = Ice::newCallback(cookie->getSIPAMICallback(), &SIPAMICallback::callback);
mRouter->begin_connectBridgedSessionsWithDestination(
- mOperationId,
+ mOperationContext,
mSession->getSessionProxy(),
mContact,
true,
@@ -2083,7 +2091,7 @@ protected:
{
router->end_connectBridgedSessionsWithDestination(asyncResult);
}
- catch (const Ice::ConnectionLostException &cle)
+ catch (const Ice::ConnectionLostException& cle)
{
// Assume a failover is occurring for the routing service.
// This will block the WorkQueue's thread, but it's highly likely
@@ -2120,7 +2128,7 @@ private:
AsteriskSCF::Discovery::SmartProxy<SessionRouterPrx> mRouter;
SIPSessionPtr mSession;
RetryPolicy mRetryPolicy;
- std::string mOperationId;
+ OperationContextPtr mOperationContext;
};
pjsip_redirect_op PJSIPSessionModule::invOnRedirected(pjsip_inv_session* inv, const pjsip_uri* uri,
@@ -2152,19 +2160,22 @@ class HandleSendReinviteResponse : public SIPQueueableOperation
{
public:
HandleSendReinviteResponse(pjsip_inv_session *inv, const int moduleId, pjsip_tx_data *tdata)
- : mInv(inv), mModuleId(moduleId), mResponse(tdata) { }
+ : mInv(inv),
+ mModuleId(moduleId),
+ mResponse(tdata),
+ mOperationContext(AsteriskSCF::createContext()) { }
protected:
SuspendableWorkResult initial(const SuspendableWorkListenerPtr&)
{
PJSIPSessionModInfo *session_mod_info = (PJSIPSessionModInfo*)mInv->mod_data[mModuleId];
- SIPSessionPtr session = session_mod_info->getSessionPtr();
- SessionControllerPrx controller = session->getSessionControllerProxy();
+ mSession = session_mod_info->getSessionPtr();
+ SessionControllerPrx controller = mSession->getSessionControllerProxy();
// If we have no session controller respond to the reinvite with the same SDP that we previously had
if (!controller)
{
- pjsip_inv_set_sdp_answer(mInv, session->modifySDP(StreamInformationDict()));
+ pjsip_inv_set_sdp_answer(mInv, mSession->modifySDP(StreamInformationDict()));
pjsip_inv_send_reinvite_response(mInv, mResponse);
return Complete;
}
@@ -2177,21 +2188,21 @@ protected:
}
// Call into the session for the serious work
- session->createSDPAnswer(offer_sdp, mStreamsAdded);
+ mSession->createSDPAnswer(offer_sdp, mStreamsAdded);
// If no streams were added we can respond to the offer with the SDP we had previously and move on
if (mStreamsAdded.empty())
{
- pjsip_inv_set_sdp_answer(mInv, session->modifySDP(StreamInformationDict()));
+ pjsip_inv_set_sdp_answer(mInv, mSession->modifySDP(StreamInformationDict()));
pjsip_inv_send_reinvite_response(mInv, mResponse);
return Complete;
}
// If any streams were added we need to call into the session controller to see what ones
// we can actually accept
- SIPAMICallbackPtr cb(new SIPAMICallback(0, session, this, false, true));
+ SIPAMICallbackPtr cb(new SIPAMICallback(0, mSession, this, false, true));
Ice::CallbackPtr d = Ice::newCallback(cb, &SIPAMICallback::callback);
- session->getSessionControllerProxy()->begin_addStreams(mStreamsAdded, d);
+ mSession->getSessionControllerProxy()->begin_addStreams(mOperationContext, mStreamsAdded, d);
return Complete;
}
@@ -2204,8 +2215,8 @@ protected:
// Remove any streams that were accepted by the session controller, we don't need to do anything
// more with them since our initial call to createSDPAnswer will have created the proper SDP
for (StreamInformationDict::iterator stream = accepted.begin();
- stream != accepted.end();
- ++stream)
+ stream != accepted.end();
+ ++stream)
{
mStreamsAdded.erase(stream->first);
}
@@ -2230,6 +2241,8 @@ private:
const int mModuleId;
pjsip_tx_data *mResponse;
StreamInformationDict mStreamsAdded;
+ OperationContextPtr mOperationContext;
+ SIPSessionPtr mSession;
};
void PJSIPSessionModule::invOnSendReinviteResponse(pjsip_inv_session* inv, pjsip_tx_data* tdata)
diff --git a/src/PJSIPSessionModuleConstruction.cpp b/src/PJSIPSessionModuleConstruction.cpp
index 002e09f..f57db1e 100644
--- a/src/PJSIPSessionModuleConstruction.cpp
+++ b/src/PJSIPSessionModuleConstruction.cpp
@@ -17,6 +17,7 @@
#include <AsteriskSCF/Logger.h>
#include <AsteriskSCF/WorkQueue/WorkQueue.h>
#include <AsteriskSCF/ThreadPool/ThreadPool.h>
+#include <AsteriskSCF/Helpers/OperationContext.h>
#include "PJSIPSessionModule.h"
#include "AuthManager.h"
@@ -149,7 +150,7 @@ PJSIPSessionModule::PJSIPSessionModule(pjsip_endpoint *endpt,
mSessionCreationExtensionPointService = serviceLocatorManagement->addService(mSessionCreationExtensionPointPrx, SessionCreationExtensionPointId);
// TBD... how to access the Component's service and instance ids.
- mSessionCreationExtensionPointService->addLocatorParams(new ServiceLocatorParams(SessionCreationHookLocatorCategory, "default", ""), "");
+ mSessionCreationExtensionPointService->addLocatorParams(AsteriskSCF::createContext(), new ServiceLocatorParams(SessionCreationHookLocatorCategory, "default", ""), "");
mPoolQueue = new AsteriskSCF::WorkQueue::WorkQueue();
mPoolListener = new PJSIPSessionModuleThreadPoolListener();
diff --git a/src/SIPConfiguration.cpp b/src/SIPConfiguration.cpp
index badd6b0..eb7486e 100644
--- a/src/SIPConfiguration.cpp
+++ b/src/SIPConfiguration.cpp
@@ -36,12 +36,15 @@
#include "TLSTransport.h"
#include "STUNTransportConfig.h"
#include <vector>
-#include <AsteriskSCF/Helpers/Network.h>
+#include <AsteriskSCF/Helpers/OperationContext.h>
+#include <AsteriskSCF/Helpers/OperationContextCache.h>
using namespace AsteriskSCF::System::Configuration::V1;
using namespace AsteriskSCF::Configuration::SIPSessionManager::V1;
using namespace AsteriskSCF::SessionCommunications::PartyIdentification::V1;
using namespace AsteriskSCF::Core::Routing::V1;
+using namespace AsteriskSCF::System::V1;
+using namespace AsteriskSCF::Helpers;
using namespace std;
//
@@ -1616,9 +1619,9 @@ public:
ConfigurationGroupSeq getConfiguration(const ConfigurationGroupSeq&, const Ice::Current&);
ConfigurationGroupSeq getConfigurationAll(const ConfigurationGroupSeq&, const Ice::Current&);
ConfigurationGroupSeq getConfigurationGroups(const Ice::Current&);
- void setConfiguration(const ConfigurationGroupSeq&, const Ice::Current&);
- void removeConfigurationItems(const ConfigurationGroupSeq&, const Ice::Current&);
- void removeConfigurationGroups(const ConfigurationGroupSeq&, const Ice::Current&);
+ void setConfiguration(const OperationContextPtr& operationContext, const ConfigurationGroupSeq&, const Ice::Current&);
+ void removeConfigurationItems(const OperationContextPtr& operationContext, const ConfigurationGroupSeq&, const Ice::Current&);
+ void removeConfigurationGroups(const OperationContextPtr& operationContext, const ConfigurationGroupSeq&, const Ice::Current&);
ConfigurationDataPtr getData()
{
@@ -1634,6 +1637,7 @@ private:
// itself.
//
ConfigurationDataPtr mData;
+ OperationContextCachePtr mOperationContextCache;
};
typedef IceUtil::Handle<ConfigurationServiceImpl> ConfigurationServiceImplPtr;
@@ -1641,7 +1645,8 @@ typedef IceUtil::Handle<ConfigurationServiceImpl> ConfigurationServiceImplPtr;
ConfigurationServiceImpl::ConfigurationServiceImpl(const PJSIPManagerPtr& manager,
const boost::shared_ptr<SIPEndpointFactory>& factory, const std::string& routingId,
const LocatorRegistrySmartPrx& registry) :
- mData(new ConfigurationData(manager, factory, routingId, registry))
+ mData(new ConfigurationData(manager, factory, routingId, registry)),
+ mOperationContextCache(new OperationContextCache(180))
{
}
@@ -1861,9 +1866,17 @@ static void runPostProcessing()
}
}
-void ConfigurationServiceImpl::setConfiguration(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups,
- const Ice::Current&)
+void ConfigurationServiceImpl::setConfiguration(
+ const OperationContextPtr& operationContext,
+ const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups,
+ const Ice::Current&)
{
+ // Is this a retry for an operation we're already processing?
+ if (!mOperationContextCache->addOperationContext(operationContext))
+ {
+ throw OperationCallCancelledException(operationContext->id, Duplicate);
+ }
+
class GroupsVisitor : public SIPConfigurationGroupVisitor
{
public:
@@ -1933,8 +1946,16 @@ void ConfigurationServiceImpl::setConfiguration(const AsteriskSCF::System::Confi
}
void ConfigurationServiceImpl::removeConfigurationItems(
- const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups, const Ice::Current&)
+ const OperationContextPtr& operationContext,
+ const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups,
+ const Ice::Current&)
{
+ // Is this a retry for an operation we're already processing?
+ if (!mOperationContextCache->addOperationContext(operationContext))
+ {
+ throw OperationCallCancelledException(operationContext->id, Duplicate);
+ }
+
class GroupsVisitor : public SIPConfigurationGroupVisitor
{
public:
@@ -2002,8 +2023,16 @@ void ConfigurationServiceImpl::removeConfigurationItems(
}
void ConfigurationServiceImpl::removeConfigurationGroups(
- const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups, const Ice::Current&)
+ const OperationContextPtr& operationContext,
+ const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups,
+const Ice::Current&)
{
+ // Is this a retry for an operation we're already processing?
+ if (!mOperationContextCache->addOperationContext(operationContext))
+ {
+ throw OperationCallCancelledException(operationContext->id, Duplicate);
+ }
+
class Visitor : public SIPConfigurationGroupVisitor
{
public:
diff --git a/src/SIPEndpoint.cpp b/src/SIPEndpoint.cpp
index 1081e9c..7253d87 100644
--- a/src/SIPEndpoint.cpp
+++ b/src/SIPEndpoint.cpp
@@ -31,6 +31,7 @@
#include "NATOptions.h"
using namespace std;
+using namespace AsteriskSCF::System::V1;
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::Media::V1;
using namespace AsteriskSCF::Media::SDP::V1;
@@ -514,7 +515,8 @@ std::string SIPEndpoint::getId(const Ice::Current&)
}
AsteriskSCF::SessionCommunications::V1::SessionPrx SIPEndpoint::createSession(
- const std::string& destination,
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const string& destination,
const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
const AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationHookPrx& oneShotHook,
const Ice::Current&)
diff --git a/src/SIPEndpoint.h b/src/SIPEndpoint.h
index cbf447f..28196be 100644
--- a/src/SIPEndpoint.h
+++ b/src/SIPEndpoint.h
@@ -298,6 +298,7 @@ public:
*/
std::string getId(const Ice::Current&);
AsteriskSCF::SessionCommunications::V1::SessionPrx createSession(
+ const AsteriskSCF::System::V1::OperationContextPtr&,
const std::string&,
const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx&,
const AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationHookPrx&,
diff --git a/src/SIPSession.cpp b/src/SIPSession.cpp
index 00ad31d..693e9e4 100755
--- a/src/SIPSession.cpp
+++ b/src/SIPSession.cpp
@@ -46,9 +46,12 @@
#include <AsteriskSCF/Media/NetworkIf.h>
#include <AsteriskSCF/SessionCookies/SIPSessionManager/SIPSessionCookiesIf.h>
#include <AsteriskSCF/Collections/HandleSet.h>
+#include <AsteriskSCF/Helpers/OperationContextCache.h>
+#include <AsteriskSCF/Helpers/OperationContext.h>
#include "NATOptions.h"
#include "PJSIPSessionModule.h"
+
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::System::NAT::V1;
using namespace AsteriskSCF::SessionCommunications::V1;
@@ -133,7 +136,7 @@ public:
const NATEndpointOptions& natOptions)
: mAdapter(adapter), mDialog(0), mInviteSession(0), mEndpoint(endpoint), mDestination(destination),
mManager(manager), mServiceLocator(serviceLocator), mReplicationContext(replicationContext),
- mNatOptions(natOptions), mSDP(0)
+ mNatOptions(natOptions), mSDP(0), mOperationContextCache(new OperationContextCache(180))
{
}
@@ -334,6 +337,8 @@ public:
AsteriskSCF::SessionCommunications::V1::TelephonyEventSourceSeq mExternalEventSources;
+ AsteriskSCF::Helpers::OperationContextCachePtr mOperationContextCache;
+
private:
static ReadOnlyCookieTypes mReadOnlyCookieTypes;
@@ -550,12 +555,21 @@ class ChangeStreamStatesOperation : public SuspendableWork
public:
ChangeStreamStatesOperation(
const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_changeStreamStatesPtr& cb,
+ const OperationContextPtr& operationContext,
const AsteriskSCF::Media::V1::StreamStateDict& streams,
const boost::shared_ptr<SIPSessionPriv>& sessionPriv)
- : mCb(cb), mStreams(streams), mImplPriv(sessionPriv) { }
+ : mCb(cb), mOperationContext(operationContext), mStreams(streams), mImplPriv(sessionPriv) { }
SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
{
+ // Is this a retry for an operation we're already processing?
+ if (!mImplPriv->mOperationContextCache->addOperationContext(mOperationContext))
+ {
+ lg(Debug) << "Retry of previously processed changeStreamStates() operation detected and rejected.";
+ mCb->ice_exception(OperationCallCancelledException(mOperationContext->id, Duplicate));
+ return Complete;
+ }
+
lg(Debug) << "Executing a changeStreamStates Operation";
// This boolean is set to true if at least one stream is actually changed. This is to prevent
@@ -657,6 +671,7 @@ public:
private:
AsteriskSCF::SessionCommunications::V1::AMD_SessionController_changeStreamStatesPtr mCb;
+ OperationContextPtr mOperationContext;
AsteriskSCF::Media::V1::StreamStateDict mStreams;
boost::shared_ptr<SIPSessionPriv> mImplPriv;
};
@@ -666,13 +681,22 @@ class AddStreamsOperation : public SuspendableWork
public:
AddStreamsOperation(
const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_addStreamsPtr& cb,
+ const OperationContextPtr& operationContext,
const AsteriskSCF::Media::V1::StreamInformationDict& streams,
const boost::shared_ptr<SIPSessionPriv>& sessionPriv,
const SIPSessionPtr& session)
- : mCb(cb), mStreams(streams), mImplPriv(sessionPriv), mSession(session) { }
+ : mCb(cb), mOperationContext(operationContext), mStreams(streams), mImplPriv(sessionPriv), mSession(session) { }
SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
{
+ // Is this a retry for an operation we're already processing?
+ if (!mImplPriv->mOperationContextCache->addOperationContext(mOperationContext))
+ {
+ lg(Debug) << "Retry of previously processed addStreams() operation detected and rejected.";
+ mCb->ice_exception(OperationCallCancelledException(mOperationContext->id, Duplicate));
+ return Complete;
+ }
+
lg(Debug) << "Executing an addStreams Operation";
// If there is an outstanding transaction then no streams can be added at this time
@@ -717,6 +741,7 @@ public:
private:
AsteriskSCF::SessionCommunications::V1::AMD_SessionController_addStreamsPtr mCb;
+ OperationContextPtr mOperationContext;
AsteriskSCF::Media::V1::StreamInformationDict mStreams;
boost::shared_ptr<SIPSessionPriv> mImplPriv;
SIPSessionPtr mSession;
@@ -727,13 +752,22 @@ class RemoveStreamsOperation : public SuspendableWork
public:
RemoveStreamsOperation(
const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_removeStreamsPtr& cb,
+ const OperationContextPtr& operationContext,
const AsteriskSCF::Media::V1::StreamInformationDict& streams,
const boost::shared_ptr<SIPSessionPriv>& sessionPriv,
const SIPSessionPtr& session)
- : mCb(cb), mStreams(streams), mImplPriv(sessionPriv), mSession(session) { }
+ : mCb(cb), mOperationContext(operationContext), mStreams(streams), mImplPriv(sessionPriv), mSession(session) { }
SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
{
+ // Is this a retry for an operation we're already processing?
+ if (!mImplPriv->mOperationContextCache->addOperationContext(mOperationContext))
+ {
+ lg(Debug) << "Retry of previously processed removeStates() operation detected and rejected.";
+ mCb->ice_exception(OperationCallCancelledException(mOperationContext->id, Duplicate));
+ return Complete;
+ }
+
lg(Debug) << "Executing a removeStreams Operation";
pjmedia_sdp_session *sdp = mSession->modifySDP(mStreams);
@@ -764,6 +798,7 @@ public:
private:
AsteriskSCF::SessionCommunications::V1::AMD_SessionController_removeStreamsPtr mCb;
+ OperationContextPtr mOperationContext;
AsteriskSCF::Media::V1::StreamInformationDict mStreams;
boost::shared_ptr<SIPSessionPriv> mImplPriv;
SIPSessionPtr mSession;
@@ -772,13 +807,21 @@ private:
class SetCookiesOperation : public SuspendableWork
{
public:
- SetCookiesOperation(const AsteriskSCF::SessionCommunications::V1::SessionCookies& cookies,
+ SetCookiesOperation(const OperationContextPtr& operationContext,
+ const AsteriskSCF::SessionCommunications::V1::SessionCookies& cookies,
const boost::shared_ptr<SIPSessionPriv>& sessionPriv,
bool privileged = false)
- : mCookies(cookies), mImplPriv(sessionPriv), mPrivileged(privileged) { }
+ : mOperationContext(operationContext), mCookies(cookies), mImplPriv(sessionPriv), mPrivileged(privileged) { }
SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
{
+ // Is this a retry for an operation we're already processing?
+ if (!mImplPriv->mOperationContextCache->addOperationContext(mOperationContext))
+ {
+ lg(Debug) << "Retry of previously processed setCookies() operation detected and rejected.";
+ return Complete;
+ }
+
for (AsteriskSCF::SessionCommunications::V1::SessionCookies::const_iterator i = mCookies.begin();
i != mCookies.end();
++i)
@@ -800,6 +843,7 @@ public:
}
private:
+ OperationContextPtr mOperationContext;
AsteriskSCF::SessionCommunications::V1::SessionCookies mCookies;
boost::shared_ptr<SIPSessionPriv> mImplPriv;
bool mPrivileged;
@@ -815,51 +859,54 @@ public:
mImplPriv(implPriv), mSession(session) { }
void changeStreamStates_async(const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_changeStreamStatesPtr& cb,
+ const OperationContextPtr& operationContext,
const AsteriskSCF::Media::V1::StreamStateDict& states, const Ice::Current&)
{
lg(Debug) << "Queueing changeStreamStates operation";
- mSession->enqueueSessionWork(new ChangeStreamStatesOperation(cb, states, mImplPriv));
+ mSession->enqueueSessionWork(new ChangeStreamStatesOperation(cb, operationContext, states, mImplPriv));
}
void addStreams_async(const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_addStreamsPtr& cb,
+ const OperationContextPtr& operationContext,
const AsteriskSCF::Media::V1::StreamInformationDict& streams, const Ice::Current&)
{
lg(Debug) << "Queueing addStreams operation";
- mSession->enqueueSessionWork(new AddStreamsOperation(cb, streams, mImplPriv, mSession));
+ mSession->enqueueSessionWork(new AddStreamsOperation(cb, operationContext, streams, mImplPriv, mSession));
}
void removeStreams_async(const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_removeStreamsPtr& cb,
+ const OperationContextPtr& operationContext,
const AsteriskSCF::Media::V1::StreamInformationDict& streams, const Ice::Current&)
{
lg(Debug) << "Queueing removeStreams operation";
- mSession->enqueueSessionWork(new RemoveStreamsOperation(cb, streams, mImplPriv, mSession));
+ mSession->enqueueSessionWork(new RemoveStreamsOperation(cb, operationContext, streams, mImplPriv, mSession));
}
/**
* This operation allows the externally connected component (typically the bridge)
* to update this session's ConnectedLine information.
*/
- void updateConnectedLine(const ConnectedLinePtr& connected, const Ice::Current&)
+ void updateConnectedLine(const OperationContextPtr& operationContext, const ConnectedLinePtr& connected, const Ice::Current&)
{
AsteriskSCF::SessionCommunications::V1::SessionCookies cookies;
cookies.push_back(connected);
- mSession->enqueueSessionWork(new SetCookiesOperation(cookies, mImplPriv, true));
+ mSession->enqueueSessionWork(new SetCookiesOperation(operationContext, cookies, mImplPriv, true));
}
- void updateCallerID(const CallerPtr& caller, const Ice::Current&)
+ void updateCallerID(const OperationContextPtr& operationContext, const CallerPtr& caller, const Ice::Current&)
{
AsteriskSCF::SessionCommunications::V1::SessionCookies cookies;
cookies.push_back(caller);
- mSession->enqueueSessionWork(new SetCookiesOperation(cookies, mImplPriv, true));
+ mSession->enqueueSessionWork(new SetCookiesOperation(operationContext, cookies, mImplPriv, true));
}
/**
* This operation provides notification that some other Session we are connected to
* has been redirected.
*/
- void updateRedirections(const RedirectionsPtr&, const ::Ice::Current&)
+ void updateRedirections(const OperationContextPtr& operationContext, const RedirectionsPtr&, const ::Ice::Current&)
{
// TBD.
}
@@ -944,12 +991,22 @@ class ConnectStreamsOperation : public SuspendableWork
public:
ConnectStreamsOperation(
const AsteriskSCF::Media::V1::AMD_DirectMediaConnection_connectStreamsPtr& cb,
+ const OperationContextPtr& operationContext,
const AsteriskSCF::Media::V1::DirectMediaConnectionDict& connections,
+ const boost::shared_ptr<SIPSessionPriv>& implPriv,
const SIPSessionPtr& session)
- : mCb(cb), mConnections(connections), mSession(session) { }
+ : mCb(cb), mOperationContext(operationContext), mConnections(connections), mImplPriv(implPriv), mSession(session) { }
SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
{
+ // Is this a retry for an operation we're already processing?
+ if (!mImplPriv->mOperationContextCache->addOperationContext(mOperationContext))
+ {
+ lg(Debug) << "Retry of previously processed connectStreams() operation detected and rejected.";
+ mCb->ice_exception(OperationCallCancelledException(mOperationContext->id, Duplicate));
+ return Complete;
+ }
+
if (mSession->getInviteSession()->invite_tsx)
{
mSession->enqueueSessionWork(this);
@@ -975,7 +1032,9 @@ public:
private:
AsteriskSCF::Media::V1::AMD_DirectMediaConnection_connectStreamsPtr mCb;
+ OperationContextPtr mOperationContext;
AsteriskSCF::Media::V1::DirectMediaConnectionDict mConnections;
+ boost::shared_ptr<SIPSessionPriv> mImplPriv;
const SIPSessionPtr mSession;
};
@@ -984,12 +1043,22 @@ class DisconnectStreamsOperation : public SuspendableWork
public:
DisconnectStreamsOperation(
const AsteriskSCF::Media::V1::AMD_DirectMediaConnection_disconnectStreamsPtr& cb,
+ const OperationContextPtr& operationContext,
const Ice::StringSeq& streams,
+ const boost::shared_ptr<SIPSessionPriv>& implPriv,
const SIPSessionPtr& session)
- : mCb(cb), mStreams(streams), mSession(session) { }
+ : mCb(cb), mOperationContext(operationContext), mStreams(streams), mImplPriv(implPriv), mSession(session) { }
SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
{
+ // Is this a retry for an operation we're already processing?
+ if (!mImplPriv->mOperationContextCache->addOperationContext(mOperationContext))
+ {
+ lg(Debug) << "Retry of previously processed disconnectStreams() operation detected and rejected.";
+ mCb->ice_exception(OperationCallCancelledException(mOperationContext->id, Duplicate));
+ return Complete;
+ }
+
pjmedia_sdp_session *sdp = mSession->modifySDP(mStreams);
pjsip_tx_data *packet = NULL;
@@ -1009,7 +1078,9 @@ public:
private:
AsteriskSCF::Media::V1::AMD_DirectMediaConnection_disconnectStreamsPtr mCb;
+ OperationContextPtr mOperationContext;
Ice::StringSeq mStreams;
+ boost::shared_ptr<SIPSessionPriv> mImplPriv;
const SIPSessionPtr mSession;
};
@@ -1019,8 +1090,8 @@ private:
class SIPDirectMediaConnection : public AsteriskSCF::Media::V1::DirectMediaConnection
{
public:
- SIPDirectMediaConnection(const SIPSessionPtr& session) :
- mSession(session) { }
+ SIPDirectMediaConnection(const boost::shared_ptr<SIPSessionPriv>& implPriv, const SIPSessionPtr& session) :
+ mImplPriv(implPriv), mSession(session) { }
void checkDirectConnections_async(const AsteriskSCF::Media::V1::AMD_DirectMediaConnection_checkDirectConnectionsPtr& cb,
const AsteriskSCF::Media::V1::DirectMediaConnectionDict& connections, const Ice::Current&)
@@ -1030,23 +1101,26 @@ public:
}
void connectStreams_async(const AsteriskSCF::Media::V1::AMD_DirectMediaConnection_connectStreamsPtr& cb,
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const AsteriskSCF::Media::V1::DirectMediaConnectionDict& connections, const Ice::Current&)
{
lg(Debug) << "Queueing connectStreams operation";
- mSession->enqueueSessionWork(new ConnectStreamsOperation(cb, connections, mSession));
+ mSession->enqueueSessionWork(new ConnectStreamsOperation(cb, operationContext, connections, mImplPriv, mSession));
}
void disconnectStreams_async(const AsteriskSCF::Media::V1::AMD_DirectMediaConnection_disconnectStreamsPtr& cb,
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const Ice::StringSeq& streams, const Ice::Current&)
{
lg(Debug) << "Queueing disconnectStreams operation";
- mSession->enqueueSessionWork(new DisconnectStreamsOperation(cb, streams, mSession));
+ mSession->enqueueSessionWork(new DisconnectStreamsOperation(cb, operationContext, streams, mImplPriv, mSession));
}
private:
/**
* A pointer to the communications session that created us.
*/
+ boost::shared_ptr<SIPSessionPriv> mImplPriv;
SIPSessionPtr mSession;
};
@@ -1355,7 +1429,7 @@ SIPSession::SIPSession(const Ice::ObjectAdapterPtr& adapter,
mImplPriv->mOurSessionControllerProxy =
AsteriskSCF::SessionCommunications::V1::SessionControllerPrx::uncheckedCast(adapter->addWithUUID(mImplPriv->mOurSessionController));
- DirectMediaConnectionPtr directMedia = new SIPDirectMediaConnection(this);
+ DirectMediaConnectionPtr directMedia = new SIPDirectMediaConnection(mImplPriv, this);
adapter->addFacet(directMedia, mImplPriv->mSessionProxy->ice_getIdentity(), directMediaConnectionFacet);
if (isUAC)
@@ -1398,7 +1472,7 @@ SIPSession::SIPSession(const Ice::ObjectAdapterPtr& adapter,
mImplPriv->mOurSessionControllerProxy =
AsteriskSCF::SessionCommunications::V1::SessionControllerPrx::uncheckedCast(adapter->add(mImplPriv->mOurSessionController, controllerid));
- DirectMediaConnectionPtr directMedia = new SIPDirectMediaConnection(this);
+ DirectMediaConnectionPtr directMedia = new SIPDirectMediaConnection(mImplPriv, this);
adapter->addFacet(directMedia, sessionid, directMediaConnectionFacet);
mImplPriv->mRTPSessions = mediasessions;
@@ -1466,13 +1540,22 @@ class IndicateOperation : public SuspendableWork
public:
IndicateOperation(
const AsteriskSCF::SessionCommunications::V1::AMD_Session_indicatePtr& cb,
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const AsteriskSCF::SessionCommunications::V1::IndicationPtr& indication,
const boost::shared_ptr<SIPSessionPriv>& sessionPriv,
const SIPSessionPtr& session)
- : mCb(cb), mIndication(indication), mImplPriv(sessionPriv), mSession(session) { }
+ : mCb(cb), mOperationContext(operationContext), mIndication(indication), mImplPriv(sessionPriv), mSession(session) { }
SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
{
+ // Is this a retry for an operation we're already processing?
+ if (!mImplPriv->mOperationContextCache->addOperationContext(mOperationContext))
+ {
+ lg(Debug) << "Retry of previously processed indicate() operation detected and rejected.";
+ mCb->ice_exception(OperationCallCancelledException(mOperationContext->id, Duplicate));
+ return Complete;
+ }
+
lg(Debug) << "Executing indicate operation";
AsteriskSCF::SessionCommunications::V1::ConnectIndicationPtr Connect;
AsteriskSCF::SessionCommunications::V1::FlashIndicationPtr Flash;
@@ -1588,6 +1671,7 @@ private:
}
AsteriskSCF::SessionCommunications::V1::AMD_Session_indicatePtr mCb;
+ OperationContextPtr mOperationContext;
AsteriskSCF::SessionCommunications::V1::IndicationPtr mIndication;
boost::shared_ptr<SIPSessionPriv> mImplPriv;
SIPSessionPtr mSession;
@@ -1598,11 +1682,12 @@ private:
*/
void SIPSession::indicate_async(
const AsteriskSCF::SessionCommunications::V1::AMD_Session_indicatePtr& cb,
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const AsteriskSCF::SessionCommunications::V1::IndicationPtr& indication,
const Ice::Current&)
{
lg(Debug) << "Queuing an indicate operation";
- enqueueSessionWork(new IndicateOperation(cb, indication, mImplPriv, this));
+ enqueueSessionWork(new IndicateOperation(cb, operationContext, indication, mImplPriv, this));
}
class GetEndpointOperation : public SuspendableWork
@@ -1815,43 +1900,55 @@ class SetAndGetSessionControllerOperation : public SuspendableWork
{
public:
SetAndGetSessionControllerOperation(const AsteriskSCF::SessionCommunications::V1::AMD_Session_setAndGetSessionControllerPtr& cb,
+ const OperationContextPtr& operationContext,
const AsteriskSCF::SessionCommunications::V1::SessionControllerPrx& controller,
const boost::shared_ptr<SIPSessionPriv>& sessionPriv)
- : mCb(cb), mController(controller), mImplPriv(sessionPriv) { }
+ : mCb(cb), mOperationContext(operationContext), mController(controller), mImplPriv(sessionPriv) { }
SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
+ {
+ // Is this a retry for an operation we're already processing?
+ if (!mImplPriv->mOperationContextCache->addOperationContext(mOperationContext))
{
- lg(Debug) << "Executing a SetAndGetSessionController operation";
-
- if (mImplPriv->mSessionController)
- {
- mCb->ice_exception(AsteriskSCF::SessionCommunications::V1::ControllerAlreadySet());
- return Complete;
- }
-
- mImplPriv->mSessionController = mController;
-
- // Update the party identification.
- SessionOwnerIdPtr test = new SessionOwnerId();
- AsteriskSCF::SessionCommunications::V1::SessionCookieDict::const_iterator search = mImplPriv->mSessionCookies.find(test->ice_id());
- if (search != mImplPriv->mSessionCookies.end())
- {
- // Set the ConnectedLine information on the other controller.
- SessionOwnerIdPtr owner = SessionOwnerIdPtr::dynamicCast(search->second);
- ConnectedLinePtr connectedLine = new ConnectedLine(owner->ids);
- mController->begin_updateConnectedLine(connectedLine);
- }
- else
- {
- lg(Info) << "Unable to set ConnectedLine party info. No Id configured for endpoint " << mImplPriv->mEndpoint->getName();
- }
+ lg(Debug) << "Retry of previously processed setAndGetSessionController() operation detected and rejected.";
+ // Since a return value is expected, don't throw the usual OperationCallCancelledException.
mCb->ice_response(mImplPriv->mOurSessionControllerProxy);
return Complete;
}
+ lg(Debug) << "Executing a SetAndGetSessionController operation";
+
+ if (mImplPriv->mSessionController)
+ {
+ mCb->ice_exception(AsteriskSCF::SessionCommunications::V1::ControllerAlreadySet());
+ return Complete;
+ }
+
+ mImplPriv->mSessionController = mController;
+
+ // Update the party identification.
+ SessionOwnerIdPtr test = new SessionOwnerId();
+ AsteriskSCF::SessionCommunications::V1::SessionCookieDict::const_iterator search = mImplPriv->mSessionCookies.find(test->ice_id());
+ if (search != mImplPriv->mSessionCookies.end())
+ {
+ // Set the ConnectedLine information on the other controller.
+ SessionOwnerIdPtr owner = SessionOwnerIdPtr::dynamicCast(search->second);
+ ConnectedLinePtr connectedLine = new ConnectedLine(owner->ids);
+ mController->begin_updateConnectedLine(mOperationContext, connectedLine);
+ }
+ else
+ {
+ lg(Info) << "Unable to set ConnectedLine party info. No Id configured for endpoint " << mImplPriv->mEndpoint->getName();
+ }
+
+ mCb->ice_response(mImplPriv->mOurSessionControllerProxy);
+ return Complete;
+ }
+
private:
AsteriskSCF::SessionCommunications::V1::AMD_Session_setAndGetSessionControllerPtr mCb;
+ OperationContextPtr mOperationContext;
AsteriskSCF::SessionCommunications::V1::SessionControllerPrx mController;
boost::shared_ptr<SIPSessionPriv> mImplPriv;
};
@@ -1861,11 +1958,12 @@ private:
*/
void SIPSession::setAndGetSessionController_async(
const AsteriskSCF::SessionCommunications::V1::AMD_Session_setAndGetSessionControllerPtr& cb,
+ const OperationContextPtr& operationContext,
const AsteriskSCF::SessionCommunications::V1::SessionControllerPrx& controller,
const Ice::Current&)
{
lg(Debug) << "queueing a setAndGetSessionController operation";
- enqueueSessionWork(new SetAndGetSessionControllerOperation(cb, controller, mImplPriv));
+ enqueueSessionWork(new SetAndGetSessionControllerOperation(cb, operationContext, controller, mImplPriv));
}
class RemoveSessionControllerOperation : public SuspendableWork
@@ -2347,10 +2445,10 @@ void SIPSession::stop(const AsteriskSCF::SessionCommunications::V1::ResponseCode
* An implementation of the setCookies method as defined in SessionCommunicationsIf.ice which sets cookies
* on the session. The task is enqueued for thread-safe access to the Session state information.
*/
-void SIPSession::setCookies(const AsteriskSCF::SessionCommunications::V1::SessionCookies& cookies, const Ice::Current&)
+void SIPSession::setCookies(const OperationContextPtr& operationContext, const AsteriskSCF::SessionCommunications::V1::SessionCookies& cookies, const Ice::Current&)
{
lg(Debug) << "queuing a setCookies operation";
- enqueueSessionWork(new SetCookiesOperation(cookies, mImplPriv, false));
+ enqueueSessionWork(new SetCookiesOperation(operationContext, cookies, mImplPriv, false));
}
/**
@@ -2535,12 +2633,20 @@ void SIPSession::setCookies(const AsteriskSCF::SessionCommunications::V1::Sessio
class RemoveCookiesOperation : public SuspendableWork
{
public:
- RemoveCookiesOperation(const AsteriskSCF::SessionCommunications::V1::SessionCookies& cookieTypes,
- const boost::shared_ptr<SIPSessionPriv>& sessionPriv)
- : mCookieTypes(cookieTypes), mImplPriv(sessionPriv) { }
+ RemoveCookiesOperation(const OperationContextPtr& operationContext,
+ const AsteriskSCF::SessionCommunications::V1::SessionCookies& cookieTypes,
+ const boost::shared_ptr<SIPSessionPriv>& sessionPriv)
+ : mOperationContext(operationContext), mCookieTypes(cookieTypes), mImplPriv(sessionPriv) { }
SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
{
+ // Is this a retry for an operation we're already processing?
+ if (!mImplPriv->mOperationContextCache->addOperationContext(mOperationContext))
+ {
+ lg(Debug) << "Retry of previously processed removeCookies() operation detected and rejected.";
+ return Complete;
+ }
+
for (AsteriskSCF::SessionCommunications::V1::SessionCookies::const_iterator i = mCookieTypes.begin();
i != mCookieTypes.end();
++i)
@@ -2562,6 +2668,7 @@ public:
}
private:
+ OperationContextPtr mOperationContext;
AsteriskSCF::SessionCommunications::V1::SessionCookies mCookieTypes;
boost::shared_ptr<SIPSessionPriv> mImplPriv;
};
@@ -2570,10 +2677,10 @@ private:
* An implementation of the removeCookies method as defined in SessionCommunications.ice which removes specific
* cookies from the session.
*/
-void SIPSession::removeCookies(const AsteriskSCF::SessionCommunications::V1::SessionCookies& cookieTypes, const Ice::Current&)
+void SIPSession::removeCookies(const OperationContextPtr& operationContext, const AsteriskSCF::SessionCommunications::V1::SessionCookies& cookieTypes, const Ice::Current&)
{
lg(Debug) << "queuing a removeCookies operation";
- enqueueSessionWork(new RemoveCookiesOperation(cookieTypes, mImplPriv));
+ enqueueSessionWork(new RemoveCookiesOperation(operationContext, cookieTypes, mImplPriv));
}
/**
@@ -3047,6 +3154,8 @@ pjmedia_sdp_session *SIPSession::createSDPOffer(const AsteriskSCF::Media::V1::St
mImplPriv->mSDP = createSDP();
}
+ OperationContextPtr operationContext(AsteriskSCF::createContext());
+
// Iterate through each requested stream
for (StreamInformationDict::const_iterator stream = requestedStreams.begin();
stream != requestedStreams.end();
@@ -3110,7 +3219,7 @@ pjmedia_sdp_session *SIPSession::createSDPOffer(const AsteriskSCF::Media::V1::St
options->handleTelephonyEvents = true;
}
- RTPSessionPrx session = factory->allocate(params, options, outputs);
+ RTPSessionPrx session = factory->allocate(operationContext, params, options, outputs);
// Double check to make sure they actually gave us a sesson back... they could have had a problem
if (session == 0)
@@ -3188,7 +3297,7 @@ pjmedia_sdp_session *SIPSession::createSDPOffer(const AsteriskSCF::Media::V1::St
addFormatstoSDP(stream->second->formats, media, payloads);
// Push the payload mapping to the RTP session so it'll correctly map things
- session->associatePayloads(payloads);
+ session->associatePayloads(operationContext, payloads);
}
else if ((t38 = T38UdptlFormatPtr::dynamicCast(formats.front())))
{
@@ -3216,7 +3325,7 @@ pjmedia_sdp_session *SIPSession::createSDPOffer(const AsteriskSCF::Media::V1::St
continue;
}
- UDPTLSessionPrx session = factory->allocate(params);
+ UDPTLSessionPrx session = factory->allocate(operationContext, params);
if (session == 0)
{
continue;
@@ -3324,6 +3433,7 @@ pjmedia_sdp_session *SIPSession::createSDPAnswer(const pjmedia_sdp_session* offe
{
mImplPriv->mSDP = createSDP();
}
+ OperationContextPtr operationContext(AsteriskSCF::createContext());
// Get the non-stream level connection information in case there is no connection level one
std::string destination(pj_strbuf(&offer->conn->addr), pj_strlen(&offer->conn->addr));
@@ -3600,7 +3710,7 @@ pjmedia_sdp_session *SIPSession::createSDPAnswer(const pjmedia_sdp_session* offe
options->handleTelephonyEvents = true;
}
- session = factory->allocate(params, options, outputs);
+ session = factory->allocate(operationContext, params, options, outputs);
// Double check to make sure they actually gave us a sesson back... they could have had a problem
if (session == 0)
@@ -3661,7 +3771,7 @@ pjmedia_sdp_session *SIPSession::createSDPAnswer(const pjmedia_sdp_session* offe
addFormatstoSDP(formats, media, payloads);
// Push the payload mapping to the RTP session so it'll correctly map things
- session->associatePayloads(payloads);
+ session->associatePayloads(operationContext, payloads);
}
// If the RTP session supports RTCP determine the connection details for it
@@ -3686,7 +3796,7 @@ pjmedia_sdp_session *SIPSession::createSDPAnswer(const pjmedia_sdp_session* offe
}
}
- rtcpSession->setRemoteDetails(rtcpConnection, rtcpPort);
+ rtcpSession->setRemoteDetails(operationContext, rtcpConnection, rtcpPort);
}
// Update connection information
@@ -3780,7 +3890,7 @@ pjmedia_sdp_session *SIPSession::createSDPAnswer(const pjmedia_sdp_session* offe
continue;
}
- UDPTLSessionPrx session = factory->allocate(params);
+ UDPTLSessionPrx session = factory->allocate(operationContext, params);
if (session == 0)
{
@@ -3890,11 +4000,11 @@ pjmedia_sdp_session *SIPSession::createSDPAnswer(const pjmedia_sdp_session* offe
{
if (!streamsRemoved.empty())
{
- mImplPriv->mSessionController->removeStreams(streamsRemoved);
+ mImplPriv->mSessionController->removeStreams(operationContext, streamsRemoved);
}
if (!streamsChanged.empty())
{
- mImplPriv->mSessionController->changeStreamStates(streamsChanged);
+ mImplPriv->mSessionController->changeStreamStates(operationContext, streamsChanged);
}
}
@@ -4193,7 +4303,7 @@ void SIPSession::startMedia(const pjmedia_sdp_session*, const pjmedia_sdp_sessio
key;
}
- srtpSession->start(suite, key, config.srtpConfig.enableAuthentication,
+ srtpSession->start(AsteriskSCF::createContext(), suite, key, config.srtpConfig.enableAuthentication,
config.srtpConfig.enableEncryption);
}
}
diff --git a/src/SIPSession.h b/src/SIPSession.h
index 69b850f..ebda37d 100644
--- a/src/SIPSession.h
+++ b/src/SIPSession.h
@@ -166,7 +166,9 @@ public:
void indicate_async(
const AsteriskSCF::SessionCommunications::V1::AMD_Session_indicatePtr& cb,
- const AsteriskSCF::SessionCommunications::V1::IndicationPtr&, const Ice::Current&);
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const AsteriskSCF::SessionCommunications::V1::IndicationPtr&,
+ const Ice::Current&);
void getEndpoint_async(
const AsteriskSCF::SessionCommunications::V1::AMD_Session_getEndpointPtr& cb,
@@ -205,7 +207,14 @@ public:
void removeListener(const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx&, const Ice::Current&);
void start(const Ice::Current&);
void stop(const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr&, const Ice::Current&);
- void setCookies(const AsteriskSCF::SessionCommunications::V1::SessionCookies&, const Ice::Current&);
+ void setCookies(
+ const AsteriskSCF::System::V1::OperationContextPtr&,
+ const AsteriskSCF::SessionCommunications::V1::SessionCookies&,
+ const Ice::Current&);
+ void removeCookies(
+ const AsteriskSCF::System::V1::OperationContextPtr&,
+ const AsteriskSCF::SessionCommunications::V1::SessionCookies&, const Ice::Current&);
+
/**
* Typically called from queued operations to set cookies.
* Also used during session creation when modifications hooks have changed
@@ -216,7 +225,6 @@ public:
* Used during replication
*/
void setCookies(const AsteriskSCF::SessionCommunications::V1::SessionCookieDict&);
- void removeCookies(const AsteriskSCF::SessionCommunications::V1::SessionCookies&, const Ice::Current&);
void getCookies_async(
const ::AsteriskSCF::SessionCommunications::V1::AMD_Session_getCookiesPtr&,
const AsteriskSCF::SessionCommunications::V1::SessionCookies&,
@@ -255,6 +263,7 @@ public:
void setAndGetSessionController_async(
const AsteriskSCF::SessionCommunications::V1::AMD_Session_setAndGetSessionControllerPtr&,
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const AsteriskSCF::SessionCommunications::V1::SessionControllerPrx&,
const Ice::Current&);
diff --git a/src/SIPTelephonyEventSink.cpp b/src/SIPTelephonyEventSink.cpp
index 9dda45b..2b8fb02 100644
--- a/src/SIPTelephonyEventSink.cpp
+++ b/src/SIPTelephonyEventSink.cpp
@@ -23,19 +23,34 @@ namespace SIPSessionManager
{
using namespace AsteriskSCF::System::WorkQueue::V1;
+using namespace AsteriskSCF::Helpers;
+using namespace AsteriskSCF::System::V1;
class WriteTelephonyEvent : public SuspendableWork
{
public:
WriteTelephonyEvent(
const AsteriskSCF::SessionCommunications::V1::AMD_TelephonyEventSink_writePtr& cb,
+ const OperationContextPtr& operationContext,
const AsteriskSCF::SessionCommunications::V1::TelephonyEventPtr& event,
pjsip_inv_session *inv,
const SIPTelephonyEventSinkPtr& sink)
- : mCB(cb), mEvent(event), mInv(inv), mSink(sink) { }
+ : mCB(cb),
+ mOperationContext(operationContext),
+ mEvent(event),
+ mInv(inv),
+ mSink(sink)
+ {
+ }
SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
{
+ // Is this a retry for an operation we're already processing?
+ if (!mSink->getOperationContextCache()->addOperationContext(mOperationContext))
+ {
+ return Complete;
+ }
+
if (mSink->isDestroyed() == true)
{
mCB->ice_exception(Ice::ObjectNotExistException(__FILE__, __LINE__));
@@ -143,6 +158,7 @@ private:
}
AsteriskSCF::SessionCommunications::V1::AMD_TelephonyEventSink_writePtr mCB;
+ OperationContextPtr mOperationContext;
AsteriskSCF::SessionCommunications::V1::TelephonyEventPtr mEvent;
pjsip_inv_session *mInv;
SIPTelephonyEventSinkPtr mSink;
@@ -150,14 +166,19 @@ private:
SIPTelephonyEventSink::SIPTelephonyEventSink(const SessionWorkPtr& sessionWork, pjsip_inv_session *inv)
- : mSessionWork(sessionWork), mInv(inv) { }
+ : mSessionWork(sessionWork),
+ mInv(inv),
+ mOperationContextCache(new OperationContextCache(180))
+{
+}
void SIPTelephonyEventSink::write_async(
const AsteriskSCF::SessionCommunications::V1::AMD_TelephonyEventSink_writePtr& cb,
+ const OperationContextPtr& operationContext,
const AsteriskSCF::SessionCommunications::V1::TelephonyEventPtr& event,
const Ice::Current&)
{
- mSessionWork->enqueueWork(new WriteTelephonyEvent(cb, event, mInv, this));
+ mSessionWork->enqueueWork(new WriteTelephonyEvent(cb, operationContext, event, mInv, this));
}
class SetTelephonyEventSource : public SuspendableWork
@@ -201,6 +222,11 @@ void SIPTelephonyEventSink::setSource(const AsteriskSCF::SessionCommunications::
mSource = source;
}
+OperationContextCachePtr SIPTelephonyEventSink::getOperationContextCache()
+{
+ return mOperationContextCache;
+}
+
class GetTelephonyEventSource : public SuspendableWork
{
public:
diff --git a/src/SIPTelephonyEventSink.h b/src/SIPTelephonyEventSink.h
index 8b76299..ec237f0 100644
--- a/src/SIPTelephonyEventSink.h
+++ b/src/SIPTelephonyEventSink.h
@@ -17,6 +17,7 @@
#include "SIPSession.h"
#include <AsteriskSCF/WorkQueue/Dispatched.h>
+#include <AsteriskSCF/Helpers/OperationContextCache.h>
namespace AsteriskSCF
{
@@ -30,6 +31,7 @@ public:
SIPTelephonyEventSink(const SessionWorkPtr& sessionWork, pjsip_inv_session *inv);
void write_async(
const AsteriskSCF::SessionCommunications::V1::AMD_TelephonyEventSink_writePtr&,
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const AsteriskSCF::SessionCommunications::V1::TelephonyEventPtr&,
const Ice::Current&);
void setSource_async(
@@ -46,6 +48,8 @@ public:
const AsteriskSCF::SessionCommunications::V1::AMD_TelephonyEventSink_getSourcePtr&,
const Ice::Current&);
+ AsteriskSCF::Helpers::OperationContextCachePtr getOperationContextCache();
+
/**
* Only called from within a queued operation.
*/
@@ -55,6 +59,7 @@ private:
SessionWorkPtr mSessionWork;
pjsip_inv_session *mInv;
AsteriskSCF::SessionCommunications::V1::TelephonyEventSourcePrx mSource;
+ AsteriskSCF::Helpers::OperationContextCachePtr mOperationContextCache;
};
typedef IceUtil::Handle<SIPTelephonyEventSink> SIPTelephonyEventSinkPtr;
diff --git a/src/SIPTelephonyEventSource.cpp b/src/SIPTelephonyEventSource.cpp
index 08dde41..268ffa7 100644
--- a/src/SIPTelephonyEventSource.cpp
+++ b/src/SIPTelephonyEventSource.cpp
@@ -17,6 +17,9 @@
#include "SIPTelephonyEventSource.h"
#include <AsteriskSCF/Helpers/ProxyHelper.h>
#include <AsteriskSCF/System/WorkQueue/WorkQueueIf.h>
+#include <AsteriskSCF/Helpers/OperationContext.h>
+
+using namespace AsteriskSCF::Helpers;
namespace AsteriskSCF
{
@@ -26,30 +29,47 @@ namespace SIPSessionManager
using namespace AsteriskSCF::System::WorkQueue::V1;
using namespace AsteriskSCF::SessionCommunications::V1;
+using namespace AsteriskSCF::System::V1;
void SIPTelephonyEventSource::distributeToSinks(const TelephonyEventPtr& event)
{
for (TelephonyEventSinkSeq::const_iterator iter = mSinks.begin();
iter != mSinks.end(); ++iter)
{
- (*iter)->write(event);
+ OperationContextPtr operationContext = AsteriskSCF::createContext();
+ (*iter)->write(operationContext, event);
}
}
SIPTelephonyEventSource::SIPTelephonyEventSource(const SessionWorkPtr& sessionWork)
- : mSessionWork(sessionWork) { }
+ : mSessionWork(sessionWork),
+ mOperationContextCache(new OperationContextCache(180))
+{
+}
+
+OperationContextCachePtr SIPTelephonyEventSource::getOperationContextCache()
+{
+ return mOperationContextCache;
+}
class AddSinks : public SuspendableWork
{
public:
AddSinks(
const AMD_TelephonyEventSource_addSinksPtr& cb,
+ const OperationContextPtr& operationContext,
const SIPTelephonyEventSourcePtr& source,
const TelephonyEventSinkSeq& sinks)
- : mCB(cb), mSource(source), mSinks(sinks) { }
+ : mCB(cb), mOperationContext(operationContext), mSource(source), mSinks(sinks) { }
SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
{
+ // Is this a retry for an operation we're already processing?
+ if (!mSource->getOperationContextCache()->addOperationContext(mOperationContext))
+ {
+ return Complete;
+ }
+
if (mSource->isDestroyed() == true)
{
mCB->ice_exception(Ice::ObjectNotExistException(__FILE__, __LINE__));
@@ -63,6 +83,7 @@ public:
}
private:
AMD_TelephonyEventSource_addSinksPtr mCB;
+ OperationContextPtr mOperationContext;
SIPTelephonyEventSourcePtr mSource;
TelephonyEventSinkSeq mSinks;
};
@@ -72,12 +93,19 @@ class RemoveSinks : public SuspendableWork
public:
RemoveSinks(
const AMD_TelephonyEventSource_removeSinksPtr& cb,
+ const OperationContextPtr& operationContext,
const SIPTelephonyEventSourcePtr& source,
const TelephonyEventSinkSeq& sinks)
- : mCB(cb), mSource(source), mSinks(sinks) { }
+ : mCB(cb), mOperationContext(operationContext), mSource(source), mSinks(sinks) { }
SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
{
+ // Is this a retry for an operation we're already processing?
+ if (!mSource->getOperationContextCache()->addOperationContext(mOperationContext))
+ {
+ return Complete;
+ }
+
if (mSource->isDestroyed() == true)
{
mCB->ice_exception(Ice::ObjectNotExistException(__FILE__, __LINE__));
@@ -91,24 +119,27 @@ public:
}
private:
AMD_TelephonyEventSource_removeSinksPtr mCB;
+ OperationContextPtr mOperationContext;
SIPTelephonyEventSourcePtr mSource;
TelephonyEventSinkSeq mSinks;
};
void SIPTelephonyEventSource::addSinks_async(
const AMD_TelephonyEventSource_addSinksPtr& cb,
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const TelephonyEventSinkSeq& sinks,
const Ice::Current&)
{
- mSessionWork->enqueueWork(new AddSinks(cb, this, sinks));
+ mSessionWork->enqueueWork(new AddSinks(cb, operationContext, this, sinks));
}
void SIPTelephonyEventSource::removeSinks_async(
const AMD_TelephonyEventSource_removeSinksPtr& cb,
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const TelephonyEventSinkSeq& sinks,
const Ice::Current&)
{
- mSessionWork->enqueueWork(new RemoveSinks(cb, this, sinks));
+ mSessionWork->enqueueWork(new RemoveSinks(cb, operationContext, this, sinks));
}
void SIPTelephonyEventSource::addSinks(const TelephonyEventSinkSeq& sinks)
diff --git a/src/SIPTelephonyEventSource.h b/src/SIPTelephonyEventSource.h
index 91f3612..8797768 100644
--- a/src/SIPTelephonyEventSource.h
+++ b/src/SIPTelephonyEventSource.h
@@ -18,6 +18,7 @@
#include "SIPSession.h"
#include <AsteriskSCF/WorkQueue/Dispatched.h>
+#include <AsteriskSCF/Helpers/OperationContextCache.h>
namespace AsteriskSCF
{
@@ -35,11 +36,13 @@ public:
void addSinks_async(
const AsteriskSCF::SessionCommunications::V1::AMD_TelephonyEventSource_addSinksPtr&,
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const AsteriskSCF::SessionCommunications::V1::TelephonyEventSinkSeq& sinks,
const Ice::Current&);
void removeSinks_async(
const AsteriskSCF::SessionCommunications::V1::AMD_TelephonyEventSource_removeSinksPtr&,
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const AsteriskSCF::SessionCommunications::V1::TelephonyEventSinkSeq& sinks,
const Ice::Current&);
@@ -53,6 +56,8 @@ public:
void addSinks(const AsteriskSCF::SessionCommunications::V1::TelephonyEventSinkSeq& sinks);
void removeSinks(const AsteriskSCF::SessionCommunications::V1::TelephonyEventSinkSeq& sinks);
+ AsteriskSCF::Helpers::OperationContextCachePtr getOperationContextCache();
+
/**
* Write an event to all the configured sinks.
*
@@ -62,6 +67,7 @@ public:
private:
SessionWorkPtr mSessionWork;
AsteriskSCF::SessionCommunications::V1::TelephonyEventSinkSeq mSinks;
+ AsteriskSCF::Helpers::OperationContextCachePtr mOperationContextCache;
};
typedef IceUtil::Handle<SIPTelephonyEventSource> SIPTelephonyEventSourcePtr;
diff --git a/src/SIPTransfer.cpp b/src/SIPTransfer.cpp
index 0f0b06b..b74e542 100644
--- a/src/SIPTransfer.cpp
+++ b/src/SIPTransfer.cpp
@@ -20,10 +20,12 @@
#include <boost/algorithm/string.hpp>
#include <IceUtil/UUID.h>
+#include <AsteriskSCF/Helpers/OperationContext.h>
#include <AsteriskSCF/Logger.h>
#include <AsteriskSCF/SessionCookies/SIPSessionManager/SIPSessionCookiesIf.h>
using namespace AsteriskSCF::System::Logging;
+using namespace AsteriskSCF::System::V1;
using namespace AsteriskSCF::SessionCommunications::PartyIdentification::V1;
namespace
@@ -113,6 +115,7 @@ TransferListener::TransferListener(
}
void TransferListener::indicated(
+ const OperationContextPtr& operationContext,
const SessionPrx& source,
const IndicationPtr& event,
const AsteriskSCF::SessionCommunications::V1::SessionCookies&,
@@ -418,7 +421,7 @@ HandleReferOperation::HandleReferOperation(
mModuleId(moduleId),
mReferCSeq(tsx->cseq),
mBlindTransfer(false),
- mOperationId(::IceUtil::generateUUID()),
+ mOperationContext(AsteriskSCF::createContext()),
mRetryPolicy(5, 500) {}
HandleReferOperation::~HandleReferOperation()
@@ -566,7 +569,7 @@ void HandleReferOperation::invokeBlindTransfer(const SIPAMICallbackCookiePtr& co
PJSIPSessionModInfo *session_mod_info = (PJSIPSessionModInfo*)mInv->mod_data[mModuleId];
SIPSessionPtr session = session_mod_info->getSessionPtr();
- mSessionRouter->begin_connectBridgedSessionsWithDestination(mOperationId, session->getSessionProxy(), mTarget, true, mHook->getProxy(), d);
+ mSessionRouter->begin_connectBridgedSessionsWithDestination(mOperationContext, session->getSessionProxy(), mTarget, true, mHook->getProxy(), d);
}
/**
@@ -577,7 +580,7 @@ void HandleReferOperation::invokeBlindTransfer(const SIPAMICallbackCookiePtr& co
void HandleReferOperation::invokeTransfer(const SIPAMICallbackCookiePtr& cookie)
{
Ice::CallbackPtr d = Ice::newCallback(cookie->getSIPAMICallback(), &SIPAMICallback::callback);
- mSessionRouter->begin_connectBridgedSessions(mOperationId, mSession->getSessionProxy(), mOtherSession->getSessionProxy(), true, d);
+ mSessionRouter->begin_connectBridgedSessions(mOperationContext, mSession->getSessionProxy(), mOtherSession->getSessionProxy(), true, d);
}
SuspendableWorkResult HandleReferOperation::calledBack(const Ice::AsyncResultPtr& asyncResult)
diff --git a/src/SIPTransfer.h b/src/SIPTransfer.h
index db8927c..c0e2447 100644
--- a/src/SIPTransfer.h
+++ b/src/SIPTransfer.h
@@ -20,6 +20,7 @@
#include <AsteriskSCF/Replication/SIPSessionManager/SIPStateReplicationIf.h>
#include <AsteriskSCF/System/Hook/HookIf.h>
#include <AsteriskSCF/Helpers/Retry.h>
+#include <AsteriskSCF/System/OperationsIf.h>
#include "PJSIPSessionModule.h"
@@ -58,6 +59,7 @@ public:
* call is progressing
*/
void indicated(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const AsteriskSCF::SessionCommunications::V1::SessionPrx& source,
const AsteriskSCF::SessionCommunications::V1::IndicationPtr& event,
const AsteriskSCF::SessionCommunications::V1::SessionCookies&,
@@ -377,9 +379,9 @@ private:
TransferSessionCreationHookPtr mHook;
/**
- * Unique id used for the routing service operation.
+ * Operation context.
*/
- std::string mOperationId;
+ AsteriskSCF::System::V1::OperationContextPtr mOperationContext;
/**
* Retry helper.
-----------------------------------------------------------------------
--
asterisk-scf/integration/sip.git
More information about the asterisk-scf-commits
mailing list