[asterisk-scf-commits] asterisk-scf/integration/bridging.git branch "retry_deux" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Wed Apr 11 09:53:11 CDT 2012


branch "retry_deux" has been updated
       via  9b62490fe79e51d4fb804c0ccbe8ce75875ba0df (commit)
      from  439e365875ee9565130a14898529fe895b84785e (commit)

Summary of changes:
 .../BridgeService/BridgeReplicatorIf.ice           |    3 +
 src/BridgeImpl.cpp                                 |   57 ++++-
 src/BridgeImpl.h                                   |   12 +-
 src/BridgeManagerImpl.cpp                          |  238 ++++++++++++++------
 src/BridgeManagerImpl.h                            |    3 +
 src/BridgeReplicatorStateListenerI.cpp             |   32 +++-
 src/BridgeServiceConfig.h                          |   10 +
 src/SessionOperations.cpp                          |   30 ++-
 test/CMakeLists.txt                                |   30 +++
 9 files changed, 321 insertions(+), 94 deletions(-)


- Log -----------------------------------------------------------------
commit 9b62490fe79e51d4fb804c0ccbe8ce75875ba0df
Author: Brent Eagles <beagles at digium.com>
Date:   Wed Apr 11 12:19:31 2012 -0230

    Lots of replication/retry consistency implementation changes. The most
    direct way to ensure that complex operations don't lead to badness is to
    regenerate downstream context codes using a set strategy based on available
    information. If other components are implemented properly, you should be
    able to re-invoke operations all over the place, allowing partially
    completed complex operations to automatically "re-heal". There are some
    deeper operations that need to be revisited as well I think.
    
    Some context cache uses were modified to exploit recent changes to the
    operation monitor and associated classes.
    
    Some unit tests that need to be revisited... it was their behavior and
    stepping through the code that triggered most of the changes in this
    commit.

diff --git a/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice b/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice
index 8bc215a..29d974f 100644
--- a/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice
+++ b/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice
@@ -192,6 +192,7 @@ sequence<BridgedSession> BridgedSessionSeq;
  **/
 enum ServiceState
 {
+    Starting,
     Running,
     Paused,
     ShuttingDown,
@@ -256,6 +257,8 @@ class BridgeStateItem extends ReplicatedStateItem
      * its own snapshot of applicable hooks. 
      */
     PartyIdHooks partyIdHookSet;
+
+    AsteriskSCF::System::V1::OperationContext originatingContext;
 };
 
 class BridgeListenerStateItem extends ReplicatedStateItem
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index 647f23a..59048ee 100755
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -188,7 +188,7 @@ public:
     bool destroyed();
     void destroyImpl();
     void shutdownImpl(const Ice::Current& current);
-    void activate(const BridgePrx& proxy, const std::string& id);
+    void finishSetup(const BridgePrx& proxy, const std::string& id);
 
     void updateState(const BridgeStateItemPtr& state);
     void addListener(const BridgeListenerStateItemPtr& update);
