[asterisk-scf-commits] asterisk-scf/integration/routing.git branch "route_replica" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Wed Apr 13 20:13:38 CDT 2011


branch "route_replica" has been updated
       via  c0da173bc8199919b8da1a1f79be484f7c894098 (commit)
      from  cc3b28f14b514d647fa4933a0a54c6c13705191b (commit)

Summary of changes:
 local-slice/BasicRoutingStateReplicationIf.ice     |   20 +++-
 src/ConnectBridgedSessionsOperation.cpp            |    8 +-
 src/ConnectBridgedSessionsOperation.h              |    2 -
 ...nectBridgedSessionsWithDestinationOperation.cpp |  105 +++++++++++++++-----
 ...onnectBridgedSessionsWithDestinationOperation.h |   22 ++++-
 src/OperationReplicaCache.cpp                      |   47 ++++-----
 src/OperationReplicaCache.h                        |    3 +-
 src/RouteSessionOperation.cpp                      |  109 +++++++++++++++-----
 src/RouteSessionOperation.h                        |   26 ++++-
 src/SessionRouter.cpp                              |   42 ++++++++
 src/SessionRouterOperation.h                       |   21 +++-
 11 files changed, 305 insertions(+), 100 deletions(-)


- Log -----------------------------------------------------------------
commit c0da173bc8199919b8da1a1f79be484f7c894098
Author: Ken Hunt <ken.hunt at digium.com>
Date:   Wed Apr 13 20:12:06 2011 -0500

    - Getting consistency among replicas of different operations.
    - Implemented replica fast-forward when pulling from cache.

diff --git a/local-slice/BasicRoutingStateReplicationIf.ice b/local-slice/BasicRoutingStateReplicationIf.ice
index 4b696c1..676538d 100644
--- a/local-slice/BasicRoutingStateReplicationIf.ice
+++ b/local-slice/BasicRoutingStateReplicationIf.ice
@@ -103,23 +103,24 @@ module V1
     const string RouteSessionOpStartKeyMod = ".START";
 
 	/**
-	 * Indicates the RouteSessionOperation is waiting for an AMI endpoint lookup() reply. 
+	 * Indicates the RouteSessionOperation has completed an AMI endpoint lookup(). 
 	 * The key (in the base state item) is the transactionId of this 
 	 * operation + RouteSessionOpWaitLookupKeyMod
 	 */
     class RouteSessionOpWaitLookupState extends OperationStateItem
     {
+       AsteriskSCF::Core::Endpoint::V1::EndpointSeq endpoints;
     };
     const string RouteSessionOpWaitLookupKeyMod = ".WAITLOOKUP";
 
 	/**
-	 * Indicates the RouteSessionOperation is going to create the bridge. 
+	 * Indicates the RouteSessionOperation has created the bridge. 
 	 * The key (in the base state item) is the transactionId of this 
 	 * operation + RouteSessionOpBridgingKeyMod
 	 */
     class RouteSessionOpBridgingState extends OperationStateItem
     {
-        AsteriskSCF::Core::Endpoint::V1::EndpointSeq endpoints;
+        AsteriskSCF::SessionCommunications::V1::Bridge* bridge;
     };
     const string RouteSessionOpBridgingKeyMod = ".BRIDGING";
 
@@ -139,14 +140,25 @@ module V1
     };
 	const string ConnectBridgedSessionsWithDestStartKeyMod = ".START";
 
+	/**
+	 * Indicates the ConnectBridgedSessionsWithDestinationOperation completed an AMI endpoint lookup(). 
+	 * The key (in the base state item) is the transactionId of this 
+	 * operation + ConnectBridgedSessionsWithDestWaitLookupKeyMod
+	 */
     class ConnectBridgedSessionsWithDestinationOpWaitLookupState extends OperationStateItem
     {
+        AsteriskSCF::Core::Endpoint::V1::EndpointSeq endpoints;
     };
     const string ConnectBridgedSessionsWithDestWaitLookupKeyMod = ".WAITLOOKUP";
 
+	/**
+	 * Indicates the ConnectBridgedSessionsWithDestinationOperation has created the bridge. 
+	 * The key (in the base state item) is the transactionId of this 
+	 * operation + ConnectBridgedSessionsWithDestBridgingKeyMod
+	 */
     class ConnectBridgedSessionsWithDestinationOpBridgingState extends OperationStateItem
     {
-        AsteriskSCF::Core::Endpoint::V1::EndpointSeq endpoints;
+	    AsteriskSCF::SessionCommunications::V1::Bridge* bridge;
     };
 	const string ConnectBridgedSessionsWithDestBridgingKeyMod = ".BRIDGING";
 
