[asterisk-scf-commits] asterisk-scf/release/sip.git branch "master" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Tue May 8 17:29:28 CDT 2012
branch "master" has been updated
via 18a36bffd8644a4b5fe7c2515c192a8123f5be0f (commit)
from a803c2b6dc16b72afdf4c9b62dfe53d14521d4cf (commit)
Summary of changes:
.../SIPSessionManager/SIPStateReplicationIf.ice | 24 +-
src/Component.cpp | 31 +-
src/PJSIPManager.cpp | 4 +-
src/PJSIPRegistrarModule.cpp | 91 ++-
src/PJSIPRegistrarModule.h | 6 +
src/PJSIPSessionModule.cpp | 136 ++-
src/PJSIPSessionModule.h | 30 +-
src/PJSIPSessionModuleConstruction.cpp | 14 +-
src/SIPClientRegistration.cpp | 10 +-
src/SIPConfiguration.cpp | 394 ++++---
src/SIPEndpoint.cpp | 266 +++-
src/SIPEndpoint.h | 19 +-
src/SIPRegistrarListener.cpp | 95 +-
src/SIPRegistrarListener.h | 8 +-
src/SIPSession.cpp | 1335 ++++++++++++++------
src/SIPSession.h | 83 +-
src/SIPStateReplicator.h | 12 +-
src/SIPStateReplicatorListener.cpp | 21 +-
src/SIPTelephonyEventSink.cpp | 83 +-
src/SIPTelephonyEventSink.h | 6 +
src/SIPTelephonyEventSource.cpp | 62 +-
src/SIPTelephonyEventSource.h | 6 +
src/SIPTransfer.cpp | 151 ++-
src/SIPTransfer.h | 41 +-
24 files changed, 2084 insertions(+), 844 deletions(-)
- Log -----------------------------------------------------------------
commit 18a36bffd8644a4b5fe7c2515c192a8123f5be0f
Author: Ken Hunt <ken.hunt at digium.com>
Date: Tue May 8 12:08:45 2012 -0500
Changes for new retry logic.
diff --git a/slice/AsteriskSCF/Replication/SIPSessionManager/SIPStateReplicationIf.ice b/slice/AsteriskSCF/Replication/SIPSessionManager/SIPStateReplicationIf.ice
index 5b29ff3..0bb964c 100644
--- a/slice/AsteriskSCF/Replication/SIPSessionManager/SIPStateReplicationIf.ice
+++ b/slice/AsteriskSCF/Replication/SIPSessionManager/SIPStateReplicationIf.ice
@@ -24,6 +24,7 @@
#include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.ice>
#include <AsteriskSCF/System/Component/ConfigurationIf.ice>
#include <AsteriskSCF/SIP/SIPRegistrarIf.ice>
+#include <AsteriskSCF/System/OperationsIf.ice>
module AsteriskSCF
{
@@ -50,18 +51,24 @@ module V1
interface SIPStateReplicatorListener
{
- void stateRemoved(Ice::StringSeq itemKeys);
- void stateRemovedForItems(SIPStateItemSeq items);
- void stateSet(SIPStateItemSeq items);
+ idempotent void stateRemoved(
+ AsteriskSCF::System::V1::OperationContext operationContext,
+ Ice::StringSeq itemKeys);
+ idempotent void stateRemovedForItems(
+ AsteriskSCF::System::V1::OperationContext operationContext,
+ SIPStateItemSeq items);
+ idempotent void stateSet(
+ AsteriskSCF::System::V1::OperationContext operationContext,
+ SIPStateItemSeq items);
};
interface SIPStateReplicator
{
- void addListener(SIPStateReplicatorListener *listener);
- void removeListener(SIPStateReplicatorListener *listener);
- void setState (SIPStateItemSeq items);
- void removeState(Ice::StringSeq items);
- void removeStateForItems(SIPStateItemSeq items);
+ idempotent void addListener(AsteriskSCF::System::V1::OperationContext operationContext, SIPStateReplicatorListener *listener);
+ idempotent void removeListener(AsteriskSCF::System::V1::OperationContext operationContext, SIPStateReplicatorListener *listener);
+ idempotent void setState (AsteriskSCF::System::V1::OperationContext operationContext, SIPStateItemSeq items);
+ idempotent void removeState(AsteriskSCF::System::V1::OperationContext operationContext, Ice::StringSeq items);
+ idempotent void removeStateForItems(AsteriskSCF::System::V1::OperationContext operationContext, SIPStateItemSeq items);
idempotent SIPStateItemSeq getState(Ice::StringSeq iteKeys);
idempotent SIPStateItemSeq getAllState();
};
@@ -151,6 +158,7 @@ module V1
Ice::Identity sessionObjectId;
Ice::Identity mediaSessionObjectId;
Ice::Identity sessionControllerObjectId;
+ AsteriskSCF::System::V1::OperationContext originalContext;
AsteriskSCF::Media::V1::StreamSourceSeq sources;
AsteriskSCF::Media::V1::StreamSinkSeq sinks;
AsteriskSCF::Media::V1::StreamInformationDict streams;
diff --git a/src/Component.cpp b/src/Component.cpp
index 3da1d4e..e353b2f 100644
--- a/src/Component.cpp
+++ b/src/Component.cpp
@@ -28,6 +28,7 @@
#include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.h>
#include <AsteriskSCF/Discovery/SmartProxy.h>
#include <AsteriskSCF/System/Component/ConfigurationIf.h>
+#include <AsteriskSCF/System/OperationsIf.h>
#include <AsteriskSCF/Logger/IceLogger.h>
#include <AsteriskSCF/Logger.h>
@@ -61,6 +62,7 @@ using namespace AsteriskSCF::SIP::Registration::V1;
using namespace AsteriskSCF::System::Configuration::V1;
using namespace AsteriskSCF::Replication;
using namespace AsteriskSCF::Discovery;
+using namespace AsteriskSCF::System::V1;
namespace
@@ -82,21 +84,24 @@ public:
}
void addAuthHook(
- const AuthHookPrx &hook,
- int priority,
- const RequestTypeSeq &requestTypes,
- const Ice::Current&)
+ const OperationContextPtr& operationContext,
+ const AuthHookPrx &hook,
+ int priority,
+ const RequestTypeSeq &requestTypes,
+ const Ice::Current&)
{
mPJSIPManager->addAuthHook(hook, priority, requestTypes);
}
- void removeAuthHook(const AuthHookPrx &hook, const Ice::Current&)
+ void removeAuthHook(const OperationContextPtr& operationContext, const AuthHookPrx &hook, const Ice::Current&)
{
+ // This method can be called any number of times, so no special retry logic required.
mPJSIPManager->removeAuthHook(hook);
}
- void clearAuthHooks(const Ice::Current&)
+ void clearAuthHooks(const OperationContextPtr& operationContext, const Ice::Current&)
{
+ // This method can be called any number of times, so no special retry logic required.
mPJSIPManager->clearAuthHooks();
}
private:
@@ -289,9 +294,9 @@ void Component::locateStateReplicator()
// Since we're not in standalone mode, we'll get our configuration updates routed via the
// replicator service.
- ConfigurationReplicatorPrx configurationReplicator = ConfigurationReplicatorPrx::checkedCast(
- sipReplicationContext->getReplicator().initialize(), ReplicatorFacet);
- configurationReplicator->registerConfigurationService(mConfigurationServiceProxy);
+ ConfigurationReplicatorPrx configurationReplicator = ConfigurationReplicatorPrx::checkedCast(
+ sipReplicationContext->getReplicator().initialize(), ReplicatorFacet);
+ configurationReplicator->registerConfigurationService(AsteriskSCF::Operations::createContext(), mConfigurationServiceProxy);
}
catch (...)
{
@@ -320,7 +325,7 @@ void Component::listenToStateReplicators()
// Are we in standby mode?
if (sipReplicationContext->getState() == STANDBY_IN_REPLICA_GROUP)
{
- sipReplicationContext->getReplicator()->addListener(mReplicatorListenerProxy);
+ sipReplicationContext->getReplicator()->addListener(Operations::createContext(), mReplicatorListenerProxy);
mListeningToReplicator = true;
}
}
@@ -349,7 +354,7 @@ void Component::stopListeningToStateReplicators()
try
{
- sipReplicationContext->getReplicator()->removeListener(mReplicatorListenerProxy);
+ sipReplicationContext->getReplicator()->removeListener(Operations::createContext(), mReplicatorListenerProxy);
mListeningToReplicator = false;
}
catch (const Ice::Exception& e)
@@ -459,7 +464,7 @@ void Component::registerWithRoutingService()
EndpointLocatorPrx locator = EndpointLocatorPrx::uncheckedCast(
getServiceAdapter()->createDirectProxy(getCommunicator()->stringToIdentity(EndpointLocatorObjectId)));
- mRoutingServiceLocatorRegistry->addEndpointLocator(mRoutingId, destinations, locator);
+ mRoutingServiceLocatorRegistry->addEndpointLocator(AsteriskSCF::Operations::createContext(), mRoutingId, destinations, locator);
}
catch(const std::exception& e)
{
@@ -474,7 +479,7 @@ void Component::unregisterFromRoutingService()
{
try
{
- mRoutingServiceLocatorRegistry->removeEndpointLocator(mRoutingId);
+ mRoutingServiceLocatorRegistry->removeEndpointLocator(AsteriskSCF::Operations::createContext(), mRoutingId);
}
catch(const std::exception& e)
{
diff --git a/src/PJSIPManager.cpp b/src/PJSIPManager.cpp
index d706c29..11a917a 100644
--- a/src/PJSIPManager.cpp
+++ b/src/PJSIPManager.cpp
@@ -189,7 +189,7 @@ void PJSIPManager::addTransport(const string& id, const TransportPtr& transport)
{
if (i->second->isDestroyed())
{
- mTransports.erase(i);
+ mTransports.erase(i++);
}
else
{
@@ -207,7 +207,7 @@ TransportPtr PJSIPManager::getTransport(const string& id)
{
if (i->second->isDestroyed())
{
- mTransports.erase(i);
+ mTransports.erase(i++);
}
else
{
diff --git a/src/PJSIPRegistrarModule.cpp b/src/PJSIPRegistrarModule.cpp
index 590c584..d2df307 100644
--- a/src/PJSIPRegistrarModule.cpp
+++ b/src/PJSIPRegistrarModule.cpp
@@ -20,6 +20,7 @@
#include <AsteriskSCF/WorkQueue/WorkQueue.h>
#include <AsteriskSCF/WorkQueue/DefaultQueueListener.h>
#include <AsteriskSCF/PJLIB/ThreadHook.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
#include "PJSIPRegistrarModule.h"
#include "PJSIPManager.h"
@@ -31,6 +32,8 @@ using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::System::WorkQueue::V1;
using namespace AsteriskSCF::System::Hook::V1;
using namespace AsteriskSCF::SIP::ExtensionPoint::V1;
+using namespace AsteriskSCF::System::V1;
+using namespace AsteriskSCF::Operations;
namespace
{
@@ -73,7 +76,7 @@ public:
iter != listeners.end(); ++iter)
{
lg(Debug) << "Alerting listener " << (*iter)->ice_getIdentity().name << " about removal of binding due to expiration.";
- (*iter)->contactsRemoved(removedBindingUpdateSeq);
+ (*iter)->contactsRemoved(Operations::createContext(), removedBindingUpdateSeq);
}
BindingWrapperSeq emptySeq;
@@ -147,7 +150,8 @@ void BindingWrapper::updateBinding(const std::string& callID, int cSeq, int expi
queue->enqueueWork(new UpdateBinding(this, callID, cSeq, expiration));
}
-RegistrarI::RegistrarI(const RegistrarListenerPrx& defaultListener)
+RegistrarI::RegistrarI(const RegistrarListenerPrx& defaultListener) :
+ mOperationContextCache(OperationContextCache::create(120))
{
lg(Debug) << "In RegistrarI constructor, should be adding a listener...";
mQueue = new AsteriskSCF::WorkQueue::WorkQueue();
@@ -173,33 +177,55 @@ class AddListener : public Work
public:
AddListener(
const AMD_Registrar_addListenerPtr& cb,
+ const OperationContextPtr& operationContext,
const RegistrarListenerPrx& listener,
const RegistrarIPtr& registrar)
- : mCB(cb), mListener(listener), mRegistrar(registrar) { }
+ : mCB(cb), mOperationContext(operationContext), mListener(listener), mRegistrar(registrar) { }
void execute()
{
- std::vector<AsteriskSCF::SIP::Registration::V1::RegistrarListenerPrx>& listeners(mRegistrar->getListeners());
+ AMDContextResultData<ContactDict, AMD_Registrar_addListenerPtr>::ptr_type operationCookie =
+ getContext<AMDContextResultData<ContactDict, AMD_Registrar_addListenerPtr> >(
+ mRegistrar->getOperationContextCache(),
+ mOperationContext,
+ mCB);
+ if (!operationCookie)
+ {
+ lg(Debug) << "Retry of addListener() detected for operation " << mOperationContext->id;
+ return;
+ }
- if (std::find(listeners.begin(), listeners.end(), mListener) == listeners.end())
+ try
{
- lg(Debug) << "Adding Listener " << mListener->ice_getIdentity().name << " to registrar";
- listeners.push_back(mListener);
+ std::vector<AsteriskSCF::SIP::Registration::V1::RegistrarListenerPrx>& listeners(mRegistrar->getListeners());
+
+ if (std::find(listeners.begin(), listeners.end(), mListener) == listeners.end())
+ {
+ lg(Debug) << "Adding Listener " << mListener->ice_getIdentity().name << " to registrar";
+ listeners.push_back(mListener);
+ }
+ operationCookie->setResult(mRegistrar->createContactDict(mRegistrar->getBindings()));
+ }
+ catch (const std::exception& e)
+ {
+ operationCookie->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+ assert(false);
}
- mCB->ice_response(mRegistrar->createContactDict(mRegistrar->getBindings()));
}
private:
AMD_Registrar_addListenerPtr mCB;
+ OperationContextPtr mOperationContext;
RegistrarListenerPrx mListener;
RegistrarIPtr mRegistrar;
};
void RegistrarI::addListener_async(
const AMD_Registrar_addListenerPtr& cb,
+ const OperationContextPtr& operationContext,
const RegistrarListenerPrx& listener,
const Ice::Current&)
{
- mQueue->enqueueWork(new AddListener(cb, listener, this));
+ mQueue->enqueueWork(new AddListener(cb, operationContext, listener, this));
}
class RemoveListener : public Work
@@ -207,20 +233,41 @@ class RemoveListener : public Work
public:
RemoveListener(
const AMD_Registrar_removeListenerPtr& cb,
+ const OperationContextPtr& operationContext,
const RegistrarListenerPrx& listener,
const RegistrarIPtr& registrar)
- : mCB(cb), mListener(listener), mRegistrar(registrar) { }
+ : mCB(cb), mOperationContext(operationContext), mListener(listener), mRegistrar(registrar) { }
void execute()
{
- std::vector<AsteriskSCF::SIP::Registration::V1::RegistrarListenerPrx>& listeners(mRegistrar->getListeners());
+ AMDContextData<AMD_Registrar_removeListenerPtr>::ptr_type operationCookie =
+ getContext<AMDContextData<AMD_Registrar_removeListenerPtr> >(
+ mRegistrar->getOperationContextCache(),
+ mOperationContext,
+ mCB);
+ if (!operationCookie)
+ {
+ lg(Debug) << "Retry of removeListener() detected for operation " << mOperationContext->id;
+ return;
+ }
- listeners.erase(std::remove(listeners.begin(), listeners.end(), mListener));
- mCB->ice_response();
+ try
+ {
+ std::vector<AsteriskSCF::SIP::Registration::V1::RegistrarListenerPrx>& listeners(mRegistrar->getListeners());
+
+ listeners.erase(std::remove(listeners.begin(), listeners.end(), mListener));
+ operationCookie->setCompleted();
+ }
+ catch (const std::exception& e)
+ {
+ operationCookie->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+ assert(false);
+ }
}
private:
AMD_Registrar_removeListenerPtr mCB;
+ OperationContextPtr mOperationContext;
RegistrarListenerPrx mListener;
RegistrarIPtr mRegistrar;
@@ -228,10 +275,11 @@ private:
void RegistrarI::removeListener_async(
const AMD_Registrar_removeListenerPtr& cb,
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const RegistrarListenerPrx& listener,
const Ice::Current&)
{
- mQueue->enqueueWork(new RemoveListener(cb, listener, this));
+ mQueue->enqueueWork(new RemoveListener(cb, operationContext, listener, this));
}
class GetAllBindings : public Work
@@ -389,6 +437,11 @@ QueuePtr RegistrarI::getQueue()
return mQueue;
}
+OperationContextCachePtr RegistrarI::getOperationContextCache()
+{
+ return mOperationContextCache;
+}
+
class ReplicateState : public Work
{
public:
@@ -417,7 +470,7 @@ public:
SIPRegistrarStateItemPtr newItem(new SIPRegistrarStateItem(mAOR, mAOR, newDict, true));
SIPStateItemSeq items;
items.push_back(newItem);
- mReplicationContext->getReplicator().tryOneWay()->setState(items);
+ mReplicationContext->getReplicator().tryOneWay()->setState(Operations::createContext(), items);
}
if (!mExistingBindings.empty())
@@ -426,7 +479,7 @@ public:
SIPRegistrarStateItemPtr existingItem(new SIPRegistrarStateItem(mAOR, mAOR, existingDict, false));
SIPStateItemSeq items;
items.push_back(existingItem);
- mReplicationContext->getReplicator().tryOneWay()->setState(items);
+ mReplicationContext->getReplicator().tryOneWay()->setState(Operations::createContext(), items);
}
if (!mRemovedBindings.empty())
@@ -435,7 +488,7 @@ public:
SIPRegistrarStateItemPtr removedItem(new SIPRegistrarStateItem(mAOR, mAOR, removedDict, false));
SIPStateItemSeq items;
items.push_back(removedItem);
- mReplicationContext->getReplicator().tryOneWay()->removeStateForItems(items);
+ mReplicationContext->getReplicator().tryOneWay()->removeStateForItems(Operations::createContext(), items);
}
}
catch (const Ice::TwowayOnlyException&)
@@ -516,8 +569,8 @@ public:
{
//Listeners are only concerned with new and removed bindings. There's no need to tell them of existing ones.
lg(Debug) << "Alerting listener " << (*iter)->ice_getIdentity().name << " about changes to bindings";
- (*iter)->contactsAdded(newBindingUpdateSeq);
- (*iter)->contactsRemoved(removedBindingUpdateSeq);
+ (*iter)->contactsAdded(Operations::createContext(), newBindingUpdateSeq);
+ (*iter)->contactsRemoved(Operations::createContext(), removedBindingUpdateSeq);
}
}
private:
diff --git a/src/PJSIPRegistrarModule.h b/src/PJSIPRegistrarModule.h
index a1094a1..ea0211b 100644
--- a/src/PJSIPRegistrarModule.h
+++ b/src/PJSIPRegistrarModule.h
@@ -19,6 +19,7 @@
#include <AsteriskSCF/Discovery/SmartProxy.h>
#include <AsteriskSCF/SIP/SIPRegistrarIf.h>
#include <AsteriskSCF/System/WorkQueue/WorkQueueIf.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
#include "SIPStateReplicator.h"
#include "PJSIPModule.h"
@@ -44,11 +45,13 @@ public:
void addListener_async(
const AsteriskSCF::SIP::Registration::V1::AMD_Registrar_addListenerPtr& cb,
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const AsteriskSCF::SIP::Registration::V1::RegistrarListenerPrx& listener,
const Ice::Current&);
void removeListener_async(
const AsteriskSCF::SIP::Registration::V1::AMD_Registrar_removeListenerPtr& cb,
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const AsteriskSCF::SIP::Registration::V1::RegistrarListenerPrx& listener,
const Ice::Current&);
@@ -86,11 +89,14 @@ public:
AsteriskSCF::System::WorkQueue::V1::QueuePtr getQueue();
+ AsteriskSCF::Operations::OperationContextCachePtr getOperationContextCache();
+
private:
BindingWrapperDict mBindings;
AsteriskSCF::SIP::Registration::V1::ContactDict mContacts;
std::vector<AsteriskSCF::SIP::Registration::V1::RegistrarListenerPrx> mListeners;
AsteriskSCF::System::WorkQueue::V1::QueuePtr mQueue;
+ AsteriskSCF::Operations::OperationContextCachePtr mOperationContextCache;
};
typedef IceUtil::Handle<RegistrarI> RegistrarIPtr;
diff --git a/src/PJSIPSessionModule.cpp b/src/PJSIPSessionModule.cpp
index 12a0f08..f8de853 100644
--- a/src/PJSIPSessionModule.cpp
+++ b/src/PJSIPSessionModule.cpp
@@ -40,10 +40,16 @@
#include <AsteriskSCF/Logger.h>
#include <AsteriskSCF/WorkQueue/WorkQueue.h>
#include <AsteriskSCF/WorkQueue/SuspendableWorkQueue.h>
+#include <AsteriskSCF/Helpers/Retry.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::SIP::ExtensionPoint::V1;
using namespace AsteriskSCF::System::Hook::V1;
+using namespace AsteriskSCF::System::V1;
+using namespace AsteriskSCF::Operations;
namespace
{
@@ -109,6 +115,7 @@ void PJSIPSessionModInfo::updateSessionState(pjsip_inv_session *inv_session)
mSessionState->sessionObjectId = mSession->getSessionProxy()->ice_getIdentity();
mSessionState->mediaSessionObjectId = mSession->getMediaSessionProxy()->ice_getIdentity();
mSessionState->sessionControllerObjectId = mSession->getOurSessionControllerProxy()->ice_getIdentity();
+ mSessionState->originalContext = mSession->getOperationContext();
mSessionState->sources = mSession->getMediaSources();
mSessionState->sinks = mSession->getMediaSinks();
mSessionState->rtpMediaSessions = mSession->getRTPMediaSessions();
@@ -191,20 +198,57 @@ SessionWorkPtr PJSIPSessionModInfo::getSessionWork()
return mSessionWork;
}
-void SIPSessionCreationExtensionPoint::addSessionCreationHook(const SessionCreationHookPrx& hook, const Ice::Current&)
+SIPSessionCreationExtensionPoint::SIPSessionCreationExtensionPoint() :
+ mOperationContextCache(OperationContextCache::create(120))
{
+}
+
+void SIPSessionCreationExtensionPoint::addSessionCreationHook(
+ const OperationContextPtr& operationContext,
+ const SessionCreationHookPrx& hook, const Ice::Current&)
+{
+ ContextDataPtr contextData;
+ if (!(contextData = Operations::checkAndThrow(mOperationContextCache, operationContext)))
+ {
+ lg(Debug) << "SIPSessionCreationExtensionPoint::addSessionCreationHook() detected retry for operation " << operationContext->id;
+ return;
+ }
+
boost::unique_lock<boost::shared_mutex> lock(mLock);
mHooks.push_back(hook);
+ contextData->setCompleted();
}
-void SIPSessionCreationExtensionPoint::removeSessionCreationHook(const SessionCreationHookPrx& hook, const Ice::Current&)
+
+void SIPSessionCreationExtensionPoint::removeSessionCreationHook(
+ const OperationContextPtr& operationContext,
+ const SessionCreationHookPrx& hook, const Ice::Current&)
{
+ ContextDataPtr contextData;
+ if (!(contextData = Operations::checkAndThrow(mOperationContextCache, operationContext)))
+ {
+ lg(Debug) << "SIPSessionCreationExtensionPoint::removeSessionCreationHook() detected retry for operation " << operationContext->id;
+ return;
+ }
+
boost::unique_lock<boost::shared_mutex> lock(mLock);
mHooks.erase(std::find(mHooks.begin(), mHooks.end(), hook));
+
+ contextData->setCompleted();
}
-void SIPSessionCreationExtensionPoint::clearSessionCreationHooks(const Ice::Current&)
+void SIPSessionCreationExtensionPoint::clearSessionCreationHooks(
+ const OperationContextPtr& operationContext, const Ice::Current&)
{
+ ContextDataPtr contextData;
+ if (!(contextData = Operations::checkAndThrow(mOperationContextCache, operationContext)))
+ {
+ lg(Debug) << "SIPSessionCreationExtensionPoint::clearSessionCreationHooks() detected retry for operation " << operationContext->id;
+ return;
+ }
+
boost::unique_lock<boost::shared_mutex> lock(mLock);
mHooks.clear();
+
+ contextData->setCompleted();
}
SessionCreationHookSeq SIPSessionCreationExtensionPoint::getHooks()
@@ -309,12 +353,12 @@ void PJSIPSessionModule::replicateState(PJSIPDialogModInfo *dlgInfo, PJSIPTransa
{
if (setItems.size() != 0)
{
- mReplicationContext->getReplicator().tryOneWay()->setState(setItems);
+ mReplicationContext->getReplicator().tryOneWay()->setState(Operations::createContext(), setItems);
}
if (removeItems.size() != 0)
{
- mReplicationContext->getReplicator().tryOneWay()->removeState(removeItems);
+ mReplicationContext->getReplicator().tryOneWay()->removeState(Operations::createContext(), removeItems);
}
}
}
@@ -484,12 +528,12 @@ protected:
if (!mInv->neg || fail(pjmedia_sdp_neg_get_neg_remote(mInv->neg, &remote_sdp)))
{
// No SDP was present in the INVITE so we need to create an offer
- sdp = mSession->createSDPOffer(StreamInformationDict(), streams);
+ sdp = mSession->createSDPOffer(Operations::createContext(), StreamInformationDict(), streams);
}
else
{
// SDP was present in the INVITE so we need to create an answer using their offer
- sdp = mSession->createSDPAnswer(remote_sdp, streams);
+ sdp = mSession->createSDPAnswer(Operations::createContext(), remote_sdp, streams);
}
if (!sdp)
@@ -532,15 +576,14 @@ protected:
else
{
// If this is not an attended transfer we can just route the session as normally
- std::string operationId = ::IceUtil::generateUUID();
+ SuspendableWorkListenerPtr listener = 0;
// Update the Party Id information on the session.
mSession->setSelfAsCaller();
- SuspendableWorkListenerPtr listener = 0;
SIPAMICallbackPtr cb(new SIPAMICallback(listener, mSession, this, false, true));
Ice::CallbackPtr d = Ice::newCallback(cb, &SIPAMICallback::callback);
- sessionRouter->begin_routeSession(operationId, mSession->getSessionProxy(), mDestination, 0, mCallerID, mRedirections, d);
+ sessionRouter->begin_routeSession(Operations::createContext(), mSession->getSessionProxy(), mDestination, 0, mCallerID, mRedirections, d);
}
}
catch (const Ice::CommunicatorDestroyedException &)
@@ -567,29 +610,27 @@ protected:
}
catch (const DestinationNotFoundException &)
{
- if (success(pjsip_inv_end_session(mInv, 404, NULL, &mTdata)))
- {
- pjsip_inv_send_msg(mInv, mTdata);
- }
- else
- {
- lg(Warning) << "Unable to create 404 response for INVITE";
- }
+ endSession(404);
}
catch (...)
{
- if (success(pjsip_inv_end_session(mInv, 500, NULL, &mTdata)))
- {
- pjsip_inv_send_msg(mInv, mTdata);
- }
- else
- {
- lg(Warning) << "Unable to create 500 response for INVITE";
- }
+ endSession(500);
}
return Complete;
}
+ void endSession(int statusCode)
+ {
+ if (success(pjsip_inv_end_session(mInv, statusCode, NULL, &mTdata)))
+ {
+ pjsip_inv_send_msg(mInv, mTdata);
+ }
+ else
+ {
+ lg(Warning) << "Unable to create " << statusCode << " response for INVITE";
+ }
+ }
+
private:
PJSIPSessionModulePtr mSessionModule;
SIPEndpointPtr mCaller;
@@ -1521,7 +1562,7 @@ protected:
{
SIPAMICallbackPtr cb(new SIPAMICallback(0, mSession, this, false, true));
Ice::CallbackPtr d = Ice::newCallback(cb, &SIPAMICallback::callback);
- (*listener)->begin_indicated(mSession->getSessionProxy(), new RingingIndication(), mSession->getCookies(), d);
+ (*listener)->begin_indicated(Operations::createContext(), mSession->getSessionProxy(), new RingingIndication(), mSession->getCookies(), d);
}
catch (const Ice::Exception &ex)
{
@@ -1545,7 +1586,7 @@ protected:
Ice::CallbackPtr d = Ice::newCallback(cb, &SIPAMICallback::callback);
ProgressingIndicationPtr progressing(new ProgressingIndication());
progressing->response = response;
- (*listener)->begin_indicated(mSession->getSessionProxy(), progressing, mSession->getCookies(), d);
+ (*listener)->begin_indicated(Operations::createContext(), mSession->getSessionProxy(), progressing, mSession->getCookies(), d);
}
catch (const Ice::Exception &ex)
{
@@ -1561,13 +1602,14 @@ protected:
std::vector<AsteriskSCF::SessionCommunications::V1::SessionListenerPrx> listeners = mSession->getListeners();
std::vector<AsteriskSCF::SessionCommunications::V1::SessionListenerPrx>::const_iterator listener;
lg(Debug) << "Relating connected state to " << listeners.size() << " listeners";
+
for (listener = listeners.begin(); listener != listeners.end(); ++listener)
{
try
{
SIPAMICallbackPtr cb(new SIPAMICallback(0, mSession, this, false, true));
Ice::CallbackPtr d = Ice::newCallback(cb, &SIPAMICallback::callback);
- (*listener)->begin_indicated(mSession->getSessionProxy(), new ConnectedIndication(), mSession->getCookies(), d);
+ (*listener)->begin_indicated(Operations::createContext(), mSession->getSessionProxy(), new ConnectedIndication(), mSession->getCookies(), d);
}
catch (const Ice::Exception &ex)
{
@@ -1577,7 +1619,7 @@ protected:
}
}
mSession->setSessionOwnerId(mConnected);
- mSession->getSessionControllerProxy()->updateConnectedLine(mConnected);
+ mSession->getSessionControllerProxy()->updateConnectedLine(Operations::createContext(), mConnected);
return Complete;
}
@@ -1825,7 +1867,7 @@ protected:
{
try
{
- (*listener)->indicated(session->getSessionProxy(), stopped, session->getCookies());
+ (*listener)->indicated(Operations::createContext(), session->getSessionProxy(), stopped, session->getCookies());
}
catch (const Ice::Exception &ex)
{
@@ -2063,7 +2105,7 @@ protected:
SIPSessionPtr session = session_mod_info->getSessionPtr();
StreamInformationDict added;
- session->createSDPAnswer(remote_sdp, added);
+ session->createSDPAnswer(Operations::createContext(), remote_sdp, added);
return Complete;
}
@@ -2087,7 +2129,10 @@ public:
const std::string& contact,
const AsteriskSCF::Discovery::SmartProxy<SessionRouterPrx>& router,
const SIPSessionPtr& session)
- : mInv(inv), mContact(contact), mRouter(router), mSession(session) { }
+ : mInv(inv),
+ mContact(contact),
+ mRouter(router),
+ mSession(session) { }
protected:
SuspendableWorkResult initial(const SuspendableWorkListenerPtr&)
@@ -2098,7 +2143,7 @@ protected:
SIPAMICallbackPtr cb(new SIPAMICallback(listener, mSession, this, false, true));
Ice::CallbackPtr d = Ice::newCallback(cb, &SIPAMICallback::callback);
mRouter->begin_connectBridgedSessionsWithDestination(
- IceUtil::generateUUID(),
+ Operations::createContext(),
mSession->getSessionProxy(),
mContact,
true,
@@ -2170,19 +2215,21 @@ class HandleSendReinviteResponse : public SIPQueueableOperation
{
public:
HandleSendReinviteResponse(pjsip_inv_session *inv, const int moduleId, pjsip_tx_data *tdata)
- : mInv(inv), mModuleId(moduleId), mResponse(tdata) { }
+ : mInv(inv),
+ mModuleId(moduleId),
+ mResponse(tdata) { }
protected:
SuspendableWorkResult initial(const SuspendableWorkListenerPtr&)
{
PJSIPSessionModInfo *session_mod_info = (PJSIPSessionModInfo*)mInv->mod_data[mModuleId];
- SIPSessionPtr session = session_mod_info->getSessionPtr();
- SessionControllerPrx controller = session->getSessionControllerProxy();
+ mSession = session_mod_info->getSessionPtr();
+ SessionControllerPrx controller = mSession->getSessionControllerProxy();
// If we have no session controller respond to the reinvite with the same SDP that we previously had
if (!controller)
{
- pjsip_inv_set_sdp_answer(mInv, session->modifySDP(StreamInformationDict()));
+ pjsip_inv_set_sdp_answer(mInv, mSession->modifySDP(StreamInformationDict()));
pjsip_inv_send_reinvite_response(mInv, mResponse);
return Complete;
}
@@ -2195,21 +2242,21 @@ protected:
}
// Call into the session for the serious work
- session->createSDPAnswer(offer_sdp, mStreamsAdded);
+ mSession->createSDPAnswer(Operations::createContext(), offer_sdp, mStreamsAdded);
// If no streams were added we can respond to the offer with the SDP we had previously and move on
if (mStreamsAdded.empty())
{
- pjsip_inv_set_sdp_answer(mInv, session->modifySDP(StreamInformationDict()));
+ pjsip_inv_set_sdp_answer(mInv, mSession->modifySDP(StreamInformationDict()));
pjsip_inv_send_reinvite_response(mInv, mResponse);
return Complete;
}
// If any streams were added we need to call into the session controller to see what ones
// we can actually accept
- SIPAMICallbackPtr cb(new SIPAMICallback(0, session, this, false, true));
+ SIPAMICallbackPtr cb(new SIPAMICallback(0, mSession, this, false, true));
Ice::CallbackPtr d = Ice::newCallback(cb, &SIPAMICallback::callback);
- session->getSessionControllerProxy()->begin_addStreams(mStreamsAdded, d);
+ mSession->getSessionControllerProxy()->begin_addStreams(Operations::createContext(), mStreamsAdded, d);
return Complete;
}
@@ -2222,8 +2269,8 @@ protected:
// Remove any streams that were accepted by the session controller, we don't need to do anything
// more with them since our initial call to createSDPAnswer will have created the proper SDP
for (StreamInformationDict::iterator stream = accepted.begin();
- stream != accepted.end();
- ++stream)
+ stream != accepted.end();
+ ++stream)
{
mStreamsAdded.erase(stream->first);
}
@@ -2248,6 +2295,7 @@ private:
const int mModuleId;
pjsip_tx_data *mResponse;
StreamInformationDict mStreamsAdded;
+ SIPSessionPtr mSession;
};
void PJSIPSessionModule::invOnSendReinviteResponse(pjsip_inv_session* inv, pjsip_tx_data* tdata)
diff --git a/src/PJSIPSessionModule.h b/src/PJSIPSessionModule.h
index f9874ed..941704b 100644
--- a/src/PJSIPSessionModule.h
+++ b/src/PJSIPSessionModule.h
@@ -27,6 +27,7 @@
#include <AsteriskSCF/System/ThreadPool/ThreadPoolIf.h>
#include <AsteriskSCF/System/WorkQueue/WorkQueueIf.h>
#include <AsteriskSCF/PJLIB/ThreadHook.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
#include "SIPEndpointFactory.h"
#include "SIPReplicationContext.h"
@@ -84,14 +85,25 @@ typedef IceUtil::Handle<PJSIPSessionModuleThreadPoolListener> PJSIPSessionModule
class SIPSessionCreationExtensionPoint : public AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationExtensionPoint
{
public:
- void addSessionCreationHook(const AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationHookPrx& hook, const Ice::Current&);
- void removeSessionCreationHook(const AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationHookPrx& hook, const Ice::Current&);
- void clearSessionCreationHooks(const Ice::Current&);
+ SIPSessionCreationExtensionPoint();
+
+ void addSessionCreationHook(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationHookPrx& hook,
+ const Ice::Current&);
+ void removeSessionCreationHook(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationHookPrx& hook,
+ const Ice::Current&);
+ void clearSessionCreationHooks(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const Ice::Current&);
AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationHookSeq getHooks();
private:
AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationHookSeq mHooks;
boost::shared_mutex mLock;
+ AsteriskSCF::Operations::OperationContextCachePtr mOperationContextCache;
};
typedef IceUtil::Handle<SIPSessionCreationExtensionPoint> SIPSessionCreationExtensionPointPtr;
@@ -385,5 +397,17 @@ private:
typedef IceUtil::Handle<SIPAMICallback> SIPAMICallbackPtr;
+class SIPAMICallbackCookie : public Ice::LocalObject
+{
+public:
+ SIPAMICallbackCookie(const SIPAMICallbackPtr& c) : mCallbackPtr(c) {}
+ SIPAMICallbackPtr getSIPAMICallback() { return mCallbackPtr; }
+
+private:
+ SIPAMICallbackPtr mCallbackPtr;
+};
+
+typedef IceUtil::Handle<SIPAMICallbackCookie> SIPAMICallbackCookiePtr;
+
}; //end namespace SIPSessionManager
}; //end namespace AsteriskSCF
diff --git a/src/PJSIPSessionModuleConstruction.cpp b/src/PJSIPSessionModuleConstruction.cpp
index 4fbfe8d..fa38aab 100644
--- a/src/PJSIPSessionModuleConstruction.cpp
+++ b/src/PJSIPSessionModuleConstruction.cpp
@@ -17,6 +17,7 @@
#include <AsteriskSCF/Logger.h>
#include <AsteriskSCF/WorkQueue/WorkQueue.h>
#include <AsteriskSCF/ThreadPool/ThreadPool.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
#include "PJSIPSessionModule.h"
#include "AuthManager.h"
@@ -157,9 +158,16 @@ PJSIPSessionModule::PJSIPSessionModule(pjsip_endpoint *endpt,
mSessionCreationExtensionPointPrx =
SessionCreationExtensionPointPrx::uncheckedCast(mAdapter->addWithUUID(mSessionCreationExtensionPoint));
- mSessionCreationExtensionPointService = serviceLocatorManagement->addService(mSessionCreationExtensionPointPrx, SessionCreationExtensionPointId);
+ mSessionCreationExtensionPointService = serviceLocatorManagement->addService(
+ Operations::createContext(),
+ mSessionCreationExtensionPointPrx,
+ SessionCreationExtensionPointId);
+
// TBD... how to access the Component's service and instance ids.
- mSessionCreationExtensionPointService->addLocatorParams(new ServiceLocatorParams(SessionCreationHookLocatorCategory, "default", ""), "");
+ mSessionCreationExtensionPointService->addLocatorParams(
+ AsteriskSCF::Operations::createContext(),
+ new ServiceLocatorParams(SessionCreationHookLocatorCategory, "default", ""),
+ "");
mPoolQueue = new AsteriskSCF::WorkQueue::WorkQueue();
mPoolListener = new PJSIPSessionModuleThreadPoolListener();
@@ -204,7 +212,7 @@ PJSIPSessionModule::~PJSIPSessionModule()
try
{
mPoolQueue->shutdown();
- mSessionCreationExtensionPointService->unregister();
+ mSessionCreationExtensionPointService->unregister(Operations::createContext());
mAdapter->remove(mSessionCreationExtensionPointPrx->ice_getIdentity());
}
catch (const AsteriskSCF::System::WorkQueue::V1::ShuttingDown&)
diff --git a/src/SIPClientRegistration.cpp b/src/SIPClientRegistration.cpp
index 91cc147..10a86e7 100644
--- a/src/SIPClientRegistration.cpp
+++ b/src/SIPClientRegistration.cpp
@@ -16,6 +16,7 @@
#include "SIPClientRegistration.h"
#include "PJUtil.h"
+#include <AsteriskSCF/Operations/OperationContext.h>
#include <boost/numeric/conversion/cast.hpp>
using namespace AsteriskSCF::System::Logging;
@@ -29,6 +30,7 @@ using namespace AsteriskSCF::SIPSessionManager;
using namespace AsteriskSCF::System::Hook::V1;
using namespace AsteriskSCF::Configuration::SIPSessionManager::V1;
using namespace AsteriskSCF::System::Component::V1;
+using namespace AsteriskSCF::System::V1;
void regCallback(struct pjsip_regc_cbparam *param)
{
@@ -109,12 +111,12 @@ public:
ClientRegistrationReplicaListener(const SIPRegistrationClientPtr& client)
: mClient(client) { }
- void activated(const ReplicaPrx&, const Ice::Current&)
+ void activated(const OperationContextPtr&, const ReplicaPrx&, const Ice::Current&)
{
mClient->sendRegister();
}
- void onStandby(const ReplicaPrx&, const Ice::Current&)
+ void onStandby(const OperationContextPtr&, const ReplicaPrx&, const Ice::Current&)
{
}
void heartbeat(const ReplicaPrx&, bool, const Ice::Current&)
@@ -150,7 +152,7 @@ void SIPRegistrationClient::activate()
{
ReplicaListenerPtr listener(new ClientRegistrationReplicaListener(this));
mReplicaListenerProxy = ReplicaListenerPrx::uncheckedCast(mBackplaneAdapter->addWithUUID(listener));
- mReplica->addListener(mReplicaListenerProxy);
+ mReplica->addListener(Operations::createContext(), mReplicaListenerProxy);
}
void SIPRegistrationClient::createPJSIPRegistration(
@@ -201,7 +203,7 @@ void SIPRegistrationClient::createPJSIPRegistration(
void SIPRegistrationClient::destroy()
{
- mReplica->removeListener(mReplicaListenerProxy);
+ mReplica->removeListener(Operations::createContext(), mReplicaListenerProxy);
mBackplaneAdapter->remove(mReplicaListenerProxy->ice_getIdentity());
destroyPJSIPRegistration();
}
diff --git a/src/SIPConfiguration.cpp b/src/SIPConfiguration.cpp
index 7f133a0..1017b03 100644
--- a/src/SIPConfiguration.cpp
+++ b/src/SIPConfiguration.cpp
@@ -37,13 +37,17 @@
#include "TLSTransport.h"
#include "STUNTransportConfig.h"
#include <vector>
-#include <AsteriskSCF/Helpers/Network.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::System::Configuration::V1;
using namespace AsteriskSCF::Configuration::SIPSessionManager::V1;
using namespace AsteriskSCF::SessionCommunications::PartyIdentification::V1;
using namespace AsteriskSCF::Core::Routing::V1;
+using namespace AsteriskSCF::System::V1;
+using namespace AsteriskSCF::Operations;
using namespace std;
namespace
@@ -716,7 +720,7 @@ public:
}
if (updateSystem)
{
- mRegistry->setEndpointLocatorDestinationIds(mRoutingId, destinations);
+ mRegistry->setEndpointLocatorDestinationIds(Operations::createContext(), mRoutingId, destinations);
}
}
@@ -1700,9 +1704,9 @@ public:
ConfigurationGroupSeq getConfiguration(const ConfigurationGroupSeq&, const Ice::Current&);
ConfigurationGroupSeq getConfigurationAll(const ConfigurationGroupSeq&, const Ice::Current&);
ConfigurationGroupSeq getConfigurationGroups(const Ice::Current&);
- void setConfiguration(const ConfigurationGroupSeq&, const Ice::Current&);
- void removeConfigurationItems(const ConfigurationGroupSeq&, const Ice::Current&);
- void removeConfigurationGroups(const ConfigurationGroupSeq&, const Ice::Current&);
+ void setConfiguration(const OperationContextPtr& operationContext, const ConfigurationGroupSeq&, const Ice::Current&);
+ void removeConfigurationItems(const OperationContextPtr& operationContext, const ConfigurationGroupSeq&, const Ice::Current&);
+ void removeConfigurationGroups(const OperationContextPtr& operationContext, const ConfigurationGroupSeq&, const Ice::Current&);
ConfigurationDataPtr getData()
{
@@ -1718,6 +1722,7 @@ private:
// itself.
//
ConfigurationDataPtr mData;
+ OperationContextCachePtr mOperationContextCache;
};
typedef IceUtil::Handle<ConfigurationServiceImpl> ConfigurationServiceImplPtr;
@@ -1725,7 +1730,8 @@ typedef IceUtil::Handle<ConfigurationServiceImpl> ConfigurationServiceImplPtr;
ConfigurationServiceImpl::ConfigurationServiceImpl(const PJSIPManagerPtr& manager,
const boost::shared_ptr<SIPEndpointFactory>& factory, const std::string& routingId,
const LocatorRegistrySmartPrx& registry) :
- mData(new ConfigurationData(manager, factory, routingId, registry))
+ mData(new ConfigurationData(manager, factory, routingId, registry)),
+ mOperationContextCache(OperationContextCache::create(180))
{
}
@@ -1945,212 +1951,284 @@ static void runPostProcessing()
}
}
-void ConfigurationServiceImpl::setConfiguration(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups,
- const Ice::Current&)
+void ConfigurationServiceImpl::setConfiguration(
+ const OperationContextPtr& operationContext,
+ const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups,
+ const Ice::Current&)
{
+ ContextDataPtr contextData;
+ if (!(contextData = Operations::checkAndThrow(mOperationContextCache, operationContext)))
+ {
+ lg(Debug) << "ConfigurationService::setConfiguration() detected retry for operation " << operationContext->id;
+ return;
+ }
+
lg(Debug) << "Configuration: setting configuration data.";
- class GroupsVisitor : public SIPConfigurationGroupVisitor
+ try
{
- public:
- GroupsVisitor(const ConfigurationServiceImplPtr& impl) :
- mImpl(impl)
- {
- };
+ class GroupsVisitor : public SIPConfigurationGroupVisitor
+ {
+ public:
+ GroupsVisitor(const ConfigurationServiceImplPtr& impl) :
+ mImpl(impl)
+ {
+ };
- private:
+ private:
- void visitSIPGeneralGroup(const SIPGeneralGroupPtr& group)
- {
- genericSet<SIPGeneralGroupPtr>(mImpl->getData(), group);
- }
+ void visitSIPGeneralGroup(const SIPGeneralGroupPtr& group)
+ {
+ genericSet<SIPGeneralGroupPtr>(mImpl->getData(), group);
+ }
- void visitSIPDomainGroup(const SIPDomainGroupPtr& group)
- {
- genericSet<SIPDomainGroupPtr>(mImpl->getData(), group);
- };
+ void visitSIPDomainGroup(const SIPDomainGroupPtr& group)
+ {
+ genericSet<SIPDomainGroupPtr>(mImpl->getData(), group);
+ };
- void visitSIPUDPTransportGroup(const SIPUDPTransportGroupPtr& group)
- {
- genericSet<SIPUDPTransportGroupPtr>(mImpl->getData(), group);
- };
+ void visitSIPUDPTransportGroup(const SIPUDPTransportGroupPtr& group)
+ {
+ genericSet<SIPUDPTransportGroupPtr>(mImpl->getData(), group);
+ };
- void visitSIPTCPTransportGroup(const SIPTCPTransportGroupPtr& group)
- {
- genericSet<SIPTCPTransportGroupPtr>(mImpl->getData(), group);
- };
+ void visitSIPTCPTransportGroup(const SIPTCPTransportGroupPtr& group)
+ {
+ genericSet<SIPTCPTransportGroupPtr>(mImpl->getData(), group);
+ };
- void visitSIPTLSTransportGroup(const SIPTLSTransportGroupPtr& group)
- {
- genericSet<SIPTLSTransportGroupPtr>(mImpl->getData(), group);
- };
+ void visitSIPTLSTransportGroup(const SIPTLSTransportGroupPtr& group)
+ {
+ genericSet<SIPTLSTransportGroupPtr>(mImpl->getData(), group);
+ };
- void visitSIPSTUNTransportGroup(const SIPSTUNTransportGroupPtr& group)
- {
- genericSet<SIPSTUNTransportGroupPtr>(mImpl->getData(), group);
- }
+ void visitSIPSTUNTransportGroup(const SIPSTUNTransportGroupPtr& group)
+ {
+ genericSet<SIPSTUNTransportGroupPtr>(mImpl->getData(), group);
+ }
- void visitSIPEndpointGroup(const SIPEndpointGroupPtr& group)
- {
- genericSet<SIPEndpointGroupPtr>(mImpl->getData(), group);
- };
+ void visitSIPEndpointGroup(const SIPEndpointGroupPtr& group)
+ {
+ genericSet<SIPEndpointGroupPtr>(mImpl->getData(), group);
+ };
- void visitSIPRegistrationGroup(const SIPRegistrationGroupPtr& group)
- {
- genericSet<SIPRegistrationGroupPtr>(mImpl->getData(), group);
- }
- void visitIdentityGroup(const IdentityGroupPtr& group)
- {
- genericSet<IdentityGroupPtr>(mImpl->getData(), group);
- }
+ void visitSIPRegistrationGroup(const SIPRegistrationGroupPtr& group)
+ {
+ genericSet<SIPRegistrationGroupPtr>(mImpl->getData(), group);
+ }
+ void visitIdentityGroup(const IdentityGroupPtr& group)
+ {
+ genericSet<IdentityGroupPtr>(mImpl->getData(), group);
+ }
- ConfigurationServiceImplPtr mImpl;
- };
+ ConfigurationServiceImplPtr mImpl;
+ };
- SIPConfigurationGroupVisitorPtr v = new GroupsVisitor(this);
+ SIPConfigurationGroupVisitorPtr v = new GroupsVisitor(this);
- postProcesses.clear();
- for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
+ postProcesses.clear();
+ for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
+ {
+ (*group)->visit(v);
+ }
+
+ runPostProcessing();
+ contextData->setCompleted();
+ }
+ catch (const Ice::Exception& ex)
{
- (*group)->visit(v);
+ contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(ex)));
+ throw;
+ }
+ catch (const std::exception& e)
+ {
+ contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+ assert(false);
+ throw;
}
-
- runPostProcessing();
}
void ConfigurationServiceImpl::removeConfigurationItems(
- const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups, const Ice::Current&)
+ const OperationContextPtr& operationContext,
+ const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups,
+ const Ice::Current&)
{
- class GroupsVisitor : public SIPConfigurationGroupVisitor
+ ContextDataPtr contextData;
+ if (!(contextData = Operations::checkAndThrow(mOperationContextCache, operationContext)))
{
- public:
- GroupsVisitor(const ConfigurationServiceImplPtr& impl) :
- mImpl(impl)
- {
- };
+ lg(Debug) << "ConfigurationService::removeConfigurationItems() detected retry for operation " << operationContext->id;
+ return;
+ }
+
+ try
+ {
+ class GroupsVisitor : public SIPConfigurationGroupVisitor
+ {
+ public:
+ GroupsVisitor(const ConfigurationServiceImplPtr& impl) :
+ mImpl(impl)
+ {
+ };
- void visitSIPGeneralGroup(const SIPGeneralGroupPtr& group)
- {
- mImpl->getData()->removeFromGroup(group);
- };
+ void visitSIPGeneralGroup(const SIPGeneralGroupPtr& group)
+ {
+ mImpl->getData()->removeFromGroup(group);
+ };
- void visitSIPDomainGroup(const SIPDomainGroupPtr& group)
- {
- mImpl->getData()->removeFromGroup(group);
- };
+ void visitSIPDomainGroup(const SIPDomainGroupPtr& group)
+ {
+ mImpl->getData()->removeFromGroup(group);
+ };
- void visitSIPUDPTransportGroup(const SIPUDPTransportGroupPtr& group)
- {
- mImpl->getData()->removeFromGroup(group);
- };
+ void visitSIPUDPTransportGroup(const SIPUDPTransportGroupPtr& group)
+ {
+ mImpl->getData()->removeFromGroup(group);
+ };
- void visitSIPTCPTransportGroup(const SIPTCPTransportGroupPtr& group)
- {
- mImpl->getData()->removeFromGroup(group);
- };
+ void visitSIPTCPTransportGroup(const SIPTCPTransportGroupPtr& group)
+ {
+ mImpl->getData()->removeFromGroup(group);
+ };
- void visitSIPTLSTransportGroup(const SIPTLSTransportGroupPtr& group)
- {
- mImpl->getData()->removeFromGroup(group);
- };
+ void visitSIPTLSTransportGroup(const SIPTLSTransportGroupPtr& group)
+ {
+ mImpl->getData()->removeFromGroup(group);
+ };
- void visitSIPSTUNTransportGroup(const SIPSTUNTransportGroupPtr& group)
- {
- mImpl->getData()->removeFromGroup(group);
- }
+ void visitSIPSTUNTransportGroup(const SIPSTUNTransportGroupPtr& group)
+ {
+ mImpl->getData()->removeFromGroup(group);
+ }
- void visitSIPEndpointGroup(const SIPEndpointGroupPtr& group)
- {
- mImpl->getData()->removeFromGroup(group);
- };
+ void visitSIPEndpointGroup(const SIPEndpointGroupPtr& group)
+ {
+ mImpl->getData()->removeFromGroup(group);
+ };
- void visitSIPRegistrationGroup(const SIPRegistrationGroupPtr& group)
- {
- mImpl->getData()->removeFromGroup(group);
- }
- void visitIdentityGroup(const IdentityGroupPtr& group)
- {
- mImpl->getData()->removeFromGroup(group);
- }
+ void visitSIPRegistrationGroup(const SIPRegistrationGroupPtr& group)
+ {
+ mImpl->getData()->removeFromGroup(group);
+ }
+ void visitIdentityGroup(const IdentityGroupPtr& group)
+ {
+ mImpl->getData()->removeFromGroup(group);
+ }
- private:
- ConfigurationServiceImplPtr mImpl;
- };
+ private:
+ ConfigurationServiceImplPtr mImpl;
+ };
- SIPConfigurationGroupVisitorPtr v = new GroupsVisitor(this);
+ SIPConfigurationGroupVisitorPtr v = new GroupsVisitor(this);
- postProcesses.clear();
- for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
+ postProcesses.clear();
+ for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
+ {
+ (*group)->visit(v);
+ }
+ runPostProcessing();
+ contextData->setCompleted();
+ }
+ catch (const Ice::Exception& ex)
{
- (*group)->visit(v);
+ contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(ex)));
+ throw;
+ }
+ catch (const std::exception& e)
+ {
+ contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+ assert(false);
+ throw;
}
- runPostProcessing();
}
void ConfigurationServiceImpl::removeConfigurationGroups(
- const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups, const Ice::Current&)
+ const OperationContextPtr& operationContext,
+ const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups,
+ const Ice::Current&)
{
- class Visitor : public SIPConfigurationGroupVisitor
+ ContextDataPtr contextData;
+ if (!(contextData = Operations::checkAndThrow(mOperationContextCache, operationContext)))
{
- public:
- Visitor(const ConfigurationServiceImplPtr& impl) : mImpl(impl) { };
+ lg(Debug) << "ConfigurationService::removeConfigurationGroups() detected retry for operation " << operationContext->id;
+ return;
+ }
+
+ try
+ {
+ class Visitor : public SIPConfigurationGroupVisitor
+ {
+ public:
+ Visitor(const ConfigurationServiceImplPtr& impl) : mImpl(impl) { };
- private:
- void visitSIPGeneralGroup(const SIPGeneralGroupPtr& group)
- {
- mImpl->getData()->remove(group);
- };
+ private:
+ void visitSIPGeneralGroup(const SIPGeneralGroupPtr& group)
+ {
+ mImpl->getData()->remove(group);
+ };
- void visitSIPDomainGroup(const SIPDomainGroupPtr& group)
- {
- mImpl->getData()->remove(group);
- };
+ void visitSIPDomainGroup(const SIPDomainGroupPtr& group)
+ {
+ mImpl->getData()->remove(group);
+ };
- void visitSIPUDPTransportGroup(const SIPUDPTransportGroupPtr& group)
- {
- mImpl->getData()->remove(group);
- };
+ void visitSIPUDPTransportGroup(const SIPUDPTransportGroupPtr& group)
+ {
+ mImpl->getData()->remove(group);
+ };
- void visitSIPTCPTransportGroup(const SIPTCPTransportGroupPtr& group)
- {
- mImpl->getData()->remove(group);
- };
+ void visitSIPTCPTransportGroup(const SIPTCPTransportGroupPtr& group)
+ {
+ mImpl->getData()->remove(group);
+ };
- void visitSIPTLSTransportGroup(const SIPTLSTransportGroupPtr& group)
- {
- mImpl->getData()->remove(group);
- };
+ void visitSIPTLSTransportGroup(const SIPTLSTransportGroupPtr& group)
+ {
+ mImpl->getData()->remove(group);
+ };
- void visitSIPSTUNTransportGroup(const SIPSTUNTransportGroupPtr& group)
- {
- mImpl->getData()->remove(group);
- }
+ void visitSIPSTUNTransportGroup(const SIPSTUNTransportGroupPtr& group)
+ {
+ mImpl->getData()->remove(group);
+ }
- void visitSIPEndpointGroup(const SIPEndpointGroupPtr& group)
- {
- mImpl->getData()->remove(group);
- };
+ void visitSIPEndpointGroup(const SIPEndpointGroupPtr& group)
+ {
+ mImpl->getData()->remove(group);
+ };
- void visitSIPRegistrationGroup(const SIPRegistrationGroupPtr& group)
- {
- mImpl->getData()->remove(group);
- }
- void visitIdentityGroup(const IdentityGroupPtr& group)
- {
- mImpl->getData()->remove(group);
- }
+ void visitSIPRegistrationGroup(const SIPRegistrationGroupPtr& group)
+ {
+ mImpl->getData()->remove(group);
+ }
+ void visitIdentityGroup(const IdentityGroupPtr& group)
+ {
+ mImpl->getData()->remove(group);
+ }
- ConfigurationServiceImplPtr mImpl;
- };
+ ConfigurationServiceImplPtr mImpl;
+ };
- SIPConfigurationGroupVisitorPtr v = new Visitor(this);
+ SIPConfigurationGroupVisitorPtr v = new Visitor(this);
- postProcesses.clear();
- for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
+ postProcesses.clear();
+ for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
+ {
+ (*group)->visit(v);
+ }
+ runPostProcessing();
+ contextData->setCompleted();
+ }
+ catch (const Ice::Exception& ex)
{
- (*group)->visit(v);
+ contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(ex)));
+ throw;
+ }
+ catch (const std::exception& e)
+ {
+ contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+ assert(false);
+ throw;
}
- runPostProcessing();
}
};
};
diff --git a/src/SIPEndpoint.cpp b/src/SIPEndpoint.cpp
index e630e39..a26d5a8 100644
--- a/src/SIPEndpoint.cpp
+++ b/src/SIPEndpoint.cpp
@@ -27,10 +27,14 @@
#include <AsteriskSCF/Collections/HandleSet.h>
#include <AsteriskSCF/Media/MediaIf.h>
#include <AsteriskSCF/Media/SDP/MediaSDPIf.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
#include "NATOptions.h"
using namespace std;
+using namespace AsteriskSCF::System::V1;
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::Media::V1;
using namespace AsteriskSCF::Media::SDP::V1;
@@ -40,6 +44,7 @@ using namespace AsteriskSCF::Replication::SIPSessionManager::V1;
using namespace AsteriskSCF::Discovery;
using namespace AsteriskSCF::Configuration::SIPSessionManager::V1;
using namespace AsteriskSCF::System::Component::V1;
+using namespace AsteriskSCF::Operations;
namespace
{
@@ -211,7 +216,8 @@ public:
mReplica(replica),
mAuthManager(authManager),
mDefaultListeners(new AsteriskSCF::Collections::ProxySet<SessionListenerPrx>(adapter, lg, "Default Session Listeners")),
- mDefaultSessionCookies(new AsteriskSCF::Collections::HandleSet<AsteriskSCF::SessionCommunications::V1::SessionCookiePtr>(lg, "Default Cookies"))
+ mDefaultSessionCookies(new AsteriskSCF::Collections::HandleSet<AsteriskSCF::SessionCommunications::V1::SessionCookiePtr>(lg, "Default Cookies")),
+ mOperationContextCache(OperationContextCache::create(180))
{
};
@@ -271,6 +277,7 @@ public:
std::map<std::string, SIPRegistrationClientPtr> mClientRegistrations;
AsteriskSCF::Collections::HandleSet<AsteriskSCF::SessionCommunications::V1::SessionCookiePtr>::SetPtr mDefaultSessionCookies;
+ AsteriskSCF::Operations::OperationContextCachePtr mOperationContextCache;
};
SIPEndpoint::SIPEndpoint(const Ice::ObjectAdapterPtr& adapter,
@@ -528,61 +535,91 @@ std::string SIPEndpoint::getId(const Ice::Current&)
return mImplPriv->mEndpointProxy->ice_getIdentity().name;
}
+/**
+ * This version of this overloaded operation handles a remote invocation for an active component.
+ */
AsteriskSCF::SessionCommunications::V1::SessionPrx SIPEndpoint::createSession(
- const std::string& destination,
+ const OperationContextPtr& operationContext,
+ const string& destination,
const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
const AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationHookPrx& oneShotHook,
const Ice::Current&)
{
+ std::pair<bool, ContextResultData<AsteriskSCF::SessionCommunications::V1::SessionPrx>::ptr_type> cacheResult =
+ getContextSync<ContextResultData<AsteriskSCF::SessionCommunications::V1::SessionPrx>::ptr_type>(
+ mImplPriv->mOperationContextCache, operationContext);
+ if (cacheResult.first)
+ {
+ lg(Debug) << BOOST_CURRENT_FUNCTION << " detected retry for operation " << operationContext->id;
+ return cacheResult.second->getResult();
+ }
+
std::cout << "Got call over Ice to create a session for endpoint " << mImplPriv->mName << std::endl;
if (mImplPriv->mConfig.sessionConfig.callDirection != BOTH &&
mImplPriv->mConfig.sessionConfig.callDirection != INBOUND)
{
// TODO: We should have an exception here
+ cacheResult.second->setResult(0);
return 0;
}
- // Combine the default listeners and the argument listener.
- vector<SessionListenerPrx> listeners = mImplPriv->mDefaultListeners->getAll();
- if (listener != 0)
+ try
{
- listeners.push_back(listener);
- }
-
- AsteriskSCF::SessionCommunications::V1::SessionCookies defaultCookies = mImplPriv->mDefaultSessionCookies->getAll();
-
- SIPSessionPtr session = SIPSession::create(
- mImplPriv->mAdapter,
- this,
- destination,
- listeners,
- defaultCookies,
- mImplPriv->mManager,
- mImplPriv->mServiceLocator,
- mImplPriv->mReplicationContext,
- oneShotHook,
- mImplPriv->mConfig.sessionConfig.rtpOverIPv6,
- true,
- mImplPriv->mConfig,
- NATEndpointOptions(mImplPriv->mConfig.sessionConfig.rtpOverICE,
- mImplPriv->mConfig.sessionConfig.rtpICEIncludeTURN,
- mImplPriv->mConfig.transportConfig.enableNAT,
- mImplPriv->mConfig.sessionConfig.udptlOverICE,
- mImplPriv->mConfig.sessionConfig.udptlWithTURN));
-
- mImplPriv->mSessions.push_back(session);
+ // Combine the default listeners and the argument listener.
+ vector<SessionListenerPrx> listeners = mImplPriv->mDefaultListeners->getAll();
+ if (listener != 0)
+ {
+ listeners.push_back(listener);
+ }
- std::cout << "And now we're returing a session proxy..." << std::endl;
- return session->getSessionProxy();
+ AsteriskSCF::SessionCommunications::V1::SessionCookies defaultCookies = mImplPriv->mDefaultSessionCookies->getAll();
+
+ SIPSessionPtr session = SIPSession::create(
+ operationContext,
+ mImplPriv->mAdapter,
+ this,
+ destination,
+ listeners,
+ defaultCookies,
+ mImplPriv->mManager,
+ mImplPriv->mServiceLocator,
+ mImplPriv->mReplicationContext,
+ oneShotHook,
+ mImplPriv->mConfig.sessionConfig.rtpOverIPv6,
+ true,
+ mImplPriv->mConfig,
+ NATEndpointOptions(mImplPriv->mConfig.sessionConfig.rtpOverICE,
+ mImplPriv->mConfig.sessionConfig.rtpICEIncludeTURN,
+ mImplPriv->mConfig.transportConfig.enableNAT,
+ mImplPriv->mConfig.sessionConfig.udptlOverICE,
+ mImplPriv->mConfig.sessionConfig.udptlWithTURN));
+
+ mImplPriv->mSessions.push_back(session);
+
+ std::cout << "And now we're returing a session proxy..." << std::endl;
+
+ cacheResult.second->setResult(session->getSessionProxy());
+ return session->getSessionProxy();
+ }
+ catch (const std::exception& e)
+ {
+ cacheResult.second->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+ assert(false);
+ throw;
+ }
}
+/**
+ * This version of this overloaded method is called for sessions originating at this endpoint.
+ */
AsteriskSCF::SIPSessionManager::SIPSessionPtr SIPEndpoint::createSession(const std::string& destination)
{
vector<SessionListenerPrx> defaultListeners = mImplPriv->mDefaultListeners->getAll();
AsteriskSCF::SessionCommunications::V1::SessionCookies defaultCookies = mImplPriv->mDefaultSessionCookies->getAll();
SIPSessionPtr session = SIPSession::create(
+ AsteriskSCF::Operations::createContext(),
mImplPriv->mAdapter,
this,
destination,
@@ -616,13 +653,15 @@ SIPSessionPtr SIPEndpoint::createSession
const Ice::Identity& sessionid,
const Ice::Identity& controllerid,
const Ice::Identity& mediaid,
+ const OperationContextPtr& originalContext,
const AsteriskSCF::Replication::SIPSessionManager::V1::RTPMediaSessionDict& mediasessions,
const AsteriskSCF::Replication::SIPSessionManager::V1::UDPTLMediaSessionSeq& udptlMediaSessions,
const AsteriskSCF::Media::V1::StreamSourceSeq& sources,
const AsteriskSCF::Media::V1::StreamSinkSeq& sinks)
{
SIPSessionPtr session = SIPSession::create
- (mImplPriv->mAdapter,
+ (originalContext,
+ mImplPriv->mAdapter,
this,
destination,
sessionid,
@@ -688,44 +727,95 @@ AsteriskSCF::SessionCommunications::V1::SessionEndpointPrx SIPEndpoint::getEndpo
return mImplPriv->mEndpointProxy;
}
-void SIPEndpoint::addDefaultSessionListener(const SessionListenerPrx& listener, const Ice::Current&)
+void SIPEndpoint::addDefaultSessionListener(
+ const OperationContextPtr& operationContext,
+ const SessionListenerPrx& listener,
+ const Ice::Current&)
{
- mImplPriv->mDefaultListeners->add(listener);
-
- if (mImplPriv->mReplicationContext->isReplicating() == false)
+ ContextDataPtr contextData;
+ if (!(contextData = Operations::checkAndThrow(mImplPriv->mOperationContextCache, operationContext)))
{
+ lg(Debug) << BOOST_CURRENT_FUNCTION << " detected retry for operation " << operationContext->id;
return;
}
- // Replicate this change.
- SIPStateItemSeq items;
- items.push_back(new DefaultSessionListenerItem
- (mImplPriv->replicaKeyName(listener),
- "",
- mImplPriv->mName,
- listener));
+ try
+ {
+ mImplPriv->mDefaultListeners->add(listener);
+
+ if (mImplPriv->mReplicationContext->isReplicating())
+ {
+ // Replicate this change.
+ SIPStateItemSeq items;
+ items.push_back(new DefaultSessionListenerItem
+ (mImplPriv->replicaKeyName(listener),
+ "",
+ mImplPriv->mName,
+ listener));
+
+ mImplPriv->mReplicationContext->getReplicator().tryOneWay()->setState(Operations::createContext(), items);
+ }
+ contextData->setCompleted();
+ }
+ catch (const Ice::Exception& ex)
+ {
+ contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(ex)));
+ throw;
+ }
+ catch (const std::exception& e)
+ {
+ contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+ assert(false);
+ throw;
+ }
- mImplPriv->mReplicationContext->getReplicator().tryOneWay()->setState(items);
}
-void SIPEndpoint::removeDefaultSessionListener(const SessionListenerPrx& listener, const Ice::Current&)
+void SIPEndpoint::removeDefaultSessionListener(
+ const OperationContextPtr& operationContext,
+ const SessionListenerPrx& listener,
+ const Ice::Current&)
{
- mImplPriv->mDefaultListeners->remove(listener);
-
- if (mImplPriv->mReplicationContext->isReplicating() == false)
+ ContextDataPtr contextData;
+ if (!(contextData = Operations::checkAndThrow(mImplPriv->mOperationContextCache, operationContext)))
{
+ lg(Debug) << BOOST_CURRENT_FUNCTION << " detected retry for operation " << operationContext->id;
return;
}
- // Replicate this change.
- SIPStateItemSeq items;
- items.push_back(new DefaultSessionListenerItem
- (mImplPriv->replicaKeyName(listener),
- "",
- mImplPriv->mName,
- listener));
+ try
+ {
+ mImplPriv->mDefaultListeners->remove(listener);
+
+ if (mImplPriv->mReplicationContext->isReplicating() == false)
+ {
+ contextData->setCompleted();
+ return;
+ }
+
+ // Replicate this change.
+ SIPStateItemSeq items;
+ items.push_back(new DefaultSessionListenerItem
+ (mImplPriv->replicaKeyName(listener),
+ "",
+ mImplPriv->mName,
+ listener));
+
+ mImplPriv->mReplicationContext->getReplicator().tryOneWay()->removeStateForItems(Operations::createContext(), items);
+ contextData->setCompleted();
+ }
+ catch (const Ice::Exception& ex)
+ {
+ contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(ex)));
+ throw;
+ }
+ catch (const std::exception& e)
+ {
+ contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+ assert(false);
+ throw;
+ }
- mImplPriv->mReplicationContext->getReplicator().tryOneWay()->removeStateForItems(items);
}
/**
@@ -786,12 +876,37 @@ void SIPEndpoint::addDefaultSessionCookies(const AsteriskSCF::SessionCommunicati
(*i)));
}
- mImplPriv->mReplicationContext->getReplicator().tryOneWay()->setState(items);
+ mImplPriv->mReplicationContext->getReplicator().tryOneWay()->setState(Operations::createContext(), items);
}
-void SIPEndpoint::addDefaultSessionCookies(const AsteriskSCF::SessionCommunications::V1::SessionCookies& cookies, const Ice::Current&)
+void SIPEndpoint::addDefaultSessionCookies(
+ const OperationContextPtr& operationContext,
+ const AsteriskSCF::SessionCommunications::V1::SessionCookies& cookies,
+ const Ice::Current&)
{
- addDefaultSessionCookies(cookies, false);
+ ContextDataPtr contextData;
+ if (!(contextData = Operations::checkAndThrow(mImplPriv->mOperationContextCache, operationContext)))
+ {
+ lg(Debug) << BOOST_CURRENT_FUNCTION << " detected retry for operation " << operationContext->id;
+ return;
+ }
+
+ try
+ {
+ addDefaultSessionCookies(cookies, false);
+ contextData->setCompleted();
+ }
+ catch (const Ice::Exception& ex)
+ {
+ contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(ex)));
+ throw;
+ }
+ catch (const std::exception& e)
+ {
+ contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+ assert(false);
+ throw;
+ }
}
/**
@@ -830,12 +945,37 @@ void SIPEndpoint::removeDefaultSessionCookies(const AsteriskSCF::SessionCommunic
(*i)));
}
- mImplPriv->mReplicationContext->getReplicator().tryOneWay()->removeStateForItems(items);
+ mImplPriv->mReplicationContext->getReplicator().tryOneWay()->removeStateForItems(Operations::createContext(), items);
}
-void SIPEndpoint::removeDefaultSessionCookies(const AsteriskSCF::SessionCommunications::V1::SessionCookies& cookies, const Ice::Current&)
+void SIPEndpoint::removeDefaultSessionCookies(
+ const OperationContextPtr& operationContext,
+ const AsteriskSCF::SessionCommunications::V1::SessionCookies& cookies,
+ const Ice::Current&)
{
- removeDefaultSessionCookies(cookies, false);
+ ContextDataPtr contextData;
+ if (!(contextData = Operations::checkAndThrow(mImplPriv->mOperationContextCache, operationContext)))
+ {
+ lg(Debug) << BOOST_CURRENT_FUNCTION << " detected retry for operation " << operationContext->id;
+ return;
+ }
+
+ try
+ {
+ removeDefaultSessionCookies(cookies, false);
+ contextData->setCompleted();
+ }
+ catch (const Ice::Exception& ex)
+ {
+ contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(ex)));
+ throw;
+ }
+ catch (const std::exception& e)
+ {
+ contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+ assert(false);
+ throw;
+ }
}
void SIPEndpoint::addDefaultSessionCookie(const AsteriskSCF::SessionCommunications::V1::SessionCookiePtr& cookie)
diff --git a/src/SIPEndpoint.h b/src/SIPEndpoint.h
index c5fb88d..8ebc217 100644
--- a/src/SIPEndpoint.h
+++ b/src/SIPEndpoint.h
@@ -316,21 +316,26 @@ public:
*/
std::string getId(const Ice::Current&);
AsteriskSCF::SessionCommunications::V1::SessionPrx createSession(
+ const AsteriskSCF::System::V1::OperationContextPtr&,
const std::string&,
const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx&,
const AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationHookPrx&,
const Ice::Current&);
AsteriskSCF::SessionCommunications::V1::SessionSeq getSessions(const Ice::Current&);
void addDefaultSessionListener(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
const Ice::Current& = Ice::Current());
void removeDefaultSessionListener(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
const Ice::Current& = Ice::Current());
void addDefaultSessionCookies(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const AsteriskSCF::SessionCommunications::V1::SessionCookies& cookies,
const Ice::Current& = Ice::Current());
void removeDefaultSessionCookies(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const AsteriskSCF::SessionCommunications::V1::SessionCookies& cookies,
const Ice::Current& = Ice::Current());
@@ -352,10 +357,16 @@ public:
// dependency insanity
//
AsteriskSCF::SIPSessionManager::SIPSessionPtr createSession(const std::string&);
- AsteriskSCF::SIPSessionManager::SIPSessionPtr createSession(const std::string&, const Ice::Identity&, const Ice::Identity&,
- const Ice::Identity&, const AsteriskSCF::Replication::SIPSessionManager::V1::RTPMediaSessionDict&,
- const AsteriskSCF::Replication::SIPSessionManager::V1::UDPTLMediaSessionSeq&,
- const AsteriskSCF::Media::V1::StreamSourceSeq&, const AsteriskSCF::Media::V1::StreamSinkSeq&);
+ AsteriskSCF::SIPSessionManager::SIPSessionPtr createSession(
+ const std::string&,
+ const Ice::Identity&,
+ const Ice::Identity&,
+ const Ice::Identity&,
+ const AsteriskSCF::System::V1::OperationContextPtr& originalContext,
+ const AsteriskSCF::Replication::SIPSessionManager::V1::RTPMediaSessionDict&,
+ const AsteriskSCF::Replication::SIPSessionManager::V1::UDPTLMediaSessionSeq&,
+ const AsteriskSCF::Media::V1::StreamSourceSeq&,
+ const AsteriskSCF::Media::V1::StreamSinkSeq&);
void removeSession(const AsteriskSCF::SessionCommunications::V1::SessionPtr&);
diff --git a/src/SIPRegistrarListener.cpp b/src/SIPRegistrarListener.cpp
index ded34f2..6fd2084 100644
--- a/src/SIPRegistrarListener.cpp
+++ b/src/SIPRegistrarListener.cpp
@@ -14,10 +14,13 @@
* at the top of the source tree.
*/
#include <AsteriskSCF/Logger.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
#include "SIPRegistrarListener.h"
using namespace AsteriskSCF::System::Logging;
+using namespace AsteriskSCF::System::V1;
+using namespace AsteriskSCF::Operations;
namespace
{
@@ -32,7 +35,7 @@ namespace SIPSessionManager
using namespace AsteriskSCF::SIP::Registration::V1;
SIPDefaultRegistrarListener::SIPDefaultRegistrarListener(const boost::shared_ptr<SIPEndpointFactory>& endpointFactory)
- : mEndpointFactory(endpointFactory)
+ : mOperationContextCache(OperationContextCache::create(120)), mEndpointFactory(endpointFactory)
{
pj_caching_pool_init(&mCachingPool, NULL, 2048);
}
@@ -73,43 +76,83 @@ void SIPDefaultRegistrarListener::updateEndpoints(const SIPEndpointSeq& endpoint
}
}
-void SIPDefaultRegistrarListener::contactsAdded(const BindingUpdateSeq& contacts, const Ice::Current&)
+void SIPDefaultRegistrarListener::contactsAdded(
+ const OperationContextPtr& operationContext,
+ const BindingUpdateSeq& contacts,
+ const Ice::Current&)
{
- pj_pool_t *pool = pj_pool_create(&mCachingPool.factory, "DefaultRegistrarListener", 256, 256, NULL);
- for (BindingUpdateSeq::const_iterator iter = contacts.begin(); iter != contacts.end(); ++iter)
+ ContextDataPtr contextData;
+ if (!(contextData = Operations::checkAndThrow(mOperationContextCache, operationContext)))
{
- SIPEndpointSeq endpoints = getEndpoints(pool, iter->aor);
- for (Ice::StringSeq::const_iterator contactIter = iter->contacts.begin();
- contactIter != iter->contacts.end(); ++contactIter)
+ lg(Debug) << "SIPDefaultRegistrarListener::contactsAdded() detected retry for operation " << operationContext->id;
+ return;
+ }
+
+ try
+ {
+ pj_pool_t *pool = pj_pool_create(&mCachingPool.factory, "DefaultRegistrarListener", 256, 256, NULL);
+ for (BindingUpdateSeq::const_iterator iter = contacts.begin(); iter != contacts.end(); ++iter)
{
- updateEndpoints(endpoints, pool, *contactIter);
+ SIPEndpointSeq endpoints = getEndpoints(pool, iter->aor);
+ for (Ice::StringSeq::const_iterator contactIter = iter->contacts.begin();
+ contactIter != iter->contacts.end(); ++contactIter)
+ {
+ updateEndpoints(endpoints, pool, *contactIter);
+ }
}
+ pj_pool_release(pool);
+ contextData->setCompleted();
+ }
+ catch (const std::exception& e)
+ {
+ contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+ assert(false);
+ throw;
}
- pj_pool_release(pool);
}
-void SIPDefaultRegistrarListener::contactsRemoved(const BindingUpdateSeq& contacts, const Ice::Current&)
+void SIPDefaultRegistrarListener::contactsRemoved(
+ const OperationContextPtr& operationContext,
+ const BindingUpdateSeq& contacts,
+ const Ice::Current&)
{
- pj_pool_t *pool = pj_pool_create(&mCachingPool.factory, "DefaultRegistrarListener", 256, 256, NULL);
- for (BindingUpdateSeq::const_iterator iter = contacts.begin(); iter != contacts.end(); ++iter)
+ ContextDataPtr contextData;
+ if (!(contextData = Operations::checkAndThrow(mOperationContextCache, operationContext)))
{
- if (iter->contacts.empty())
- {
- //If an aor has no contacts being removed, bail now.
- continue;
- }
- SIPEndpointSeq endpoints = getEndpoints(pool, iter->aor);
- for (SIPEndpointSeq::iterator endpointIter = endpoints.begin();
- endpointIter != endpoints.end(); ++endpointIter)
+ lg(Debug) << "SIPDefaultRegistrarListener::contactsRemoved() detected retry for operation " << operationContext->id;
+ return;
+ }
+
+ try
+ {
+ pj_pool_t *pool = pj_pool_create(&mCachingPool.factory, "DefaultRegistrarListener", 256, 256, NULL);
+ for (BindingUpdateSeq::const_iterator iter = contacts.begin(); iter != contacts.end(); ++iter)
{
- lg(Debug) << "Removing contacts from AoR " << iter->aor;
- //Setting the endpoint's target address to be empty is the method we currently use
- //to make the endpoint inaccessible. Once endpoints can support multiple addresses,
- //it'll be a bit less hacky since we can just remove a specific contact from the list.
- (*endpointIter)->setTargetAddress("", 0);
+ if (iter->contacts.empty())
... 3431 lines suppressed ...
--
asterisk-scf/release/sip.git
More information about the asterisk-scf-commits
mailing list