[asterisk-scf-commits] asterisk-scf/integration/sip.git branch "threading" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Thu May 19 19:01:39 CDT 2011
branch "threading" has been updated
via 56969fd2d98b159e1695d77a689fb78ecbfb6ebf (commit)
via 30f4658ce0ba340fd90667a5483714ada1e5097c (commit)
from c807bf56d40a56549bc37ea3a3e46d8f540821da (commit)
Summary of changes:
src/PJSipSessionModule.cpp | 131 +++++++++-----------------------------------
src/PJSipSessionModule.h | 82 ++++++++++++++++++++++++++--
2 files changed, 103 insertions(+), 110 deletions(-)
- Log -----------------------------------------------------------------
commit 56969fd2d98b159e1695d77a689fb78ecbfb6ebf
Author: Mark Michelson <mmichelson at digium.com>
Date: Thu May 19 18:46:12 2011 -0500
A bit of cleanup plus adding some helpful comments.
diff --git a/src/PJSipSessionModule.cpp b/src/PJSipSessionModule.cpp
index c20ac28..7229342 100644
--- a/src/PJSipSessionModule.cpp
+++ b/src/PJSipSessionModule.cpp
@@ -330,23 +330,9 @@ public:
pjsip_tx_data *tdata,
pjsip_dialog *replacedDialog,
const std::string destination)
- : mState(Initial), mSessionModule(module), mCaller(caller), mSessionRouter(router), mInv(inv), mTdata(tdata), mReplacedDialog(replacedDialog), mDestination(destination) { }
+ : mSessionModule(module), mCaller(caller), mSessionRouter(router), mInv(inv), mTdata(tdata), mReplacedDialog(replacedDialog), mDestination(destination) { }
- SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
- {
- switch (mState)
- {
- case Initial:
- return initial();
- case CalledBack:
- return calledBack();
- default:
- lg(Error) << "We're in a bad state here.";
- return Complete;
- }
- }
-
- SuspendableWorkResult initial()
+ SuspendableWorkResult initial(const SuspendableWorkListenerPtr&)
{
std::cout << "SessionCreationOperation running. About to create a session!" << std::endl;
try
@@ -384,7 +370,6 @@ public:
SuspendableWorkListenerPtr listener = 0;
SipAMICallbackPtr cb(new SipAMICallback(listener, mSession, this, false, true));
Ice::CallbackPtr d = Ice::newCallback(cb, &SipAMICallback::callback);
- mState = CalledBack;
mSessionRouter->begin_routeSession(operationId, mSession->getSessionProxy(), mDestination, d);
}
}
@@ -417,20 +402,6 @@ public:
return Complete;
}
- enum states
- {
- /**
- * First state.
- * @see processRefer
- */
- Initial,
- /**
- * State after routing service completes
- * @see processRoutingResponse
- */
- CalledBack,
- } mState;
-
PJSipSessionModulePtr mSessionModule;
SipEndpointPtr mCaller;
AsteriskSCF::Discovery::SmartProxy<SessionRouterPrx> mSessionRouter;
@@ -606,24 +577,10 @@ public:
const SipSessionPtr& session,
const AsteriskSCF::Discovery::SmartProxy<SessionRouterPrx>& sessionRouter,
const int moduleId)
- : mState(Initial), mInv(inv), mTsx(tsx), mTdata(tdata),
+ : mInv(inv), mTsx(tsx), mTdata(tdata),
mTargetSipUri(target_sip_uri), mSession(session), mSessionRouter(sessionRouter),
mModuleId(moduleId), mWasWithDestination(false) { }
- SuspendableWorkResult execute(const SuspendableWorkListenerPtr& workListener)
- {
- switch (mState)
- {
- case Initial:
- return processRefer(workListener);
- case CalledBack:
- return processRoutingResponse(workListener);
- default:
- lg(Error) << "We're in a bad state here...";
- return Complete;
- }
- }
-
/**
* This is what is initially called when the operation is queued.
*
@@ -634,7 +591,7 @@ public:
* When this operation is successful, it will result in work for this session
* being suspended until the routing service returns.
*/
- SuspendableWorkResult processRefer(const SuspendableWorkListenerPtr& workListener)
+ SuspendableWorkResult initial(const SuspendableWorkListenerPtr& workListener)
{
// Determine if this is a blind transfer or an attended transfer
pj_str_t replaces = pj_str((char*)"Replaces");
@@ -735,7 +692,6 @@ public:
Ice::CallbackPtr d = Ice::newCallback(cb, &SipAMICallback::callback);
lg(Debug) << "handleRefer() calling router connectBridgedSessions(). ";
- mState = CalledBack;
mSessionRouter->begin_connectBridgedSessions(operationId, mSession->getSessionProxy(), other_session->getSessionProxy(), d);
pjsip_dlg_dec_lock(other_dlg);
return Complete;
@@ -764,7 +720,6 @@ public:
lg(Debug) << "handleRefer() calling router connectBridgedSessionsWithDestination(). ";
mWasWithDestination = true;
- mState = CalledBack;
mSessionRouter->begin_connectBridgedSessionsWithDestination(operationId, session->getSessionProxy(), target, d);
return Complete;
}
@@ -782,7 +737,7 @@ public:
* Once the routing service has allowed for work to be resumed,
* this is where the final work is done
*/
- SuspendableWorkResult processRoutingResponse(const SuspendableWorkListenerPtr&)
+ SuspendableWorkResult calledBack(const SuspendableWorkListenerPtr&)
{
assert(mAsyncResult);
SessionRouterPrx router = SessionRouterPrx::uncheckedCast(mAsyncResult->getProxy());
@@ -820,19 +775,6 @@ public:
return Complete;
}
- enum states
- {
- /**
- * First state.
- * @see processRefer
- */
- Initial,
- /**
- * State after routing service completes
- * @see processRoutingResponse
- */
- CalledBack,
- } mState;
/**
* The INVITE session, which contains the dialog on which the
* REFER was received.
@@ -980,23 +922,9 @@ class HandleInviteResponseOperation : public SipQueueableOperation
{
public:
HandleInviteResponseOperation(int respCode, const int invState, const SipSessionPtr& session)
- : mState(Initial), mRespCode(respCode), mInvState(invState), mSession(session) { }
+ : mRespCode(respCode), mInvState(invState), mSession(session) { }
- SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
- {
- switch (mState)
- {
- case Initial:
- return processInviteResponse();
- case CalledBack:
- return processSessionListenerResult();
- default:
- lg(Error) << "We're in a bad state here...";
- return Complete;
- }
- }
-
- SuspendableWorkResult processInviteResponse()
+ SuspendableWorkResult initial(const SuspendableWorkListenerPtr&)
{
std::cout << "Handling response to an INVITE yo!" << std::endl;
//Treat all 1XX messages we don't recognize the same as a 180
@@ -1021,7 +949,6 @@ public:
{
SipAMICallbackPtr cb(new SipAMICallback(0, mSession, this, false, true));
Ice::CallbackPtr d = Ice::newCallback(cb, &SipAMICallback::callback);
- mState = CalledBack;
(*listener)->begin_indicated(mSession->getSessionProxy(), new RingingIndication(), d);
}
catch (const Ice::Exception &ex)
@@ -1046,7 +973,6 @@ public:
Ice::CallbackPtr d = Ice::newCallback(cb, &SipAMICallback::callback);
ProgressingIndicationPtr progressing(new ProgressingIndication());
progressing->response = response;
- mState = CalledBack;
(*listener)->begin_indicated(mSession->getSessionProxy(), progressing, d);
}
catch (const Ice::Exception &ex)
@@ -1069,7 +995,6 @@ public:
{
SipAMICallbackPtr cb(new SipAMICallback(0, mSession, this, false, true));
Ice::CallbackPtr d = Ice::newCallback(cb, &SipAMICallback::callback);
- mState = CalledBack;
(*listener)->begin_indicated(mSession->getSessionProxy(), new ConnectedIndication(), d);
}
catch (const Ice::Exception &ex)
@@ -1082,7 +1007,7 @@ public:
return Complete;
}
- SuspendableWorkResult processSessionListenerResult()
+ SuspendableWorkResult calledBack()
{
assert(mAsyncResult);
SessionListenerPrx listener = SessionListenerPrx::uncheckedCast(mAsyncResult->getProxy());
@@ -1096,21 +1021,7 @@ public:
}
return Complete;
}
-
- enum states
- {
- /**
- * First state.
- * @see processRefer
- */
- Initial,
- /**
- * State after routing service completes
- * @see processSessionListenerResult
- */
- CalledBack,
- } mState;
-
+
int mRespCode;
const int mInvState;
SipSessionPtr mSession;
@@ -1145,7 +1056,7 @@ public:
const int tsxState)
: mSessionModule(module), mTsx(tsx), mInv(inv), mEventType(eventType), mTsxState(tsxState) { }
- SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
+ SuspendableWorkResult initial(const SuspendableWorkListenerPtr&)
{
std::cout << "The transaction has address " << mTsx << std::endl;
PJSipTransactionModInfo *tsx_mod_info = static_cast<PJSipTransactionModInfo *> (mTsx->mod_data[mSessionModule->getModule().id]);
@@ -1198,7 +1109,7 @@ public:
InviteStateOperation(const PJSipSessionModulePtr& module, pjsip_inv_session* inv, const int eventType, const int invState, const std::string branch)
: mSessionModule(module), mInv(inv), mEventType(eventType), mInvState(invState), mEventBranch(branch) { }
- SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
+ SuspendableWorkResult initial(const SuspendableWorkListenerPtr&)
{
if (mInvState == PJSIP_INV_STATE_DISCONNECTED)
{
@@ -1399,7 +1310,7 @@ public:
HandleMediaUpdate(pjsip_inv_session *inv, const int moduleId, const ServiceLocatorPrx& serviceLocator)
: mInv(inv), mModuleId(moduleId), mServiceLocator(serviceLocator) { }
- SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
+ SuspendableWorkResult initial(const SuspendableWorkListenerPtr&)
{
const pjmedia_sdp_session *remote_sdp;
pj_status_t status;
diff --git a/src/PJSipSessionModule.h b/src/PJSipSessionModule.h
index a8968d3..0a8a872 100644
--- a/src/PJSipSessionModule.h
+++ b/src/PJSipSessionModule.h
@@ -138,33 +138,105 @@ typedef IceUtil::Handle<PJSipSessionModule> PJSipSessionModulePtr;
/**
* Operation that may be queued.
+ *
+ * Operations that are queued due to the arrival of SIP messages tend to follow a pattern. That
+ * is, they do some initial processing based on the SIP message, then they make an AMI call to
+ * a session listener, and then they process the response of the AMI call.
+ *
+ * This class makes facilitating such a pattern relatively easy.
*/
class SipQueueableOperation : virtual public AsteriskSCF::System::WorkQueue::V1::SuspendableWork
{
public:
- virtual ~SipQueueableOperation() { };
+ SipQueueableOperation() : mState(Initial) { }
+ virtual ~SipQueueableOperation() { }
/**
* Queueable operations may call out to AMI methods. AMI callbacks happen
* in Ice client threads. We want to process the result of the AMI call in one
* of our thread pool's threads, though. The AMI callback can set the operation's
- * AsyncResult so that the operation will be able to actually do something with
- * the AMI call's result.
+ * AsyncResult so that the operation may process the result of the AMI call in
+ * one of our threads.
*/
void setAsyncResult(const Ice::AsyncResultPtr& r) {mAsyncResult = r;}
+
/**
* Override of SuspendableWorkExecute()
*/
- virtual AsteriskSCF::System::WorkQueue::V1::SuspendableWorkResult execute(const AsteriskSCF::System::WorkQueue::V1::SuspendableWorkListenerPtr&)
+ AsteriskSCF::System::WorkQueue::V1::SuspendableWorkResult execute(const AsteriskSCF::System::WorkQueue::V1::SuspendableWorkListenerPtr& listener)
+ {
+ AsteriskSCF::System::WorkQueue::V1::SuspendableWorkResult result =
+ AsteriskSCF::System::WorkQueue::V1::Complete;
+ switch (mState)
+ {
+ case Initial:
+ result = initial(listener);
+ // Change the state now so that
+ // when the AMI callback occurs,
+ // the proper followup will be called.
+ mState = CalledBack;
+ break;
+ case CalledBack:
+ result = calledBack(listener);
+ break;
+ default:
+ break;
+ }
+ return result;
+ }
+
+ /**
+ * All SipQueueableOperations are required to provide an overload
+ * of this function. This is where initial processing takes place.
+ */
+ virtual AsteriskSCF::System::WorkQueue::V1::SuspendableWorkResult initial(
+ const AsteriskSCF::System::WorkQueue::V1::SuspendableWorkListenerPtr&) = 0;
+
+ /**
+ * If a SipQueueableOperation has made an AMI call, then when the AMI method
+ * returns, this will be where the AMI result can be processed.
+ *
+ * Since not all SipQueueableOperations make use of AMI, this method need
+ * not always be overloaded.
+ */
+ virtual AsteriskSCF::System::WorkQueue::V1::SuspendableWorkResult calledBack(
+ const AsteriskSCF::System::WorkQueue::V1::SuspendableWorkListenerPtr&)
{
return AsteriskSCF::System::WorkQueue::V1::Complete;
}
protected:
+ enum states
+ {
+ /**
+ * State of the operation upon construction.
+ * @see initial
+ */
+ Initial,
+ /**
+ * State when AMI result is received.
+ * @see calledBack
+ */
+ CalledBack,
+ } mState;
+
+ /**
+ * Ice asynchronous result.
+ *
+ * Used for processing the result of AMI calls.
+ * @see setAsyncResult
+ */
Ice::AsyncResultPtr mAsyncResult;
- SipQueueableOperation() { }
};
typedef IceUtil::Handle<SipQueueableOperation> SipQueueableOperationPtr;
+/**
+ * General AMI callback class used with SipQueueableOperation
+ *
+ * This AMI callback doesn't actually process results itself. Rather,
+ * it passes the result back to the original operation that made the
+ * AMI call so that the AMI result can be processed in the proper
+ * thread.
+ */
class SipAMICallback : public IceUtil::Shared
{
public:
commit 30f4658ce0ba340fd90667a5483714ada1e5097c
Author: Mark Michelson <mmichelson at digium.com>
Date: Thu May 19 17:42:53 2011 -0500
Fix another crash that could happen if a transaction terminates after the session is stopped.
diff --git a/src/PJSipSessionModule.cpp b/src/PJSipSessionModule.cpp
index 45f05c0..c20ac28 100644
--- a/src/PJSipSessionModule.cpp
+++ b/src/PJSipSessionModule.cpp
@@ -1362,11 +1362,21 @@ void PJSipSessionModule::invOnTsxStateChanged(pjsip_inv_session *inv, pjsip_tran
//function from there.
PJSipSessionModInfo *session_mod_info = static_cast<PJSipSessionModInfo*>(inv->mod_data[mModule.id]);
- SipSessionPtr session = session_mod_info->getSessionPtr();
- std::string method(pj_strbuf(&tsx->method.name), pj_strlen(&tsx->method.name));
- std::cout << "Queuing a Transaction state operation for transaction " << tsx << " Method: " << method << std::endl;
- enqueueSessionWork(new TransactionStateOperation(this, tsx, inv, e->type, tsx->state), inv);
+ //When a transaction is terminated, it will call into here. In the situation where we are the UAS, this
+ //may happen after the session has been stopped, and therefore the session_mod_info is NULL.
+ //
+ //XXX This can be problematic with regards to transaction state replication since we may not communicate
+ //to the replica that the transaction has been terminated. This will need to be addressed when transaction
+ //state replication is worked on.
+ if (session_mod_info)
+ {
+ SipSessionPtr session = session_mod_info->getSessionPtr();
+
+ std::string method(pj_strbuf(&tsx->method.name), pj_strlen(&tsx->method.name));
+ std::cout << "Queuing a Transaction state operation for transaction " << tsx << " Method: " << method << std::endl;
+ enqueueSessionWork(new TransactionStateOperation(this, tsx, inv, e->type, tsx->state), inv);
+ }
}
/**
-----------------------------------------------------------------------
--
asterisk-scf/integration/sip.git
More information about the asterisk-scf-commits
mailing list