diff --git a/src/ConnectBridgedSessionsOperation.cpp b/src/ConnectBridgedSessionsOperation.cpp
index 9c637d0..5cd327d 100644
--- a/src/ConnectBridgedSessionsOperation.cpp
+++ b/src/ConnectBridgedSessionsOperation.cpp
@@ -55,13 +55,12 @@ ConnectBridgedSessionsOperation::ConnectBridgedSessionsOperation(const AMD_Sessi
                                                                 std::string transactionId)
         : SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsPtr, ConnectBridgedSessionsOp::OperationState>(cb, 
                                                                                                                         context, 
+                                                                                                                        current,
                                                                                                                         listener,
                                                                                                           ConnectBridgedSessionsOp::STATE_CONNECT,
                                                                                                           transactionId),
-        mInitiatorCallback(cb),
-        mSessionToReplace(sessionToReplace),
-        mBridgedSession(bridgedSession),
-        mIceCurrent(current) 
+           mSessionToReplace(sessionToReplace),
+           mBridgedSession(bridgedSession)
 {
     Ice::Context::const_iterator it = current.ctx.find(::AsteriskSCF::SessionCommunications::V1::TransactionKey);
     if (it == current.ctx.end())
@@ -190,6 +189,5 @@ void ConnectBridgedSessionsOperation::connectBridgedSessionsState()
     finishAndSendResult();
 }
 
-
 } // end BasicRoutingService
 } // end AsteriskSCF
diff --git a/src/ConnectBridgedSessionsOperation.h b/src/ConnectBridgedSessionsOperation.h
index 2484d5a..bd3e73a 100644
--- a/src/ConnectBridgedSessionsOperation.h
+++ b/src/ConnectBridgedSessionsOperation.h
@@ -89,10 +89,8 @@ private:
 
 private:
     // Operation input params. 
-     AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr mInitiatorCallback;
      AsteriskSCF::SessionCommunications::V1::SessionPrx mSessionToReplace;
      AsteriskSCF::SessionCommunications::V1::SessionPrx mBridgedSession;
-    ::Ice::Current mIceCurrent;
 
 }; // class ConnectBridgedSessionsOperation
 	
diff --git a/src/ConnectBridgedSessionsWithDestinationOperation.cpp b/src/ConnectBridgedSessionsWithDestinationOperation.cpp
index 80ca61c..f624c49 100644
--- a/src/ConnectBridgedSessionsWithDestinationOperation.cpp
+++ b/src/ConnectBridgedSessionsWithDestinationOperation.cpp
@@ -71,6 +71,27 @@ public:
             }
             break;
 
