[asterisk-scf-commits] asterisk-scf/release/bridging.git branch "master" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Tue May 8 17:26:56 CDT 2012
branch "master" has been updated
via 55fbdc3b30aed0195d034082502f7798b0351059 (commit)
from 75c04280f9f1c35bf3927dc83350870ddeee2395 (commit)
Summary of changes:
config/test_bridging.conf | 9 +-
.../BridgeService/BridgeReplicatorIf.ice | 37 +-
src/BridgeCreationExtensionPointImpl.cpp | 114 +-
src/BridgeCreationExtensionPointImpl.h | 3 +-
src/BridgeImpl.cpp | 1627 +++++++++++++-------
src/BridgeImpl.h | 34 +-
src/BridgeListenerMgr.cpp | 33 +-
src/BridgeListenerMgr.h | 11 +
src/BridgeManagerImpl.cpp | 890 ++++++++----
src/BridgeManagerImpl.h | 3 +-
src/BridgeManagerListenerMgr.cpp | 23 +-
src/BridgeManagerListenerMgr.h | 5 +
src/BridgePartyIdExtensionPoint.cpp | 271 ++++-
src/BridgePartyIdExtensionPoint.h | 31 +-
src/BridgeReplicatorStateListenerI.cpp | 53 +-
src/BridgeReplicatorStateListenerI.h | 5 +-
src/BridgeServiceConfig.h | 44 +-
src/CMakeLists.txt | 1 +
src/Component.cpp | 5 +-
src/ComponentStateReplicator.cpp | 6 +-
src/MediaMixer.cpp | 19 +-
src/MediaSplicer.cpp | 138 ++-
src/MediaSplicer.h | 9 +-
...atorStateListenerI.h => ReplicatorSmartProxy.h} | 30 +-
src/ServiceUtil.h | 9 +-
src/SessionCollection.cpp | 44 +-
src/SessionCollection.h | 25 +-
src/SessionListener.cpp | 281 ++--
src/SessionListener.h | 50 +-
src/SessionOperations.cpp | 157 ++-
src/SessionOperations.h | 114 ++-
src/SessionWrapper.cpp | 163 ++-
src/SessionWrapper.h | 57 +-
test/BridgeListenerI.cpp | 15 +-
test/BridgeListenerI.h | 14 +-
test/BridgeManagerListenerI.cpp | 12 +-
test/BridgeManagerListenerI.h | 9 +-
test/CMakeLists.txt | 36 +-
test/ReplicationUnitTests.cpp | 191 +++
test/TestBridging.cpp | 182 ++-
test/UnitTests.cpp | 69 +-
41 files changed, 3416 insertions(+), 1413 deletions(-)
copy src/{BridgeReplicatorStateListenerI.h => ReplicatorSmartProxy.h} (55%)
create mode 100644 test/ReplicationUnitTests.cpp
- Log -----------------------------------------------------------------
commit 55fbdc3b30aed0195d034082502f7798b0351059
Author: Ken Hunt <ken.hunt at digium.com>
Date: Tue May 8 09:56:33 2012 -0500
Retry logic changes. Also includes use of base State Replicaotor class.
diff --git a/config/test_bridging.conf b/config/test_bridging.conf
index 2854d81..09079b8 100644
--- a/config/test_bridging.conf
+++ b/config/test_bridging.conf
@@ -20,7 +20,11 @@ IceBox.InheritProperties=1
# It's important to specify a reasonable size for the client thread pool for services
# that use AMI/AMD
#
-Ice.ThreadPool.Client.Size=4
+Ice.ThreadPool.Client.Size=10
+Ice.ThreadPool.Server.Size=10
+
+# For unit test we don't replicate the Service Locator
+ServiceDiscovery.Standalone = true
ServiceDiscovery.IceStorm.TopicManager.Endpoints=default -p 4421
ServiceDiscovery.IceStorm.TopicManager.ThreadPool.Size=4
@@ -41,6 +45,7 @@ LocatorServiceManagement.Proxy=LocatorServiceManagement:tcp -h 127.0.0.1 -p 4422
LoggerAdapter.Endpoints=default
Logger.ServiceAdapter.Endpoints=default
+Logger.Standalone = true
AsteriskSCF.LoggingService.Endpoints=default
AsteriskSCF.LoggingService.ThreadPool.Size=4
AsteriskSCF.LoggingClient.Endpoints=default
@@ -81,12 +86,10 @@ TestBridge.ServiceAdapter.Endpoints=default -h 127.0.0.1 -p 57000
TestBridge.ServiceAdapter.ThreadPool.Size=2
TestBridge.BackplaneAdapter.Endpoints=default -h 127.0.0.1 -p 57001
TestBridge.BackplaneAdapter.ThreadPool.Size=2
-TestBridge.Standby=false
TestBridge.LogLevel=1
TestBridge2.InstanceName=TestBridge2
TestBridge2.BridgeManagerObjectId=TestBridgeManager2
-TestBridge2.Standby=true
TestBridge2.ServiceAdapter.Endpoints=default -h 127.0.0.1 -p 57010
TestBridge2.ServiceAdapter.ThreadPool.Size=4
TestBridge2.BackplaneAdapter.Endpoints=default -h 127.0.0.1 -p 57011
diff --git a/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice b/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice
index 3dbe763..8fc042d 100644
--- a/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice
+++ b/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice
@@ -20,6 +20,7 @@
#include <AsteriskSCF/SessionCommunications/SessionCommunicationsExtensionPointsIf.ice>
#include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.ice>
#include <AsteriskSCF/Media/MediaIf.ice>
+#include <AsteriskSCF/System/OperationsIf.ice>
module AsteriskSCF
{
@@ -75,6 +76,15 @@ enum MediaOperationReplicationPolicy
Reconstruct /* Media operations are reconstructed on the replica */
};
+
+/**
+ * Items to support replication of contexts. The strategy here is to simply
+ * replicate a context along with a coded value to indicate what operation
+ * the context was for. The coded value is a simple integral constant for
+ * the time being. This is a little bit of a heavy-handed and brute force,
+ * but after trying other approaches, I prefer this one.
+ */
+
/**
*
* A bridge establishes connections between media objects associated with sessions.
@@ -158,6 +168,7 @@ sequence<BridgedSession> BridgedSessionSeq;
**/
enum ServiceState
{
+ Starting,
Running,
Paused,
ShuttingDown,
@@ -170,10 +181,13 @@ sequence<AsteriskSCF::SessionCommunications::V1::BridgeManagerListener*> BridgeM
*/
class PartyIdHooks
{
- AsteriskSCF::SessionCommunications::ExtensionPoints::V1::ReceivedConnectedLinePartyIdHookSeq receivedConnectedLineHooks;
- AsteriskSCF::SessionCommunications::ExtensionPoints::V1::ForwardingConnectedLinePartyIdHookSeq forwardingConnectedLineHooks;
+ AsteriskSCF::SessionCommunications::ExtensionPoints::V1::ReceivedConnectedLinePartyIdHookSeq
+ receivedConnectedLineHooks;
+ AsteriskSCF::SessionCommunications::ExtensionPoints::V1::ForwardingConnectedLinePartyIdHookSeq
+ forwardingConnectedLineHooks;
AsteriskSCF::SessionCommunications::ExtensionPoints::V1::ForwardingCallerPartyIdHookSeq forwardingCallerHooks;
- AsteriskSCF::SessionCommunications::ExtensionPoints::V1::ForwardingRedirectionsPartyIdHookSeq forwardingRedirectionsHooks;
+ AsteriskSCF::SessionCommunications::ExtensionPoints::V1::ForwardingRedirectionsPartyIdHookSeq
+ forwardingRedirectionsHooks;
};
/**
@@ -219,6 +233,8 @@ class BridgeStateItem extends ReplicatedStateItem
* its own snapshot of applicable hooks.
*/
PartyIdHooks partyIdHookSet;
+
+ AsteriskSCF::System::V1::OperationContext originatingContext;
};
class BridgeListenerStateItem extends ReplicatedStateItem
@@ -227,6 +243,7 @@ class BridgeListenerStateItem extends ReplicatedStateItem
AsteriskSCF::SessionCommunications::V1::BridgeListener* listener;
};
+
/**
*
* The BridgeReplicatorListener interface must be implemented by components
@@ -236,8 +253,8 @@ class BridgeListenerStateItem extends ReplicatedStateItem
**/
interface ReplicatorListener
{
- void stateRemoved(Ice::StringSeq itemKeys);
- void stateSet(ReplicatedStateItemSeq items);
+ idempotent void stateRemoved(AsteriskSCF::System::V1::OperationContext operationContext, Ice::StringSeq itemKeys);
+ idempotent void stateSet(AsteriskSCF::System::V1::OperationContext operationContext, ReplicatedStateItemSeq items);
};
/**
@@ -248,11 +265,13 @@ interface ReplicatorListener
**/
interface Replicator
{
- void addListener(ReplicatorListener* listener);
- void removeListener(ReplicatorListener* listener);
+ idempotent void addListener(AsteriskSCF::System::V1::OperationContext operationContext,
+ ReplicatorListener* listener);
+ idempotent void removeListener(AsteriskSCF::System::V1::OperationContext operationContext,
+ ReplicatorListener* listener);
- void setState(ReplicatedStateItemSeq items);
- void removeState(Ice::StringSeq items);
+ idempotent void setState(AsteriskSCF::System::V1::OperationContext operationContext, ReplicatedStateItemSeq items);
+ idempotent void removeState(AsteriskSCF::System::V1::OperationContext operationContext, Ice::StringSeq items);
idempotent ReplicatedStateItemSeq getState(Ice::StringSeq itemKeys);
idempotent ReplicatedStateItemSeq getAllState();
diff --git a/src/BridgeCreationExtensionPointImpl.cpp b/src/BridgeCreationExtensionPointImpl.cpp
index 172321f..105d6fa 100755
--- a/src/BridgeCreationExtensionPointImpl.cpp
+++ b/src/BridgeCreationExtensionPointImpl.cpp
@@ -17,12 +17,16 @@
#include "BridgeCreationExtensionPointImpl.h"
#include <boost/thread/shared_mutex.hpp>
#include "BridgeServiceConfig.h"
+#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
using namespace AsteriskSCF::BridgeService;
using namespace AsteriskSCF::SessionCommunications::V1;
using namespace AsteriskSCF::SessionCommunications::ExtensionPoints::V1;
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::System::Hook::V1;
+using namespace AsteriskSCF::Operations;
namespace
{
@@ -30,46 +34,97 @@ namespace
{
public:
BridgeExtPtImpl(const Logger& logger) :
- mLogger(logger)
+ mLogger(logger),
+ mContextCache(AsteriskSCF::Operations::OperationContextCache::create(60)) /* XXX make configurable */
{
}
- void addHook(const BridgeCreationHookPrx& newHook, const Ice::Current& current)
+ void addHook(const AsteriskSCF::System::V1::OperationContextPtr& context, const
+ BridgeCreationHookPrx& newHook, const Ice::Current& current)
{
- UniqueLock lock(mLock);
- BridgeCreationHookSeq::iterator i =
- find_if(mHooks.begin(), mHooks.end(), IdentityComparePredicate<BridgeCreationHookPrx>(newHook));
- if (i != mHooks.end())
- {
- mLogger(Trace) << "refreshing creation hook " << newHook << " on "
- << objectIdFromCurrent(current);
- //
- // Refresh the proxy if it is already in the vector.
- //
- *i = newHook;
- }
- else
+ ContextDataPtr data(checkAndThrow(mContextCache, context));
+ if (data)
{
- mLogger(Trace) << "adding creation hook " << newHook << " on "
- << objectIdFromCurrent(current);
- mHooks.push_back(newHook);
+ try
+ {
+ UniqueLock lock(mLock);
+ BridgeCreationHookSeq::iterator i =
+ find_if(mHooks.begin(), mHooks.end(), IdentityComparePredicate<BridgeCreationHookPrx>(newHook));
+ if (i != mHooks.end())
+ {
+ mLogger(Trace) << "refreshing creation hook " << newHook << " on "
+ << objectIdFromCurrent(current);
+ //
+ // Refresh the proxy if it is already in the vector.
+ //
+ *i = newHook;
+ }
+ else
+ {
+ mLogger(Trace) << "adding creation hook " << newHook << " on "
+ << objectIdFromCurrent(current);
+ mHooks.push_back(newHook);
+ }
+ data->getMonitor()->setCompleted();
+ }
+ catch (const Ice::Exception& ex)
+ {
+ data->setException(ExceptionWrapper::create(ex));
+ throw;
+ }
+ catch (const std::exception& ex)
+ {
+ data->setException(ExceptionWrapper::create(ex));
+ throw;
+ }
+ catch (...)
+ {
+ data->setException(ExceptionWrapper::create("Unknown unexpected exception"));
+ throw;
+ }
}
}
- void removeHook(const BridgeCreationHookPrx& hookToRemove, const Ice::Current& current)
+ void removeHook(const AsteriskSCF::System::V1::OperationContextPtr& context,
+ const BridgeCreationHookPrx& hookToRemove, const Ice::Current& current)
{
- UniqueLock lock(mLock);
- mLogger(Trace) << "removing creation hook " << hookToRemove << " on "
- << objectIdFromCurrent(current);
- mHooks.erase(remove_if(mHooks.begin(), mHooks.end(),
- IdentityComparePredicate<BridgeCreationHookPrx>(hookToRemove)), mHooks.end());
+ ContextDataPtr data(checkAndThrow(mContextCache, context));
+ if (data)
+ {
+ try
+ {
+ UniqueLock lock(mLock);
+ mLogger(Trace) << "removing creation hook " << hookToRemove << " on "
+ << objectIdFromCurrent(current);
+ mHooks.erase(remove_if(mHooks.begin(), mHooks.end(),
+ IdentityComparePredicate<BridgeCreationHookPrx>(hookToRemove)), mHooks.end());
+ }
+ catch (const Ice::Exception& ex)
+ {
+ data->setException(ExceptionWrapper::create(ex));
+ throw;
+ }
+ catch (const std::exception& ex)
+ {
+ data->setException(ExceptionWrapper::create(ex));
+ throw;
+ }
+ catch (...)
+ {
+ data->setException(ExceptionWrapper::create("Unknown unexpected exception"));
+ throw;
+ }
+ }
}
- void clearHooks(const Ice::Current& current)
+ void clearHooks(const AsteriskSCF::System::V1::OperationContextPtr& context, const Ice::Current& current)
{
- UniqueLock lock(mLock);
- mLogger(Trace) << "clearing hooks from " << objectIdFromCurrent(current);
- mHooks.clear();
+ if (mContextCache->addOperationContext(context))
+ {
+ UniqueLock lock(mLock);
+ mLogger(Trace) << "clearing hooks from " << objectIdFromCurrent(current);
+ mHooks.clear();
+ }
}
BridgeCreationHookDataPtr runHooks(const BridgeCreationHookDataPtr& originalData)
@@ -83,7 +138,7 @@ namespace
try
{
BridgeCreationHookDataPtr resultData;
- HookResult result = (*i)->execute(tokenData, resultData);
+ HookResult result = (*i)->execute(AsteriskSCF::Operations::createContext(), tokenData, resultData);
if (result.status == AsteriskSCF::System::Hook::V1::Succeeded)
{
tokenData = resultData;
@@ -110,6 +165,7 @@ namespace
Logger mLogger;
BridgeCreationHookSeq mHooks;
+ AsteriskSCF::Operations::OperationContextCachePtr mContextCache;
void removeHooks(const BridgeCreationHookSeq& hooks)
{
diff --git a/src/BridgeCreationExtensionPointImpl.h b/src/BridgeCreationExtensionPointImpl.h
index 5cc6c6a..b3b2cfc 100755
--- a/src/BridgeCreationExtensionPointImpl.h
+++ b/src/BridgeCreationExtensionPointImpl.h
@@ -38,7 +38,8 @@ namespace BridgeService
/**
* Base class for internal methods.
*/
- class BridgeCreationExtensionPointImpl : public AsteriskSCF::SessionCommunications::ExtensionPoints::V1::BridgeCreationExtensionPoint
+ class BridgeCreationExtensionPointImpl :
+ public AsteriskSCF::SessionCommunications::ExtensionPoints::V1::BridgeCreationExtensionPoint
{
public:
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index 688e61f..ae578b4 100755
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -19,6 +19,7 @@
#include <AsteriskSCF/System/Component/ComponentServiceIf.h>
#include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.h>
#include <Ice/Ice.h>
+#include <Ice/IncomingAsync.h>
#include <boost/thread/locks.hpp>
#include <memory>
#include <algorithm>
@@ -29,6 +30,9 @@
#include "SessionWrapper.h"
#include "SessionOperations.h"
#include "SessionListener.h"
+#include <AsteriskSCF/Operations/OperationContextCache.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::SessionCommunications::V1;
@@ -36,28 +40,92 @@ using namespace AsteriskSCF::SessionCommunications::PartyIdentification::V1;
using namespace AsteriskSCF::SessionCommunications::ExtensionPoints::V1;
using namespace AsteriskSCF::BridgeService;
using namespace AsteriskSCF::Replication::BridgeService::V1;
+using namespace AsteriskSCF::Operations;
using namespace AsteriskSCF;
using namespace std;
/**
*
- * NOTE: Once asynchronous invocations began to be added to the mix, several shortcomings of both the initial
- * implementation of the bridge and its replication became apparent. In the previous implementation, the bridge
- * operations were very "serial" when it came to adding and removing sessions. When converting these operations to
- * AMD/AMI, the serial approach completely falls apart. Especially when replication is thrown into the mix. For example,
- * in a typical AMD/AMI scenario, the setBridge() and media allocation operations would occur as AMI requests. The
- * apparent outcome will be that the participants will "appear" on the call in the order in which the operations
- * complete (i.e. randomly). Indeed it is possible that some will ultimately fail and some sessions might actually be
- * removed from the bridge before they are able to complete. Reconciling the completely asynchronous nature to an
- * implementation is just too awful. As asynchronous support in bridging is the driving factor behind these changes, the
- * refactoring for replication could have occurred in that development branch. However, replication is a separate task
- * and it is desirable to have a *relevant* review of it independently of asynchronous support.
+ * NOTE: Once asynchronous invocations began to be added to the mix, several
+ * shortcomings of both the initial implementation of the bridge and its
+ * replication became apparent. In the previous implementation, the bridge
+ * operations were very "serial" when it came to adding and removing sessions.
+ * When converting these operations to AMD/AMI, the serial approach completely
+ * falls apart. Especially when replication is thrown into the mix. For
+ * example, in a typical AMD/AMI scenario, the setBridge() and media allocation
+ * operations would occur as AMI requests. The apparent outcome will be that
+ * the participants will "appear" on the call in the order in which the
+ * operations complete (i.e. randomly). Indeed it is possible that some will
+ * ultimately fail and some sessions might actually be removed from the bridge
+ * before they are able to complete. Reconciling the completely asynchronous
+ * nature to an implementation is just too awful. As asynchronous support in
+ * bridging is the driving factor behind these changes, the refactoring for
+ * replication could have occurred in that development branch. However,
+ * replication is a separate task and it is desirable to have a *relevant*
+ * review of it independently of asynchronous support.
*
*/
namespace
{
+class SessionsTracker : public IceUtil::Shared
+{
+public:
+ SessionsTracker() :
+ mResponses(0)
+ {
+ }
+
+ void add(const SessionPrx& s)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mSessions.push_back(s);
+ ++mResponses;
+ }
+
+ SessionSeq getSessions()
+ {
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ return mSessions;
+ }
+
+ void addException(const SessionPrx& session, const Ice::Exception& ex)
+ {
+ addExceptionMessage(session, ex.what());
+ }
+
+ void addExceptionMessage(const SessionPrx& session, const string& msg)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ SessionError error;
+ error.failedSession = session;
+ error.message = msg;
+ mExceptions.push_back(error);
+ ++mResponses;
+ }
+
+ SessionErrorSeq getExceptions()
+ {
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ return mExceptions;
+ }
+
+ size_t responseCount()
+ {
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ return mResponses;
+ }
+
+private:
+ boost::shared_mutex mLock;
+ SessionSeq mSessions;
+ SessionErrorSeq mExceptions;
+ size_t mResponses;
+};
+typedef IceUtil::Handle<SessionsTracker> SessionsTrackerPtr;
+
+
/**
*
* BridgeImpl is a reference implmentation of the AsteriskSCF::Bridging::V1::Bridge
@@ -80,25 +148,33 @@ public:
// AsteriskSCF::SessionCommunications::Bridging::Bridge Interface
//
void addSessions_async(const AMD_Bridge_addSessionsPtr& callback,
- const SessionWithSessionInfoSeq& sessionInfos,
- const Ice::Current&);
- void removeSessions_async(const AMD_Bridge_removeSessionsPtr& callback, const SessionSeq& sessions,
- const Ice::Current&);
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const SessionWithSessionInfoSeq& sessionInfos,
+ const Ice::Current&);
+ void removeSessions_async(const AMD_Bridge_removeSessionsPtr& callback,
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const SessionSeq& sessions,
+ const Ice::Current&);
SessionSeq listSessions(const Ice::Current&);
- void shutdown(const Ice::Current& current);
- void destroy(const Ice::Current& current);
+ void shutdown(const AsteriskSCF::System::V1::OperationContextPtr& context, const Ice::Current& current);
+ void destroy(const AsteriskSCF::System::V1::OperationContextPtr& context, const Ice::Current& current);
- void addListener(const BridgeListenerPrx& listener, const Ice::Current& current);
- void removeListener(const BridgeListenerPrx& listener, const Ice::Current& current);
+ void addListener(const AsteriskSCF::System::V1::OperationContextPtr&,
+ const BridgeListenerPrx& listener, const Ice::Current& current);
+ void removeListener(const AsteriskSCF::System::V1::OperationContextPtr&,
+ const BridgeListenerPrx& listener, const Ice::Current& current);
- void replaceSession_async(const AMD_Bridge_replaceSessionPtr& callbac,
- const SessionPrx& sessionToReplace,
- const SessionWithSessionInfoSeq& newSessionInfos,
- const Ice::Current& current);
+ void replaceSession_async(const AMD_Bridge_replaceSessionPtr& callback,
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const SessionPrx& sessionToReplace,
+ const SessionWithSessionInfoSeq& newSessionInfos,
+ const Ice::Current& current);
- void setCookies(const BridgeCookies& cookies, const Ice::Current&);
- void removeCookies(const BridgeCookies& cookies, const Ice::Current&);
+ void setCookies(const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const BridgeCookies& cookies, const Ice::Current&);
+ void removeCookies(const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const BridgeCookies& cookies, const Ice::Current&);
BridgeCookies getCookies(const BridgeCookies& cookies, const Ice::Current&);
//
@@ -112,7 +188,7 @@ public:
bool destroyed();
void destroyImpl();
void shutdownImpl(const Ice::Current& current);
- void activate(const BridgePrx& proxy, const std::string& id);
+ void finishSetup(const BridgePrx& proxy, const std::string& id);
void updateState(const BridgeStateItemPtr& state);
void addListener(const BridgeListenerStateItemPtr& update);
@@ -124,15 +200,21 @@ public:
void forceUpdate();
- void getAddSessionsTasks(QueuedTasks& tasks, const SessionPrx& source, const SessionSeq& sessions, const CallerPtr& callerID, const RedirectionsPtr& redirects);
+ void getAddSessionsTasks(QueuedTasks& tasks,
+ const AsteriskSCF::System::V1::OperationContextPtr& context,
+ const SessionPrx& source, const SessionSeq& sessions,
+ const CallerPtr& callerID, const RedirectionsPtr& redirects);
- PartyIdHooksPtr getPartyIdHooks()
+ PartyIdHooksPtr getPartyIdHooks() const
{
return mState->partyIdHookSet;
}
BridgeCookies getCookies()
{
+ //
+ // TODO: use transform?
+ //
BridgeCookies result;
for (BridgeCookieDict::const_iterator i = mState->cookies.begin(); i != mState->cookies.end(); ++i)
{
@@ -141,8 +223,12 @@ public:
return result;
}
- void updateConnectedLine(const SessionWrapperPtr& sourceSession, const ConnectedLinePtr& connectedLine);
- void updateRedirections(const SessionWrapperPtr& sourceSession, const RedirectionsPtr& redirections);
+ void updateConnectedLine(const SessionWrapperPtr& sourceSession,
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const ConnectedLinePtr& connectedLine);
+ void updateRedirections(const SessionWrapperPtr& sourceSession,
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const RedirectionsPtr& redirections);
private:
@@ -185,6 +271,8 @@ private:
//
Logger mLogger;
+ AsteriskSCF::Operations::OperationContextCachePtr mOperationContextCache;
+
void statePreCheck();
BridgeStateItemPtr createUpdate();
void pushUpdate(const BridgeStateItemPtr& update);
@@ -199,9 +287,9 @@ private:
typedef IceUtil::Handle<BridgeImpl> BridgeImplPtr;
//
-// simply checks for nulls at this point.
+// Simply checks for nulls at this point.
//
-static void checkSessions(const SessionSeq& sessions)
+void checkSessions(const SessionSeq& sessions)
{
Ice::LongSeq invalidIndexes;
Ice::Long index = 0;
@@ -218,63 +306,6 @@ static void checkSessions(const SessionSeq& sessions)
}
}
-class SessionsTracker : public IceUtil::Shared
-{
-public:
- SessionsTracker() :
- mResponses(0)
- {
- }
-
- void add(const SessionPrx& s)
- {
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- mSessions.push_back(s);
- ++mResponses;
- }
-
- SessionSeq getSessions()
- {
- boost::shared_lock<boost::shared_mutex> lock(mLock);
- return mSessions;
- }
-
- void addException(const SessionPrx& session, const Ice::Exception& ex)
- {
- addExceptionMessage(session, ex.what());
- }
-
- void addExceptionMessage(const SessionPrx& session, const string& msg)
- {
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- SessionError error;
- error.failedSession = session;
- error.message = msg;
- mExceptions.push_back(error);
- ++mResponses;
- }
-
- SessionErrorSeq getExceptions()
- {
- boost::shared_lock<boost::shared_mutex> lock(mLock);
- return mExceptions;
- }
-
- size_t responseCount()
- {
- boost::shared_lock<boost::shared_mutex> lock(mLock);
- return mResponses;
- }
-
-private:
- boost::shared_mutex mLock;
- SessionSeq mSessions;
- SessionErrorSeq mExceptions;
- size_t mResponses;
-};
-typedef IceUtil::Handle<SessionsTracker> SessionsTrackerPtr;
-
-
/**
* Forwards the redirection records for the specified session
* to every other session in the bridge. Applies the
@@ -289,11 +320,14 @@ typedef IceUtil::Handle<SessionsTracker> SessionsTrackerPtr;
class ForwardRedirectionsUpdatedTask : public QueuedTask
{
public:
- ForwardRedirectionsUpdatedTask(const BridgeImplPtr& bridge,
- const SessionPrx& sourceSession,
- const RedirectionsPtr& redirections,
- const Logger& logger) :
+ ForwardRedirectionsUpdatedTask(
+ const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+ const BridgeImplPtr& bridge,
+ const SessionPrx& sourceSession,
+ const RedirectionsPtr& redirections,
+ const Logger& logger) :
QueuedTask("ForwardRedirectionsUpdatedTask"),
+ mRootContext(rootContext),
mBridge(bridge),
mSourceSession(sourceSession),
mRedirections(redirections),
@@ -312,17 +346,25 @@ protected:
}
try
{
-
+ //
// Forward the ConnectedLine to each bridged session.
+ //
SessionSeq sessions = mBridge->getSessions()->getSessionSeq();
for(SessionSeq::iterator i = sessions.begin();
i != sessions.end(); ++i)
{
+ //
+ // TODO: This looks like a SessionOperation.
+ //
if (!(*i) || (*i)->ice_getIdentity() == mSourceSession->ice_getIdentity())
{
continue;
}
+ //
+ // TODO: Why doesn't this loop simply use the sequence of SessionWrappers you
+ // can retrieve from the SessionCollection?
+ //
SessionWrapperPtr destSessionWrapper = mBridge->getSessions()->getSession(*i);
if (destSessionWrapper)
{
@@ -359,8 +401,12 @@ protected:
{
try
{
- // Apply this hook.
- AsteriskSCF::System::Hook::V1::HookResult hookResult = (*i)->modifyForwardingRedirections(mSourceSession,
+ // Apply this hook.
+ //
+ // TODO: Are hooks stateful? Do I have to care about consistent contexts?
+ //
+ AsteriskSCF::System::Hook::V1::HookResult hookResult = (*i)->modifyForwardingRedirections(
+ AsteriskSCF::Operations::createContext(mRootContext), mSourceSession,
destinationSession->getSession(),
currentRedirections, destSpecificRedirections);
@@ -379,12 +425,16 @@ protected:
SessionControllerPrx sessionController = destinationSession->getSessionController();
if (sessionController)
{
+
// Forward the info via the SessionController for this session.
- sessionController->updateRedirections(currentRedirections);
+ sessionController->updateRedirections(
+ calculateOperationContext(mRootContext, identityToString(sessionController->ice_getIdentity())),
+ currentRedirections);
}
}
private:
+ AsteriskSCF::System::V1::OperationContextPtr mRootContext;
BridgeImplPtr mBridge;
SessionPrx mSourceSession;
RedirectionsPtr mRedirections;
@@ -406,10 +456,13 @@ private:
class ForwardConnectedLineTask : public QueuedTask
{
public:
- ForwardConnectedLineTask(const BridgeImplPtr& bridge,
- const SessionPrx& sourceSession,
- const Logger& logger) :
+ ForwardConnectedLineTask(
+ const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+ const BridgeImplPtr& bridge,
+ const SessionPrx& sourceSession,
+ const Logger& logger) :
QueuedTask("ForwardConnectedLineTask"),
+ mRootContext(rootContext),
mBridge(bridge),
mSourceSession(sourceSession),
mLogger(logger)
@@ -437,7 +490,10 @@ protected:
return true;
}
+ //
// Forward the ConnectedLine to each bridged session.
+ // TODO: Implement in terms of an operation to be used with the visitor.
+ //
SessionSeq sessions = mBridge->getSessions()->getSessionSeq();
for(SessionSeq::iterator i = sessions.begin();
i != sessions.end(); ++i)
@@ -485,7 +541,9 @@ protected:
try
{
// Apply a hook
- AsteriskSCF::System::Hook::V1::HookResult hookResult = (*i)->modifyForwardingConnectedLine(mSourceSession,
+ AsteriskSCF::System::Hook::V1::HookResult hookResult = (*i)->modifyForwardingConnectedLine(
+ AsteriskSCF::Operations::createContext(mRootContext),
+ mSourceSession,
destinationSession->getSession(),
currentConnectedLine, destSpecificConnectedLine);
@@ -505,11 +563,13 @@ protected:
if (sessionController)
{
// Forward the info via the SessionController for this session.
- sessionController->updateConnectedLine(currentConnectedLine);
+ sessionController->updateConnectedLine(calculateOperationContext(mRootContext, identityToString(sessionController->ice_getIdentity())),
+ currentConnectedLine);
}
}
private:
+ AsteriskSCF::System::V1::OperationContextPtr mRootContext;
BridgeImplPtr mBridge;
SessionPrx mSourceSession;
Logger mLogger;
@@ -518,11 +578,14 @@ private:
class ForwardCallerIDTask : public QueuedTask
{
public:
- ForwardCallerIDTask(const BridgeImplPtr& bridge,
- const SessionPrx& source,
- const CallerPtr& callerID,
- const Logger& logger) :
+ ForwardCallerIDTask(
+ const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+ const BridgeImplPtr& bridge,
+ const SessionPrx& source,
+ const CallerPtr& callerID,
+ const Logger& logger) :
QueuedTask("ForwardConnectedLineTask"),
+ mRootContext(rootContext),
mBridge(bridge),
mSource(source),
mCallerID(callerID),
@@ -591,7 +654,9 @@ protected:
try
{
// Apply a hook
- AsteriskSCF::System::Hook::V1::HookResult hookResult = (*i)->modifyForwardingCaller(mSource,
+ AsteriskSCF::System::Hook::V1::HookResult hookResult = (*i)->modifyForwardingCaller(
+ AsteriskSCF::Operations::createContext(mRootContext),
+ mSource,
destinationSession->getSession(),
currentCallerID, destSpecificCallerID);
@@ -612,11 +677,14 @@ protected:
SessionControllerPrx sessionController = destinationSession->getSessionController();
if (sessionController)
{
- sessionController->updateCallerID(currentCallerID);
+ sessionController->updateCallerID(
+ calculateOperationContext(mRootContext, identityToString(sessionController->ice_getIdentity())),
+ currentCallerID);
}
}
private:
+ AsteriskSCF::System::V1::OperationContextPtr mRootContext;
BridgeImplPtr mBridge;
SessionPrx mSource;
CallerPtr mCallerID;
@@ -630,11 +698,14 @@ private:
class UpdateConnectedLineTask : public QueuedTask
{
public:
- UpdateConnectedLineTask(const BridgeImplPtr& bridge,
- const SessionPrx& sourceSession,
- const ConnectedLinePtr& connectedLine,
- const Logger& logger) :
+ UpdateConnectedLineTask(
+ const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+ const BridgeImplPtr& bridge,
+ const SessionPrx& sourceSession,
+ const ConnectedLinePtr& connectedLine,
+ const Logger& logger) :
QueuedTask("UpdateConnectedLineTask"),
+ mRootContext(rootContext),
mBridge(bridge),
mSourceSession(sourceSession),
mConnectedLine(connectedLine),
@@ -658,20 +729,39 @@ protected:
if (!wrapper)
{
+ //
+ // TODO: Returning true early allows subsequent tasks to
+ // run. Might be a dubious practice!
+ //
mLogger(Debug) << "Unable to find matching session for, returning early with true.";
+ return true;
}
PartyIdHooksPtr partyIdHooks = mBridge->getPartyIdHooks();
if (partyIdHooks)
{
// Allow the ReceivedConnectedLinePartyId hooks to alter the ConnectedLine record.
- for(vector<ReceivedConnectedLinePartyIdHookPrx>::const_iterator i = partyIdHooks->receivedConnectedLineHooks.begin();
+ for(vector<ReceivedConnectedLinePartyIdHookPrx>::const_iterator i =
+ partyIdHooks->receivedConnectedLineHooks.begin();
i != partyIdHooks->receivedConnectedLineHooks.end(); ++i)
{
try
{
- // Apply this hook.
- AsteriskSCF::System::Hook::V1::HookResult hookResult = (*i)->modifyReceivedConnectedLine(mSourceSession,
+ //
+ // It is quite possible that this can never
+ // happen, but it is prudent to check all the
+ // same.
+ //
+ if ((*i) == 0)
+ {
+ continue;
+ }
+
+ //
+ // Apply this hook.
+ //
+ AsteriskSCF::System::Hook::V1::HookResult hookResult = (*i)->modifyReceivedConnectedLine(
+ AsteriskSCF::Operations::createContext(mRootContext), mSourceSession,
currentConnectedLine, updatedConnectedLine);
if (hookResult.status == AsteriskSCF::System::Hook::V1::Succeeded)
@@ -679,6 +769,11 @@ protected:
currentConnectedLine = updatedConnectedLine;
}
}
+ //
+ // catching and ignoring an Ice exception here might be
+ // true to the hook calls, but other exceptions really
+ // should be considered bugs.
+ //
catch (const std::exception& e)
{
mLogger(Debug) << FUNLOG << " : " << e.what();
@@ -686,13 +781,18 @@ protected:
}
}
- // Cache this value.
+ //
+ // Cache this value. TODO: can currentConnectedLine be 0 and
+ // if so, does it make sense to set it. The way
+ // setConnectedLine is written, it won't be set again.
+ //
wrapper->setConnectedLine(currentConnectedLine);
return true;
}
private:
+ AsteriskSCF::System::V1::OperationContextPtr mRootContext;
BridgeImplPtr mBridge;
SessionPrx mSourceSession;
ConnectedLinePtr mConnectedLine;
@@ -702,9 +802,12 @@ private:
class RemoveSessionsNotify : public QueuedTask
{
public:
- RemoveSessionsNotify(const BridgeListenerMgrPtr& bridgeListeners,
- const SessionsTrackerPtr& tracker, const BridgeCookies& cookies) :
+ RemoveSessionsNotify(
+ const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+ const BridgeListenerMgrPtr& bridgeListeners,
+ const SessionsTrackerPtr& tracker, const BridgeCookies& cookies) :
QueuedTask("RemoveSessionsNotify"),
+ mRootContext(rootContext),
mBridgeListeners(bridgeListeners),
mTracker(tracker),
mCookies(cookies)
@@ -717,12 +820,13 @@ protected:
SessionSeq sessions = mTracker->getSessions();
if (!sessions.empty())
{
- mBridgeListeners->sessionsRemoved(sessions, mCookies);
+ mBridgeListeners->sessionsRemoved(mRootContext, sessions, mCookies);
}
return true;
}
private:
+ AsteriskSCF::System::V1::OperationContextPtr mRootContext;
BridgeListenerMgrPtr mBridgeListeners;
SessionsTrackerPtr mTracker;
BridgeCookies mCookies;
@@ -731,9 +835,11 @@ private:
class SetBridgeTask : public QueuedTask
{
public:
- SetBridgeTask(const SessionCollectionPtr& sessionCollection, const BridgePrx& bridge,
+ SetBridgeTask(const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+ const SessionCollectionPtr& sessionCollection, const BridgePrx& bridge,
const SessionListenerPrx& listener, const SessionSeq& sessions, const SessionsTrackerPtr& tracker):
QueuedTask("SetBridgeTask"),
+ mRootContext(rootContext),
mSessionManager(sessionCollection),
mBridge(bridge),
mSessionListener(listener),
@@ -758,9 +864,9 @@ protected:
continue;
}
tasksDispatched = true;
- (*i)->begin_setBridge(mBridge, mSessionListener,
- newCallback_Session_setBridge(this, &SetBridgeTask::set,
- &SetBridgeTask::failed), session);
+ (*i)->begin_setBridge(calculateOperationContext(mRootContext, identityToString(mBridge->ice_getIdentity())), mBridge,
+ mSessionListener, newCallback_Session_setBridge(this, &SetBridgeTask::set,
+ &SetBridgeTask::failed), session);
}
return !tasksDispatched;
}
@@ -773,7 +879,7 @@ protected:
//
// setupMedia is an AMI backed implementation, so should not block here.
//
- session->setupMedia();
+ session->setupMedia(mRootContext);
}
if (mTracker->responseCount() == mSessions.size())
{
@@ -797,7 +903,7 @@ protected:
// We want to make sure that session is laying around. The session collection will
// take care of cleaning it up as long as it is marked as destroyed.
//
- session->destroy();
+ session->destroy(mRootContext);
}
catch(...)
{
@@ -817,6 +923,7 @@ protected:
}
private:
+ AsteriskSCF::System::V1::OperationContextPtr mRootContext;
SessionCollectionPtr mSessionManager;
BridgePrx mBridge;
SessionListenerPrx mSessionListener;
@@ -824,41 +931,123 @@ private:
SessionsTrackerPtr mTracker;
};
+typedef ContextResultData<AsteriskSCF::Media::V1::StreamInformationDict> StreamInformationContextData;
+typedef AMDContextResultData<
+ AsteriskSCF::Media::V1::StreamInformationDict,
+ AsteriskSCF::SessionCommunications::V1::AMD_SessionController_addStreamsPtr> AddStreamsContextData;
+
+typedef AMDContextData<
+ AsteriskSCF::SessionCommunications::V1::AMD_SessionController_removeStreamsPtr> RemoveStreamsContextData;
+
class BridgeSessionController : public SessionController
{
public:
- BridgeSessionController(const BridgeImplPtr& bridge,
- const SessionWrapperPtr& self,
- const Logger& logger) : mBridge(bridge), mSelf(self), mLogger(logger) { }
+ BridgeSessionController(const BridgeImplPtr& bridge, const SessionWrapperPtr& self,
+ const Logger& logger) :
+ mBridge(bridge),
+ mSelf(self),
+ mLogger(logger),
+ mOperationCache(AsteriskSCF::Operations::OperationContextCache::create(60))
+ {
+ }
- void changeStreamStates_async(const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_changeStreamStatesPtr& cb,
- const AsteriskSCF::Media::V1::StreamStateDict&, const Ice::Current&)
+ void changeStreamStates_async(
+ const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_changeStreamStatesPtr& cb,
+ const AsteriskSCF::System::V1::OperationContextPtr&,
+ const AsteriskSCF::Media::V1::StreamStateDict&, const Ice::Current&)
{
// We do not care about stream state changes at this point in time
cb->ice_response();
}
void addStreams_async(const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_addStreamsPtr& cb,
- const AsteriskSCF::Media::V1::StreamInformationDict& streams, const Ice::Current&)
+ const AsteriskSCF::System::V1::OperationContextPtr& context,
+ const AsteriskSCF::Media::V1::StreamInformationDict& streams, const Ice::Current&)
{
- AddStreamsOperationPtr op = new AddStreamsOperation(cb, mSelf, streams, mLogger);
- mBridge->getSessions()->visitSessions(*op);
+ try
+ {
+ AddStreamsContextData::ptr_type data(getContext<AddStreamsContextData>(mOperationCache,
+ context, cb));
+ if (data)
+ {
+ try
+ {
+ AddStreamsOperationPtr op = new AddStreamsOperation(context, data->getProxy(), mSelf, streams, mLogger);
+ mBridge->getSessions()->visitSessions(*op);
+ }
+ catch (const std::exception& ex)
+ {
+ data->getProxy()->ice_exception(ex);
+ }
+ catch (...)
+ {
+ assert("If we got here, really bad things have happened!" == 0);
+ data->getProxy()->ice_exception();
+ }
+ }
+ }
+ catch (const std::exception& ex)
+ {
+ mLogger(Error) << ex.what() << " exception occurred when trying to get operation context data.";
+ cb->ice_exception(ex);
+ }
+ catch (...)
+ {
+ assert("If we got here, really bad things have happened!" == 0);
+ mLogger(Error) << "Unknown exception occurred when trying to get operation context data.";
+ cb->ice_exception();
+ }
}
void removeStreams_async(const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_removeStreamsPtr& cb,
- const AsteriskSCF::Media::V1::StreamInformationDict& streams, const Ice::Current&)
+ const AsteriskSCF::System::V1::OperationContextPtr& context,
+ const AsteriskSCF::Media::V1::StreamInformationDict& streams, const Ice::Current&)
{
- RemoveStreamsOperation op(mSelf, streams);
- mBridge->getSessions()->visitSessions(op);
- cb->ice_response();
+ try
+ {
+ RemoveStreamsContextData::ptr_type data(
+ getContext<RemoveStreamsContextData>(mOperationCache, context, cb));
+ if (data)
+ {
+ RemoveStreamsOperation op(context, mSelf, streams);
+ mBridge->getSessions()->visitSessions(op);
+ data->getProxy()->ice_response();
+ }
+ }
+ catch (const std::exception& ex)
+ {
+ mLogger(Error) << ex.what() << " exception occurred when trying to get operation context data.";
+ cb->ice_exception(ex);
+ }
+ catch (...)
+ {
+ assert("If we got here, really bad things have happened!" == 0);
+ mLogger(Error) << "Unknown exception occurred when trying to get operation context data.";
+ cb->ice_exception();
+ }
}
- void updateConnectedLine(const ConnectedLinePtr& connectedLine, const Ice::Current&)
+ void updateConnectedLine(const AsteriskSCF::System::V1::OperationContextPtr& context,
+ const ConnectedLinePtr& connectedLine, const Ice::Current&)
{
- mBridge->updateConnectedLine(mSelf, connectedLine);
+ ContextDataPtr data(checkAndThrow(mOperationCache, context));
+ if (data)
+ {
+ try
+ {
+ mBridge->updateConnectedLine(mSelf, context, connectedLine);
+ data->getMonitor()->setCompleted();
+ }
+ catch (...)
+ {
+ assert("If we got here, really bad things have happened!" == 0);
+ data->setException(ExceptionWrapper::create("Unexpected unknown exception"));
+ throw;
+ }
+ }
}
- void updateCallerID(const CallerPtr&, const Ice::Current&)
+ void updateCallerID(const AsteriskSCF::System::V1::OperationContextPtr&, const CallerPtr&, const Ice::Current&)
{
//This shouldn't really ever be called because the caller information doesn't
//get updated except for once, and that's at bridge creation time. If information
@@ -867,26 +1056,54 @@ public:
return;
}
- void updateRedirections(const RedirectionsPtr& redirections, const ::Ice::Current&)
+ void updateRedirections(const AsteriskSCF::System::V1::OperationContextPtr& context,
+ const RedirectionsPtr& redirections, const ::Ice::Current&)
{
- mBridge->updateRedirections(mSelf, redirections);
+ ContextDataPtr data(checkAndThrow(mOperationCache, context));
+ if (data)
+ {
+ try
+ {
+ mBridge->updateRedirections(mSelf, context, redirections);
+ data->getMonitor()->setCompleted();
+ }
+ catch (const Ice::ObjectNotExistException&)
+ {
+ assert("We should not be ignoring or passing along ONEs" == 0);
+ }
+ catch (const std::exception& ex)
+ {
+ data->setException(ExceptionWrapper::create(ex));
+ throw;
+ }
+ catch (...)
+ {
+ assert("If we got here, really bad things have happened!" == 0);
+ data->setException(ExceptionWrapper::create("Unexpected unknown exception"));
+ throw;
+ }
+ }
}
private:
BridgeImplPtr mBridge;
SessionWrapperPtr mSelf;
Logger mLogger;
+ AsteriskSCF::Operations::OperationContextCachePtr mOperationCache;
};
class SetAndGetSessionControllerTask : public QueuedTask
{
public:
- SetAndGetSessionControllerTask(const Ice::ObjectAdapterPtr& adapter,
- const SessionCollectionPtr& sessionCollection,
- const SessionSeq& sessions,
- const BridgeImplPtr& bridge,
- const Logger& logger):
+ SetAndGetSessionControllerTask(
+ const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+ const Ice::ObjectAdapterPtr& adapter,
+ const SessionCollectionPtr& sessionCollection,
+ const SessionSeq& sessions,
+ const BridgeImplPtr& bridge,
+ const Logger& logger):
QueuedTask("SetAndGetSessionControllerTask"),
+ mRootContext(rootContext),
mAdapter(adapter),
mSessionManager(sessionCollection),
mSessions(sessions),
@@ -913,12 +1130,16 @@ protected:
}
tasksDispatched = true;
SessionControllerPtr controller = new BridgeSessionController(mBridge, session, mLogger);
- std::string identity = (*i)->ice_getIdentity().name;
- identity += ".bridgecontroller";
- SessionControllerPrx controllerPrx =
- SessionControllerPrx::uncheckedCast(mAdapter->add(controller, mAdapter->getCommunicator()->stringToIdentity(identity)));
- (*i)->begin_setAndGetSessionController(controllerPrx, newCallback_Session_setAndGetSessionController(this,
- &SetAndGetSessionControllerTask::get, &SetAndGetSessionControllerTask::failed), session);
+ std::string identity = (*i)->ice_getIdentity().name;
+ identity += ".bridgecontroller";
+ SessionControllerPrx controllerPrx =
+ SessionControllerPrx::uncheckedCast(mAdapter->add(controller, mAdapter->getCommunicator()->stringToIdentity(identity)));
+
+ (*i)->begin_setAndGetSessionController(
+ calculateOperationContext(mRootContext, identityToString((*i)->ice_getIdentity())),
+ controllerPrx,
+ newCallback_Session_setAndGetSessionController(this, &SetAndGetSessionControllerTask::get,
+ &SetAndGetSessionControllerTask::failed), session);
}
return !tasksDispatched;
}
@@ -949,7 +1170,7 @@ protected:
// We want to make sure that session is laying around. The session collection will
// take care of cleaning it up as long as it is marked as destroyed.
//
- session->destroy();
+ session->destroy(mRootContext);
}
catch(...)
{
@@ -969,6 +1190,7 @@ protected:
}
private:
+ AsteriskSCF::System::V1::OperationContextPtr mRootContext;
Ice::ObjectAdapterPtr mAdapter;
SessionCollectionPtr mSessionManager;
SessionSeq mSessions;
@@ -980,10 +1202,13 @@ private:
class RemoveSessionControllerTask : public QueuedTask
{
public:
- RemoveSessionControllerTask(const Ice::ObjectAdapterPtr& adapter,
+ RemoveSessionControllerTask(
+ const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+ const Ice::ObjectAdapterPtr& adapter,
const SessionsTrackerPtr& sessions,
const Logger& logger):
QueuedTask("RemoveSessionControllerTask"),
+ mRootContext(rootContext),
mAdapter(adapter),
mSessions(sessions),
mLogger(logger)
@@ -1011,13 +1236,14 @@ protected:
// Remove the session controller from the session itself
SessionControllerPrx controller = SessionControllerPrx::uncheckedCast(mAdapter->createProxy(
- mAdapter->getCommunicator()->stringToIdentity(identity)));
- (*i)->removeSessionController(controller);
+ mAdapter->getCommunicator()->stringToIdentity(identity)));
+ (*i)->removeSessionController(calculateOperationContext(mRootContext, identity), controller);
}
return true;
}
private:
+ AsteriskSCF::System::V1::OperationContextPtr mRootContext;
Ice::ObjectAdapterPtr mAdapter;
SessionsTrackerPtr mSessions;
Logger mLogger;
@@ -1026,51 +1252,60 @@ private:
class UnplugMedia : public QueuedTask
{
public:
- UnplugMedia(const BridgeImplPtr& bridge) :
+ UnplugMedia(const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+ const BridgeImplPtr& bridge) :
QueuedTask("UnplugMedia"),
+ mRootContext(rootContext),
mBridge(bridge)
- {
- }
+ {
+ }
protected:
bool executeImpl()
{
- UnplugMediaOperation op;
+ UnplugMediaOperation op(mRootContext);
mBridge->getSessions()->visitSessions(op);
return true;
}
private:
+ AsteriskSCF::System::V1::OperationContextPtr mRootContext;
BridgeImplPtr mBridge;
};
class SetupMedia : public QueuedTask
{
public:
- SetupMedia(const BridgeImplPtr& bridge) :
+ SetupMedia(const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+ const BridgeImplPtr& bridge) :
QueuedTask("SetupMedia"),
+ mRootContext(rootContext),
mBridge(bridge)
- {
- }
-
+ {
+ }
+
protected:
bool executeImpl()
{
- SetupMediaOperation op;
+ SetupMediaOperation op(mRootContext);
mBridge->getSessions()->visitSessions(op);
return true;
}
private:
+ AsteriskSCF::System::V1::OperationContextPtr mRootContext;
BridgeImplPtr mBridge;
};
class AddToListeners : public QueuedTask
{
public:
- AddToListeners(const BridgeListenerMgrPtr& listeners, const SessionsTrackerPtr& tracker,
+ AddToListeners(
+ const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+ const BridgeListenerMgrPtr& listeners, const SessionsTrackerPtr& tracker,
const BridgeCookies& cookies) :
QueuedTask("AddToListeners"),
+ mRootContext(rootContext),
mListeners(listeners),
mTracker(tracker),
mCookies(cookies)
@@ -1080,11 +1315,12 @@ public:
protected:
bool executeImpl()
{
- mListeners->sessionsAdded(mTracker->getSessions(), mCookies);
+ mListeners->sessionsAdded(mRootContext, mTracker->getSessions(), mCookies);
return true;
}
private:
+ AsteriskSCF::System::V1::OperationContextPtr mRootContext;
BridgeListenerMgrPtr mListeners;
SessionsTrackerPtr mTracker;
BridgeCookies mCookies;
@@ -1105,8 +1341,21 @@ protected:
{
if (mBridge->getSessions()->size() < 2 && mPrx)
{
- mPrx->begin_shutdown(newCallback_Bridge_shutdown(this, &CheckShutdown::done,
- &CheckShutdown::failed));
+ try
+ {
+ mPrx->begin_shutdown(AsteriskSCF::Operations::createContext(),
+ newCallback_Bridge_shutdown(this, &CheckShutdown::done,
+ &CheckShutdown::failed));
+ }
+ catch (const Ice::Exception&)
+ {
+ //
+ // This is okay.
+ //
+ }
+ //
+ // Everything else is not.
+ //
}
//
// We don't care about the result really. The CheckShutdown instance will hang
@@ -1135,47 +1384,14 @@ private:
BridgePrx mPrx;
};
-template <class T>
-class GenericAMDCallback : public QueuedTask
-{
-public:
- GenericAMDCallback(const T& cb, const SessionsTrackerPtr& tracker) :
- QueuedTask("GenericAMDCallback"),
- mCallback(cb),
- mTracker(tracker)
- {
- }
-protected:
-
- bool executeImpl()
- {
- mCallback->ice_response();
- return true;
- }
-
- void failImpl()
- {
- SessionErrorSeq errors(mTracker->getExceptions());
- if (!errors.empty())
- {
- mCallback->ice_exception(BridgeSessionOperationFailed(errors));
- }
- else
- {
- mCallback->ice_exception();
- }
- }
-
-private:
- T mCallback;
- SessionsTrackerPtr mTracker;
-};
class UpdateTask : public QueuedTask
{
public:
- UpdateTask(const BridgeImplPtr& bridge) :
+ UpdateTask(const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+ const BridgeImplPtr& bridge) :
QueuedTask("UpdateTask"),
+ mRootContext(rootContext),
mBridge(bridge)
{
}
@@ -1187,16 +1403,20 @@ protected:
return true;
}
private:
+ AsteriskSCF::System::V1::OperationContextPtr mRootContext;
BridgeImplPtr mBridge;
};
class ConnectTelephonyEventsTask: public QueuedTask
{
public:
- ConnectTelephonyEventsTask(const BridgeImplPtr& bridge,
- const SessionSeq& newSessions,
- const Logger& logger)
- : QueuedTask("ConnectTelephonyEventsTask"),
+ ConnectTelephonyEventsTask(
+ const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+ const BridgeImplPtr& bridge,
+ const SessionSeq& newSessions,
+ const Logger& logger)
+ : QueuedTask("ConnectTelephonyEventsTask"),
+ mRootContext(rootContext),
mBridge(bridge),
mNewSessions(newSessions),
mLogger(logger)
@@ -1227,7 +1447,7 @@ protected:
continue;
}
- ConnectTelephonyOperation op(telephonySession, ignoreList, mLogger);
+ ConnectTelephonyOperation op(mRootContext, telephonySession, ignoreList, mLogger);
mBridge->getSessions()->visitSessions(op);
// Since this session is now connected to all others in the bridge (including
@@ -1238,6 +1458,7 @@ protected:
}
private:
+ AsteriskSCF::System::V1::OperationContextPtr mRootContext;
BridgeImplPtr mBridge;
SessionSeq mNewSessions;
Logger mLogger;
@@ -1246,10 +1467,13 @@ private:
class DisconnectTelephonyEventsTask: public QueuedTask
{
public:
- DisconnectTelephonyEventsTask(const BridgeImplPtr& bridge,
- const SessionSeq& disconnectingSessions,
- const Logger& logger)
- : QueuedTask("DisconnectTelephonyEventsTask"),
+ DisconnectTelephonyEventsTask(
+ const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+ const BridgeImplPtr& bridge,
+ const SessionSeq& disconnectingSessions,
+ const Logger& logger)
+ : QueuedTask("DisconnectTelephonyEventsTask"),
+ mRootContext(rootContext),
mBridge(bridge),
mDisconnectingSessions(disconnectingSessions),
mLogger(logger)
@@ -1276,14 +1500,16 @@ protected:
continue;
}
- DisconnectTelephonyOperation op(telephonySession, mLogger);
+ DisconnectTelephonyOperation op(mRootContext, telephonySession, mLogger);
mBridge->getSessions()->visitSessions(op);
}
/**
- * In ConnectTelephonyEventsTask, we could assume all the sessions were in the bridge's session collection
- * and just use a visitor. But when a session is removed, AMI operations are involved. We use an
- * extra pass over the set of sessions being removed, and disconnect each one from the other, to be safe.
+ * In ConnectTelephonyEventsTask, we could assume all the sessions were
+ * in the bridge's session collection and just use a visitor. But when a
+ * session is removed, AMI operations are involved. We use an extra pass
+ * over the set of sessions being removed, and disconnect each one from
+ * the other, to be safe.
*/
disconnectMembers();
@@ -1348,24 +1574,62 @@ protected:
for(TelephonyEventSourceSeq::iterator i=fromSources.begin();
i != fromSources.end(); ++i)
{
- (*i)->removeSinks(sinksToRemove);
+ (*i)->removeSinks(calculateOperationContext(mRootContext, identityToString((*i)->ice_getIdentity())),
+ sinksToRemove);
}
}
private:
+ AsteriskSCF::System::V1::OperationContextPtr mRootContext;
BridgeImplPtr mBridge;
SessionSeq mDisconnectingSessions;
Logger mLogger;
};
-} // End of anonymous namespace
-
-BridgeImpl::BridgeImpl(const string& name, const Ice::ObjectAdapterPtr& adapter,
- const vector<BridgeListenerPrx>& listeners,
- const BridgeListenerMgrPtr& listenerMgr,
- const ReplicatorSmartPrx& replicator,
- const BridgeStateItemPtr& state,
+template <class T>
+class GenericAMDCallback : public QueuedTask
+{
+public:
+ GenericAMDCallback(const T& cb, const SessionsTrackerPtr& tracker) :
+ QueuedTask("GenericAMDCallback"),
+ mCallback(cb),
+ mTracker(tracker)
+ {
+ }
+protected:
+
+ bool executeImpl()
+ {
+ mCallback->ice_response();
+ return true;
+ }
+
+ void failImpl()
+ {
+ SessionErrorSeq errors(mTracker->getExceptions());
+ if (!errors.empty())
+ {
+ mCallback->ice_exception(BridgeSessionOperationFailed(errors));
+ }
+ else
+ {
+ mCallback->ice_exception();
+ }
+ }
+
+private:
+ T mCallback;
+ SessionsTrackerPtr mTracker;
+};
+
+} // End of anonymous namespace
+
+BridgeImpl::BridgeImpl(const string& name, const Ice::ObjectAdapterPtr& adapter,
+ const vector<BridgeListenerPrx>& listeners,
+ const BridgeListenerMgrPtr& listenerMgr,
+ const ReplicatorSmartPrx& replicator,
+ const BridgeStateItemPtr& state,
const Logger& logger) :
mActivated(false),
mState(state),
@@ -1375,7 +1639,8 @@ BridgeImpl::BridgeImpl(const string& name, const Ice::ObjectAdapterPtr& adapter,
mListeners(listenerMgr),
mReplicator(replicator),
mSessionListener(createSessionListener(mSessions, logger)),
- mLogger(logger)
+ mLogger(logger),
+ mOperationContextCache(AsteriskSCF::Operations::OperationContextCache::create(60))
{
mLogger(Trace) << FUNLOG << ": creating a new Bridge with " << listeners.size() << " default listeners";
for (vector<BridgeListenerPrx>::const_iterator i = listeners.begin();
@@ -1397,20 +1662,24 @@ BridgeImpl::~BridgeImpl()
/**
* Process an updated ConnectedLine record from a session.
*/
-void BridgeImpl::updateConnectedLine(const SessionWrapperPtr& sourceSession, const ConnectedLinePtr& connectedLine)
+void BridgeImpl::updateConnectedLine(const SessionWrapperPtr& sourceSession,
+ const AsteriskSCF::System::V1::OperationContextPtr& context, const ConnectedLinePtr& connectedLine)
{
- try
+ //
+ // TODO: Is it correct to simply ignore any exceptions that occur here?
+ //
+ try
{
QueuedTasks tasks;
// Updates the cached ConnectedLine party id information for the given session.
// - Applies receive hooks on the received ConnectedLine info before caching.
- tasks.push_back(new UpdateConnectedLineTask(this, sourceSession->getSession(), connectedLine, mLogger));
+ tasks.push_back(new UpdateConnectedLineTask(context, this, sourceSession->getSession(), connectedLine, mLogger));
// Forwards the ConnectedLine information to the other sessions in the bridge.
// - Applies forwarding hooks to the cached ConnectedLine info before forwarding.
- tasks.push_back(new ForwardConnectedLineTask(this, sourceSession->getSession(), mLogger));
+ tasks.push_back(new ForwardConnectedLineTask(context, this, sourceSession->getSession(), mLogger));
ExecutorPtr runner(new Executor(tasks, mLogger));
runner->start();
@@ -1424,7 +1693,8 @@ void BridgeImpl::updateConnectedLine(const SessionWrapperPtr& sourceSession, con
/**
* Process an updated Redirections record from a session.
*/
-void BridgeImpl::updateRedirections(const SessionWrapperPtr& sourceSession, const RedirectionsPtr& redirections)
+void BridgeImpl::updateRedirections(const SessionWrapperPtr& sourceSession,
+ const AsteriskSCF::System::V1::OperationContextPtr& context, const RedirectionsPtr& redirections)
{
try
{
@@ -1432,7 +1702,8 @@ void BridgeImpl::updateRedirections(const SessionWrapperPtr& sourceSession, cons
// Forwards the Redirections information to the other sessions in the bridge.
// Applies forwarding hooks to the Redirecrting info before forwarding.
- tasks.push_back(new ForwardRedirectionsUpdatedTask(this, sourceSession->getSession(), redirections, mLogger));
+ tasks.push_back(new ForwardRedirectionsUpdatedTask(context, this, sourceSession->getSession(),
+ redirections, mLogger));
ExecutorPtr runner(new Executor(tasks, mLogger));
runner->start();
}
@@ -1442,134 +1713,163 @@ void BridgeImpl::updateRedirections(const SessionWrapperPtr& sourceSession, cons
}
}
-static SessionPrx extractSession(const SessionWithSessionInfo& swsi)
+SessionPrx extractSession(const SessionWithSessionInfo& swsi)
{
return swsi.sessionProxy;
}
+typedef AMDContextData<AMD_Bridge_addSessionsPtr> AddSessionsContextData;
+
void BridgeImpl::addSessions_async(const AMD_Bridge_addSessionsPtr& callback,
- const SessionWithSessionInfoSeq& sessionInfos,
- const Ice::Current&)
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const SessionWithSessionInfoSeq& sessionInfos,
+ const Ice::Current&)
{
- try
+ AddSessionsContextData::ptr_type data(
+ getContext<AddSessionsContextData>(
+ mOperationContextCache, operationContext, callback));
+
+ if (data)
{
- mSessions->reap();
- SessionSeq sessions;
- sessions.resize(sessionInfos.size());
- std::transform(sessionInfos.begin(), sessionInfos.end(), sessions.begin(), extractSession);
- if (sessions.empty())
+ try
{
- if (callback)
+ mSessions->reap();
+ SessionSeq sessions;
+ sessions.resize(sessionInfos.size());
+ std::transform(sessionInfos.begin(), sessionInfos.end(), sessions.begin(), extractSession);
+ if (sessions.empty())
{
- callback->ice_response();
+ if (callback)
+ {
+ data->getProxy()->ice_response();
+ }
+ return;
+ }
+ checkSessions(sessions);
+ statePreCheck();
+ mLogger(Trace) << FUNLOG << ": adding " << sessions.size() << " sessions";
+ SessionsTrackerPtr tracker(new SessionsTracker);
+
+ QueuedTasks tasks;
+ tasks.push_back(new UnplugMedia(operationContext, this));
+ tasks.push_back(new SetBridgeTask(operationContext, mSessions, mPrx, mSessionListenerPrx, sessions, tracker));
+ tasks.push_back(new AddToListeners(operationContext, mListeners, tracker, getCookies()));
+ tasks.push_back(new SetAndGetSessionControllerTask(operationContext,
+ mObjAdapter, mSessions, sessions, this, mLogger));
+
+ //
+ // The new sessions being added to the bridge need to have their connected line set and
+ // forwarded to the existing sessions.
+ //
+ // TODO: This very much like the SessionOperations which are
+ // applied with the "visit" operation.
+ //
+ for (SessionWithSessionInfoSeq::const_iterator infoIter = sessionInfos.begin();
+ infoIter != sessionInfos.end(); ++infoIter)
+ {
+ if (infoIter->info && infoIter->info->sessionOwner)
+ {
+ tasks.push_back(new UpdateConnectedLineTask(operationContext, this, infoIter->sessionProxy,
+ new ConnectedLine(infoIter->info->sessionOwner->ids), mLogger));
+ tasks.push_back(new ForwardConnectedLineTask(operationContext, this, infoIter->sessionProxy, mLogger));
+ }
}
- return;
- }
- checkSessions(sessions);
- statePreCheck();
- mLogger(Trace) << FUNLOG << ": adding " << sessions.size() << " sessions";
-
- SessionsTrackerPtr tracker(new SessionsTracker);
- QueuedTasks tasks;
- tasks.push_back(new UnplugMedia(this));
- tasks.push_back(new SetBridgeTask(mSessions, mPrx, mSessionListenerPrx, sessions, tracker));
- tasks.push_back(new AddToListeners(mListeners, tracker, getCookies()));
- tasks.push_back(new SetAndGetSessionControllerTask(mObjAdapter, mSessions, sessions, this, mLogger));
- //The new sessions being added to the bridge need to have their connected line set and
- //forwarded to the existing sessions.
- for (SessionWithSessionInfoSeq::const_iterator infoIter = sessionInfos.begin(); infoIter != sessionInfos.end(); ++infoIter)
- {
- if (infoIter->info && infoIter->info->sessionOwner)
+ //The existing sessions need to have their stored connected line forwarded to the new
+ //sessions
+ SessionSeq existingSessions = getSessions()->getSessionSeq();
+ for (SessionSeq::const_iterator sessionIter = existingSessions.begin();
+ sessionIter != existingSessions.end(); ++sessionIter)
{
- tasks.push_back(new UpdateConnectedLineTask(this, infoIter->sessionProxy, new ConnectedLine(infoIter->info->sessionOwner->ids), mLogger));
- tasks.push_back(new ForwardConnectedLineTask(this, infoIter->sessionProxy, mLogger));
+ tasks.push_back(new ForwardConnectedLineTask(operationContext, this, *sessionIter, mLogger));
}
+
+ tasks.push_back(new GenericAMDCallback<AMD_Bridge_addSessionsPtr>(data->getProxy(), tracker));
+ tasks.push_back(new SetupMedia(operationContext, this));
+ tasks.push_back(new ConnectTelephonyEventsTask(operationContext, this, sessions, mLogger));
+ tasks.push_back(new UpdateTask(operationContext, this));
+ ExecutorPtr executor(new Executor(tasks, mLogger));
+ executor->start();
+ //
+ // When the operations have all completed, that last task will take care of handling
+ // the callback. It's all left withing the try/catch in the event something happens during
+ // task startup.
+ //
}
-
- //The existing sessions need to have their stored connected line forwarded to the new
- //sessions
- SessionSeq existingSessions = getSessions()->getSessionSeq();
- for (SessionSeq::const_iterator sessionIter = existingSessions.begin(); sessionIter != existingSessions.end(); ++sessionIter)
+ catch (const std::exception& ex)
{
- tasks.push_back(new ForwardConnectedLineTask(this, *sessionIter, mLogger));
+ data->getProxy()->ice_exception(ex);
+ }
+ catch (...)
+ {
+ assert("If we got here, really bad things have happened!" == 0);
+ data->getProxy()->ice_exception();
}
-
- tasks.push_back(new GenericAMDCallback<AMD_Bridge_addSessionsPtr>(callback, tracker));
- tasks.push_back(new SetupMedia(this));
- tasks.push_back(new ConnectTelephonyEventsTask(this, sessions, mLogger));
- tasks.push_back(new UpdateTask(this));
- ExecutorPtr executor(new Executor(tasks, mLogger));
- executor->start();
- //
- // When the operations have all completed, that last task will take care of handling
- // the callback. It's all left withing the try/catch in the event something happens during
- // task startup.
- //
- }
- catch (const std::exception& ex)
- {
- callback->ice_exception(ex);
- }
- catch (...)
- {
- callback->ice_exception();
}
}
-void BridgeImpl::removeSessions_async(const AMD_Bridge_removeSessionsPtr& callback, const SessionSeq& sessions,
- const Ice::Current&)
+typedef AMDContextData<AMD_Bridge_removeSessionsPtr> RemoveSessionsContextData;
+
+void BridgeImpl::removeSessions_async(const AMD_Bridge_removeSessionsPtr& callback,
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext, const SessionSeq& sessions,
+ const Ice::Current&)
{
- try
+ RemoveSessionsContextData::ptr_type data(
+ getContext<RemoveSessionsContextData>(mOperationContextCache, operationContext, callback));
+ if (data)
{
- if (sessions.empty())
+ try
{
- callback->ice_response();
- return;
- }
- checkSessions(sessions);
- statePreCheck();
- mSessions->reap();
+ if (sessions.empty())
+ {
+ data->getProxy()->ice_response();
+ return;
+ }
+ checkSessions(sessions);
+ statePreCheck();
+ mSessions->reap();
- //
- // The shutdown of individual sessions are implemented as series of AMI requests. Once initiated,
- // we allow them to proceed asynchronously and do not concern ourselves with the result.
- // The logic of shutdown should remove them either because the operations succeeded or
- // *couldn't* be accomplished because of some terminal condition. At any rate, waiting around for them
- // is pointless.
- //
- SessionsTrackerPtr removed(new SessionsTracker);
- for (SessionSeq::const_iterator i = sessions.begin(); i != sessions.end(); ++i)
- {
- SessionWrapperPtr session = mSessions->getSession(*i);
- if (session)
+ //
+ // The shutdown of individual sessions are implemented as series of AMI requests. Once initiated,
+ // we allow them to proceed asynchronously and do not concern ourselves with the result.
+ // The logic of shutdown should remove them either because the operations succeeded or
+ // *couldn't* be accomplished because of some terminal condition. At any rate, waiting around for them
+ // is pointless.
+ //
+ SessionsTrackerPtr removed(new SessionsTracker);
+ for (SessionSeq::const_iterator i = sessions.begin(); i != sessions.end(); ++i)
{
- removed->add(session->getSession());
- mSessions->removeSession(session->getBridgedSession());
+ SessionWrapperPtr session = mSessions->getSession(*i);
+ if (session)
+ {
+ removed->add(session->getSession());
+ mSessions->removeSession(operationContext, session->getBridgedSession());
+ }
}
- }
- QueuedTasks tasks;
+ QueuedTasks tasks;
- BridgeCookies cookies;
+ BridgeCookies cookies;
+ {
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ cookies = getCookies();
+ }
+ tasks.push_back(new RemoveSessionsNotify(operationContext, mListeners, removed, cookies));
+ tasks.push_back(new RemoveSessionControllerTask(operationContext, mObjAdapter, removed, mLogger));
+ tasks.push_back(new GenericAMDCallback<AMD_Bridge_removeSessionsPtr>(data->getProxy(), removed));
+ tasks.push_back(new DisconnectTelephonyEventsTask(operationContext, this, sessions, mLogger));
+ tasks.push_back(new CheckShutdown(this, mPrx));
+ ExecutorPtr runner(new Executor(tasks, mLogger));
+ runner->start();
+ }
+ catch (const std::exception& ex)
{
- boost::shared_lock<boost::shared_mutex> lock(mLock);
- cookies = getCookies();
+ data->getProxy()->ice_exception(ex);
+ }
+ catch (...)
+ {
+ assert("If we got here, really bad things have happened!" == 0);
+ data->getProxy()->ice_exception();
}
- tasks.push_back(new RemoveSessionsNotify(mListeners, removed, cookies));
- tasks.push_back(new RemoveSessionControllerTask(mObjAdapter, removed, mLogger));
- tasks.push_back(new GenericAMDCallback<AMD_Bridge_removeSessionsPtr>(callback, removed));
- tasks.push_back(new DisconnectTelephonyEventsTask(this, sessions, mLogger));
- tasks.push_back(new CheckShutdown(this, mPrx));
- ExecutorPtr runner(new Executor(tasks, mLogger));
- runner->start();
- }
- catch (const std::exception& ex)
- {
- callback->ice_exception(ex);
- }
- catch (...)
- {
- callback->ice_exception();
}
}
@@ -1580,83 +1880,112 @@ SessionSeq BridgeImpl::listSessions(const Ice::Current& current)
return mSessions->getSessionSeq();
}
-void BridgeImpl::shutdown(const Ice::Current& current)
+void BridgeImpl::shutdown(const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const Ice::Current& current)
{
- mSessions->reap();
- //
- // In an effort to allow some consistency with replicas, the shutdown operation is broken into
- // two parts. In an asynchronous version, this would probably be a couple of queued tasks.
- //
+ ContextDataPtr data(checkAndThrow(mOperationContextCache, operationContext));
+ if (data)
+ {
+ try
+ {
+ mSessions->reap();
+ //
+ // In an effort to allow some consistency with replicas, the
+ // shutdown operation is broken into two parts. In an asynchronous
+ // version, this would probably be a couple of queued tasks.
+ //
- //
- // When shutting down, the bridge makes a copy of its current state and unlocks, proceeding with
- // no other internal locks.
- //
- mLogger(Trace) << FUNLOG << ":" << objectIdFromCurrent(current);
+ //
+ // When shutting down, the bridge makes a copy of its current
+ // state and unlocks, proceeding with no other internal locks.
+ //
+ mLogger(Trace) << FUNLOG << ":" << objectIdFromCurrent(current);
- BridgeStateItemPtr update;
- {
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- if (mState->runningState == ShuttingDown)
+ BridgeStateItemPtr update;
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ if (mState->runningState == ShuttingDown)
+ {
+ mLogger(Trace) << FUNLOG << ": called when shutting down." ;
+ return;
+ }
+ if (mState->runningState == Destroyed)
+ {
+ mLogger(Trace) << FUNLOG << ": called when destroyed." ;
+ throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+ }
+ mState->runningState = ShuttingDown;
+
+ mListeners->stopping(getCookies());
+ update = createUpdate();
+ }
... 6197 lines suppressed ...
--
asterisk-scf/release/bridging.git
More information about the asterisk-scf-commits
mailing list