@@ -2355,7 +2355,8 @@ BridgeCookies BridgeImpl::getCookies(const BridgeCookies& cookies, const Ice::Cu
 void BridgeImpl::setCookiesImpl(const BridgeCookies& cookies)
 {
     // 
-    // This is only used for initial setting of cookies and will not be used after the object has been activated.
+    // This is only used for initial setting of cookies and will not be used after a proxy to the object has
+    // been returned.
     //
     for (BridgeCookies::const_iterator i = cookies.begin(); i != cookies.end(); ++i)
     {
@@ -2418,7 +2419,7 @@ void BridgeImpl::shutdownImpl(const Ice::Current& current)
     shutdown(AsteriskSCF::Operations::createContext(), current);
 }
 
-void BridgeImpl::activate(const BridgePrx& proxy, const std::string& bridgeId)
+void BridgeImpl::finishSetup(const BridgePrx& proxy, const std::string& bridgeId)
 {
     mPrx = proxy;
     mState->publishedBridge = proxy;
@@ -2476,7 +2477,6 @@ void BridgeImpl::activate()
     //
     // XXX this is not correct with extension points... this could be a decorator.
     //
-
     mPrx = mState->publishedBridge;
     BridgePrx actualPrx = BridgePrx::uncheckedCast(mObjAdapter->add(this, mObjAdapter->getCommunicator()->stringToIdentity(mState->key)));
 
@@ -2517,15 +2517,46 @@ void BridgeImpl::getAddSessionsTasks(QueuedTasks& tasks,
 {
     SessionsTrackerPtr tracker(new SessionsTracker);
     SessionWrapperPtr sourceSessionWrapper = getSessions()->getSession(source);
-    tasks.push_back(new UnplugMedia(context, this));
-    tasks.push_back(new SetBridgeTask(context, mSessions, mPrx, mSessionListenerPrx, sessions, tracker));
-    tasks.push_back(new AddToListeners(context, mListeners, tracker, getCookies()));
-    tasks.push_back(new SetAndGetSessionControllerTask(context, mObjAdapter, mSessions, sessions, this, mLogger));
-    tasks.push_back(new ForwardCallerIDTask(context, this, source, callerID, mLogger));
-    tasks.push_back(new ForwardRedirectionsUpdatedTask(context, this, source, redirects, mLogger));
-    tasks.push_back(new SetupMedia(context, this));
-    tasks.push_back(new ConnectTelephonyEventsTask(context, this, sessions, mLogger));
-    tasks.push_back(new UpdateTask(context, this));
+    //
+    // FAILOVER: You don't want this to happen after failover.
+    //
+    tasks.push_back(new UnplugMedia(addContextSuffix(context, ".UnplugMedia"), this));
+    //
+    // FAILOVER: This probably isn't so bad.
+    //
+    tasks.push_back(new SetBridgeTask(addContextSuffix(context, ".SetBridgeTask"),
+                mSessions, mPrx, mSessionListenerPrx, sessions, tracker));
+    //
+    // FAILOVER: Also not so bad as long as duplicates are handled properly.
+    //
+    tasks.push_back(new AddToListeners(addContextSuffix(context, ".AddToListeners"), mListeners, tracker, getCookies()));
+    //
+    // FAILOVER: This might have side-effects. Might be best not to redo it.
+    //
+    tasks.push_back(new SetAndGetSessionControllerTask(addContextSuffix(context, ".SetAndGetSessionController"),
+            mObjAdapter, mSessions, sessions, this, mLogger));
+    //
+    // FAILOVER: Avoid redoing.
+    //
+    tasks.push_back(new ForwardCallerIDTask(addContextSuffix(context, ".ForwardCallerIDTask"), this, source, callerID, mLogger));
+    //
+    // FAILOVER: Avoid redoing.
+    //
+    tasks.push_back(new ForwardRedirectionsUpdatedTask(addContextSuffix(context, ".ForwardRedirectionsUpdateTask"),
+            this, source, redirects, mLogger));
+    //
+    // FAILOVER: Avoid redoing.
+    //
+    tasks.push_back(new SetupMedia(addContextSuffix(context, ".SetupMedia"), this));
+    //
+    // FAILOVER: Avoid redoing.
+    //
+    tasks.push_back(new ConnectTelephonyEventsTask(addContextSuffix(context, ".ConnectTelephonyEventsTask"),
+            this, sessions, mLogger));
+    //
+    // FAILOVER: No point in doing this one again.
+    //
+    tasks.push_back(new UpdateTask(addContextSuffix(context, ".UpdateTask"), this));
 }
 
 void BridgeImpl::statePreCheck()
diff --git a/src/BridgeImpl.h b/src/BridgeImpl.h
index 032592c..afa2205 100644
--- a/src/BridgeImpl.h
+++ b/src/BridgeImpl.h
@@ -85,14 +85,14 @@ public:
     /**
      *
      * A BridgeServant instance isn't ready to be used until it has been
-     * activated and handed a proxy that identifies it. In a sense, bridge
-     * servants are without identity until this has been called. Care should
-     * be taken to make sure this is called before any remote procedure calls
-     * can reasonably be made on the servant. This MUST ONLY BE CALLED
-     * ONCE per instance and is not thread safe.
+     * completely setup and handed a proxy that identifies it. In a sense,
+     * bridge servants are without identity until this has been
+     * called. Care should be taken to make sure this is called before any
+     * remote procedure calls can reasonably be made on the servant. This
+     * MUST ONLY BE CALLED ONCE per instance and is not thread safe.
      *
      **/
-    virtual void activate(const AsteriskSCF::SessionCommunications::V1::BridgePrx& proxy, 
+    virtual void finishSetup(const AsteriskSCF::SessionCommunications::V1::BridgePrx& proxy, 
         const std::string& id) = 0;
 
     /**
diff --git a/src/BridgeManagerImpl.cpp b/src/BridgeManagerImpl.cpp
index 98cac33..508aca5 100755
--- a/src/BridgeManagerImpl.cpp
+++ b/src/BridgeManagerImpl.cpp
@@ -113,6 +113,8 @@ public:
         return mName;
     }
 
+    void newCreateBridgeContext(const ContextStateItemPtr& contextItem);
+
     void createBridgeReplica(const BridgeStateItemPtr& bridgeState);
 
     void removeBridge(const string& bridgeState);
@@ -250,20 +252,60 @@ void BridgeManagerImpl::createBridge_async(const AMD_BridgeManager_createBridgeP
     const BridgeListenerPrx& listener, const CallerPtr& callerID, const RedirectionsPtr& redirects,
     const Ice::Current& current)
 {
-    CreateBridgeContextDataPtr data(
-        getContext<CreateBridgeContextDataPtr,
-                   AMD_BridgeManager_createBridgePtr>(mOperationCache, context, callback));
+    std::pair<bool, CreateBridgeContextDataPtr> data(
+        getContextSync<CreateBridgeContextDataPtr>(mOperationCache, context));
 
-    if (data)
+    //
+    // If there is a cache entry and the callback count is 0, that means that the context
+    // cookie was inserted via replication. 
+    // 
+    bool replicaRetry = data.first && (data.second->callbackCount() == 0);
+
+    //
+    // If addCB returns true, then the operation had already completed and we can simply return.
+    //
+    if (data.second->addCB(callback))
+    {
+        return;
+    }
+
+    //
+    // If we've figured that we are replica retry or we weren't in the
+    // cache, then we need to run through this stuff again.
+    //
+    // NOTE: The code
+    // for creating a bridge must generate identical object ids etc for
+    // objects that are sent to downstream objects. If we failed over
+    // before the creation of the bridge servant on the original instance
+    // then we are going to end up calling on other objects again. If ids,
+    // arguments, etc don't match up we could cause downstream objects a
+    // fair amount of grief.
+    //
+    if (replicaRetry || !data.first)
     {
         //
         // At this point, we should be the first and only thread for the
         // specific context.
         //
-        AsteriskSCF::Replication::BridgeService::V1::ContextStateItemPtr contextState(
-            new AsteriskSCF::Replication::BridgeService::V1::ContextStateItem);
-        contextState->code = AsteriskSCF::Replication::BridgeService::V1::CreateBridge;
-        contextState->operationContext = context;
+        AsteriskSCF::Replication::BridgeService::V1::ContextStateItemPtr contextState;
+        if (!replicaRetry)
+        {
+            contextState = new AsteriskSCF::Replication::BridgeService::V1::ContextStateItem;
+            contextState->key = context->id + context->transactionId;
+            contextState->serial = AsteriskSCF::Replication::BridgeService::V1::SerialCounterStart;
+            contextState->code = AsteriskSCF::Replication::BridgeService::V1::CreateBridge;
+            contextState->operationContext = context;
+            if (mReplicationContext->isReplicating())
+            {
+                ReplicatedStateItemSeq seq;
+                seq.push_back(contextState);
+                mReplicationContext->getReplicator()->setState(AsteriskSCF::Operations::createContext(context), seq);
+                //
+                // Now the replicas have a record that this thing has started.
+                //
+            }
+        }
+        
         
         try
         {
@@ -271,30 +313,50 @@ void BridgeManagerImpl::createBridge_async(const AMD_BridgeManager_createBridgeP
             boost::unique_lock<boost::shared_mutex> lock(mLock);
             statePreCheck(BOOST_CURRENT_FUNCTION);
             reap();
+            BridgeInfo info;
             try
             {
-                string stringId = string("bridge.") + IceUtil::generateUUID();
-                Ice::Identity id(mAdapter->getCommunicator()->stringToIdentity(stringId));
-                BridgePrx prx(BridgePrx::uncheckedCast(mAdapter->createProxy(id)));
+                //
+                // If we are getting retried after a failover, we should
+                // skip this block of code.  The bridge servant is already
+                // created. We just need to finish the rest of the setup
+                // tasks. It could result in duplicate calls going to other
+                // objects, but the contexts are created according to a
+                // common protocol so the other objects should recognize
+                // them as duplicates. There are other strategies that
+                // "might" work, but really setting up a bridge is WAY too
+                // complicated and alternate strategies are consequently
+                // also very complicated and hard to prove. This approach
+                // simply relies on everybody behaving as they should.
+                //
+
+                string stringId;
+                BridgePrx prx;
+                SessionSeq initialSessions(sessions);
                 AsteriskSCF::SessionCommunications::V1::BridgeListenerSeq listeners(mState->defaultBridgeListeners);
-                if (listener)
+                if (!replicaRetry)
                 {
-                    listeners.push_back(listener);
+                    stringId = string("bridge.") + IceUtil::generateUUID();
+                    Ice::Identity id(mAdapter->getCommunicator()->stringToIdentity(stringId));
+                    prx = BridgePrx::uncheckedCast(mAdapter->createProxy(id));
+                    if (listener)
+                    {
+                        listeners.push_back(listener);
+                    }
                 }
 
+                
                 //
                 // "sessions" is a const, so we need to make a local non-const copy that can be modified
                 // by the hooks if need be.
                 //
-                SessionSeq initialSessions(sessions);
-
+                    
                 //
                 // We create a local outside of the block that initializes and runs the hooks if they are
                 // present. If this is '0' later on in the code, it signifies the bridge creation extension
                 // point mechanism was not initialized.
                 //
                 BridgeCreationHookDataPtr hookData;
-                BridgeInfo info;
                 info.proxy = prx;
                 if (mCreationExtension)
                 {
@@ -306,7 +368,7 @@ void BridgeManagerImpl::createBridge_async(const AMD_BridgeManager_createBridgeP
                     hookData->listeners = listeners;
                     hookData->initialSessions = sessions;
                     hookData = mCreationExtension->runHooks(hookData);
-
+                        
                     //
                     // Update locals with the result of running the hooks.
                     //
@@ -318,61 +380,86 @@ void BridgeManagerImpl::createBridge_async(const AMD_BridgeManager_createBridgeP
                         dump(mLogger, hookData);
                     }
                 }
-        
-                //
-                // The bridge listener manager is a wrapper/helper class that manages the listeners and
-                // propogates the bridge events. We separate it's instantiation from the construction
-                // of the bridge itself as this is a natural area of refinement and extension.
-                //
-                BridgeListenerMgrPtr mgr(new BridgeListenerMgr(mAdapter->getCommunicator(), stringId, info.decoratingPrx));
-
-                //
-                // Now we can get down to the creation of the bridge servant itself. Note that the 
-                // initialization is still not really complete as we need to set up and cookies,
-                // initial sessions, etc that may have been defined or added as part of the createBridge
-                // call or the bridge creation hooks.
-                //
-                BridgeServantPtr bridge = BridgeServant::create(stringId, mAdapter, listeners, mgr, 
-                    mPartyIdExtensionPoint->getHooks(), mReplicationContext->getReplicator(), mLogger);
-
-                Ice::ObjectPrx obj = mAdapter->add(bridge, id);
 
-                mLogger(Info) << objectIdFromCurrent(current) << ": creating new bridge " << obj->ice_toString() << "." ;
-
-                //
-                // Finish updating BridgeInfo struct.
-                //
-                info.servant = bridge;
-       
-                //
-                // It's very important to note that the bridge servant will not have it's own proxy!
-                // NOTE: This method is probably misnamed and misleading. The bridge servant was added to 
-                // object adapter above. It might be a better idea to move adding the servant to the adapter
-                // into the activate method.
-                // 
-                bridge->activate(info.decoratingPrx, stringId);
-                mBridges.push_back(info);
-
-                if (hookData)
+                BridgeServantPtr bridge;
+                if (!replicaRetry)
+                {            
+                    //
+                    // The bridge listener manager is a wrapper/helper class
+                    // that manages the listeners and propogates the bridge
+                    // events. We separate it's instantiation from the
+                    // construction of the bridge itself as this is a natural
+                    // area of refinement and extension.
+                    //
+                    BridgeListenerMgrPtr mgr(new BridgeListenerMgr(mAdapter->getCommunicator(), stringId, info.decoratingPrx));
+                    
+                    //
+                    // Now we can get down to the creation of the bridge
+                    // servant itself. Note that the initialization is still
+                    // not really complete as we need to set up and cookies,
+                    // initial sessions, etc that may have been defined or
+                    // added as part of the createBridge call or the bridge
+                    // creation hooks.
+                    //
+                    bridge = BridgeServant::create(stringId, mAdapter, listeners, mgr, 
+                        mPartyIdExtensionPoint->getHooks(), mReplicationContext->getReplicator(), mLogger);
+
+                    Ice::Identity id = mAdapter->getCommunicator()->stringToIdentity(stringId);
+                    
+                    Ice::ObjectPrx obj = mAdapter->add(bridge, id);
+                    
+                    mLogger(Info) << objectIdFromCurrent(current) << ": creating new bridge " << obj->ice_toString() << "." ;
+                    
+                    //
+                    // Finish updating BridgeInfo struct.
+                    //
+                    info.servant = bridge;
+                    
+                    //
+                    // The bridge servant needs to be handed the proxy it
+                    // should publish. A direct proxy can be computed at any
+                    // time using the stringId.
+                    // 
+                    if (hookData)
+                    {
+                        bridge->setCookiesImpl(hookData->cookies);
+                    }
+                    bridge->finishSetup(info.decoratingPrx, stringId);
+                    mBridges.push_back(info);
+                }
+                else
                 {
-                    bridge->setCookiesImpl(hookData->cookies);
+                    prx = data.second->getResult();
+                    assert(prx);
+                    stringId = mAdapter->getCommunicator()->identityToString(prx->ice_getIdentity());
+
+                    for (vector<BridgeInfo>::const_iterator iter = mBridges.begin(); iter != mBridges.end() && !bridge ; ++iter)
+                    {
+                        if (iter->proxy->ice_getIdentity() == prx->ice_getIdentity())
+                        {
+                            info = *iter;
+                            bridge = iter->servant;
+                            break;
+                        }
+                    }
+                    assert(bridge);
                 }
 
-                //
-                // There are some finalization tasks that may be performed asynchronously (replication etc.)
-                //
                 QueuedTasks tasks;
-
+                AsteriskSCF::System::V1::OperationContextPtr seedContext =
+                    AsteriskSCF::System::V1::OperationContextPtr::dynamicCast(context->ice_clone());
+                seedContext->id = seedContext->id + "." + stringId;
+                
                 //
                 // If there are some sessions that need to be added to the bridge immediately
                 // upon creation, create some tasks to add to the queue.
                 //
                 if (!initialSessions.empty())
                 {
-                    bridge->getAddSessionsTasks(tasks, context, source, initialSessions, callerID, redirects);
+                    bridge->getAddSessionsTasks(tasks, seedContext, source, initialSessions, callerID, redirects);
                 }
-
-                tasks.push_back(new FinishUp(data->getProxy(), mListeners, info.decoratingPrx));
+                    
+                tasks.push_back(new FinishUp(data.second->getProxy(), mListeners, info.decoratingPrx));
                 ExecutorPtr runner(new Executor(tasks, mLogger));
                 runner->start();
             }
@@ -391,12 +478,12 @@ void BridgeManagerImpl::createBridge_async(const AMD_BridgeManager_createBridgeP
         catch (const std::exception& ex)
         {
             mLogger(Error) << "FUNLOG" << "An unhandled " << ex.what() << " is being rethrown, please contact support.";
-            data->getProxy()->ice_exception(ex);
+            data.second->getProxy()->ice_exception(ex);
         }
         catch (...)
         { 
             assert("If we got here, really bad things have happened!" == 0);
-            data->getProxy()->ice_exception();
+            data.second->getProxy()->ice_exception();
         }
     }
 }      
@@ -768,26 +855,41 @@ void BridgeManagerImpl::activate(const AsteriskSCF::Core::Discovery::V1::Service
     mCreationExtensionPointServicePrx->addLocatorParams(AsteriskSCF::Operations::createContext(), params, "");
 }
 
+void BridgeManagerImpl::newCreateBridgeContext(const ContextStateItemPtr& context)
+{
+    std::pair<bool, CreateBridgeContextDataPtr> data(
+        getContextSync<CreateBridgeContextDataPtr>(mOperationCache, context->operationContext));
+}
+
 void BridgeManagerImpl::createBridgeReplica(const BridgeStateItemPtr& state)
 {
+    std::pair<bool, CreateBridgeContextDataPtr> data(
+        getContextSync<CreateBridgeContextDataPtr>(mOperationCache, state->originatingContext));
+
+    assert(data.second);
     mLogger(Trace) << FUNLOG;
     boost::unique_lock<boost::shared_mutex> lock(mLock);
     Ice::Identity id(mAdapter->getCommunicator()->stringToIdentity(state->bridgeId));
     BridgePrx prx(BridgePrx::uncheckedCast(mAdapter->createProxy(id)));
     BridgeListenerMgrPtr mgr(new BridgeListenerMgr(mAdapter->getCommunicator(), state->bridgeId, prx));
-
-
-    BridgeServantPtr bridge = BridgeServant::create(mAdapter, mgr, 
-        mReplicationContext->getReplicator(), mLogger, state);
+    
+    BridgeServantPtr bridge = BridgeServant::create(mAdapter, mgr, mReplicationContext->getReplicator(), mLogger, state);
     Ice::ObjectPrx obj = mAdapter->add(bridge, id);
-
+    
     mLogger(Info) << ": creating bridge replica " << obj->ice_toString() << "." ;
     BridgeInfo info;
     info.servant = bridge;
     info.proxy = BridgePrx::uncheckedCast(obj);
     mBridges.push_back(info);
-}
 
+    //
+    // While we have a result for any subsequent createBridge_async
+    // requests, we need to make sure that the entire operation
+    // completes. createBridge_async needs to take special care here. 
+    //
+    data.second->setResult(info.proxy);
+}
+    
 void BridgeManagerImpl::removeBridge(const string& bridgeId)
 {
     mLogger(Trace) << FUNLOG;
diff --git a/src/BridgeManagerImpl.h b/src/BridgeManagerImpl.h
index b07fa12..885a51b 100644
--- a/src/BridgeManagerImpl.h
+++ b/src/BridgeManagerImpl.h
@@ -54,6 +54,9 @@ public:
 
     virtual std::string getID() = 0;
 
+    virtual void newCreateBridgeContext(
+        const AsteriskSCF::Replication::BridgeService::V1::ContextStateItemPtr& context) = 0;
+
     virtual void createBridgeReplica(
         const AsteriskSCF::Replication::BridgeService::V1::BridgeStateItemPtr& bridgeState) = 0;
 
diff --git a/src/BridgeReplicatorStateListenerI.cpp b/src/BridgeReplicatorStateListenerI.cpp
index c9300e6..06975bc 100644
--- a/src/BridgeReplicatorStateListenerI.cpp
+++ b/src/BridgeReplicatorStateListenerI.cpp
@@ -92,7 +92,8 @@ public:
                             // Keep the session list clean.
                             //
                             found = true;
-                            removeBridgeItem(bridgedSessionItem->bridgeId, bridgedSessionItem->key, item->ice_id());
+                            removeBridgeItem(bridgedSessionItem->bridgeId, bridgedSessionItem->key,
+                                item->ice_id());
                         }
                         //
                         // We could break here if we could be sure that there were no other updates.
@@ -331,6 +332,30 @@ public:
                 continue;
             }
 
+            ContextStateItemPtr contextItem = ContextStateItemPtr::dynamicCast((*i));
+            if (contextItem)
+            {
+                switch(contextItem->code)
+                {
+                    case CreateBridge:
+                        mManager->newCreateBridgeContext(contextItem);
+                        break;
+                    case AddSession:
+                        break;
+                    case ReplaceSession:
+                        break;
+                    case RemoveSession:
+                        break;
+                    case Shutdown:
+                        break;
+                    default:
+                        mLogger(Error) << "Received a replicated context item with an unknown code, "
+                            "possible implementation mismatch or corrupted data from active component. Please "
+                            "contact support.";
+                }
+                continue;
+            }
+
             mLogger(Error) << "Bridge replicator service received an unrecognized replication item: " << (*i)->ice_id();
         }
     }
@@ -345,6 +370,11 @@ private:
 
 }
 
+//
+// TODO: reverse the order of the arguments for consistency's sake. Nearly
+// every other method in this component puts the logger object at the end
+// of the argument list.
+//
 ReplicatorListenerPtr AsteriskSCF::BridgeService::createStateListener(const Logger& logger,
     const BridgeManagerServantPtr& manager)
 {
diff --git a/src/BridgeServiceConfig.h b/src/BridgeServiceConfig.h
index 3e6b240..b2e95ca 100644
--- a/src/BridgeServiceConfig.h
+++ b/src/BridgeServiceConfig.h
@@ -88,5 +88,15 @@ inline std::string objectIdFromCurrent(const Ice::Current& current)
     return "<na>";
 }
 
+inline AsteriskSCF::System::V1::OperationContextPtr addContextSuffix(
+    const AsteriskSCF::System::V1::OperationContextPtr& sourceContext,
+    const std::string& suffix)
+{
+    AsteriskSCF::System::V1::OperationContextPtr result =
+        AsteriskSCF::System::V1::OperationContextPtr::dynamicCast(sourceContext->ice_clone());
+    result->id = result->id + suffix;
+    return result;
+}
+
 } // End of namespace BridgeService
 } // End of namespace AsteriskSCF
diff --git a/src/SessionOperations.cpp b/src/SessionOperations.cpp
index b1daf91..0fe6ec4 100644
--- a/src/SessionOperations.cpp
+++ b/src/SessionOperations.cpp
@@ -95,7 +95,7 @@ void ShutdownSessionOperation::operator()(const SessionWrapperPtr& wrapper)
 {
     if (mSkipExclude || wrapper->getSession()->ice_getIdentity() != mExclude)
     {
-        wrapper->shutdown(AsteriskSCF::Operations::createContext(), mListener, mResponse);
+        wrapper->shutdown(addContextSuffix(mRootContext, wrapper->id()), mListener, mResponse);
     }
 }
 
@@ -120,7 +120,7 @@ void RelayIndication::operator()(const SessionWrapperPtr& session)
             //
             // TODO: AMI.. or would this be better as a oneway. Do we care if we get a response etc?
             //
-	    s->indicate(AsteriskSCF::Operations::createContext(mRootContext), mIndication);
+	    s->indicate(addContextSuffix(mRootContext, session->id()), mIndication);
         }
         catch (const Ice::ObjectNotExistException& ex)
         {
@@ -165,7 +165,7 @@ void AddStreamsOperation::operator()(const SessionWrapperPtr& session)
 
     // Go ahead and request that the streams be added
     controller->begin_addStreams(
-        AsteriskSCF::Operations::createContext(mRootContext), mStreams,
+        addContextSuffix(mRootContext, session->id()), mStreams,
         newCallback_SessionController_addStreams(this, &AddStreamsOperation::added, &AddStreamsOperation::failed),
                                  session);
 
@@ -245,7 +245,7 @@ void RemoveStreamsOperation::operator()(const SessionWrapperPtr& session)
         return;
     }
 
-    controller->begin_removeStreams(AsteriskSCF::Operations::createContext(mRootContext),
+    controller->begin_removeStreams(addContextSuffix(mRootContext, session->id()),
         remove, newCallback_SessionController_removeStreams(this, &RemoveStreamsOperation::removed,
             &RemoveStreamsOperation::failed), session);
 
@@ -321,7 +321,16 @@ void ConnectTelephonyOperation::connectSinks(const TelephonySessionPrx& sourceSe
     for(TelephonyEventSourceSeq::iterator i=toSources.begin();   
         i != toSources.end(); ++i)
     {
-        (*i)->addSinks(AsteriskSCF::Operations::createContext(mRootContext), sinksToAdd);
+        Ice::Identity id = (*i)->ice_getIdentity();
+
+        //
+        // Strictly speaking, dealing with the members of an Ice identity
+        // directly is kind of frowned upon, but it is the shortest
+        // distance between two points here.
+        //
+        string idString = id.name + id.category;
+        
+        (*i)->addSinks(addContextSuffix(mRootContext, idString), sinksToAdd);
     }
 }
 
@@ -363,7 +372,16 @@ void DisconnectTelephonyOperation::disconnectSinks(const TelephonySessionPrx& so
     for(TelephonyEventSourceSeq::iterator i=fromSources.begin();   
         i != fromSources.end(); ++i)
     {
-        (*i)->removeSinks(AsteriskSCF::Operations::createContext(mRootContext), sinksToRemove);
+        Ice::Identity id = (*i)->ice_getIdentity();
+
+        //
+        // Strictly speaking, dealing with the members of an Ice identity
+        // directly is kind of frowned upon, but it is the shortest
+        // distance between two points here.
+        //
+        string idString = id.name + id.category;
+
+        (*i)->removeSinks(addContextSuffix(mRootContext, idString), sinksToRemove);
     }
 }
 
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index eae777d..29ef936 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -42,3 +42,33 @@ astscf_component_add_slice_collection_libraries(BridgeUnitTests ASTSCF)
 astscf_component_build_standalone(BridgeUnitTests)
 target_link_libraries(BridgeUnitTests LoggingClient ASTSCFIceUtilCpp)
 astscf_test_boost(BridgeUnitTests)
+
+astscf_component_init(BridgeReplicationTests)
+astscf_component_add_files(BridgeReplicationTests
+	../src/BridgeManagerImpl.h
+	../src/BridgePartyIdExtensionPoint.h
+	../src/BridgeReplicationContext.h
+	../src/BridgeReplicatorStateListenerI.h
+	../src/BridgeCreationExtensionPointImpl.cpp
+	../src/BridgeImpl.cpp
+	../src/BridgeListenerMgr.cpp
+	../src/BridgeManagerImpl.cpp
+	../src/BridgeManagerListenerMgr.cpp
+	../src/BridgePartyIdExtensionPoint.cpp
+	../src/BridgeReplicatorStateListenerI.cpp
+	../src/MediaMixer.cpp
+	../src/MediaSplicer.cpp
+	../src/SessionCollection.cpp
+	../src/SessionListener.cpp
+	../src/SessionOperations.cpp
+	../src/SessionWrapper.cpp
+	ReplicationUnitTests.cpp
+	)
+astscf_component_add_slices(BridgeReplicationTests PROJECT AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice)
+astscf_component_add_ice_libraries(BridgeReplicationTests Ice)
+astscf_component_add_boost_libraries(BridgeReplicationTests unit_test_framework thread)
+astscf_component_add_slice_collection_libraries(BridgeReplicationTests
+ASTSCF)
+astscf_component_build_standalone(BridgeReplicationTests)
+target_link_libraries(BridgeReplicationTests LoggingClient ASTSCFIceUtilCpp)
+astscf_test_boost(BridgeReplicationTests)

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/bridging.git



More information about the asterisk-scf-commits mailing list