+       } // end switch
+    }
+
+    void pushState(RoutingStateItemPtr item)
+    {
+        RoutingStateItemSeq setItems;
+
+        setItems.push_back(item);
+        mReplicator->setState(setItems);
+        
+        // Cache the keys of all pushed items.
+        mReplicatedStateKeys.push_back(item->key);
+    }
+
+    /**
+     * This callback is called just before the execution of a state machine's current state handler. 
+     */
+    void stateExecutionComplete(ConnectBridgedSessionsWithDestinationOp::OperationState state)
+    {
+        switch(state)
+        {
         case ConnectBridgedSessionsWithDestinationOp::STATE_WAIT_LOOKUP_RESULTS:
             {
             // We just completed the entire operation. 
@@ -78,6 +99,7 @@ public:
             ConnectBridgedSessionsWithDestinationOpWaitLookupStatePtr waitLookup(new ConnectBridgedSessionsWithDestinationOpWaitLookupState());
             waitLookup->transactionId = mOperation->getTransactionId();
             waitLookup->key = mOperation->getTransactionId() + AsteriskSCF::BasicRoutingService::V1::RouteSessionOpWaitLookupKeyMod;
+            waitLookup->endpoints = mOperation->getLookupResult();
 
             pushState(waitLookup);
             }
@@ -85,35 +107,18 @@ public:
 
         case ConnectBridgedSessionsWithDestinationOp::STATE_BRIDGING:
             {
-            // We just completed the entire operation. 
+            // We just completed creating a bridge. 
             // Push this information to the state replicator.
             ConnectBridgedSessionsWithDestinationOpBridgingStatePtr bridgeOp(new ConnectBridgedSessionsWithDestinationOpBridgingState());
             bridgeOp->transactionId = mOperation->getTransactionId();
             bridgeOp->key = mOperation->getTransactionId() + AsteriskSCF::BasicRoutingService::V1::RouteSessionOpBridgingKeyMod;
-            bridgeOp->endpoints = mOperation->getLookupResult();
+            bridgeOp->bridge = mOperation->getBridge();
 
             pushState(bridgeOp);
             }
             break;
-        }
-    }
-
-    void pushState(RoutingStateItemPtr item)
-    {
-        RoutingStateItemSeq setItems;
 
-        setItems.push_back(item);
-        mReplicator->setState(setItems);
-        
-        // Cache the keys of all pushed items.
-        mReplicatedStateKeys.push_back(item->key);
-    }
-
-    /**
-     * This callback is called just before the execution of a state machine's current state handler. 
-     */
-    void stateExecutionComplete(ConnectBridgedSessionsWithDestinationOp::OperationState state)
-    {
+        } // end switch
     }
 
     /**
@@ -159,17 +164,17 @@ ConnectBridgedSessionsWithDestinationOperation::ConnectBridgedSessionsWithDestin
                           std::string transactionId)
         : SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr, ConnectBridgedSessionsWithDestinationOp::OperationState>(cb, 
                                 context, 
+                                current,
                                 listener,
                                 ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP,
                                 transactionId),
-        mInitiatorCallback(cb),
         mSessionToReplace(sessionToReplace),
-        mDestination(destination),
-        mIceCurrent(current)
+        mDestination(destination)
 {
     mStateMachine.addState(ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP, boost::bind(&ConnectBridgedSessionsWithDestinationOperation::lookupState, this));
     mStateMachine.addState(ConnectBridgedSessionsWithDestinationOp::STATE_WAIT_LOOKUP_RESULTS, boost::bind(&ConnectBridgedSessionsWithDestinationOperation::waitOnLookupState, this));
     mStateMachine.addState(ConnectBridgedSessionsWithDestinationOp::STATE_BRIDGING, boost::bind(&ConnectBridgedSessionsWithDestinationOperation::establishBridgeState, this));
+    mStateMachine.addState(ConnectBridgedSessionsWithDestinationOp::STATE_SEND_RESPONSE, boost::bind(&ConnectBridgedSessionsWithDestinationOperation::sendResponseState, this));
 }
 
 /**
@@ -217,6 +222,7 @@ ConnectBridgedSessionsWithDestinationOperation::ConnectBridgedSessionsWithDestin
     mStateMachine.addState(ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP, boost::bind(&ConnectBridgedSessionsWithDestinationOperation::lookupState, this));
     mStateMachine.addState(ConnectBridgedSessionsWithDestinationOp::STATE_WAIT_LOOKUP_RESULTS, boost::bind(&ConnectBridgedSessionsWithDestinationOperation::waitOnLookupState, this));
     mStateMachine.addState(ConnectBridgedSessionsWithDestinationOp::STATE_BRIDGING, boost::bind(&ConnectBridgedSessionsWithDestinationOperation::establishBridgeState, this));
+    mStateMachine.addState(ConnectBridgedSessionsWithDestinationOp::STATE_SEND_RESPONSE, boost::bind(&ConnectBridgedSessionsWithDestinationOperation::sendResponseState, this));
 }
 
 /**
@@ -264,16 +270,55 @@ void ConnectBridgedSessionsWithDestinationOperation::reflectUpdate(AsteriskSCF::
 
 void ConnectBridgedSessionsWithDestinationOperation::reflectUpdate(AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpWaitLookupStatePtr item)
 {
+    mLookupResult = item->endpoints;
+
     mReplicatedStates.push_back(ConnectBridgedSessionsWithDestinationOp::STATE_WAIT_LOOKUP_RESULTS);
 }
 
 void ConnectBridgedSessionsWithDestinationOperation::reflectUpdate(AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpBridgingStatePtr item)
 {
-    mLookupResult = item->endpoints;
+    mBridge = item->bridge;
 
     mReplicatedStates.push_back(ConnectBridgedSessionsWithDestinationOp::STATE_BRIDGING);
 }
 
+/** 
+ * Apply as many of the state transitions as possible. It is important to note
+ * that receiving a state transition item for a particular state means that
+ * state completed. 
+ */
+bool ConnectBridgedSessionsWithDestinationOperation::fastForwardReplica()
+{
+    std::vector<ConnectBridgedSessionsWithDestinationOp::OperationState>::iterator i = find(mReplicatedStates.begin(), mReplicatedStates.end(), ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP);
+
+    // If we didn't receive the initial call params, this replica is of no use. 
+    if (i == mReplicatedStates.end())
+    {
+        return false;
+    }
+    // Set to initial state.
+    mStateMachine.setNextState(ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP);
+
+    // See if we got the results from the lookup.
+    i = find(mReplicatedStates.begin(), mReplicatedStates.end(), ConnectBridgedSessionsWithDestinationOp::STATE_WAIT_LOOKUP_RESULTS);
+    if (i == mReplicatedStates.end())
+    {
+        return true;
+    }
+    mStateMachine.setNextState(ConnectBridgedSessionsWithDestinationOp::STATE_BRIDGING);
+
+    // See if we got past bridge creation.
+    i = find(mReplicatedStates.begin(), mReplicatedStates.end(), ConnectBridgedSessionsWithDestinationOp::STATE_BRIDGING);
+    if (i == mReplicatedStates.end())
+    {
+        return true;
+    }
+
+    // Apparently nothing left to do but reply to the AMD callback. 
+    mStateMachine.setNextState(ConnectBridgedSessionsWithDestinationOp::STATE_SEND_RESPONSE);
+    return true;
+}
+
 /**
  * This is a state handler for one of this operation's states. 
  */
