[asterisk-scf-commits] asterisk-scf/integration/routing.git branch "route_replica" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Wed Apr 13 20:13:38 CDT 2011
branch "route_replica" has been updated
via c0da173bc8199919b8da1a1f79be484f7c894098 (commit)
from cc3b28f14b514d647fa4933a0a54c6c13705191b (commit)
Summary of changes:
local-slice/BasicRoutingStateReplicationIf.ice | 20 +++-
src/ConnectBridgedSessionsOperation.cpp | 8 +-
src/ConnectBridgedSessionsOperation.h | 2 -
...nectBridgedSessionsWithDestinationOperation.cpp | 105 +++++++++++++++-----
...onnectBridgedSessionsWithDestinationOperation.h | 22 ++++-
src/OperationReplicaCache.cpp | 47 ++++-----
src/OperationReplicaCache.h | 3 +-
src/RouteSessionOperation.cpp | 109 +++++++++++++++-----
src/RouteSessionOperation.h | 26 ++++-
src/SessionRouter.cpp | 42 ++++++++
src/SessionRouterOperation.h | 21 +++-
11 files changed, 305 insertions(+), 100 deletions(-)
- Log -----------------------------------------------------------------
commit c0da173bc8199919b8da1a1f79be484f7c894098
Author: Ken Hunt <ken.hunt at digium.com>
Date: Wed Apr 13 20:12:06 2011 -0500
- Getting consistency among replicas of different operations.
- Implemented replica fast-forward when pulling from cache.
diff --git a/local-slice/BasicRoutingStateReplicationIf.ice b/local-slice/BasicRoutingStateReplicationIf.ice
index 4b696c1..676538d 100644
--- a/local-slice/BasicRoutingStateReplicationIf.ice
+++ b/local-slice/BasicRoutingStateReplicationIf.ice
@@ -103,23 +103,24 @@ module V1
const string RouteSessionOpStartKeyMod = ".START";
/**
- * Indicates the RouteSessionOperation is waiting for an AMI endpoint lookup() reply.
+ * Indicates the RouteSessionOperation has completed an AMI endpoint lookup().
* The key (in the base state item) is the transactionId of this
* operation + RouteSessionOpWaitLookupKeyMod
*/
class RouteSessionOpWaitLookupState extends OperationStateItem
{
+ AsteriskSCF::Core::Endpoint::V1::EndpointSeq endpoints;
};
const string RouteSessionOpWaitLookupKeyMod = ".WAITLOOKUP";
/**
- * Indicates the RouteSessionOperation is going to create the bridge.
+ * Indicates the RouteSessionOperation has created the bridge.
* The key (in the base state item) is the transactionId of this
* operation + RouteSessionOpBridgingKeyMod
*/
class RouteSessionOpBridgingState extends OperationStateItem
{
- AsteriskSCF::Core::Endpoint::V1::EndpointSeq endpoints;
+ AsteriskSCF::SessionCommunications::V1::Bridge* bridge;
};
const string RouteSessionOpBridgingKeyMod = ".BRIDGING";
@@ -139,14 +140,25 @@ module V1
};
const string ConnectBridgedSessionsWithDestStartKeyMod = ".START";
+ /**
+ * Indicates the ConnectBridgedSessionsWithDestinationOperation completed an AMI endpoint lookup().
+ * The key (in the base state item) is the transactionId of this
+ * operation + ConnectBridgedSessionsWithDestWaitLookupKeyMod
+ */
class ConnectBridgedSessionsWithDestinationOpWaitLookupState extends OperationStateItem
{
+ AsteriskSCF::Core::Endpoint::V1::EndpointSeq endpoints;
};
const string ConnectBridgedSessionsWithDestWaitLookupKeyMod = ".WAITLOOKUP";
+ /**
+ * Indicates the ConnectBridgedSessionsWithDestinationOperation has created the bridge.
+ * The key (in the base state item) is the transactionId of this
+ * operation + ConnectBridgedSessionsWithDestBridgingKeyMod
+ */
class ConnectBridgedSessionsWithDestinationOpBridgingState extends OperationStateItem
{
- AsteriskSCF::Core::Endpoint::V1::EndpointSeq endpoints;
+ AsteriskSCF::SessionCommunications::V1::Bridge* bridge;
};
const string ConnectBridgedSessionsWithDestBridgingKeyMod = ".BRIDGING";
diff --git a/src/ConnectBridgedSessionsOperation.cpp b/src/ConnectBridgedSessionsOperation.cpp
index 9c637d0..5cd327d 100644
--- a/src/ConnectBridgedSessionsOperation.cpp
+++ b/src/ConnectBridgedSessionsOperation.cpp
@@ -55,13 +55,12 @@ ConnectBridgedSessionsOperation::ConnectBridgedSessionsOperation(const AMD_Sessi
std::string transactionId)
: SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsPtr, ConnectBridgedSessionsOp::OperationState>(cb,
context,
+ current,
listener,
ConnectBridgedSessionsOp::STATE_CONNECT,
transactionId),
- mInitiatorCallback(cb),
- mSessionToReplace(sessionToReplace),
- mBridgedSession(bridgedSession),
- mIceCurrent(current)
+ mSessionToReplace(sessionToReplace),
+ mBridgedSession(bridgedSession)
{
Ice::Context::const_iterator it = current.ctx.find(::AsteriskSCF::SessionCommunications::V1::TransactionKey);
if (it == current.ctx.end())
@@ -190,6 +189,5 @@ void ConnectBridgedSessionsOperation::connectBridgedSessionsState()
finishAndSendResult();
}
-
} // end BasicRoutingService
} // end AsteriskSCF
diff --git a/src/ConnectBridgedSessionsOperation.h b/src/ConnectBridgedSessionsOperation.h
index 2484d5a..bd3e73a 100644
--- a/src/ConnectBridgedSessionsOperation.h
+++ b/src/ConnectBridgedSessionsOperation.h
@@ -89,10 +89,8 @@ private:
private:
// Operation input params.
- AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr mInitiatorCallback;
AsteriskSCF::SessionCommunications::V1::SessionPrx mSessionToReplace;
AsteriskSCF::SessionCommunications::V1::SessionPrx mBridgedSession;
- ::Ice::Current mIceCurrent;
}; // class ConnectBridgedSessionsOperation
diff --git a/src/ConnectBridgedSessionsWithDestinationOperation.cpp b/src/ConnectBridgedSessionsWithDestinationOperation.cpp
index 80ca61c..f624c49 100644
--- a/src/ConnectBridgedSessionsWithDestinationOperation.cpp
+++ b/src/ConnectBridgedSessionsWithDestinationOperation.cpp
@@ -71,6 +71,27 @@ public:
}
break;
+ } // end switch
+ }
+
+ void pushState(RoutingStateItemPtr item)
+ {
+ RoutingStateItemSeq setItems;
+
+ setItems.push_back(item);
+ mReplicator->setState(setItems);
+
+ // Cache the keys of all pushed items.
+ mReplicatedStateKeys.push_back(item->key);
+ }
+
+ /**
+ * This callback is called just before the execution of a state machine's current state handler.
+ */
+ void stateExecutionComplete(ConnectBridgedSessionsWithDestinationOp::OperationState state)
+ {
+ switch(state)
+ {
case ConnectBridgedSessionsWithDestinationOp::STATE_WAIT_LOOKUP_RESULTS:
{
// We just completed the entire operation.
@@ -78,6 +99,7 @@ public:
ConnectBridgedSessionsWithDestinationOpWaitLookupStatePtr waitLookup(new ConnectBridgedSessionsWithDestinationOpWaitLookupState());
waitLookup->transactionId = mOperation->getTransactionId();
waitLookup->key = mOperation->getTransactionId() + AsteriskSCF::BasicRoutingService::V1::RouteSessionOpWaitLookupKeyMod;
+ waitLookup->endpoints = mOperation->getLookupResult();
pushState(waitLookup);
}
@@ -85,35 +107,18 @@ public:
case ConnectBridgedSessionsWithDestinationOp::STATE_BRIDGING:
{
- // We just completed the entire operation.
+ // We just completed creating a bridge.
// Push this information to the state replicator.
ConnectBridgedSessionsWithDestinationOpBridgingStatePtr bridgeOp(new ConnectBridgedSessionsWithDestinationOpBridgingState());
bridgeOp->transactionId = mOperation->getTransactionId();
bridgeOp->key = mOperation->getTransactionId() + AsteriskSCF::BasicRoutingService::V1::RouteSessionOpBridgingKeyMod;
- bridgeOp->endpoints = mOperation->getLookupResult();
+ bridgeOp->bridge = mOperation->getBridge();
pushState(bridgeOp);
}
break;
- }
- }
-
- void pushState(RoutingStateItemPtr item)
- {
- RoutingStateItemSeq setItems;
- setItems.push_back(item);
- mReplicator->setState(setItems);
-
- // Cache the keys of all pushed items.
- mReplicatedStateKeys.push_back(item->key);
- }
-
- /**
- * This callback is called just before the execution of a state machine's current state handler.
- */
- void stateExecutionComplete(ConnectBridgedSessionsWithDestinationOp::OperationState state)
- {
+ } // end switch
}
/**
@@ -159,17 +164,17 @@ ConnectBridgedSessionsWithDestinationOperation::ConnectBridgedSessionsWithDestin
std::string transactionId)
: SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr, ConnectBridgedSessionsWithDestinationOp::OperationState>(cb,
context,
+ current,
listener,
ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP,
transactionId),
- mInitiatorCallback(cb),
mSessionToReplace(sessionToReplace),
- mDestination(destination),
- mIceCurrent(current)
+ mDestination(destination)
{
mStateMachine.addState(ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP, boost::bind(&ConnectBridgedSessionsWithDestinationOperation::lookupState, this));
mStateMachine.addState(ConnectBridgedSessionsWithDestinationOp::STATE_WAIT_LOOKUP_RESULTS, boost::bind(&ConnectBridgedSessionsWithDestinationOperation::waitOnLookupState, this));
mStateMachine.addState(ConnectBridgedSessionsWithDestinationOp::STATE_BRIDGING, boost::bind(&ConnectBridgedSessionsWithDestinationOperation::establishBridgeState, this));
+ mStateMachine.addState(ConnectBridgedSessionsWithDestinationOp::STATE_SEND_RESPONSE, boost::bind(&ConnectBridgedSessionsWithDestinationOperation::sendResponseState, this));
}
/**
@@ -217,6 +222,7 @@ ConnectBridgedSessionsWithDestinationOperation::ConnectBridgedSessionsWithDestin
mStateMachine.addState(ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP, boost::bind(&ConnectBridgedSessionsWithDestinationOperation::lookupState, this));
mStateMachine.addState(ConnectBridgedSessionsWithDestinationOp::STATE_WAIT_LOOKUP_RESULTS, boost::bind(&ConnectBridgedSessionsWithDestinationOperation::waitOnLookupState, this));
mStateMachine.addState(ConnectBridgedSessionsWithDestinationOp::STATE_BRIDGING, boost::bind(&ConnectBridgedSessionsWithDestinationOperation::establishBridgeState, this));
+ mStateMachine.addState(ConnectBridgedSessionsWithDestinationOp::STATE_SEND_RESPONSE, boost::bind(&ConnectBridgedSessionsWithDestinationOperation::sendResponseState, this));
}
/**
@@ -264,16 +270,55 @@ void ConnectBridgedSessionsWithDestinationOperation::reflectUpdate(AsteriskSCF::
void ConnectBridgedSessionsWithDestinationOperation::reflectUpdate(AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpWaitLookupStatePtr item)
{
+ mLookupResult = item->endpoints;
+
mReplicatedStates.push_back(ConnectBridgedSessionsWithDestinationOp::STATE_WAIT_LOOKUP_RESULTS);
}
void ConnectBridgedSessionsWithDestinationOperation::reflectUpdate(AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpBridgingStatePtr item)
{
- mLookupResult = item->endpoints;
+ mBridge = item->bridge;
mReplicatedStates.push_back(ConnectBridgedSessionsWithDestinationOp::STATE_BRIDGING);
}
+/**
+ * Apply as many of the state transitions as possible. It is important to note
+ * that receiving a state transition item for a particular state means that
+ * state completed.
+ */
+bool ConnectBridgedSessionsWithDestinationOperation::fastForwardReplica()
+{
+ std::vector<ConnectBridgedSessionsWithDestinationOp::OperationState>::iterator i = find(mReplicatedStates.begin(), mReplicatedStates.end(), ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP);
+
+ // If we didn't receive the initial call params, this replica is of no use.
+ if (i == mReplicatedStates.end())
+ {
+ return false;
+ }
+ // Set to initial state.
+ mStateMachine.setNextState(ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP);
+
+ // See if we got the results from the lookup.
+ i = find(mReplicatedStates.begin(), mReplicatedStates.end(), ConnectBridgedSessionsWithDestinationOp::STATE_WAIT_LOOKUP_RESULTS);
+ if (i == mReplicatedStates.end())
+ {
+ return true;
+ }
+ mStateMachine.setNextState(ConnectBridgedSessionsWithDestinationOp::STATE_BRIDGING);
+
+ // See if we got past bridge creation.
+ i = find(mReplicatedStates.begin(), mReplicatedStates.end(), ConnectBridgedSessionsWithDestinationOp::STATE_BRIDGING);
+ if (i == mReplicatedStates.end())
+ {
+ return true;
+ }
+
+ // Apparently nothing left to do but reply to the AMD callback.
+ mStateMachine.setNextState(ConnectBridgedSessionsWithDestinationOp::STATE_SEND_RESPONSE);
+ return true;
+}
+
/**
* This is a state handler for one of this operation's states.
*/
@@ -403,8 +448,20 @@ void ConnectBridgedSessionsWithDestinationOperation::establishBridgeState()
return;
}
+ // Set the state handler to exectute once we've looked up our endpoints.
+ mStateMachine.setNextState(ConnectBridgedSessionsWithDestinationOp::STATE_SEND_RESPONSE);
+
+ // Keep executing.
+ mOperationsManager->reschedule(this);
+}
+
+void ConnectBridgedSessionsWithDestinationOperation::sendResponseState()
+{
// This operation is complete. Send AMD responses.
finishAndSendResult();
+
+ // Shutdown the state machine.
+ mStateMachine.shutdown();
}
} // end BasicRoutingService
diff --git a/src/ConnectBridgedSessionsWithDestinationOperation.h b/src/ConnectBridgedSessionsWithDestinationOperation.h
index b1d35b1..7052b48 100644
--- a/src/ConnectBridgedSessionsWithDestinationOperation.h
+++ b/src/ConnectBridgedSessionsWithDestinationOperation.h
@@ -42,7 +42,8 @@ namespace ConnectBridgedSessionsWithDestinationOp
{
STATE_LOOKUP,
STATE_WAIT_LOOKUP_RESULTS,
- STATE_BRIDGING
+ STATE_BRIDGING,
+ STATE_SEND_RESPONSE
};
}
@@ -80,6 +81,8 @@ public:
std::string getDestination() {return mDestination;}
+ AsteriskSCF::SessionCommunications::V1::BridgePrx getBridge() {return mBridge;}
+
/**
* Factory method for replica objects.
*/
@@ -90,6 +93,12 @@ public:
*/
void reflectUpdate(AsteriskSCF::BasicRoutingService::V1::OperationStateItemPtr stateItem);
+ /**
+ * Set the state machine into the highest state possible based on all of the state updates
+ * that have been reflected in this replica.
+ */
+ bool fastForwardReplica();
+
protected:
ConnectBridgedSessionsWithDestinationOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb,
const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
@@ -128,9 +137,18 @@ private:
*/
void establishBridgeState();
+ /**
+ * This is a state handler for one of this operation's states.
+ * Entering this state, the bridge has been created. This state
+ * completes the operation by returning the result via the AMD callback.
+ *
+ * This method is called via mCurrentStateHandler when doWork() is executed from the
+ * WorkQueue.
+ */
+ void sendResponseState();
+
private:
// Operation input params.
- AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr mInitiatorCallback;
AsteriskSCF::SessionCommunications::V1::SessionPrx mSessionToReplace;
std::string mDestination;
::Ice::Current mIceCurrent;
diff --git a/src/OperationReplicaCache.cpp b/src/OperationReplicaCache.cpp
index 3045ab8..e1c52b5 100644
--- a/src/OperationReplicaCache.cpp
+++ b/src/OperationReplicaCache.cpp
@@ -108,41 +108,36 @@ void OperationReplicaCache::cacheOperation(OperationType type, const AsteriskSCF
}
}
-bool OperationReplicaCache::fetchRouteSessionOperation(OperationType type, std::string transactionId, AsteriskSCF::Threading::WorkPtr& ref)
+bool OperationReplicaCache::fetchConnectBridgedSessionsWithDestOp(std::string transactionId, AsteriskSCF::BasicRoutingService::ConnectBridgedSessionsWithDestinationOperationPtr& ref)
{
- switch(type)
- {
- case ROUTE_SESSION_OP:
+ ConnectBridgeWithDestMapType::iterator i = mPriv->connectBridgedWithDestReplicas.find(transactionId);
+ if (i == mPriv->connectBridgedWithDestReplicas.end())
{
- if (mPriv->routeSessionReplicas.empty())
- {
- return false;
- }
+ return false;
+ }
- RouteSessionMapType::iterator i = mPriv->routeSessionReplicas.find(transactionId);
- if (i == mPriv->routeSessionReplicas.end())
- {
- return false;
- }
+ ref = (*i).second.mOperation;
+ mPriv->connectBridgedWithDestReplicas.erase(i);
- ref = (*i).second.mOperation;
- mPriv->routeSessionReplicas.erase(i);
+ ref->fastForwardReplica();
- return true;
- }
- break;
+ return true;
+}
- case CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP:
+bool OperationReplicaCache::fetchRouteSessionOp(std::string transactionId, AsteriskSCF::BasicRoutingService::RouteSessionOperationPtr& ref)
+{
+ RouteSessionMapType::iterator i = mPriv->routeSessionReplicas.find(transactionId);
+ if (i == mPriv->routeSessionReplicas.end())
{
- if (mPriv->connectBridgedWithDestReplicas.empty())
- {
- return false;
- }
- }
- break;
+ return false;
}
- return false;
+ ref = (*i).second.mOperation;
+ mPriv->routeSessionReplicas.erase(i);
+
+ ref->fastForwardReplica();
+
+ return true;
}
void OperationReplicaCache::dropRouteSessionOperation(OperationType type, std::string transactionId)
diff --git a/src/OperationReplicaCache.h b/src/OperationReplicaCache.h
index ff89752..cd6aa6a 100644
--- a/src/OperationReplicaCache.h
+++ b/src/OperationReplicaCache.h
@@ -47,7 +47,8 @@ public:
OperationReplicaCache(const boost::shared_ptr<SessionContext>& sessionContext);
void cacheOperation(OperationType type, const AsteriskSCF::BasicRoutingService::V1::OperationStateItemPtr& item);
- bool fetchRouteSessionOperation(OperationType type, std::string transactionId, AsteriskSCF::Threading::WorkPtr& ref);
+ bool fetchRouteSessionOp(std::string transactionId, AsteriskSCF::BasicRoutingService::RouteSessionOperationPtr& ref);
+ bool fetchConnectBridgedSessionsWithDestOp(std::string transactionId, AsteriskSCF::BasicRoutingService::ConnectBridgedSessionsWithDestinationOperationPtr& ref);
void dropRouteSessionOperation(OperationType type, std::string transactionId);
void clearCache(OperationType type);
diff --git a/src/RouteSessionOperation.cpp b/src/RouteSessionOperation.cpp
index 97d60d3..0d84966 100644
--- a/src/RouteSessionOperation.cpp
+++ b/src/RouteSessionOperation.cpp
@@ -71,13 +71,34 @@ public:
pushState(routeSessionOpStart);
}
break;
+ }
+ }
+
+ void pushState(RoutingStateItemPtr item)
+ {
+ RoutingStateItemSeq setItems;
+
+ setItems.push_back(item);
+ mReplicator->setState(setItems);
+
+ // Cache the keys of all pushed items.
+ mReplicatedStateKeys.push_back(item->key);
+ }
+ /**
+ * This callback is called just after the execution of a state machine's current state handler.
+ */
+ void stateExecutionComplete(RouteSessionOp::OperationState state)
+ {
+ switch(state)
+ {
case RouteSessionOp::STATE_WAIT_LOOKUP_RESULTS:
{
- // We've sent out our lookup request via AMI.
+ // We've obtained a result from our AMI lookup request.
RouteSessionOpWaitLookupStatePtr routeSessionOpWaitLookup(new RouteSessionOpWaitLookupState());
routeSessionOpWaitLookup->transactionId = mOperation->getTransactionId();
routeSessionOpWaitLookup->key = mOperation->getTransactionId() + AsteriskSCF::BasicRoutingService::V1::RouteSessionOpWaitLookupKeyMod;
+ routeSessionOpWaitLookup->endpoints = mOperation->getLookupResult();
pushState(routeSessionOpWaitLookup);
}
@@ -85,36 +106,21 @@ public:
case RouteSessionOp::STATE_BRIDGING:
{
- // We just completed the entire operation.
+ // We just completed the bridge creation.
// Push this information to the state replicator.
RouteSessionOpBridgingStatePtr routeSessionOpBridging(new RouteSessionOpBridgingState());
routeSessionOpBridging->transactionId = mOperation->getTransactionId();
routeSessionOpBridging->key = mOperation->getTransactionId() + AsteriskSCF::BasicRoutingService::V1::RouteSessionOpBridgingKeyMod;
- routeSessionOpBridging->endpoints = mOperation->getLookupResult();
+ routeSessionOpBridging->bridge = mOperation->getBridge();
pushState(routeSessionOpBridging);
}
break;
-
- }
- }
-
- void pushState(RoutingStateItemPtr item)
- {
- RoutingStateItemSeq setItems;
-
- setItems.push_back(item);
- mReplicator->setState(setItems);
-
- // Cache the keys of all pushed items.
- mReplicatedStateKeys.push_back(item->key);
- }
- /**
- * This callback is called just after the execution of a state machine's current state handler.
- */
- void stateExecutionComplete(RouteSessionOp::OperationState state)
- {
+ case RouteSessionOp::STATE_SEND_RESPONSE:
+ // The AMD caller has been notified. We'll remove the replica in shutdown() handler.
+ break;
+ }
}
/**
@@ -155,22 +161,22 @@ RouteSessionOperation::RouteSessionOperation(const AMD_SessionRouter_routeSessio
const ::std::string& destination,
const ::Ice::Current& current,
const SessionContextPtr& context,
- OperationsManager* const listener,
+ OperationsManager* listener,
std::string transactionId)
: SessionRouterOperation<AMD_SessionRouter_routeSessionPtr, RouteSessionOp::OperationState>(cb,
context,
+ current,
listener,
RouteSessionOp::STATE_LOOKUP,
transactionId),
- mInitiatorCallback(cb),
mSource(source),
- mDestination(destination),
- mIceCurrent(current)
+ mDestination(destination)
{
// Configure the state machine with state handlers.
mStateMachine.addState(RouteSessionOp::STATE_LOOKUP, boost::bind(&RouteSessionOperation::lookupState, this));
mStateMachine.addState(RouteSessionOp::STATE_WAIT_LOOKUP_RESULTS, boost::bind(&RouteSessionOperation::waitOnLookupState, this));
mStateMachine.addState(RouteSessionOp::STATE_BRIDGING, boost::bind(&RouteSessionOperation::establishBridgeState, this));
+ mStateMachine.addState(RouteSessionOp::STATE_SEND_RESPONSE, boost::bind(&RouteSessionOperation::sendResponseState, this));
}
/**
@@ -216,6 +222,7 @@ RouteSessionOperation::RouteSessionOperation(const SessionContextPtr& sessionCon
mStateMachine.addState(RouteSessionOp::STATE_LOOKUP, boost::bind(&RouteSessionOperation::lookupState, this));
mStateMachine.addState(RouteSessionOp::STATE_WAIT_LOOKUP_RESULTS, boost::bind(&RouteSessionOperation::waitOnLookupState, this));
mStateMachine.addState(RouteSessionOp::STATE_BRIDGING, boost::bind(&RouteSessionOperation::establishBridgeState, this));
+ mStateMachine.addState(RouteSessionOp::STATE_SEND_RESPONSE, boost::bind(&RouteSessionOperation::sendResponseState, this));
}
/**
@@ -259,16 +266,55 @@ void RouteSessionOperation::reflectUpdate(AsteriskSCF::BasicRoutingService::V1::
void RouteSessionOperation::reflectUpdate(AsteriskSCF::BasicRoutingService::V1::RouteSessionOpWaitLookupStatePtr item)
{
+ mLookupResult = item->endpoints;
+
mReplicatedStates.push_back(RouteSessionOp::STATE_WAIT_LOOKUP_RESULTS);
}
void RouteSessionOperation::reflectUpdate(AsteriskSCF::BasicRoutingService::V1::RouteSessionOpBridgingStatePtr item)
{
- mLookupResult = item->endpoints;
+ mBridge = item->bridge;
mReplicatedStates.push_back(RouteSessionOp::STATE_BRIDGING);
}
+/**
+ * Apply as many of the state transitions as possible. It is important to note
+ * that receiving a state transition item for a particular state means that
+ * state completed.
+ */
+bool RouteSessionOperation::fastForwardReplica()
+{
+ std::vector<RouteSessionOp::OperationState>::iterator i = find(mReplicatedStates.begin(), mReplicatedStates.end(), RouteSessionOp::STATE_LOOKUP);
+
+ // If we didn't receive the initial call params, this replica is of no use.
+ if (i == mReplicatedStates.end())
+ {
+ return false;
+ }
+ // Set to initial state.
+ mStateMachine.setNextState(RouteSessionOp::STATE_LOOKUP);
+
+ // See if we got the results from the lookup.
+ i = find(mReplicatedStates.begin(), mReplicatedStates.end(), RouteSessionOp::STATE_WAIT_LOOKUP_RESULTS);
+ if (i == mReplicatedStates.end())
+ {
+ return true;
+ }
+ mStateMachine.setNextState(RouteSessionOp::STATE_BRIDGING);
+
+ // See if we got past bridge creation.
+ i = find(mReplicatedStates.begin(), mReplicatedStates.end(), RouteSessionOp::STATE_BRIDGING);
+ if (i == mReplicatedStates.end())
+ {
+ return true;
+ }
+
+ // Apparently nothing left to do but reply to the AMD callback.
+ mStateMachine.setNextState(RouteSessionOp::STATE_SEND_RESPONSE);
+ return true;
+}
+
RouteSessionOperation::~RouteSessionOperation()
{
lg(Debug) << "RouteSessionOperation() being destroyed for " << mDestination ;
@@ -401,6 +447,15 @@ void RouteSessionOperation::establishBridgeState()
return;
}
+ // Set the state handler to exectute once we've looked up our endpoints.
+ mStateMachine.setNextState(RouteSessionOp::STATE_SEND_RESPONSE);
+
+ // Keep executing.
+ mOperationsManager->reschedule(this);
+}
+
+void RouteSessionOperation::sendResponseState()
+{
// This operation is complete. Send AMD responses.
finishAndSendResult();
diff --git a/src/RouteSessionOperation.h b/src/RouteSessionOperation.h
index 921f429..ea4a058 100644
--- a/src/RouteSessionOperation.h
+++ b/src/RouteSessionOperation.h
@@ -42,7 +42,8 @@ namespace RouteSessionOp
{
STATE_LOOKUP,
STATE_WAIT_LOOKUP_RESULTS,
- STATE_BRIDGING
+ STATE_BRIDGING,
+ STATE_SEND_RESPONSE
};
}
@@ -76,6 +77,8 @@ public:
AsteriskSCF::SessionCommunications::V1::SessionPrx getSource() {return mSource;}
std::string getDestination() {return mDestination;}
+
+ AsteriskSCF::SessionCommunications::V1::BridgePrx getBridge() {return mBridge;}
/**
* Factory method for replica objects.
@@ -87,6 +90,12 @@ public:
*/
void reflectUpdate(AsteriskSCF::BasicRoutingService::V1::OperationStateItemPtr stateItem);
+ /**
+ * Set the state machine into the highest state possible based on all of the state updates
+ * that have been reflected in this replica.
+ */
+ bool fastForwardReplica();
+
protected:
// Normal constructor
RouteSessionOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr& cb,
@@ -133,19 +142,28 @@ private:
/**
* This is a state handler for one of this operation's states.
* Entering this state, the destination endpoint has been obtained. This state
- * completes the operation by creating the bridge.
+ * creates the bridge.
*
* This method is called via mCurrentStateHandler when doWork() is executed from the
* WorkQueue.
*/
void establishBridgeState();
+ /**
+ * This is a state handler for one of this operation's states.
+ * Entering this state, the bridge has been created. This state
+ * completes the operation by returning the result via the AMD callback.
+ *
+ * This method is called via mCurrentStateHandler when doWork() is executed from the
+ * WorkQueue.
+ */
+ void sendResponseState();
+
private:
// Operation input params.
- AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr mInitiatorCallback;
AsteriskSCF::SessionCommunications::V1::SessionPrx mSource;
+ AsteriskSCF::SessionCommunications::V1::BridgePrx mBridge;
std::string mDestination;
- ::Ice::Current mIceCurrent;
std::vector<RouteSessionOp::OperationState> mReplicatedStates;
diff --git a/src/SessionRouter.cpp b/src/SessionRouter.cpp
index efc4724..9e87e48 100644
--- a/src/SessionRouter.cpp
+++ b/src/SessionRouter.cpp
@@ -32,6 +32,7 @@
#include "SessionListener.h"
#include "EndpointRegistry.h"
+#include "OperationReplicaCache.h"
using namespace AsteriskSCF;
using namespace AsteriskSCF::Core::Routing::V1;
@@ -152,6 +153,21 @@ SessionRouter::~SessionRouter()
mImpl.reset();
}
+std::string getTransactionId(const ::Ice::Current& current)
+{
+ Ice::Context::const_iterator it = current.ctx.find(::AsteriskSCF::SessionCommunications::V1::TransactionKey);
+ std::string transactionId;
+ if (it == current.ctx.end())
+ {
+ lg(Error) << "routeSession_async() called with no transaction ID set in context. ";
+ }
+ else
+ {
+ transactionId = (it->second);
+ }
+ return transactionId;
+}
+
/**
* Route the session by looking up the destination endpoint and configuring a complimentary session for the destination.
*/
@@ -160,6 +176,19 @@ void SessionRouter::routeSession_async(const ::AsteriskSCF::SessionCommunication
const ::std::string& destination,
const ::Ice::Current& current)
{
+ std::string transactionId = getTransactionId(current);
+
+ // Check the cache for a replica with this transaction Id.
+ RouteSessionOperationPtr routeSessionOp;
+ if (mImpl->mOperationReplicaCache->fetchRouteSessionOp(transactionId, routeSessionOp))
+ {
+ routeSessionOp->rehostReplica(cb, current, mImpl.get());
+ WorkPtr replicaOp(routeSessionOp);
+
+ mImpl->scheduleOperation(replicaOp);
+ return;
+ }
+
WorkPtr op(RouteSessionOperation::create(cb,
source,
destination,
@@ -181,6 +210,19 @@ void SessionRouter::connectBridgedSessionsWithDestination_async(const ::Asterisk
const ::std::string& destination,
const ::Ice::Current& current)
{
+ std::string transactionId = getTransactionId(current);
+
+ // Check the cache for a replica with this transaction Id.
+ ConnectBridgedSessionsWithDestinationOperationPtr connectBridgedSessionsWithDestOp;
+ if (mImpl->mOperationReplicaCache->fetchConnectBridgedSessionsWithDestOp(transactionId, connectBridgedSessionsWithDestOp))
+ {
+ connectBridgedSessionsWithDestOp->rehostReplica(cb, current, mImpl.get());
+ WorkPtr replicaOp(connectBridgedSessionsWithDestOp);
+
+ mImpl->scheduleOperation(replicaOp);
+ return;
+ }
+
WorkPtr op( ConnectBridgedSessionsWithDestinationOperation::create(cb,
sessionToReplace,
destination,
diff --git a/src/SessionRouterOperation.h b/src/SessionRouterOperation.h
index d257a5d..e87db54 100644
--- a/src/SessionRouterOperation.h
+++ b/src/SessionRouterOperation.h
@@ -113,11 +113,13 @@ public:
*/
SessionRouterOperation(const T& amdCallback,
const SessionContextPtr& context,
- OperationsManager* manager,
+ const ::Ice::Current& current,
+ OperationsManager* manager,
S defaultState,
std::string transactionId)
: mInitiatorCallback(amdCallback),
mSessionContext(context),
+ mIceCurrent(current),
mFinished(false),
mOperationsManager(manager),
mStateMachine(defaultState),
@@ -132,7 +134,6 @@ public:
* TEMP COMMENT: Not initialized:
* mInitiatorCallback
* mOperationsManager
- * mTransactionId
*/
SessionRouterOperation(const SessionContextPtr& context,
S defaultState)
@@ -277,19 +278,29 @@ public:
mStateMachine.removeListener(listener);
}
+ /**
+ * Sets a replicated operation's non-replicated state related to the specific host process.
+ */
+ void rehostReplica(const T& amdCallback, const ::Ice::Current& current, OperationsManager* manager)
+ {
+ mInitiatorCallback = amdCallback;
+ mIceCurrent = current;
+ mOperationsManager = manager;
+ }
+
protected:
T mInitiatorCallback;
SessionContextPtr mSessionContext;
- boost::shared_ptr<AsteriskSCF::Threading::WorkQueue::PoolId> mPoolId;
+ ::Ice::Current mIceCurrent;
bool mFinished;
- AsteriskSCF::Core::Endpoint::V1::EndpointSeq mLookupResult;
SessionListenerManagerPtr mListenerManager;
OperationsManager* mOperationsManager;
-
AsteriskSCF::StateMachine::SimpleStateMachine<S> mStateMachine;
std::string mTransactionId;
+ AsteriskSCF::Core::Endpoint::V1::EndpointSeq mLookupResult;
+
}; // class SessionRouterOperation
/**
-----------------------------------------------------------------------
--
asterisk-scf/integration/routing.git
More information about the asterisk-scf-commits
mailing list