[asterisk-scf-commits] asterisk-scf/integration/sip.git branch "threading" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Thu May 19 19:01:39 CDT 2011


branch "threading" has been updated
       via  56969fd2d98b159e1695d77a689fb78ecbfb6ebf (commit)
       via  30f4658ce0ba340fd90667a5483714ada1e5097c (commit)
      from  c807bf56d40a56549bc37ea3a3e46d8f540821da (commit)

Summary of changes:
 src/PJSipSessionModule.cpp |  131 +++++++++-----------------------------------
 src/PJSipSessionModule.h   |   82 ++++++++++++++++++++++++++--
 2 files changed, 103 insertions(+), 110 deletions(-)


- Log -----------------------------------------------------------------
commit 56969fd2d98b159e1695d77a689fb78ecbfb6ebf
Author: Mark Michelson <mmichelson at digium.com>
Date:   Thu May 19 18:46:12 2011 -0500

    A bit of cleanup plus adding some helpful comments.

diff --git a/src/PJSipSessionModule.cpp b/src/PJSipSessionModule.cpp
index c20ac28..7229342 100644
--- a/src/PJSipSessionModule.cpp
+++ b/src/PJSipSessionModule.cpp
@@ -330,23 +330,9 @@ public:
             pjsip_tx_data *tdata,
             pjsip_dialog *replacedDialog,
             const std::string destination)
-        : mState(Initial), mSessionModule(module), mCaller(caller), mSessionRouter(router), mInv(inv), mTdata(tdata), mReplacedDialog(replacedDialog), mDestination(destination) { }
+        : mSessionModule(module), mCaller(caller), mSessionRouter(router), mInv(inv), mTdata(tdata), mReplacedDialog(replacedDialog), mDestination(destination) { }
 
-    SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
-    {
-        switch (mState)
-        {
-            case Initial:
-                return initial();
-            case CalledBack:
-                return calledBack();
-            default:
-                lg(Error) << "We're in a bad state here.";
-                return Complete;
-        }
-    }
-
-    SuspendableWorkResult initial()
+    SuspendableWorkResult initial(const SuspendableWorkListenerPtr&)
     {
         std::cout << "SessionCreationOperation running. About to create a session!" << std::endl;
         try
@@ -384,7 +370,6 @@ public:
                 SuspendableWorkListenerPtr listener = 0;
                 SipAMICallbackPtr cb(new SipAMICallback(listener, mSession, this, false, true));
                 Ice::CallbackPtr d = Ice::newCallback(cb, &SipAMICallback::callback);
-                mState = CalledBack;
                 mSessionRouter->begin_routeSession(operationId, mSession->getSessionProxy(), mDestination, d);
             }
         }
@@ -417,20 +402,6 @@ public:
         return Complete;
     }
 
-    enum states
-    {
-        /**
-         * First state.
-         * @see processRefer
-         */
-        Initial,
-        /**
-         * State after routing service completes
-         * @see processRoutingResponse
-         */
-        CalledBack,
-    } mState;
-
     PJSipSessionModulePtr mSessionModule;
     SipEndpointPtr mCaller;
     AsteriskSCF::Discovery::SmartProxy<SessionRouterPrx> mSessionRouter;
@@ -606,24 +577,10 @@ public:
             const SipSessionPtr& session,
             const AsteriskSCF::Discovery::SmartProxy<SessionRouterPrx>& sessionRouter,
             const int moduleId)
-        : mState(Initial), mInv(inv), mTsx(tsx), mTdata(tdata), 
+        : mInv(inv), mTsx(tsx), mTdata(tdata), 
         mTargetSipUri(target_sip_uri), mSession(session), mSessionRouter(sessionRouter),
         mModuleId(moduleId), mWasWithDestination(false) { }
 
-    SuspendableWorkResult execute(const SuspendableWorkListenerPtr& workListener)
-    {
-        switch (mState)
-        {
-        case Initial:
-            return processRefer(workListener);
-        case CalledBack:
-            return processRoutingResponse(workListener);
-        default:
-            lg(Error) << "We're in a bad state here...";
-            return Complete;
-        }
-    }
-
     /**
      * This is what is initially called when the operation is queued.
      *
@@ -634,7 +591,7 @@ public:
      * When this operation is successful, it will result in work for this session
      * being suspended until the routing service returns.
      */