@@ -403,8 +448,20 @@ void ConnectBridgedSessionsWithDestinationOperation::establishBridgeState()
         return;
 	}
 
+    // Set the state handler to exectute once we've looked up our endpoints. 
+    mStateMachine.setNextState(ConnectBridgedSessionsWithDestinationOp::STATE_SEND_RESPONSE);
+
+    // Keep executing.
+	mOperationsManager->reschedule(this);
+}
+
+void ConnectBridgedSessionsWithDestinationOperation::sendResponseState()
+{
     // This operation is complete. Send AMD responses. 
     finishAndSendResult();
+
+    // Shutdown the state machine. 
+    mStateMachine.shutdown();
 }
 
 } // end BasicRoutingService
diff --git a/src/ConnectBridgedSessionsWithDestinationOperation.h b/src/ConnectBridgedSessionsWithDestinationOperation.h
index b1d35b1..7052b48 100644
--- a/src/ConnectBridgedSessionsWithDestinationOperation.h
+++ b/src/ConnectBridgedSessionsWithDestinationOperation.h
@@ -42,7 +42,8 @@ namespace ConnectBridgedSessionsWithDestinationOp
     {
         STATE_LOOKUP,
         STATE_WAIT_LOOKUP_RESULTS,
-        STATE_BRIDGING
+        STATE_BRIDGING,
+        STATE_SEND_RESPONSE
     };
 }
 
@@ -80,6 +81,8 @@ public:
 
     std::string getDestination() {return mDestination;}
 
+    AsteriskSCF::SessionCommunications::V1::BridgePrx getBridge() {return mBridge;}
+
     /**
      * Factory method for replica objects. 
      */
@@ -90,6 +93,12 @@ public:
      */
     void reflectUpdate(AsteriskSCF::BasicRoutingService::V1::OperationStateItemPtr stateItem);
 
+    /**
+     * Set the state machine into the highest state possible based on all of the state updates
+     * that have been reflected in this replica. 
+     */
+    bool fastForwardReplica();
+
 protected:
     ConnectBridgedSessionsWithDestinationOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb,
                         const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace, 
@@ -128,9 +137,18 @@ private:
      */
     void establishBridgeState();
 
+    /**
+     * This is a state handler for one of this operation's states. 
+     * Entering this state, the bridge has been created. This state
+     * completes the operation by returning the result via the AMD callback. 
+     * 
+     * This method is called via mCurrentStateHandler when doWork() is executed from the 
+     * WorkQueue. 
+     */
+    void sendResponseState();
+
 private:
     // Operation input params. 
-    AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr mInitiatorCallback;
     AsteriskSCF::SessionCommunications::V1::SessionPrx mSessionToReplace;
     std::string mDestination;
     ::Ice::Current mIceCurrent;
diff --git a/src/OperationReplicaCache.cpp b/src/OperationReplicaCache.cpp
index 3045ab8..e1c52b5 100644
--- a/src/OperationReplicaCache.cpp
+++ b/src/OperationReplicaCache.cpp
@@ -108,41 +108,36 @@ void OperationReplicaCache::cacheOperation(OperationType type, const AsteriskSCF
         }
 }
 
-bool OperationReplicaCache::fetchRouteSessionOperation(OperationType type, std::string transactionId, AsteriskSCF::Threading::WorkPtr& ref)
+bool OperationReplicaCache::fetchConnectBridgedSessionsWithDestOp(std::string transactionId, AsteriskSCF::BasicRoutingService::ConnectBridgedSessionsWithDestinationOperationPtr& ref)
 {
-    switch(type)
-    {
-    case ROUTE_SESSION_OP:
+    ConnectBridgeWithDestMapType::iterator i = mPriv->connectBridgedWithDestReplicas.find(transactionId);
+    if (i == mPriv->connectBridgedWithDestReplicas.end())
     {
-        if (mPriv->routeSessionReplicas.empty())
-        {
-            return false;
-        }
+        return false;
+    }
 
-        RouteSessionMapType::iterator i = mPriv->routeSessionReplicas.find(transactionId);
-        if (i == mPriv->routeSessionReplicas.end())
-        {
-            return false;
-        }
+    ref = (*i).second.mOperation;
+    mPriv->connectBridgedWithDestReplicas.erase(i);
 
-        ref = (*i).second.mOperation;
-        mPriv->routeSessionReplicas.erase(i);
+    ref->fastForwardReplica();
 
-        return true;
-    }
-    break;
+    return true;
+}
 
