[asterisk-scf-commits] asterisk-scf/release/bridging.git branch "master" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Tue Apr 19 07:15:53 CDT 2011
branch "master" has been updated
via bae047c389685f743ff52036ee82429913bfe775 (commit)
from bf4ba3621565c7ed6b02c35dea2a66255db51dcb (commit)
Summary of changes:
CMakeLists.txt | 5 +-
README.txt | 2 +-
config/test_bridging.conf.in | 160 ++++-
src/BridgeImpl.cpp | 1387 ++++++++++++++++---------------
src/BridgeImpl.h | 266 +++----
src/BridgeListenerMgr.cpp | 19 +-
src/BridgeListenerMgr.h | 2 +-
src/BridgeManagerImpl.cpp | 470 ++++++++---
src/BridgeManagerImpl.h | 93 +--
src/BridgeManagerListenerMgr.cpp | 19 +-
src/BridgeManagerListenerMgr.h | 2 +-
src/BridgeReplicatorIf.ice | 245 ++++++
src/BridgeReplicatorService.cpp | 197 +++++
src/BridgeReplicatorStateListenerI.cpp | 290 +++++++
src/BridgeReplicatorStateListenerI.h | 42 +
src/BridgeServiceConfig.h | 35 +-
src/BridgeServiceImpl.h | 22 -
src/CMakeLists.txt | 36 +-
src/DebugUtil.h | 115 +++
src/ListenerManager.h | 34 +-
src/MediaSplicer.cpp | 957 ++++++++++++++++++-----
src/MediaSplicer.h | 59 ++-
src/Service.cpp | 318 +++++---
src/ServiceUtil.h | 208 +++++
src/SessionCollection.cpp | 211 +++++
src/SessionCollection.h | 138 ++++
src/SessionListener.cpp | 157 ++++
src/SessionListener.h | 38 +
src/SessionOperations.cpp | 85 ++
src/SessionOperations.h | 206 +++++
src/SessionWrapper.cpp | 567 +++++++++++++
src/SessionWrapper.h | 162 ++++
src/Tasks.h | 471 +++++++++++
test/BridgeListenerI.cpp | 2 +-
test/BridgeListenerI.h | 2 +-
test/BridgeManagerListenerI.cpp | 2 +-
test/BridgeManagerListenerI.h | 2 +-
test/CMakeLists.txt | 60 +-
test/SessionListenerI.cpp | 2 +-
test/SessionListenerI.h | 2 +-
test/TestBridging.cpp | 325 ++++++--
test/TestCommandDriver.cpp | 2 +-
test/TestCommandDriver.h | 2 +-
test/UnitTests.cpp | 390 +++++++++
44 files changed, 6316 insertions(+), 1493 deletions(-)
create mode 100644 src/BridgeReplicatorIf.ice
create mode 100644 src/BridgeReplicatorService.cpp
create mode 100644 src/BridgeReplicatorStateListenerI.cpp
create mode 100644 src/BridgeReplicatorStateListenerI.h
delete mode 100644 src/BridgeServiceImpl.h
create mode 100644 src/DebugUtil.h
create mode 100644 src/ServiceUtil.h
create mode 100644 src/SessionCollection.cpp
create mode 100644 src/SessionCollection.h
create mode 100644 src/SessionListener.cpp
create mode 100644 src/SessionListener.h
create mode 100644 src/SessionOperations.cpp
create mode 100644 src/SessionOperations.h
create mode 100644 src/SessionWrapper.cpp
create mode 100644 src/SessionWrapper.h
create mode 100644 src/Tasks.h
create mode 100644 test/UnitTests.cpp
- Log -----------------------------------------------------------------
commit bae047c389685f743ff52036ee82429913bfe775
Author: Brent Eagles <beagles at digium.com>
Date: Tue Apr 19 09:32:21 2011 -0230
Merging async-bridging and bridge-replication branches to master.
Change summary:
- Added active->standby state replication of bridge related state.
- Added a replication service component for the above.
- Added some component test suite modifications for testing replication.
- The bridge service now uses AMI and AMD for key, potentially long running
methods.
- The bridge service implementation employs a preliminary version of
"workqueues". This will need to be converted to use the common Asterisk
SCF workqueue facility when it is completed.
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 498a40a..9979a32 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -17,7 +17,7 @@ if(NOT integrated_build STREQUAL "true")
include(cmake/AsteriskSCF.cmake)
# This project is C++ based and requires a minimum of 3.4
- asterisk_scf_project("bridging service" 3.4 CXX)
+ asterisk_scf_project(BridgingService 3.4 CXX)
# Take care of slice definitions
add_subdirectory(slice)
@@ -25,9 +25,6 @@ if(NOT integrated_build STREQUAL "true")
# logger is integrated into our build
set(integrated_build true)
add_subdirectory(logger)
- set(integrated_build false)
-else()
- set(bridging_bindir ${CMAKE_CURRENT_BINARY_DIR} PARENT_SCOPE)
endif()
add_subdirectory(src)
diff --git a/README.txt b/README.txt
index 15d1447..2d1f1b6 100644
--- a/README.txt
+++ b/README.txt
@@ -1,7 +1,7 @@
===============================================================================
=== Asterisk SCF - Bridging ===
=== ===
-=== Copyright (C) 2010, Digium, Inc. ===
+=== Copyright (C) 2010-2011, Digium, Inc. ===
===============================================================================
-------------------------------------------------------------------------------
diff --git a/config/test_bridging.conf.in b/config/test_bridging.conf.in
index c9e92e0..37d7b8c 100644
--- a/config/test_bridging.conf.in
+++ b/config/test_bridging.conf.in
@@ -1,37 +1,163 @@
+################################################################################
+# Configuration file for use with the bridging component's test driver.
+#
+#
+# General configuration
+#
+# We turn off all collocation optimization by default to avoid issues with any services
+# that might have trouble due to AMI/AMD usage.
+#
+Ice.Default.CollocationOptimized=0
+
+#
+# We use a single file for configuration.
+#
+IceBox.InheritProperties=1
+
+#
+# It's important to specify a reasonable size for the client thread pool for services
+# that use AMI/AMD
+#
+Ice.ThreadPool.Client.Size=4
+
+
+################################################################################
+# The following configuration is required for the the Service Locator component
+# because it loads a collocated instance of IceStorm.
+#
+#
+# We register services with names to help identify them within an IceBox instance.
+# It also helps with identifying properties in a configuration file as each
+# property that is specific to this service will be prefixed with
+# AsteriskSCFIceStorm.
+#
AsteriskSCFIceStorm.InstanceName=AsteriskSCFIceStorm
-AsteriskSCFIceStorm.TopicManager.Endpoints=default -p 55555
-AsteriskSCFIceStorm.Publish.Endpoints=default -p 55556
+
+
+#
+# Configure the IceStorm TopicManager's object adapter.
+#
+AsteriskSCFIceStorm.TopicManager.Endpoints=default -p 10000
+AsteriskSCFIceStorm.TopicManager.ThreadPool.Size=4
+
+#
+# Configure the IceStorm publisher object adapter.
+#
+AsteriskSCFIceStorm.Publish.Endpoints=tcp -p 10001:udp -p 10001
+AsteriskSCFIceStorm.Publish.ThreadPool.Size=4
+
+#
+# This IceStorm instance will not need to persist subscriber/publisher
+# information across process lifetimes.
+#
AsteriskSCFIceStorm.Transient=1
+#
+# Control TopicManager tracing.
+#
+# 0 = no tracing
+# 1 = trace topic creation, subscription, unsubscription
+# 2 = like 1, but with more detailed subscription information
+#
+AsteriskSCFIceStorm.Trace.TopicManager=0
+
+#
+# Flush interval in case any any subscribers have subscribed
+# using a batched oneway proxy. (Default is currently 1000ms)
+#
+AsteriskSCFIceStorm.Flush.Timeout=2000
+
+#
+# Finally the proxy that can be used to access this IceStorm instance.
+#
+TopicManager.Proxy=AsteriskSCFIceStorm/TopicManager:default -p 10000
+
+#
+# Logger service configuration. Proxies to the logger service
+# are obtained through the service locator so specifying a port
+# number is not required.
+#
LoggerAdapter.Endpoints=default
AsteriskSCF.LoggingService.Endpoints=default
+AsteriskSCF.LoggingService.ThreadPool.Size=4
AsteriskSCF.LoggingClient.Endpoints=default
-AsteriskSCF.TestChannelService.Endpoints=default -p 55560
-
-LocatorService.Proxy=LocatorService:default -p 55558
+AsteriskSCF.LoggingClient.ThreadPool.Size=4
-ServiceLocatorManagementAdapter.Endpoints=tcp -p 55557
-ServiceLocatorAdapter.Endpoints=tcp -p 55558
-ServiceLocatorManagementProxy=LocatorServiceManagement:tcp -p 55557
+#
+# Service Locator configuration. Usually the service locator
+# and it`s collocated IceStorm instance are the only services
+# that need to have their ports specified. The service locator
+# instantiates two adapters:
+# * The ServiceLocatorAdapter which hosts the servants that implement
+# the lookup queries for clients wishing to obtain proxies to services.
+# * The ServiceLocatorManagerAdapter which hosts the servants for the
+# objects that implement the registration management for services
+#
+ServiceLocatorAdapter.Endpoints=tcp -p 4411
+ServiceLocatorAdapter.ThreadPool.Size=4
+ServiceLocatorManagementAdapter.Endpoints=tcp -p 4422
+ServiceLocatorManagementAdapter.ThreadPool.Size=4
-TopicManager.Proxy=AsteriskSCFIceStorm/TopicManager:default -p 55555
+#
+# The proxies that clients use to access the Service Locator facilities.
+#
+LocatorService.Proxy=LocatorService:default -p 4411
+ServiceLocatorManagementProxy=LocatorServiceManagement:tcp -p 4422
-IceBox.InheritProperties=1
+#
+# The IceBox entries for loading the services.
+#
IceBox.Service.Logger=${logger_bindir}/server/src at logging-service:createLoggingService
+IceBox.Service.Replicator=../src at BridgeReplicator:create
IceBox.Service.TestBridge=../src at bridgeservice:create
+IceBox.Service.TestBridge2=../src/@bridgeservice:create
IceBox.Service.TestServiceLocator=${service_locator_bindir}/src at service_locator:create
IceBox.Service.TestChannel=${test_channel_bindir}/src at test_channel:create
-IceBox.Service.TestDriver=../test at bridging_unit_test:create
+IceBox.Service.TestDriver=../test at bridge_component_test:create
+
+#
+# The bridging service uses the test channel`s Endpoint
+# to create test sessions and register them with the bridge.
+# NOTE: this will be changed to be accessible through
+# service discovery in a later branch.
+#
+TestChannel.InstanceName=BridgeTest
+#
+# Configuration for the test bridge instances. There are are two: one master
+# (TestBridge) and one standby (TestBridge2).
+# NOTE: These will be changed to be accessible through
+# service discovery in a later branch.
+#
TestBridge.InstanceName=TestBridge
-TestBridge.BridgeService.Endpoints=default -p 55561
-TestBridge.Proxy=BridgeManager:default -p 55561
-TestChannel.Proxy=TestChannel:default -p 55560
-TestUtilAdapter.Endpoints=default -p 55562
-Commands.Proxy=TestChannel.Locator.Commands:default -p 55560
+TestBridge.ManagerId=TestBridgeManager
+
+TestBridge2.InstanceName=TestBridge2
+TestBridge2.ManagerId=TestBridgeManager2
+TestBridge2.StateReplicatorListener=yes
+
+#
+# Configuration for the bridge state replicator.
+# NOTE: This will be changed to only use service discovery in the a later
+# branch (i.e. it will not be necessary to configure the object adapter
+# or specify the proxy in the future)
+#
+Replicator.InstanceName=Replicator
-IceBox.LoadOrder=TestServiceLocator Logger TestBridge TestChannel TestDriver
+#
+# Some IceBox configuration.
+#
+#
+# Multiple services are loaded in the single IceBox instance. It is necessary to have
+# them start in a specific order so dependencies are available when they are needed.
+#
+IceBox.LoadOrder=TestServiceLocator Logger Replicator TestBridge TestBridge2 TestChannel TestDriver
+#
+# Configuring a manager endpoint for the IceBox service allows services to be managed and
+# monitored.
+#
IceBox.ServiceManager.Endpoints=default -p 56000
+IceBox.ServiceManager.ThreadPool.Size=4
IceBoxMgr.Proxy=IceBox/ServiceManager:default -p 56000
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index b799ca0..890d0b3 100755
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -1,7 +1,7 @@
/*
* Asterisk SCF -- An open-source communications framework.
*
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2011, Digium, Inc.
*
* See http://www.asterisk.org for more information about
* the Asterisk SCF project. Please do not directly contact
@@ -14,443 +14,506 @@
* at the top of the source tree.
*/
#include "BridgeImpl.h"
+
#include <AsteriskSCF/logger.h>
#include <AsteriskSCF/System/Component/ComponentServiceIf.h>
#include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.h>
#include <Ice/Ice.h>
+#include <boost/thread/locks.hpp>
#include <memory>
#include <algorithm>
-#include <boost/thread/locks.hpp>
+#include <set>
+#include "ServiceUtil.h"
+#include "DebugUtil.h"
+
+#include "SessionWrapper.h"
+#include "SessionOperations.h"
+#include "SessionListener.h"
using namespace AsteriskSCF::System::Logging;
+using namespace AsteriskSCF::SessionCommunications::V1;
+using namespace AsteriskSCF::BridgeService;
+using namespace AsteriskSCF::Bridge::V1;
+using namespace AsteriskSCF;
+using namespace std;
/**
*
- * NOTE: Code must be reviewed/refactored for exception safety/consistency.
- * Operations involving establishing media connections may or may not be
- * catastrophic. An example of a non-catastrophic situation might be a media
- * allocation operation that might immediately fail for transient reasons but
- * can be initialized in the background in a relatively timely fashion (of
- * course this would depend on the context).
+ * NOTE: Once asynchronous invocations began to be added to the mix, several shortcomings of both the initial
+ * implementation of the bridge and its replication became apparent. In the previous implementation, the bridge
+ * operations were very "serial" when it came to adding and removing sessions. When converting these operations to
+ * AMD/AMI, the serial approach completely falls apart. Especially when replication is thrown into the mix. For example,
+ * in a typical AMD/AMI scenario, the setBridge() and media allocation operations would occur as AMI requests. The
+ * apparent outcome will be that the participants will "appear" on the call in the order in which the operations
+ * complete (i.e. randomly). Indeed it is possible that some will ultimately fail and some sessions might actually be
+ * removed from the bridge before they are able to complete. Reconciling the completely asynchronous nature to an
+ * implementation is just too awful. As asynchronous support in bridging is the driving factor behind these changes, the
+ * refactoring for replication could have occurred in that development branch. However, replication is a separate task
+ * and it is desirable to have a *relevant* review of it independently of asynchronous support.
*
*/
namespace
{
-Logger lg = getLoggerFactory().getLogger("AsteriskSCF.BridgeService");
-class RetryPolicy
+/**
+ *
+ * BridgeImpl is a reference implmentation of the AsteriskSCF::Bridging::V1::Bridge
+ * interface.
+ *
+ **/
+class BridgeImpl : virtual public BridgeServant
{
public:
- RetryPolicy(size_t maxRetries, size_t intervalInMilliseconds) :
- mMaxRetries(maxRetries),
- mRetryInterval(intervalInMilliseconds),
- mCounter(0)
- {
- }
+ BridgeImpl(const string& name, const Ice::ObjectAdapterPtr& objAdapter,
+ const vector<BridgeListenerPrx>& listeners,
+ const BridgeListenerMgrPtr& listenerMgr,
+ const ReplicatorSmartPrx& replicator,
+ const BridgeStateItemPtr& state,
+ const Logger& logger);
- bool canRetry()
- {
- return mCounter < mMaxRetries;
- }
+ ~BridgeImpl();
- bool retry()
- {
- lg(Debug) << "Retrying for the " << mCounter + 1 << " time.";
- IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(mRetryInterval));
- ++mCounter;
- return canRetry();
- }
+ //
+ // AsteriskSCF::SessionCommunications::Bridging::Bridge Interface
+ //
+ void addSessions_async(const AMD_Bridge_addSessionsPtr& callback, const SessionSeq& sessions, const Ice::Current&);
+ void removeSessions(const SessionSeq& sessions, const Ice::Current& current);
+ void removeSessions_async(const AMD_Bridge_removeSessionsPtr& callback, const SessionSeq& sessions,
+ const Ice::Current&);
+
+ SessionSeq listSessions(const Ice::Current&);
+ void shutdown(const Ice::Current& current);
+ void destroy(const Ice::Current& current);
+
+ void addListener(const BridgeListenerPrx& listener, const Ice::Current& current);
+ void removeListener(const BridgeListenerPrx& listener, const Ice::Current& current);
+
+ void replaceSession_async(const AMD_Bridge_replaceSessionPtr& callbac, const SessionPrx& sessionToReplace,
+ const SessionSeq& newSessions, const Ice::Current& current);
+
+ //
+ // BridgeServant methods
+ //
+ bool destroyed();
+ void destroyImpl();
+ void shutdownImpl(const Ice::Current& current);
+ void activate(const BridgePrx& proxy);
+
+ void updateState(const BridgeStateItemPtr& state);
+ void addListener(const BridgeListenerStateItemPtr& update);
+ void removeListener(const BridgeListenerStateItemPtr& update);
+
+ void activate();
+ string id();
+ SessionCollectionPtr sessions();
+
+ void forceUpdate();
+
+ void getAddSessionsTasks(QueuedTasks& tasks, const SessionSeq& sessions);
private:
- size_t mMaxRetries;
- size_t mRetryInterval;
- size_t mCounter;
+
+ boost::shared_mutex mLock;
+
+ //
+ // True if not a standby object.
+ //
+ bool mActivated;
+
+ //
+ // The replicated state.
+ //
+ BridgeStateItemPtr mState;
+
+ //
+ // Helper class for dealing with the sessions.
+ //
+ SessionCollectionPtr mSessions;
+
+ const string mName;
+ Ice::ObjectAdapterPtr mObjAdapter;
+
+ BridgeListenerMgrPtr mListeners;
+ ReplicatorSmartPrx mReplicator;
+
+ //
+ // The bridge's callback implementation for the sessions that get added to it.
+ //
+ SessionListenerPtr mSessionListener;
+ SessionListenerPrx mSessionListenerPrx;
+
+ //
+ // A proxy to this bridge. Used when publishing events.
+ //
+ BridgePrx mPrx;
+
+ //
+ // The logger object.
+ //
+ Logger mLogger;
+
+ void statePreCheck();
+ BridgeStateItemPtr createUpdate();
+ void pushUpdate(const BridgeStateItemPtr& update);
+ void pushUpdates(const ReplicatedStateItemSeq& updates);
+ BridgeListenerStateItemPtr createFirstListenerUpdate(const BridgeListenerPrx& listener);
+
+ bool replicate()
+ {
+ return (mActivated && mReplicator != 0);
+ }
};
+typedef IceUtil::Handle<BridgeImpl> BridgeImplPtr;
-void checkSessions(const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions)
+//
+// simply checks for nulls at this point.
+//
+static void checkSessions(const SessionSeq& sessions)
{
Ice::LongSeq invalidIndexes;
Ice::Long index = 0;
- for(AsteriskSCF::SessionCommunications::V1::SessionSeq::const_iterator i = sessions.begin();
- i != sessions.end(); ++i, ++index)
+ for (SessionSeq::const_iterator i = sessions.begin(); i != sessions.end(); ++i, ++index)
{
- if(*i == 0)
+ if (*i == 0)
{
invalidIndexes.push_back(index);
}
}
- if(invalidIndexes.size() > 0)
+ if (invalidIndexes.size() > 0)
{
- throw AsteriskSCF::SessionCommunications::V1::InvalidSessions(invalidIndexes);
+ throw InvalidSessions(invalidIndexes);
}
}
-}
-
-//
-// Compiled in constants.
-// TODO: Replace with configuration!
-//
-
-static const std::string TopicPrefix("AsteriskSCF.Bridge.");
-//
-// TODO:
-// Operations that are performed on all bridge sessions might be better done as AMI requests.
-//
-namespace AsteriskSCF
-{
-namespace BridgeService
-{
-//
-// Functor to support using for_each on shutdown.
-//
-class ShutdownImpl : public std::unary_function<BridgeImpl::BridgeSessionPtr, void>
+class SessionsTracker : public IceUtil::Shared
{
public:
- ShutdownImpl(const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
- const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response) :
- mListener(listener),
- mResponse(response)
+ SessionsTracker() :
+ mResponses(0)
{
}
-
- void operator()(const BridgeImpl::BridgeSessionPtr& b)
+
+ void add(const SessionPrx& s)
{
- try
- {
- b->getSession()->removeBridge(mListener);
- b->disconnect();
- b->getSession()->stop(mResponse);
- }
- catch(const Ice::ObjectNotExistException& ex)
- {
- lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
- mNonExistent.push_back(b);
- }
- catch(const Ice::Exception& ex)
- {
- lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
- }
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mSessions.push_back(s);
+ ++mResponses;
+ }
+
+ SessionSeq getSessions()
+ {
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ return mSessions;
}
- const std::vector<BridgeImpl::BridgeSessionPtr>& nonExistentObjects()
+ void addException(const SessionPrx& session, const Ice::Exception& ex)
{
- return mNonExistent;
+ addExceptionMessage(session, ex.what());
}
-private:
- AsteriskSCF::SessionCommunications::V1::SessionListenerPrx mListener;
- AsteriskSCF::SessionCommunications::V1::ResponseCodePtr mResponse;
- std::vector<BridgeImpl::BridgeSessionPtr> mNonExistent;
-};
-class ProgressingImpl : public std::unary_function<BridgeImpl::BridgeSessionPtr, void>
-{
-public:
- ProgressingImpl(const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response) :
- mResponse(response)
+ void addExceptionMessage(const SessionPrx& session, const string& msg)
{
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ SessionError error;
+ error.failedSession = session;
+ error.message = msg;
+ mExceptions.push_back(error);
+ ++mResponses;
}
- void operator()(const BridgeImpl::BridgeSessionPtr& b)
+ SessionErrorSeq getExceptions()
{
- try
- {
- b->getSession()->progress(mResponse);
- }
- catch(const Ice::ObjectNotExistException& ex)
- {
- mNonExistent.push_back(b);
- lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
- }
- catch(const Ice::Exception& ex)
- {
- lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
- }
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ return mExceptions;
}
- const std::vector<BridgeImpl::BridgeSessionPtr>& nonExistentObjects()
+ size_t responseCount()
{
- return mNonExistent;
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ return mResponses;
}
private:
- AsteriskSCF::SessionCommunications::V1::ResponseCodePtr mResponse;
- std::vector<BridgeImpl::BridgeSessionPtr> mNonExistent;
+ boost::shared_mutex mLock;
+ SessionSeq mSessions;
+ SessionErrorSeq mExceptions;
+ size_t mResponses;
};
+typedef IceUtil::Handle<SessionsTracker> SessionsTrackerPtr;
-class RingImpl : public std::unary_function<BridgeImpl::BridgeSessionPtr, void>
+class RemoveSessionsNotify : public QueuedTask
{
public:
- RingImpl(const AsteriskSCF::SessionCommunications::V1::SessionPrx& exclude) :
- mExclude(exclude)
+ RemoveSessionsNotify(const BridgeListenerMgrPtr& bridgeListeners,
+ const SessionsTrackerPtr& tracker) :
+ QueuedTask("RemoveSessionsNotify"),
+ mBridgeListeners(bridgeListeners),
+ mTracker(tracker)
{
}
-
- void operator()(const BridgeImpl::BridgeSessionPtr& b)
+
+protected:
+ bool executeImpl()
{
- if(b->getSession() != mExclude)
+ SessionSeq sessions = mTracker->getSessions();
+ if (!sessions.empty())
{
- try
- {
- b->ring();
- }
- catch(const Ice::ObjectNotExistException& ex)
- {
- mNonExistent.push_back(b);
- lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
- }
- catch(const Ice::Exception& ex)
- {
- lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
- }
+ mBridgeListeners->sessionsRemoved(sessions);
}
+ return true;
}
-
- const std::vector<BridgeImpl::BridgeSessionPtr>& nonExistentObjects()
- {
- return mNonExistent;
- }
-
+
private:
- AsteriskSCF::SessionCommunications::V1::SessionPrx mExclude;
- std::vector<BridgeImpl::BridgeSessionPtr> mNonExistent;
+ BridgeListenerMgrPtr mBridgeListeners;
+ SessionsTrackerPtr mTracker;
};
-class FlashImpl : public std::unary_function<BridgeImpl::BridgeSessionPtr, void>
+class SetBridgeTask : public QueuedTask
{
public:
- void operator()(const BridgeImpl::BridgeSessionPtr& b)
+ SetBridgeTask(const SessionCollectionPtr& sessionCollection, const BridgePrx& bridge,
+ const SessionListenerPrx& listener, const SessionSeq& sessions, const SessionsTrackerPtr& tracker):
+ QueuedTask("SetBridgeTask"),
+ mSessionManager(sessionCollection),
+ mBridge(bridge),
+ mSessionListener(listener),
+ mSessions(sessions),
+ mTracker(tracker)
{
- try
- {
- b->getSession()->flash();
- }
- catch(const Ice::ObjectNotExistException& ex)
- {
- mNonExistent.push_back(b);
- lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
- }
- catch(const Ice::Exception& ex)
- {
- lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
- }
}
- const std::vector<BridgeImpl::BridgeSessionPtr>& nonExistentObjects()
+protected:
+ bool executeImpl()
{
- return mNonExistent;
+ bool tasksDispatched = false;
+ for (SessionSeq::iterator i = mSessions.begin(); i != mSessions.end(); ++i)
+ {
+ SessionWrapperPtr session = mSessionManager->addSession(*i);
+ if (session == 0)
+ {
+ //
+ // This shouldn't happen!
+ //
+ mTracker->addExceptionMessage(*i, "session already added");
+ continue;
+ }
+ tasksDispatched = true;
+ (*i)->begin_setBridge(mBridge, mSessionListener,
+ newCallback_Session_setBridge(this, &SetBridgeTask::set,
+ &SetBridgeTask::failed), session);
+ }
+ return !tasksDispatched;
}
-private:
- std::vector<BridgeImpl::BridgeSessionPtr> mNonExistent;
-};
-class HoldImpl : public std::unary_function<BridgeImpl::BridgeSessionPtr, void>
-{
-public:
- void operator()(const BridgeImpl::BridgeSessionPtr& b)
+ void set(const SessionInfoPtr& info, const SessionWrapperPtr& session)
{
- try
+ mTracker->add(session->getSession());
+ if (info->currentState != "ready")
{
- b->getSession()->hold();
- }
- catch(const Ice::ObjectNotExistException& ex)
- {
- mNonExistent.push_back(b);
- lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
+ //
+ // setupMedia is an AMI backed implementation, so should not block here.
+ //
+ session->setupMedia();
}
- catch(const Ice::Exception& ex)
+ if (mTracker->responseCount() == mSessions.size())
{
- lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
+ mListener->succeeded();
}
}
- const std::vector<BridgeImpl::BridgeSessionPtr>& nonExistentObjects()
- {
- return mNonExistent;
- }
-private:
- std::vector<BridgeImpl::BridgeSessionPtr> mNonExistent;
-};
-
-class UnholdImpl : public std::unary_function<BridgeImpl::BridgeSessionPtr, void>
-{
-public:
- void operator()(const BridgeImpl::BridgeSessionPtr& b)
+ void failed(const Ice::Exception& ex, const SessionWrapperPtr& session)
{
+ //
+ // TODO:
+ // * Log exception.
+ // * Rollback
+ // * Interpret exception and decide whether or not to fail the entire operations.
+ // Currently the semantics allow some operations to fail.
+ //
+ mTracker->addException(session->getSession(), ex);
try
{
- b->getSession()->unhold();
+ //
+ // We want to make sure that session is laying around. The session collection will
+ // take care of cleaning it up as long as it is marked as destroyed.
+ //
+ session->destroy();
}
- catch(const Ice::ObjectNotExistException& ex)
+ catch(...)
{
- mNonExistent.push_back(b);
- lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
}
- catch(const Ice::Exception& ex)
+ if (mTracker->responseCount() == mSessions.size())
{
- lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
- }
- }
-
- const std::vector<BridgeImpl::BridgeSessionPtr>& nonExistentObjects()
- {
- return mNonExistent;
- }
-private:
- std::vector<BridgeImpl::BridgeSessionPtr> mNonExistent;
-};
-
-class ConnectImpl : public std::unary_function<BridgeImpl::BridgeSessionPtr, void>
-{
-public:
- ConnectImpl(const AsteriskSCF::SessionCommunications::V1::SessionPrx& exclude) :
- mExclude(exclude)
- {
- }
-
- void operator()(BridgeImpl::BridgeSessionPtr& b)
- {
- if(b->getSession() != mExclude)
- {
- try
- {
- b->connect();
- }
- catch(const Ice::ObjectNotExistException& ex)
+ SessionErrorSeq exceptions = mTracker->getExceptions();
+ if (exceptions.size() == mSessions.size())
{
- mNonExistent.push_back(b);
- lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
+ mListener->failed();
}
- catch(const Ice::Exception& ex)
+ else
{
- lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
+ mListener->succeeded();
}
}
}
-
- const std::vector<BridgeImpl::BridgeSessionPtr>& nonExistentObjects()
- {
- return mNonExistent;
- }
-
+
private:
- AsteriskSCF::SessionCommunications::V1::SessionPrx mExclude;
- std::vector<BridgeImpl::BridgeSessionPtr> mNonExistent;
+ SessionCollectionPtr mSessionManager;
+ BridgePrx mBridge;
+ SessionListenerPrx mSessionListener;
+ SessionSeq mSessions;
+ SessionsTrackerPtr mTracker;
};
-class FindImpl : public std::unary_function<BridgeImpl::BridgeSessionPtr, bool>
+class AddToListeners : public QueuedTask
{
public:
- FindImpl(const AsteriskSCF::SessionCommunications::V1::SessionPrx& prx) :
- mPrx(prx)
+ AddToListeners(const BridgeListenerMgrPtr& listeners, const SessionsTrackerPtr& tracker) :
+ QueuedTask("AddToListeners"),
+ mListeners(listeners),
+ mTracker(tracker)
{
}
-
- bool operator()(const BridgeImpl::BridgeSessionPtr& b)
+
+protected:
+ bool executeImpl()
{
- return b->getSession() == mPrx;
+ mListeners->sessionsAdded(mTracker->getSessions());
+ return true;
}
+
private:
- AsteriskSCF::SessionCommunications::V1::SessionPrx mPrx;
+ BridgeListenerMgrPtr mListeners;
+ SessionsTrackerPtr mTracker;
};
-
-//
-// For events that require modification to the bridge, we use helper methods on the bridge itself.
-// For events result in distribution to the bridge sessions, we copy the current sessions and
-// run the calls from the listener itself.
-//
-class SessionListener : public AsteriskSCF::SessionCommunications::V1::SessionListener
+class CheckShutdown : public QueuedTask
{
public:
- SessionListener(const BridgeImplPtr& b) :
- mBridge(b)
+ CheckShutdown(const BridgeImplPtr& bridge, const BridgePrx& proxy) :
+ QueuedTask("CheckShutdown"),
+ mBridge(bridge),
+ mPrx(proxy)
{
}
- void connected(const AsteriskSCF::SessionCommunications::V1::SessionPrx& source, const Ice::Current&)
+protected:
+ bool executeImpl()
{
- try
- {
- mBridge->sessionConnected(source);
- }
- catch(const Ice::Exception& ex)
+ if (mBridge->sessions()->size() < 2 && mPrx)
{
- lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
- throw;
+ mPrx->begin_shutdown(newCallback_Bridge_shutdown(this, &CheckShutdown::done,
+ &CheckShutdown::failed));
}
- std::vector<BridgeImpl::BridgeSessionPtr> sessions(mBridge->currentSessions());
- std::for_each(sessions.begin(), sessions.end(), ConnectImpl(source));
+ //
+ // We don't care about the result really. The CheckShutdown instance will hang
+ // around because of the AMI request so the completion of the request will not
+ // have an issue.
+ //
+ return true;
}
- void flashed(const AsteriskSCF::SessionCommunications::V1::SessionPrx&, const Ice::Current&)
+ void done()
{
+ //
+ // We don't care about the ending.
+ //
}
- void held(const AsteriskSCF::SessionCommunications::V1::SessionPrx&, const Ice::Current&)
+ void failed(const Ice::Exception&)
{
+ //
+ // Operationally, we don't really care but we should probably log
+ //
}
+
+private:
+ BridgeImplPtr mBridge;
+ BridgePrx mPrx;
+};
- void progressing(const AsteriskSCF::SessionCommunications::V1::SessionPrx&,
- const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr&, const Ice::Current&)
+template <class T>
+class GenericAMDCallback : public QueuedTask
+{
+public:
+ GenericAMDCallback(const T& cb, const SessionsTrackerPtr& tracker) :
+ QueuedTask("GenericAMDCallback"),
+ mCallback(cb),
+ mTracker(tracker)
{
}
+protected:
- void ringing(const AsteriskSCF::SessionCommunications::V1::SessionPrx& source, const Ice::Current&)
+ bool executeImpl()
{
- std::vector<BridgeImpl::BridgeSessionPtr> sessions(mBridge->currentSessions());
- if(sessions.size() > 0)
- {
- std::for_each(sessions.begin(), sessions.end(), RingImpl(source));
- }
+ mCallback->ice_response();
+ return true;
}
- void stopped(const AsteriskSCF::SessionCommunications::V1::SessionPrx& source,
- const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response, const Ice::Current&)
+ void failImpl()
{
- size_t endpointCount = mBridge->sessionStopped(source, response);
- if(endpointCount < 2)
+ SessionErrorSeq errors(mTracker->getExceptions());
+ if (!errors.empty())
{
- mBridge->spawnShutdown();
+ mCallback->ice_exception(BridgeSessionOperationFailed(errors));
+ }
+ else
+ {
+ mCallback->ice_exception();
}
}
- void unheld(const AsteriskSCF::SessionCommunications::V1::SessionPrx&, const Ice::Current&)
+private:
+ T mCallback;
+ SessionsTrackerPtr mTracker;
+};
+
+class UpdateTask : public QueuedTask
+{
+public:
+ UpdateTask(const BridgeImplPtr& bridge) :
+ QueuedTask("UpdateTask"),
+ mBridge(bridge)
{
}
+protected:
+ bool executeImpl()
+ {
+ mBridge->forceUpdate();
+ return true;
+ }
private:
BridgeImplPtr mBridge;
};
-} // End of namespace BridgeService
-} // End of namespace AsteriskSCF
-
-AsteriskSCF::BridgeService::BridgeImpl::BridgeImpl(const Ice::ObjectAdapterPtr& adapter,
- const std::vector<AsteriskSCF::SessionCommunications::V1::BridgeListenerPrx>& listeners,
- const AsteriskSCF::BridgeService::BridgeListenerMgrPtr& listenerMgr,
- const AsteriskSCF::SessionCommunications::V1::BridgePrx& prx) :
- mState(Running),
+} // End of anonymous namespace
+
+BridgeImpl::BridgeImpl(const string& name, const Ice::ObjectAdapterPtr& adapter,
+ const vector<BridgeListenerPrx>& listeners,
+ const BridgeListenerMgrPtr& listenerMgr,
+ const ReplicatorSmartPrx& replicator,
+ const BridgeStateItemPtr& state,
+ const Logger& logger) :
+ mActivated(false),
+ mState(state),
+ mSessions(new SessionCollection(adapter->getCommunicator(), name, replicator, logger)),
+ mName(name),
mObjAdapter(adapter),
mListeners(listenerMgr),
- mSessionListener(new SessionListener(this)),
- mPrx(prx)
+ mReplicator(replicator),
+ mSessionListener(createSessionListener(mSessions, logger)),
+ mLogger(logger)
{
- for(std::vector<AsteriskSCF::SessionCommunications::V1::BridgeListenerPrx>::const_iterator i = listeners.begin();
+ mLogger(Debug) << FUNLOG << ": creating a new Bridge with " << listeners.size() << " default listeners";
+ for (vector<BridgeListenerPrx>::const_iterator i = listeners.begin();
i != listeners.end(); ++i)
{
mListeners->addListener(*i);
}
- std::string listenerId = mObjAdapter->getCommunicator()->identityToString(prx->ice_getIdentity());
- listenerId += ".sessionListener";
- mSessionListenerPrx =
- AsteriskSCF::SessionCommunications::V1::SessionListenerPrx::uncheckedCast(
- mObjAdapter->add(mSessionListener, mObjAdapter->getCommunicator()->stringToIdentity(listenerId))
- );
}
-AsteriskSCF::BridgeService::BridgeImpl::~BridgeImpl()
+BridgeImpl::~BridgeImpl()
{
//
// TODO: Determine if we need to clean up the listener manager. We may not
@@ -459,525 +522,507 @@ AsteriskSCF::BridgeService::BridgeImpl::~BridgeImpl()
//
}
-void AsteriskSCF::BridgeService::BridgeImpl::addSessions(
- const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions, const Ice::Current& current)
+void BridgeImpl::addSessions_async(const AMD_Bridge_addSessionsPtr& callback, const SessionSeq& sessions,
+ const Ice::Current& current)
{
- if(sessions.size() == 0)
- {
- return;
- }
- checkSessions(sessions);
- AsteriskSCF::SessionCommunications::V1::SessionSeq addedSessions;
+ try
{
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- statePreCheck();
- lg(Debug) << __FUNCTION__ << ": adding " << sessions.size() << " sessions to "
- << current.adapter->getCommunicator()->identityToString(current.id);
- for(AsteriskSCF::SessionCommunications::V1::SessionSeq::const_iterator i = sessions.begin();
- i != sessions.end(); ++i)
+ if (sessions.empty())
{
- //
- // TODO: how do we want to handle sessions that have already been added to the bridge. Its pretty much
- // impossible to guard against race conditions where multiple call paths might want to add a session
- // more than once for some reason. We should probably just log it and move on.
- //
- std::vector<BridgeSessionPtr>::iterator j =
- find_if(mSessions.begin(), mSessions.end(), AsteriskSCF::BridgeService::FindImpl(*i));
- if(j != mSessions.end())
- {
- lg(Debug) << __FUNCTION__ << ": " << (*i)->ice_toString()
- << " is already registered with this bridge.";
- continue;
- }
-
- AsteriskSCF::SessionCommunications::V1::SessionInfoPtr info;
- try
+ if (callback)
{
- RetryPolicy policy(5, 500);
- //
- // canRetry should never return false since we throw ourselves out of this loop. But
- // we'll do it here in case we decide to do something else.
- //
- while(policy.canRetry())
- {
- try
- {
- info = (*i)->setBridge(mPrx, mSessionListenerPrx);
- break;
- }
- catch(const Ice::ConnectionLostException&)
- {
- if(!policy.retry())
- {
- throw;
- }
- }
- }
+ callback->ice_response();
}
- catch(const Ice::Exception& ex)
- {
- lg(Debug) << __FUNCTION__ << ": " << (*i)->ice_toString() << " threw " << ex.what()
- << " continuing";
- }
- //
- // We need to define these states! Especially the ones that define when start is called or not.
- //
- if(info->currentState == "ready")
- {
- lg(Debug) << __FUNCTION__ << ": " << (*i)->ice_toString()
- << " current state is ready (not yet connected), not establishing media connections.";
- mSessions.push_back(new BridgeSession(*i, 0, false));
- }
- else
- {
- lg(Debug) << __FUNCTION__ << ": " << (*i)->ice_toString()
- << " media is expected to be establishing, plugging media into bridge.";
- mSessions.push_back(new BridgeSession(*i, mSplicer.connect(*i), false));;
- }
-
- addedSessions.push_back(*i);
+ return;
}
+ checkSessions(sessions);
+ statePreCheck();
+ mLogger(Debug) << FUNLOG << ": adding " << sessions.size() << " sessions";
+
+ SessionsTrackerPtr tracker(new SessionsTracker);
+ QueuedTasks tasks;
+ tasks.push_back(new SetBridgeTask(mSessions, mPrx, mSessionListenerPrx, sessions, tracker));
+ tasks.push_back(new AddToListeners(mListeners, tracker));
+ tasks.push_back(new GenericAMDCallback<AMD_Bridge_addSessionsPtr>(callback, tracker));
+ tasks.push_back(new UpdateTask(this));
+ ExecutorPtr executor(new Executor(tasks, mLogger));
+ executor->start();
+ //
+ // When the operations have all completed, that last task will take care of handling
+ // the callback. It's all left withing the try/catch in the event something happens during
+ // task startup.
+ //
}
- if(addedSessions.size())
+ catch (const std::exception& ex)
{
- mListeners->sessionsAdded(addedSessions);
+ callback->ice_exception(ex);
+ }
+ catch (...)
+ {
+ callback->ice_exception();
}
}
-void AsteriskSCF::BridgeService::BridgeImpl::removeSessions(
- const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions, const Ice::Current&)
+void BridgeImpl::removeSessions_async(const AMD_Bridge_removeSessionsPtr& callback, const SessionSeq& sessions,
+ const Ice::Current&)
{
- if(sessions.size() == 0)
- {
- return;
- }
- checkSessions(sessions);
- AsteriskSCF::SessionCommunications::V1::SessionSeq removedSessions;
+ try
{
- boost::unique_lock<boost::shared_mutex> lock(mLock);
+ if (sessions.empty())
+ {
+ callback->ice_response();
+ return;
+ }
+ checkSessions(sessions);
statePreCheck();
- for(AsteriskSCF::SessionCommunications::V1::SessionSeq::const_iterator i = sessions.begin();
- i != sessions.end(); ++i)
+
+ //
+ // The shutdown of individual sessions are implemented as series of AMI requests. Once initiated,
+ // we allow them to proceed asynchronously and do not concern ourselves with the result.
+ // The logic of shutdown should remove them either because the operations succeeded or
+ // *couldn't* be accomplished because of some terminal condition. At any rate, waiting around for them
+ // is pointless.
+ //
+ SessionsTrackerPtr removed(new SessionsTracker);
+ for (SessionSeq::const_iterator i = sessions.begin(); i != sessions.end(); ++i)
{
- std::vector<BridgeSessionPtr>::iterator j = std::find_if(mSessions.begin(),
- mSessions.end(), AsteriskSCF::BridgeService::FindImpl(*i));
- if(j != mSessions.end())
+ SessionWrapperPtr session = mSessions->getSession(*i);
+ if (session)
{
- try
- {
- (*j)->getSession()->removeBridge(mSessionListenerPrx);
- }
- catch(const Ice::Exception& ex)
- {
- lg(Info) << __FUNCTION__ << ": removingthe bridge from " << (*j)->getSession() << " threw "
- << ex.what();
- }
- (*j)->disconnect();
- mSessions.erase(j);
- removedSessions.push_back(*i);
- }
- else
- {
- //
- // TODO: how do we want to handle sessions that aren't there.
- // Its pretty much impossible to guard against race conditions
- // where an expected session may no longer be present (device
- // hung up, etc). In other words, this should probably be
- // expected and maybe just logged.
- //
+ session->shutdown(mSessionListenerPrx, new ResponseCode);
+ removed->add(session->getSession());
}
}
+ QueuedTasks tasks;
+ tasks.push_back(new RemoveSessionsNotify(mListeners, removed));
+ tasks.push_back(new GenericAMDCallback<AMD_Bridge_removeSessionsPtr>(callback, removed));
+ tasks.push_back(new CheckShutdown(this, mPrx));
+ ExecutorPtr runner(new Executor(tasks, mLogger));
+ runner->start();
}
-
- if(removedSessions.size() != 0)
+ catch (const std::exception& ex)
{
- mListeners->sessionsRemoved(removedSessions);
+ callback->ice_exception(ex);
}
- //
- // TODO: Should be policy driven.
- //
- if(mSessions.size() < 2)
+ catch (...)
{
- spawnShutdown();
+ callback->ice_exception();
}
}
-AsteriskSCF::SessionCommunications::V1::SessionSeq AsteriskSCF::BridgeService::BridgeImpl::listSessions(
- const Ice::Current& current)
+SessionSeq BridgeImpl::listSessions(const Ice::Current& current)
{
- lg(Debug) << __FUNCTION__ << ":" << current.adapter->getCommunicator()->identityToString(current.id) ;
- boost::shared_lock<boost::shared_mutex> lock(mLock);
+ mLogger(Debug) << FUNLOG << ":" << objectIdFromCurrent(current);
statePreCheck();
- lg(Debug) << __FUNCTION__ << ": working with " << mSessions.size() << " sessions." ;
- AsteriskSCF::SessionCommunications::V1::SessionSeq result;
- for(std::vector<AsteriskSCF::BridgeService::BridgeImpl::BridgeSessionPtr>::const_iterator i = mSessions.begin(); i != mSessions.end();
- ++i)
- {
- result.push_back((*i)->getSession());
- }
- return result;
+ return mSessions->getSessionSeq();
}
-void AsteriskSCF::BridgeService::BridgeImpl::shutdown(const Ice::Current& current)
+void BridgeImpl::shutdown(const Ice::Current& current)
{
//
+ // In an effort to allow some consistency with replicas, the shutdown operation is broken into
+ // two parts. In an asynchronous version, this would probably be a couple of queued tasks.
+ //
+
+ //
// When shutting down, the bridge makes a copy of its current state and unlocks, proceeding with
// no other internal locks.
//
- lg(Debug) << __FUNCTION__ << ":" << current.adapter->getCommunicator()->identityToString(current.id) ;
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- if(mState == ShuttingDown)
- {
- lg(Debug) << __FUNCTION__ << ": called when shutting down." ;
- return;
- }
- if(mState == Destroyed)
- {
- lg(Debug) << __FUNCTION__ << ": called when destroyed." ;
- throw Ice::ObjectNotExistException(__FILE__, __LINE__);
- }
- mState = ShuttingDown;
+ mLogger(Debug) << FUNLOG << ":" << objectIdFromCurrent(current);
- mListeners->stopping();
+ BridgeStateItemPtr update;
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ if (mState->runningState == ShuttingDown)
+ {
+ mLogger(Debug) << FUNLOG << ": called when shutting down." ;
+ return;
+ }
+ if (mState->runningState == Destroyed)
+ {
+ mLogger(Debug) << FUNLOG << ": called when destroyed." ;
+ throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+ }
+ mState->runningState = ShuttingDown;
- //
- // TODO: Response code for termination messages for bridges shutting down should come from configuration
- //
- if(mSessions.size() > 0)
+ mListeners->stopping();
+ update = createUpdate();
+ }
+ pushUpdate(update);
+ update = 0;
{
- std::for_each(mSessions.begin(), mSessions.end(),
- AsteriskSCF::BridgeService::ShutdownImpl(mSessionListenerPrx,
- new AsteriskSCF::SessionCommunications::V1::ResponseCode));
+ ShutdownSessionOperation shutdownOp(mSessionListenerPrx, new ResponseCode, mLogger);
+ mSessions->visitSessions(shutdownOp);
+ mListeners->stopped();
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mLogger(Info) << objectIdFromCurrent(current) << ": is shutdown." ;
+ mState->runningState = Destroyed;
+ //
+ // Remove references to the session listener implementation.
+ //
+ update = createUpdate();
+ mObjAdapter->remove(mSessionListenerPrx->ice_getIdentity());
+ mSessionListener = 0;
}
-
- lg(Info) << current.adapter->getCommunicator()->identityToString(current.id) << ": is shutdown." ;
- mListeners->stopped();
- mState = Destroyed;
-
- //
- // Remove references to the session listener implementation.
- //
- mObjAdapter->remove(mSessionListenerPrx->ice_getIdentity());
- mSessionListener = 0;
+ mSessions = 0;
+ pushUpdate(update);
}
-void AsteriskSCF::BridgeService::BridgeImpl::destroy(const Ice::Current& current)
+void BridgeImpl::destroy(const Ice::Current& current)
{
+ mLogger(Debug) << FUNLOG << ":" << objectIdFromCurrent(current);
+ BridgeStateItemPtr update;
{
- lg(Debug) << __FUNCTION__ << ":" << current.adapter->getCommunicator()->identityToString(current.id) ;
boost::unique_lock<boost::shared_mutex> lock(mLock);
- if(mState == ShuttingDown)
+ if (mState->runningState == ShuttingDown)
{
- lg(Debug) << __FUNCTION__ << ": called when shutting down." ;
+ mLogger(Debug) << FUNLOG << ": called when shutting down." ;
throw AsteriskSCF::System::Component::V1::ShuttingDown();
}
- if(mState == Destroyed)
+ if (mState->runningState == Destroyed)
{
- lg(Debug) << __FUNCTION__ << ": called when destroyed." ;
+ mLogger(Debug) << FUNLOG << ": called when destroyed." ;
throw Ice::ObjectNotExistException(__FILE__, __LINE__);
}
- mState = Destroyed;
- lg(Info) << current.adapter->getCommunicator()->identityToString(current.id) << ": is now destroyed." ;
+ mState->runningState = Destroyed;
+ mLogger(Info) << objectIdFromCurrent(current) << ": is now destroyed." ;
mListeners->stopped();
mSessionListener = 0;
-
- //
- // Remove references to the session listener implementation.
- //
- mObjAdapter->remove(mSessionListenerPrx->ice_getIdentity());
+ update = createUpdate();
}
+ pushUpdate(update);
//
- // The bridge manager removes us from the object adapter on reaping.
+ // Remove references to the session listener implementation.
//
+ mObjAdapter->remove(mSessionListenerPrx->ice_getIdentity());
}
-void AsteriskSCF::BridgeService::BridgeImpl::addListener(
- const AsteriskSCF::SessionCommunications::V1::BridgeListenerPrx& listener,
- const Ice::Current&)
-{
- mListeners->addListener(listener);
-}
-
-void AsteriskSCF::BridgeService::BridgeImpl::removeListener(
- const AsteriskSCF::SessionCommunications::V1::BridgeListenerPrx& listener,
- const Ice::Current&)
-{
- mListeners->removeListener(listener);
-}
-
-void AsteriskSCF::BridgeService::BridgeImpl::replaceSession(
- const AsteriskSCF::SessionCommunications::V1::SessionPrx& oldSession,
- const AsteriskSCF::SessionCommunications::V1::SessionSeq& newSessions,
- const Ice::Current& current)
+void BridgeImpl::addListener(const BridgeListenerPrx& listener, const Ice::Current& current)
{
- lg(Debug) << __FUNCTION__ << ":" << current.adapter->getCommunicator()->identityToString(current.id) ;
- BridgeSessionPtr toRemove;
- std::vector<BridgeSessionPtr> newMembers;
-
- checkSessions(newSessions);
+ mLogger(Debug) << FUNLOG << ":" << objectIdFromCurrent(current) << " with bridge listener: " <<
+ (listener ? "(nil)" : mObjAdapter->getCommunicator()->identityToString(listener->ice_getIdentity()));
+ if (!listener)
{
//
- // TODO:Need to find a way to make this as exception safe as possible!
+ // TODO should probably throw an exception here.
//
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- statePreCheck();
+ return;
+ }
- std::vector<BridgeSessionPtr>::iterator i =
- find_if(mSessions.begin(), mSessions.end(), AsteriskSCF::BridgeService::FindImpl(oldSession));
+ //
+ // Careful about ordering lest short-circuit become an issue
+ //
+ if (mListeners->addListener(listener) && replicate())
+ {
+ ReplicatedStateItemSeq seq;
+ seq.push_back(createFirstListenerUpdate(listener));
+ mReplicator->setState(seq);
+ }
+}
+void BridgeImpl::removeListener(const BridgeListenerPrx& listener, const Ice::Current& current)
+{
+ mLogger(Debug) << FUNLOG << ":" << objectIdFromCurrent(current) << " with bridge listener: " <<
+ (listener ? "(nil)" : mObjAdapter->getCommunicator()->identityToString(listener->ice_getIdentity()));
+ if (!listener)
+ {
//
- // We need to disconnect the media while the bridge is locked because the media splicer does not currently
- // have any mutex protection of its own.
- //
- if(i != mSessions.end())
- {
- lg(Debug) << __FUNCTION__ << ": found session to replace : " << oldSession->ice_toString();
- toRemove = *i;
- toRemove->disconnect();
- mSessions.erase(i);
- }
- //
- // We don't do anything else with the new members just yet. We are
- // going to release the lock and continue with things.
+ // TODO should probably throw an exception here.
//
+ return;
}
- std::vector<AsteriskSCF::SessionCommunications::V1::SessionInfoPtr> infoSeq;
- for(AsteriskSCF::SessionCommunications::V1::SessionSeq::const_iterator i = newSessions.begin();
- i != newSessions.end(); ++i)
+ if (mListeners->removeListener(listener))
{
- try
+ string key = mState->key + ".listener." +
+ mObjAdapter->getCommunicator()->identityToString(listener->ice_getIdentity());
+ if (replicate())
{
- RetryPolicy policy(5, 500);
- //
- // canRetry should never return false since we throw ourselves out of this loop. But
- // we'll do it here in case we decide to do something else.
- //
- while(policy.canRetry())
+ try
{
- try
- {
- infoSeq.push_back((*i)->setBridge(mPrx, mSessionListenerPrx));
- newMembers.push_back(new BridgeSession(*i, mSplicer.connect(*i), false));
- break;
- }
- catch(const Ice::ConnectionLostException&)
- {
- if(!policy.retry())
- {
- throw;
- }
- }
+ Ice::StringSeq keys;
+ keys.push_back(key);
+ mReplicator->removeState(keys);
+ }
+ catch (const std::exception& ex)
+ {
+ mLogger(Error) << "call to remove state item " << key << " failed with exception " << ex.what();
}
- }
- catch(const Ice::Exception& ex)
- {
- //
- // We need to continue if setBridge fails for some reason. Rolling
- // back the other sessions would be difficult. The bridge does not
- // know enough to try for system wide consistency. An OTS would
- // really be required if things like replaceSessions() were to be
- // atomic.
- //
- lg(Info) << __FUNCTION__ << ": setting the bridge on " << *i << " threw " << ex.what();
}
}
- assert(infoSeq.size() == newMembers.size());
+}
+void BridgeImpl::replaceSession_async(const AMD_Bridge_replaceSessionPtr& callback, const SessionPrx& sessionToReplace,
+ const SessionSeq& newSessions, const Ice::Current& current)
+{
+ try
{
+ mLogger(Debug) << FUNLOG << ":" << objectIdFromCurrent(current);
+
+ checkSessions(newSessions);
+ statePreCheck();
+
+ SessionWrapperPtr session = mSessions->getSession(sessionToReplace);
//
- // Now that is all over with, let's add them to the bridge's list.
+ // If the session did not exist on this bridge, then this operation should not proceed.
//
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- for(std::vector<BridgeSessionPtr>::iterator j = newMembers.begin(); j != newMembers.end() ; ++j)
+ if (!session)
{
- mSessions.push_back(*j);
+ throw SessionNotFound(sessionToReplace);
}
- }
-
- //
- // The call on the session to disassociate it from the bridge is deferred
- // until after the lock on the bridge has been released.
- //
- if(toRemove)
- {
- RetryPolicy policy(5, 500);
//
- // canRetry should never return false since we throw ourselves out of this loop. But
- // we'll do it here in case we decide to do something else.
+ // Shutdown is inheritently asynchronous (see SessionWrapper::shutdown())
//
- while(policy.canRetry())
- {
- try
- {
- toRemove->getSession()->removeBridge(mSessionListenerPrx);
- break;
- }
- catch(const AsteriskSCF::SessionCommunications::V1::NotBridged&)
- {
- lg(Info) << __FUNCTION__ <<
- ": removeBridge on session being replaced threw a `NotBridged' exception";
- break;
- }
- catch(const Ice::ConnectionLostException&)
- {
- if(!policy.retry())
- {
- throw;
- }
- }
- catch(const Ice::Exception& ex)
- {
- lg(Info) << __FUNCTION__ << ": removeBridge resulted in : " << ex.what();
- break;
- }
- }
+ SessionsTrackerPtr removeTracker(new SessionsTracker);
+ removeTracker->add(session->getSession());
+ session->shutdown(mSessionListenerPrx, new ResponseCode);
+
+ SessionsTrackerPtr tracker(new SessionsTracker);
+ QueuedTasks tasks;
+ tasks.push_back(new RemoveSessionsNotify(mListeners, removeTracker));
+ tasks.push_back(new SetBridgeTask(mSessions, mPrx, mSessionListenerPrx, newSessions, tracker));
+ tasks.push_back(new AddToListeners(mListeners, tracker));
+ tasks.push_back(new GenericAMDCallback<AMD_Bridge_replaceSessionPtr>(callback, tracker));
+ tasks.push_back(new UpdateTask(this));
+ ExecutorPtr executor(new Executor(tasks, mLogger));
+ executor->start();
}
-
- //
- // Now update the listeners.
- //
- AsteriskSCF::SessionCommunications::V1::SessionSeq sessions(newSessions);
- mListeners->sessionsAdded(sessions);
- if(toRemove)
+ catch (const std::exception& ex)
{
- sessions.clear();
- sessions.push_back(oldSession);
- mListeners->sessionsRemoved(sessions);
+ callback->ice_exception(ex);
+ }
+ catch (...)
+ {
+ callback->ice_exception();
}
}
-bool AsteriskSCF::BridgeService::BridgeImpl::destroyed()
+bool BridgeImpl::destroyed()
{
boost::shared_lock<boost::shared_mutex> lock(mLock);
- lg(Debug) << __FUNCTION__ << ": " << (mState == Destroyed ? "yes, I am destroyed." : "no, I am not destroyed") ;
- return mState == Destroyed;
+ mLogger(Debug) << FUNLOG << ": " << (mState->runningState == Destroyed ? "yes, I am destroyed." : "no, I am not destroyed") ;
+ return (mState->runningState == Destroyed);
}
-void AsteriskSCF::BridgeService::BridgeImpl::destroyImpl()
+void BridgeImpl::destroyImpl()
{
- boost::shared_lock<boost::shared_mutex> lock(mLock);
try
{
- mObjAdapter->remove(mPrx->ice_getIdentity());
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mState->runningState = Destroyed;
+ mSessions = 0;
+
+ if (mPrx)
+ {
+ mObjAdapter->remove(mPrx->ice_getIdentity());
+ }
}
- catch(const Ice::Exception&)
+ catch (const Ice::Exception&)
{
}
-}
-void AsteriskSCF::BridgeService::BridgeImpl::sessionConnected(
- const AsteriskSCF::SessionCommunications::V1::SessionPrx& session)
-{
- lg(Debug) << __FUNCTION__ << ": session connected " << session->ice_toString() ;
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- std::vector<BridgeSessionPtr>::iterator i =
- find_if(mSessions.begin(), mSessions.end(), AsteriskSCF::BridgeService::FindImpl(session));
-#ifndef _NDEBUG
- if(i == mSessions.end())
+ try
{
- lg(Debug) << __FUNCTION__ << ": did not find " << session->ice_toString();
+ if (replicate())
+ {
+ Ice::StringSeq keys;
+ keys.push_back(mState->key);
+ mReplicator->removeState(keys);
+ }
}
-#endif
- if(i != mSessions.end())
+ catch (const Ice::Exception&)
{
- (*i)->setConnected();
- (*i)->setConnector(mSplicer.connect(session));
}
}
-size_t AsteriskSCF::BridgeService::BridgeImpl::sessionStopped(
- const AsteriskSCF::SessionCommunications::V1::SessionPrx& session,
- const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr&)
+void BridgeImpl::shutdownImpl(const Ice::Current& current)
{
- lg(Debug) << __FUNCTION__ << ": session terminated from " << session->ice_toString() ;
- try
- {
- session->removeBridge(mSessionListenerPrx);
- }
- catch(Ice::ObjectNotExistException& ex)
- {
- lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
- }
- catch(Ice::Exception& ex)
+ shutdown(current);
+}
+
+void BridgeImpl::activate(const BridgePrx& proxy)
+{
+ mPrx = proxy;
+ string listenerId = mObjAdapter->getCommunicator()->identityToString(mPrx->ice_getIdentity());
+ mLogger(Debug) << FUNLOG << " : activating with " << listenerId;
+ mState->key = listenerId;
+ mState->bridgeId = listenerId;
+ listenerId += ".sessionListener";
+ mActivated = true;
+ mSessionListenerPrx =
+ SessionListenerPrx::uncheckedCast(
+ mObjAdapter->add(mSessionListener, mObjAdapter->getCommunicator()->stringToIdentity(listenerId))
+ );
+
+ ReplicatedStateItemSeq initialUpdates;
+ BridgeStateItemPtr update = createUpdate();
+ initialUpdates.push_back(update);
+
+ vector<BridgeListenerPrx> listeners = mListeners->getListeners();
+ for (vector<BridgeListenerPrx>::iterator i = listeners.begin(); i != listeners.end(); ++i)
{
- lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
- throw;
+ initialUpdates.push_back(createFirstListenerUpdate(*i));
}
+
+ pushUpdates(initialUpdates);
+}
- BridgeSessionPtr b;
- size_t sizeAfter;
+void BridgeImpl::updateState(const BridgeStateItemPtr& state)
+{
+ mLogger(Debug) << FUNLOG;
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ //
+ // We perform a deep copy because there are no guarantees about the thread safety of the memory
+ // pointed to be "state" over time. We could "say" that the call acquires ownership, but its
+ // safer to take the added cost of the copy.
+ //
+ *mState = *state;
+}
+
+void BridgeImpl::addListener(const BridgeListenerStateItemPtr& update)
+{
+ mListeners->addListener(update->listener);
+}
+
+void BridgeImpl::removeListener(const BridgeListenerStateItemPtr& update)
+{
+ mListeners->removeListener(update->listener);
+}
+
+void BridgeImpl::activate()
+{
+ mLogger(Debug) << FUNLOG;
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mActivated = true;
+ mPrx = BridgePrx::uncheckedCast(mObjAdapter->add(this, mObjAdapter->getCommunicator()->stringToIdentity(mState->key)));
+ string listenerId = mState->key + ".sessionListener";
+ mSessionListenerPrx = SessionListenerPrx::uncheckedCast(
+ mObjAdapter->add(mSessionListener, mObjAdapter->getCommunicator()->stringToIdentity(listenerId)));
+ //
+ // Now this replica should be ready to go!
+ //
+}
+
+string BridgeImpl::id()
+{
+ return mState->bridgeId;
+}
+
+SessionCollectionPtr BridgeImpl::sessions()
+{
+ return mSessions;
+}
+
+void BridgeImpl::forceUpdate()
+{
+ BridgeStateItemPtr update;
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
- std::vector<BridgeSessionPtr>::iterator i =
- find_if(mSessions.begin(), mSessions.end(), AsteriskSCF::BridgeService::FindImpl(session));
- if(i != mSessions.end())
- {
- b = *i;
- mSessions.erase(i);
- }
- sizeAfter = mSessions.size();
- }
- if(b)
- {
- b->disconnect();
+ update = createUpdate();
}
+ pushUpdate(update);
+}
- return sizeAfter;
+void BridgeImpl::getAddSessionsTasks(QueuedTasks& tasks,
+ const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions)
+{
+ SessionsTrackerPtr tracker(new SessionsTracker);
+ tasks.push_back(new SetBridgeTask(mSessions, mPrx, mSessionListenerPrx, sessions, tracker));
+ tasks.push_back(new AddToListeners(mListeners, tracker));
+ tasks.push_back(new UpdateTask(this));
}
-void AsteriskSCF::BridgeService::BridgeImpl::statePreCheck()
+void BridgeImpl::statePreCheck()
{
- if(mState == ShuttingDown)
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+
+ if (mState->runningState == ShuttingDown)
{
- lg(Debug) << __FUNCTION__ << ": called when shutting down." ;
+ mLogger(Debug) << FUNLOG << ": called when shutting down." ;
throw AsteriskSCF::System::Component::V1::ShuttingDown();
}
- if(mState == Destroyed)
+ if (mState->runningState == Destroyed)
{
- lg(Debug) << __FUNCTION__ << ": called when destroyed." ;
+ mLogger(Debug) << FUNLOG << ": called when destroyed." ;
throw Ice::ObjectNotExistException(__FILE__, __LINE__);
}
}
-namespace
+BridgeStateItemPtr BridgeImpl::createUpdate()
{
-class ShutdownThread : public IceUtil::Thread
-{
-public:
- ShutdownThread(const AsteriskSCF::SessionCommunications::V1::BridgePrx& bridge) :
- mBridge(bridge)
+ ++mState->serial;
+
+ //
+ // There is no point in going through the cost of the copy if it is not going to be replicated. There is a slight
+ // race condition with pushUpdate(s) and the replicator "appearing", in which case the update wil just go out the
+ // next time the state changes.
+ //
+ if (replicate())
{
+ BridgeStateItemPtr result = new BridgeStateItem(*mState.get());
+ return result;
}
+ return 0;
+}
- void run()
+void BridgeImpl::pushUpdate(const BridgeStateItemPtr& update)
+{
+ if (update && replicate())
{
- try
- {
- mBridge->shutdown();
- }
- catch(...)
- {
- }
+ ReplicatedStateItemSeq seq;
+ seq.push_back(update);
+ pushUpdates(seq);
}
-private:
- AsteriskSCF::SessionCommunications::V1::BridgePrx mBridge;
-};
}
-void AsteriskSCF::BridgeService::BridgeImpl::spawnShutdown()
+void BridgeImpl::pushUpdates(const ReplicatedStateItemSeq& update)
{
- boost::shared_lock<boost::shared_mutex> lock(mLock);
- if(!mShutdownThread)
+ if (!update.empty() && replicate())
{
- mShutdownThread = new ShutdownThread(mPrx);
- mShutdownThread->start();
+ mReplicator->setState(update);
}
}
-std::vector<AsteriskSCF::BridgeService::BridgeImpl::BridgeSessionPtr>
-AsteriskSCF::BridgeService::BridgeImpl::currentSessions()
+BridgeListenerStateItemPtr BridgeImpl::createFirstListenerUpdate(const BridgeListenerPrx& listener)
{
- boost::shared_lock<boost::shared_mutex> lock(mLock);
- return mSessions;
+ BridgeListenerStateItemPtr listenerUpdate(new BridgeListenerStateItem);
+ listenerUpdate->key = mState->key + ".listener." + mObjAdapter->getCommunicator()->identityToString(listener->ice_getIdentity());
+ listenerUpdate->serial = SerialCounterStart;
+ listenerUpdate->listener = listener;
+ listenerUpdate->bridgeId = mState->key;
+ return listenerUpdate;
+}
+
+IceUtil::Handle<AsteriskSCF::BridgeService::BridgeServant>
+AsteriskSCF::BridgeService::BridgeServant::create(const string& name, const Ice::ObjectAdapterPtr& objectAdapter,
+ const vector<BridgeListenerPrx>& listeners,
+ const AsteriskSCF::BridgeService::BridgeListenerMgrPtr& listenerMgr,
+ const ReplicatorSmartPrx& replicator,
+ const Logger& logger)
+{
+ BridgeStateItemPtr state(new AsteriskSCF::Bridge::V1::BridgeStateItem);
+ state->runningState = Running;
+ state->serial = SerialCounterStart;
+ //
+ // TODO: "replicate" is the only replication policy currently supported by the bridge service.
+ // In the future it may be possible for the bridge replica to reconstruct its media operations
+ // allowing localized resources to be used.
+ //
+ state->mediaReplicationPolicy = Replicate;
+ return new BridgeImpl(name, objectAdapter, listeners, listenerMgr, replicator, state, logger);
+}
+
+IceUtil::Handle<AsteriskSCF::BridgeService::BridgeServant>
+AsteriskSCF::BridgeService::BridgeServant::create(const Ice::ObjectAdapterPtr& objectAdapter,
+ const AsteriskSCF::BridgeService::BridgeListenerMgrPtr& listenerMgr,
+ const ReplicatorSmartPrx& replicator,
+ const Logger& logger,
+ const AsteriskSCF::Bridge::V1::BridgeStateItemPtr& state)
+{
+ logger(Debug) << FUNLOG << ": creating replica for " << state->bridgeId;
+ IceUtil::Handle<AsteriskSCF::BridgeService::BridgeServant> bridge(
+ new BridgeImpl(state->bridgeId, objectAdapter, vector<BridgeListenerPrx>(),
+ listenerMgr, replicator, state, logger));
... 7533 lines suppressed ...
--
asterisk-scf/release/bridging.git
More information about the asterisk-scf-commits
mailing list