-    SuspendableWorkResult processRefer(const SuspendableWorkListenerPtr& workListener)
+    SuspendableWorkResult initial(const SuspendableWorkListenerPtr& workListener)
     {
         // Determine if this is a blind transfer or an attended transfer
         pj_str_t replaces = pj_str((char*)"Replaces");
@@ -735,7 +692,6 @@ public:
                 Ice::CallbackPtr d = Ice::newCallback(cb, &SipAMICallback::callback);
 
                 lg(Debug) << "handleRefer() calling router connectBridgedSessions(). ";
-                mState = CalledBack;
                 mSessionRouter->begin_connectBridgedSessions(operationId, mSession->getSessionProxy(), other_session->getSessionProxy(), d);
                 pjsip_dlg_dec_lock(other_dlg);
                 return Complete;
@@ -764,7 +720,6 @@ public:
 
                 lg(Debug) << "handleRefer() calling router connectBridgedSessionsWithDestination(). ";
                 mWasWithDestination = true;
-                mState = CalledBack;
                 mSessionRouter->begin_connectBridgedSessionsWithDestination(operationId, session->getSessionProxy(), target, d);
                 return Complete;
             }
@@ -782,7 +737,7 @@ public:
      * Once the routing service has allowed for work to be resumed,
      * this is where the final work is done
      */
-    SuspendableWorkResult processRoutingResponse(const SuspendableWorkListenerPtr&)
+    SuspendableWorkResult calledBack(const SuspendableWorkListenerPtr&)
     {
         assert(mAsyncResult);
         SessionRouterPrx router = SessionRouterPrx::uncheckedCast(mAsyncResult->getProxy());
@@ -820,19 +775,6 @@ public:
         return Complete;
     }
 
-    enum states
-    {
-        /**
-         * First state.
-         * @see processRefer
-         */
-        Initial,
-        /**
-         * State after routing service completes
-         * @see processRoutingResponse
-         */
-        CalledBack,
-    } mState;
     /**
      * The INVITE session, which contains the dialog on which the
      * REFER was received.
@@ -980,23 +922,9 @@ class HandleInviteResponseOperation : public SipQueueableOperation
 {
 public:
     HandleInviteResponseOperation(int respCode, const int invState, const SipSessionPtr& session)
-        : mState(Initial), mRespCode(respCode), mInvState(invState), mSession(session) { }
+        : mRespCode(respCode), mInvState(invState), mSession(session) { }
 
-    SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
-    {
-        switch (mState)
-        {
-        case Initial:
-            return processInviteResponse();
-        case CalledBack:
-            return processSessionListenerResult();
-        default:
-            lg(Error) << "We're in a bad state here...";
-            return Complete;
-        }
-    }
-
-    SuspendableWorkResult processInviteResponse()
+    SuspendableWorkResult initial(const SuspendableWorkListenerPtr&)
     {
         std::cout << "Handling response to an INVITE yo!" << std::endl;
         //Treat all 1XX messages we don't recognize the same as a 180
@@ -1021,7 +949,6 @@ public:
                 {
                     SipAMICallbackPtr cb(new SipAMICallback(0, mSession, this, false, true));
                     Ice::CallbackPtr d = Ice::newCallback(cb, &SipAMICallback::callback);
-                    mState = CalledBack;
                     (*listener)->begin_indicated(mSession->getSessionProxy(), new RingingIndication(), d);
                 }
                 catch (const Ice::Exception &ex)
@@ -1046,7 +973,6 @@ public:
                     Ice::CallbackPtr d = Ice::newCallback(cb, &SipAMICallback::callback);
                     ProgressingIndicationPtr progressing(new ProgressingIndication());
                     progressing->response = response;
-                    mState = CalledBack;
                     (*listener)->begin_indicated(mSession->getSessionProxy(), progressing, d);
                 }
                 catch (const Ice::Exception &ex)
@@ -1069,7 +995,6 @@ public:
                     {
                         SipAMICallbackPtr cb(new SipAMICallback(0, mSession, this, false, true));
                         Ice::CallbackPtr d = Ice::newCallback(cb, &SipAMICallback::callback);
-                        mState = CalledBack;
                         (*listener)->begin_indicated(mSession->getSessionProxy(), new ConnectedIndication(), d);
                     }
                     catch (const Ice::Exception &ex)
@@ -1082,7 +1007,7 @@ public:
         return Complete;
     }
 
-    SuspendableWorkResult processSessionListenerResult()
+    SuspendableWorkResult calledBack()
     {
         assert(mAsyncResult);
         SessionListenerPrx listener = SessionListenerPrx::uncheckedCast(mAsyncResult->getProxy());
@@ -1096,21 +1021,7 @@ public:
         }
         return Complete;
     }
-    
-    enum states
-    {
-        /**
-         * First state.
-         * @see processRefer
-         */
-        Initial,
-        /**
-         * State after routing service completes
-         * @see processSessionListenerResult
-         */
-        CalledBack,
-    } mState;
-
+   
     int mRespCode;
     const int mInvState;
     SipSessionPtr mSession;