-    case CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP:
+bool OperationReplicaCache::fetchRouteSessionOp(std::string transactionId, AsteriskSCF::BasicRoutingService::RouteSessionOperationPtr& ref)
+{
+    RouteSessionMapType::iterator i = mPriv->routeSessionReplicas.find(transactionId);
+    if (i == mPriv->routeSessionReplicas.end())
     {
-        if (mPriv->connectBridgedWithDestReplicas.empty())
-        {
-            return false;
-        }
-    }
-    break;
+        return false;
     }
 
-    return false;
+    ref = (*i).second.mOperation;
+    mPriv->routeSessionReplicas.erase(i);
+
+    ref->fastForwardReplica();
+
+    return true;
 }
 
 void OperationReplicaCache::dropRouteSessionOperation(OperationType type, std::string transactionId)
diff --git a/src/OperationReplicaCache.h b/src/OperationReplicaCache.h
index ff89752..cd6aa6a 100644
--- a/src/OperationReplicaCache.h
+++ b/src/OperationReplicaCache.h
@@ -47,7 +47,8 @@ public:
     OperationReplicaCache(const boost::shared_ptr<SessionContext>& sessionContext);
 
     void cacheOperation(OperationType type, const AsteriskSCF::BasicRoutingService::V1::OperationStateItemPtr& item);
-    bool fetchRouteSessionOperation(OperationType type, std::string transactionId, AsteriskSCF::Threading::WorkPtr& ref);
+    bool fetchRouteSessionOp(std::string transactionId, AsteriskSCF::BasicRoutingService::RouteSessionOperationPtr& ref);
+    bool fetchConnectBridgedSessionsWithDestOp(std::string transactionId, AsteriskSCF::BasicRoutingService::ConnectBridgedSessionsWithDestinationOperationPtr& ref);
     void dropRouteSessionOperation(OperationType type, std::string transactionId);
 
     void clearCache(OperationType type);
diff --git a/src/RouteSessionOperation.cpp b/src/RouteSessionOperation.cpp
index 97d60d3..0d84966 100644
--- a/src/RouteSessionOperation.cpp
+++ b/src/RouteSessionOperation.cpp
@@ -71,13 +71,34 @@ public:
             pushState(routeSessionOpStart);
             }
             break;
+        }
+    }
+
+    void pushState(RoutingStateItemPtr item)
+    {
+        RoutingStateItemSeq setItems;
+
+        setItems.push_back(item);
+        mReplicator->setState(setItems);
+        
+        // Cache the keys of all pushed items.
+        mReplicatedStateKeys.push_back(item->key);
+    }
 
+    /**
+     * This callback is called just after the execution of a state machine's current state handler. 
+     */
+    void stateExecutionComplete(RouteSessionOp::OperationState state)
+    {
+        switch(state)
+        {
         case RouteSessionOp::STATE_WAIT_LOOKUP_RESULTS:
             {
-            // We've sent out our lookup request via AMI.
+            // We've obtained a result from our AMI lookup request.
             RouteSessionOpWaitLookupStatePtr routeSessionOpWaitLookup(new RouteSessionOpWaitLookupState());
             routeSessionOpWaitLookup->transactionId =  mOperation->getTransactionId();
             routeSessionOpWaitLookup->key = mOperation->getTransactionId() + AsteriskSCF::BasicRoutingService::V1::RouteSessionOpWaitLookupKeyMod;
+            routeSessionOpWaitLookup->endpoints = mOperation->getLookupResult();
 
             pushState(routeSessionOpWaitLookup);
             }
@@ -85,36 +106,21 @@ public:
 
         case RouteSessionOp::STATE_BRIDGING:
             {
-            // We just completed the entire operation. 
+            // We just completed the bridge creation.
             // Push this information to the state replicator.
             RouteSessionOpBridgingStatePtr routeSessionOpBridging(new RouteSessionOpBridgingState());
             routeSessionOpBridging->transactionId =  mOperation->getTransactionId();
             routeSessionOpBridging->key = mOperation->getTransactionId() + AsteriskSCF::BasicRoutingService::V1::RouteSessionOpBridgingKeyMod;
-            routeSessionOpBridging->endpoints = mOperation->getLookupResult();
+            routeSessionOpBridging->bridge = mOperation->getBridge();
 
             pushState(routeSessionOpBridging);
             }
             break;
-  
-        }
-    }
-
-    void pushState(RoutingStateItemPtr item)
-    {
-        RoutingStateItemSeq setItems;
-
-        setItems.push_back(item);
-        mReplicator->setState(setItems);
-        
-        // Cache the keys of all pushed items.
-        mReplicatedStateKeys.push_back(item->key);
-    }
 
-    /**
-     * This callback is called just after the execution of a state machine's current state handler. 
-     */
-    void stateExecutionComplete(RouteSessionOp::OperationState state)
-    {
+        case RouteSessionOp::STATE_SEND_RESPONSE:
+            // The AMD caller has been notified. We'll remove the replica in shutdown() handler. 
+            break;
+        }
     }
 
     /**
@@ -155,22 +161,22 @@ RouteSessionOperation::RouteSessionOperation(const AMD_SessionRouter_routeSessio
                                                 const ::std::string& destination, 
                                                 const ::Ice::Current& current,
                                                 const SessionContextPtr& context,
-                                                OperationsManager* const listener,
+                                                OperationsManager*  listener,
                                                 std::string transactionId) 
                 : SessionRouterOperation<AMD_SessionRouter_routeSessionPtr, RouteSessionOp::OperationState>(cb, 
                                                                                                             context, 
+                                                                                                            current,
                                                                                                             listener,
                                                                                                             RouteSessionOp::STATE_LOOKUP,
                                                                                                             transactionId),
-                mInitiatorCallback(cb),
                 mSource(source),
-                mDestination(destination),
-                mIceCurrent(current)
+                mDestination(destination)
 {
     // Configure the state machine with state handlers. 
     mStateMachine.addState(RouteSessionOp::STATE_LOOKUP, boost::bind(&RouteSessionOperation::lookupState, this));
     mStateMachine.addState(RouteSessionOp::STATE_WAIT_LOOKUP_RESULTS, boost::bind(&RouteSessionOperation::waitOnLookupState, this));
     mStateMachine.addState(RouteSessionOp::STATE_BRIDGING, boost::bind(&RouteSessionOperation::establishBridgeState, this));
+    mStateMachine.addState(RouteSessionOp::STATE_SEND_RESPONSE, boost::bind(&RouteSessionOperation::sendResponseState, this));
 }
 
 /**
@@ -216,6 +222,7 @@ RouteSessionOperation::RouteSessionOperation(const SessionContextPtr& sessionCon
     mStateMachine.addState(RouteSessionOp::STATE_LOOKUP, boost::bind(&RouteSessionOperation::lookupState, this));
     mStateMachine.addState(RouteSessionOp::STATE_WAIT_LOOKUP_RESULTS, boost::bind(&RouteSessionOperation::waitOnLookupState, this));
     mStateMachine.addState(RouteSessionOp::STATE_BRIDGING, boost::bind(&RouteSessionOperation::establishBridgeState, this));
+    mStateMachine.addState(RouteSessionOp::STATE_SEND_RESPONSE, boost::bind(&RouteSessionOperation::sendResponseState, this));
 }
 
 /**
@@ -259,16 +266,55 @@ void RouteSessionOperation::reflectUpdate(AsteriskSCF::BasicRoutingService::V1::
 
 void RouteSessionOperation::reflectUpdate(AsteriskSCF::BasicRoutingService::V1::RouteSessionOpWaitLookupStatePtr item)
 {
+    mLookupResult = item->endpoints;
+
     mReplicatedStates.push_back(RouteSessionOp::STATE_WAIT_LOOKUP_RESULTS);
 }
 
 void RouteSessionOperation::reflectUpdate(AsteriskSCF::BasicRoutingService::V1::RouteSessionOpBridgingStatePtr item)
 {
-    mLookupResult = item->endpoints;
+    mBridge = item->bridge;
 
     mReplicatedStates.push_back(RouteSessionOp::STATE_BRIDGING);
 }
 
+/** 
+ * Apply as many of the state transitions as possible. It is important to note
+ * that receiving a state transition item for a particular state means that
+ * state completed. 
+ */
+bool RouteSessionOperation::fastForwardReplica()
+{
+    std::vector<RouteSessionOp::OperationState>::iterator i = find(mReplicatedStates.begin(), mReplicatedStates.end(), RouteSessionOp::STATE_LOOKUP);
+
+    // If we didn't receive the initial call params, this replica is of no use. 
+    if (i == mReplicatedStates.end())
+    {
+        return false;
+    }
+    // Set to initial state.
+    mStateMachine.setNextState(RouteSessionOp::STATE_LOOKUP);
+
+    // See if we got the results from the lookup.
+    i = find(mReplicatedStates.begin(), mReplicatedStates.end(), RouteSessionOp::STATE_WAIT_LOOKUP_RESULTS);
+    if (i == mReplicatedStates.end())
+    {
+        return true;
+    }
+    mStateMachine.setNextState(RouteSessionOp::STATE_BRIDGING);
+
+    // See if we got past bridge creation.
+    i = find(mReplicatedStates.begin(), mReplicatedStates.end(), RouteSessionOp::STATE_BRIDGING);
+    if (i == mReplicatedStates.end())
+    {
+        return true;
+    }
+
+    // Apparently nothing left to do but reply to the AMD callback. 
+    mStateMachine.setNextState(RouteSessionOp::STATE_SEND_RESPONSE);
+    return true;
+}
+
 RouteSessionOperation::~RouteSessionOperation() 
 {
         lg(Debug) << "RouteSessionOperation() being destroyed for " << mDestination ;
@@ -401,6 +447,15 @@ void RouteSessionOperation::establishBridgeState()
         return;
 	}
 