@@ -1145,7 +1056,7 @@ public:
             const int tsxState)
         : mSessionModule(module), mTsx(tsx), mInv(inv), mEventType(eventType), mTsxState(tsxState) { }
 
-    SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
+    SuspendableWorkResult initial(const SuspendableWorkListenerPtr&)
     {
         std::cout << "The transaction has address " << mTsx << std::endl;
         PJSipTransactionModInfo *tsx_mod_info = static_cast<PJSipTransactionModInfo *> (mTsx->mod_data[mSessionModule->getModule().id]);
@@ -1198,7 +1109,7 @@ public:
     InviteStateOperation(const PJSipSessionModulePtr& module, pjsip_inv_session* inv, const int eventType, const int invState, const std::string branch)
         : mSessionModule(module), mInv(inv), mEventType(eventType), mInvState(invState), mEventBranch(branch) { }
 
-    SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
+    SuspendableWorkResult initial(const SuspendableWorkListenerPtr&)
     {
         if (mInvState == PJSIP_INV_STATE_DISCONNECTED)
         {
@@ -1399,7 +1310,7 @@ public:
     HandleMediaUpdate(pjsip_inv_session *inv, const int moduleId, const ServiceLocatorPrx& serviceLocator)
         : mInv(inv), mModuleId(moduleId), mServiceLocator(serviceLocator) { }
 
-    SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
+    SuspendableWorkResult initial(const SuspendableWorkListenerPtr&)
     {
         const pjmedia_sdp_session *remote_sdp;
         pj_status_t status;
diff --git a/src/PJSipSessionModule.h b/src/PJSipSessionModule.h
index a8968d3..0a8a872 100644
--- a/src/PJSipSessionModule.h
+++ b/src/PJSipSessionModule.h
@@ -138,33 +138,105 @@ typedef IceUtil::Handle<PJSipSessionModule> PJSipSessionModulePtr;
 
 /**
  * Operation that may be queued.
+ *
+ * Operations that are queued due to the arrival of SIP messages tend to follow a pattern. That
+ * is, they do some initial processing based on the SIP message, then they make an AMI call to
+ * a session listener, and then they process the response of the AMI call.
+ *
+ * This class makes facilitating such a pattern relatively easy.
  */
 class SipQueueableOperation : virtual public AsteriskSCF::System::WorkQueue::V1::SuspendableWork
 {
 public:
-    virtual ~SipQueueableOperation() { };
+    SipQueueableOperation() : mState(Initial) { }
+    virtual ~SipQueueableOperation() { }
     /**
      * Queueable operations may call out to AMI methods. AMI callbacks happen
      * in Ice client threads. We want to process the result of the AMI call in one
      * of our thread pool's threads, though. The AMI callback can set the operation's
-     * AsyncResult so that the operation will be able to actually do something with
-     * the AMI call's result.
+     * AsyncResult so that the operation may process the result of the AMI call in
+     * one of our threads.
      */
     void setAsyncResult(const Ice::AsyncResultPtr& r) {mAsyncResult = r;}
+
     /**
      * Override of SuspendableWorkExecute()
      */
-    virtual AsteriskSCF::System::WorkQueue::V1::SuspendableWorkResult execute(const AsteriskSCF::System::WorkQueue::V1::SuspendableWorkListenerPtr&)
+    AsteriskSCF::System::WorkQueue::V1::SuspendableWorkResult execute(const AsteriskSCF::System::WorkQueue::V1::SuspendableWorkListenerPtr& listener)
+    {
+        AsteriskSCF::System::WorkQueue::V1::SuspendableWorkResult result =
+            AsteriskSCF::System::WorkQueue::V1::Complete;
+        switch (mState)
+        {
+            case Initial:
+                result = initial(listener);
+                // Change the state now so that
+                // when the AMI callback occurs,
+                // the proper followup will be called.
+                mState = CalledBack;
+                break;
+            case CalledBack:
+                result = calledBack(listener);
+                break;
+            default:
+                break;
+        }
+        return result;
+    }
+
+    /**
+     * All SipQueueableOperations are required to provide an overload
+     * of this function. This is where initial processing takes place.
+     */
+    virtual AsteriskSCF::System::WorkQueue::V1::SuspendableWorkResult initial(
+            const AsteriskSCF::System::WorkQueue::V1::SuspendableWorkListenerPtr&) = 0;
+
+    /**
+     * If a SipQueueableOperation has made an AMI call, then when the AMI method
+     * returns, this will be where the AMI result can be processed.
+     *
+     * Since not all SipQueueableOperations make use of AMI, this method need
+     * not always be overloaded.
+     */
+    virtual AsteriskSCF::System::WorkQueue::V1::SuspendableWorkResult calledBack(
+            const AsteriskSCF::System::WorkQueue::V1::SuspendableWorkListenerPtr&)
     {
         return AsteriskSCF::System::WorkQueue::V1::Complete;
     }
 protected:
+    enum states
+    {
+        /**
+         * State of the operation upon construction.
+         * @see initial
+         */
+        Initial,
+        /**
+         * State when AMI result is received.
+         * @see calledBack
+         */
+        CalledBack,
+    } mState;
+
+    /**
+     * Ice asynchronous result.
+     *
+     * Used for processing the result of AMI calls.
+     * @see setAsyncResult
+     */
     Ice::AsyncResultPtr mAsyncResult;
-    SipQueueableOperation() { }
 };
 
 typedef IceUtil::Handle<SipQueueableOperation> SipQueueableOperationPtr;
 
+/**
+ * General AMI callback class used with SipQueueableOperation
+ *
+ * This AMI callback doesn't actually process results itself. Rather,
+ * it passes the result back to the original operation that made the
+ * AMI call so that the AMI result can be processed in the proper
+ * thread.
+ */
 class SipAMICallback : public IceUtil::Shared
 {
 public:

commit 30f4658ce0ba340fd90667a5483714ada1e5097c
Author: Mark Michelson <mmichelson at digium.com>
Date:   Thu May 19 17:42:53 2011 -0500

    Fix another crash that could happen if a transaction terminates after the session is stopped.

diff --git a/src/PJSipSessionModule.cpp b/src/PJSipSessionModule.cpp
index 45f05c0..c20ac28 100644
--- a/src/PJSipSessionModule.cpp
+++ b/src/PJSipSessionModule.cpp
@@ -1362,11 +1362,21 @@ void PJSipSessionModule::invOnTsxStateChanged(pjsip_inv_session *inv, pjsip_tran
     //function from there.
 
     PJSipSessionModInfo *session_mod_info = static_cast<PJSipSessionModInfo*>(inv->mod_data[mModule.id]);
-    SipSessionPtr session = session_mod_info->getSessionPtr();
 
-    std::string method(pj_strbuf(&tsx->method.name), pj_strlen(&tsx->method.name));
-    std::cout << "Queuing a Transaction state operation for transaction " << tsx  << " Method: " << method << std::endl;
-    enqueueSessionWork(new TransactionStateOperation(this, tsx, inv, e->type, tsx->state), inv);
+    //When a transaction is terminated, it will call into here. In the situation where we are the UAS, this
+    //may happen after the session has been stopped, and therefore the session_mod_info is NULL.
+    //
+    //XXX This can be problematic with regards to transaction state replication since we may not communicate
+    //to the replica that the transaction has been terminated. This will need to be addressed when transaction
+    //state replication is worked on.
+    if (session_mod_info)
+    {
+        SipSessionPtr session = session_mod_info->getSessionPtr();
+
+        std::string method(pj_strbuf(&tsx->method.name), pj_strlen(&tsx->method.name));
+        std::cout << "Queuing a Transaction state operation for transaction " << tsx  << " Method: " << method << std::endl;
+        enqueueSessionWork(new TransactionStateOperation(this, tsx, inv, e->type, tsx->state), inv);
+    }
 }
 
 /**

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/sip.git



More information about the asterisk-scf-commits mailing list