+    // Set the state handler to exectute once we've looked up our endpoints. 
+    mStateMachine.setNextState(RouteSessionOp::STATE_SEND_RESPONSE);
+
+    // Keep executing.
+	mOperationsManager->reschedule(this);
+}
+
+void RouteSessionOperation::sendResponseState()
+{
     // This operation is complete. Send AMD responses. 
     finishAndSendResult();
 
diff --git a/src/RouteSessionOperation.h b/src/RouteSessionOperation.h
index 921f429..ea4a058 100644
--- a/src/RouteSessionOperation.h
+++ b/src/RouteSessionOperation.h
@@ -42,7 +42,8 @@ namespace RouteSessionOp
     {
         STATE_LOOKUP,
         STATE_WAIT_LOOKUP_RESULTS,
-        STATE_BRIDGING
+        STATE_BRIDGING,
+        STATE_SEND_RESPONSE
     };
 }
 
@@ -76,6 +77,8 @@ public:
     AsteriskSCF::SessionCommunications::V1::SessionPrx getSource() {return mSource;}
 
     std::string getDestination() {return mDestination;}
+
+    AsteriskSCF::SessionCommunications::V1::BridgePrx getBridge() {return mBridge;}
     
     /**
      * Factory method for replica objects. 
@@ -87,6 +90,12 @@ public:
      */
     void reflectUpdate(AsteriskSCF::BasicRoutingService::V1::OperationStateItemPtr stateItem);
 
+    /**
+     * Set the state machine into the highest state possible based on all of the state updates
+     * that have been reflected in this replica. 
+     */
+    bool fastForwardReplica();
+
 protected:
     // Normal constructor
     RouteSessionOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr& cb,
@@ -133,19 +142,28 @@ private:
     /**
      * This is a state handler for one of this operation's states. 
      * Entering this state, the destination endpoint has been obtained. This state
-     * completes the operation by creating the bridge. 
+     * creates the bridge. 
      * 
      * This method is called via mCurrentStateHandler when doWork() is executed from the 
      * WorkQueue. 
      */
     void establishBridgeState();
 
+    /**
+     * This is a state handler for one of this operation's states. 
+     * Entering this state, the bridge has been created. This state
+     * completes the operation by returning the result via the AMD callback. 
+     * 
+     * This method is called via mCurrentStateHandler when doWork() is executed from the 
+     * WorkQueue. 
+     */
+    void sendResponseState();
+
 private:
     // Operation input params. 
-    AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr mInitiatorCallback;
     AsteriskSCF::SessionCommunications::V1::SessionPrx mSource;
+    AsteriskSCF::SessionCommunications::V1::BridgePrx mBridge;
     std::string mDestination;
-    ::Ice::Current mIceCurrent;
 
     std::vector<RouteSessionOp::OperationState> mReplicatedStates;
 
diff --git a/src/SessionRouter.cpp b/src/SessionRouter.cpp
index efc4724..9e87e48 100644
--- a/src/SessionRouter.cpp
+++ b/src/SessionRouter.cpp
@@ -32,6 +32,7 @@
 
 #include "SessionListener.h"
 #include "EndpointRegistry.h"
+#include "OperationReplicaCache.h"
 
 using namespace AsteriskSCF;
 using namespace AsteriskSCF::Core::Routing::V1;
@@ -152,6 +153,21 @@ SessionRouter::~SessionRouter()
     mImpl.reset();
 }
 
+std::string getTransactionId(const ::Ice::Current& current)
+{
+    Ice::Context::const_iterator it = current.ctx.find(::AsteriskSCF::SessionCommunications::V1::TransactionKey);
+    std::string transactionId;
+    if (it == current.ctx.end())
+    {
+         lg(Error) << "routeSession_async() called with no transaction ID set in context. ";
+    }
+    else
+    {
+        transactionId = (it->second);
+    }
+    return transactionId;
+}
+
 /**
  * Route the session by looking up the destination endpoint and configuring a complimentary session for the destination.
  */
@@ -160,6 +176,19 @@ void SessionRouter::routeSession_async(const ::AsteriskSCF::SessionCommunication
                                        const ::std::string& destination, 
                                        const ::Ice::Current& current)
 {
+    std::string transactionId = getTransactionId(current);
+    
+    // Check the cache for a replica with this transaction Id.
+    RouteSessionOperationPtr routeSessionOp;
+    if (mImpl->mOperationReplicaCache->fetchRouteSessionOp(transactionId, routeSessionOp))
+    {
+        routeSessionOp->rehostReplica(cb, current, mImpl.get());
+        WorkPtr replicaOp(routeSessionOp);
+
+         mImpl->scheduleOperation(replicaOp);
+         return;
+    }
+
     WorkPtr op(RouteSessionOperation::create(cb, 
                                              source, 
                                              destination, 
@@ -181,6 +210,19 @@ void SessionRouter::connectBridgedSessionsWithDestination_async(const ::Asterisk
                                                                 const ::std::string& destination, 
                                                                 const ::Ice::Current& current)
 {
+    std::string transactionId = getTransactionId(current);
+
+    // Check the cache for a replica with this transaction Id.
+    ConnectBridgedSessionsWithDestinationOperationPtr connectBridgedSessionsWithDestOp;
+    if (mImpl->mOperationReplicaCache->fetchConnectBridgedSessionsWithDestOp(transactionId, connectBridgedSessionsWithDestOp))
+    {
+        connectBridgedSessionsWithDestOp->rehostReplica(cb, current, mImpl.get());
+        WorkPtr replicaOp(connectBridgedSessionsWithDestOp);
+
+         mImpl->scheduleOperation(replicaOp);
+         return;
+    }
+
     WorkPtr op( ConnectBridgedSessionsWithDestinationOperation::create(cb, 
                                                                         sessionToReplace, 
                                                                         destination, 
diff --git a/src/SessionRouterOperation.h b/src/SessionRouterOperation.h
index d257a5d..e87db54 100644
--- a/src/SessionRouterOperation.h
+++ b/src/SessionRouterOperation.h
@@ -113,11 +113,13 @@ public:
      */
     SessionRouterOperation(const T& amdCallback,
                            const SessionContextPtr& context,
-                           OperationsManager* manager,
+                           const ::Ice::Current& current,
+                           OperationsManager* manager, 
                            S defaultState,
                            std::string transactionId) 
 		: mInitiatorCallback(amdCallback),
           mSessionContext(context),
+          mIceCurrent(current),
           mFinished(false),
 		  mOperationsManager(manager),
           mStateMachine(defaultState),
@@ -132,7 +134,6 @@ public:
      *  TEMP COMMENT: Not initialized:
      *               mInitiatorCallback
      *               mOperationsManager
-     *               mTransactionId
      */
     SessionRouterOperation(const SessionContextPtr& context,
                            S defaultState) 
@@ -277,19 +278,29 @@ public:
         mStateMachine.removeListener(listener);
     }
 
+    /** 
+     * Sets a replicated operation's non-replicated state related to the specific host process. 
+     */
+    void rehostReplica(const T& amdCallback, const ::Ice::Current& current, OperationsManager* manager)
+    {
+        mInitiatorCallback = amdCallback;
+        mIceCurrent = current;
+        mOperationsManager = manager;
+    }
+
 protected:
     T mInitiatorCallback;
     SessionContextPtr mSessionContext;
-    boost::shared_ptr<AsteriskSCF::Threading::WorkQueue::PoolId> mPoolId;
+    ::Ice::Current mIceCurrent;
 
     bool mFinished;
-    AsteriskSCF::Core::Endpoint::V1::EndpointSeq mLookupResult;
     SessionListenerManagerPtr mListenerManager;
     OperationsManager* mOperationsManager;
-
     AsteriskSCF::StateMachine::SimpleStateMachine<S> mStateMachine;
     std::string mTransactionId;
 
+    AsteriskSCF::Core::Endpoint::V1::EndpointSeq mLookupResult;
+
 }; // class SessionRouterOperation
 
 /** 

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/routing.git



More information about the asterisk-scf-commits mailing list