[asterisk-scf-commits] asterisk-scf/integration/sip.git branch "threading" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Fri May 13 15:57:34 CDT 2011


branch "threading" has been updated
       via  9353dd8149dd64b463bd207a9be0bc5bd02e1b8f (commit)
       via  673f9e8d81c71018e60c6e7117348528c679ac72 (commit)
       via  576ef02976be38ad1d9ce56a1d1a78145386acec (commit)
       via  8ca653bd739b1b898d408fc439c12cfd85fa1f81 (commit)
       via  e1847341cfa58fa1881760591e1b15c89a3b3d98 (commit)
       via  31f82419f61f057124629513bc58b08b0cbf3c0f (commit)
       via  9efb966a80d43d8472f1844e1960f7c3c4b26fc3 (commit)
      from  19dcde2cc6fabcf7abcfcc41a6987169910c70ee (commit)

Summary of changes:
 src/PJSipSessionModule.cpp |  480 ++++++++++++++++++++++++++++----------------
 src/PJSipSessionModule.h   |   46 ++++-
 2 files changed, 350 insertions(+), 176 deletions(-)


- Log -----------------------------------------------------------------
commit 9353dd8149dd64b463bd207a9be0bc5bd02e1b8f
Author: Mark Michelson <mmichelson at digium.com>
Date:   Fri May 13 15:57:24 2011 -0500

    Make the media update handler a queueable operation.
    
    For once, an easy one :)

diff --git a/src/PJSipSessionModule.cpp b/src/PJSipSessionModule.cpp
index fb08c14..d4f3883 100644
--- a/src/PJSipSessionModule.cpp
+++ b/src/PJSipSessionModule.cpp
@@ -57,6 +57,7 @@ using namespace AsteriskSCF::System::ThreadPool::V1;
 using namespace AsteriskSCF::System::WorkQueue::V1;
 using namespace AsteriskSCF::WorkQueue;
 using namespace AsteriskSCF::SmartProxy;
+using namespace AsteriskSCF::Core::Discovery::V1;
 
 class RouteSessionCallback : public IceUtil::Shared
 {
@@ -1340,111 +1341,131 @@ void PJSipSessionModule::invOnCreateOffer(pjsip_inv_session*, pjmedia_sdp_sessio
     //stub
 }
 
-void PJSipSessionModule::invOnMediaUpdate(pjsip_inv_session *inv, pj_status_t status)
+class HandleMediaUpdate : public SipQueueableOperation
 {
-    if (status != PJ_SUCCESS)
-    {
-        // We have nothing, zip, nada, kablamo, in common.
-        return;
-    }
+public:
+    HandleMediaUpdate(pjsip_inv_session *inv, const int moduleId, const ServiceLocatorPrx& serviceLocator)
+        : mInv(inv), mModuleId(moduleId), mServiceLocator(serviceLocator) { }
 
-    const pjmedia_sdp_session *remote_sdp;
-    if ((status = pjmedia_sdp_neg_get_active_remote(inv->neg, &remote_sdp)) != PJ_SUCCESS)
+    SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
     {
-        // TODO: What happens if we can't get negotiated SDP?
-        return;
-    }
+        const pjmedia_sdp_session *remote_sdp;
+        pj_status_t status;
+        if ((status = pjmedia_sdp_neg_get_active_remote(mInv->neg, &remote_sdp)) != PJ_SUCCESS)
+        {
+            // TODO: What happens if we can't get negotiated SDP?
+            return Complete;
+        }
 
-    const pjmedia_sdp_conn *remote_conn = remote_sdp->media[0]->conn ? remote_sdp->media[0]->conn : remote_sdp->conn;
+        const pjmedia_sdp_conn *remote_conn = remote_sdp->media[0]->conn ? remote_sdp->media[0]->conn : remote_sdp->conn;
 
-    PJSipSessionModInfo *session_mod_info = (PJSipSessionModInfo*)inv->mod_data[mModule.id];
-    SipSessionPtr session = session_mod_info->getSessionPtr();
-    std::string destination(pj_strbuf(&remote_conn->addr), pj_strlen(&remote_conn->addr));
-    session->setRemoteDetails(destination, remote_sdp->media[0]->desc.port);
+        PJSipSessionModInfo *session_mod_info = (PJSipSessionModInfo*)mInv->mod_data[mModuleId];
+        SipSessionPtr session = session_mod_info->getSessionPtr();
+        std::string destination(pj_strbuf(&remote_conn->addr), pj_strlen(&remote_conn->addr));
+        session->setRemoteDetails(destination, remote_sdp->media[0]->desc.port);
 
-    // Each stream has its own set of formats, so go to that granularity
-    for (unsigned int stream = 0; stream < remote_sdp->media_count; stream++)
-    {
-        // We should have the parsing of the connection information here
+        // Each stream has its own set of formats, so go to that granularity
+        for (unsigned int stream = 0; stream < remote_sdp->media_count; stream++)
+        {
+            // We should have the parsing of the connection information here
 
-        // We should have the parsing of the rtcp attribute here
+            // We should have the parsing of the rtcp attribute here
 
-        FormatSeq formats;
+            FormatSeq formats;
 
-        // Next step is to see what formats exist on this stream
-        for (unsigned int format = 0; format < remote_sdp->media[stream]->desc.fmt_count; format++)
-        {
-            FormatDiscoverySDPPtr params = new FormatDiscoverySDP();
-            params->category = "media_format";
-            std::stringstream(pj_strbuf(&remote_sdp->media[stream]->desc.fmt[format])) >> params->payload;
-            params->type = std::string(pj_strbuf(&remote_sdp->media[stream]->desc.media),
-                    pj_strlen(&remote_sdp->media[stream]->desc.media));
-
-            // Some devices rely solely on the payload for known formats (such as PCMU) so the following format
-            // parameters are optional
-            const pjmedia_sdp_attr *attr;
-            if ((attr = pjmedia_sdp_media_find_attr2(remote_sdp->media[stream], "rtpmap",
-                                    &remote_sdp->media[stream]->desc.fmt[format])))
+            // Next step is to see what formats exist on this stream
+            for (unsigned int format = 0; format < remote_sdp->media[stream]->desc.fmt_count; format++)
             {
-                pjmedia_sdp_rtpmap *rtpmap;
-                if ((pjmedia_sdp_attr_to_rtpmap(inv->pool_active, attr, &rtpmap)) == PJ_SUCCESS)
+                FormatDiscoverySDPPtr params = new FormatDiscoverySDP();
+                params->category = "media_format";
+                std::stringstream(pj_strbuf(&remote_sdp->media[stream]->desc.fmt[format])) >> params->payload;
+                params->type = std::string(pj_strbuf(&remote_sdp->media[stream]->desc.media),
+                        pj_strlen(&remote_sdp->media[stream]->desc.media));
+
+                // Some devices rely solely on the payload for known formats (such as PCMU) so the following format
+                // parameters are optional
+                const pjmedia_sdp_attr *attr;
+                if ((attr = pjmedia_sdp_media_find_attr2(remote_sdp->media[stream], "rtpmap",
+                                        &remote_sdp->media[stream]->desc.fmt[format])))
                 {
-                    params->subtype = std::string(pj_strbuf(&rtpmap->enc_name), pj_strlen(&rtpmap->enc_name));
-                    params->samplerate = rtpmap->clock_rate;
+                    pjmedia_sdp_rtpmap *rtpmap;
+                    if ((pjmedia_sdp_attr_to_rtpmap(mInv->pool_active, attr, &rtpmap)) == PJ_SUCCESS)
+                    {
+                        params->subtype = std::string(pj_strbuf(&rtpmap->enc_name), pj_strlen(&rtpmap->enc_name));
+                        params->samplerate = rtpmap->clock_rate;
+                    }
                 }
-            }
 
-            // Next we move on to the format specific parameters
-            if ((attr = pjmedia_sdp_media_find_attr2(remote_sdp->media[stream], "fmtp",
-                                    &remote_sdp->media[stream]->desc.fmt[format])))
-            {
-                pjmedia_sdp_fmtp fmtp;
-                if ((pjmedia_sdp_attr_get_fmtp(attr, &fmtp)) == PJ_SUCCESS)
+                // Next we move on to the format specific parameters
+                if ((attr = pjmedia_sdp_media_find_attr2(remote_sdp->media[stream], "fmtp",
+                                        &remote_sdp->media[stream]->desc.fmt[format])))
                 {
-                    std::string parameter = std::string(pj_strbuf(&fmtp.fmt_param), pj_strlen(&fmtp.fmt_param));
-                    params->parameters.push_back(parameter);
+                    pjmedia_sdp_fmtp fmtp;
+                    if ((pjmedia_sdp_attr_get_fmtp(attr, &fmtp)) == PJ_SUCCESS)
+                    {
+                        std::string parameter = std::string(pj_strbuf(&fmtp.fmt_param), pj_strlen(&fmtp.fmt_param));
+                        params->parameters.push_back(parameter);
+                    }
                 }
-            }
 
-            // Next up are attributes that are not specific to the format, such as ptime
-            for (unsigned int attribute = 0; attribute < remote_sdp->media[stream]->attr_count; attribute++)
-            {
-                // Attributes we already touch above OR know aren't helpful to the media format component don't need to
-                // be given to it, obviously
-                if (!pj_strcmp2(&remote_sdp->media[stream]->attr[attribute]->name, "rtpmap") ||
-                    !pj_strcmp2(&remote_sdp->media[stream]->attr[attribute]->name, "fmtp") ||
-                    !pj_strcmp2(&remote_sdp->media[stream]->attr[attribute]->name, "rtcp") ||
-                    !pj_strcmp2(&remote_sdp->media[stream]->attr[attribute]->name, "sendrecv") ||
-                    !pj_strcmp2(&remote_sdp->media[stream]->attr[attribute]->name, "sendonly") ||
-                    !pj_strcmp2(&remote_sdp->media[stream]->attr[attribute]->name, "recvonly"))
+                // Next up are attributes that are not specific to the format, such as ptime
+                for (unsigned int attribute = 0; attribute < remote_sdp->media[stream]->attr_count; attribute++)
                 {
-                    continue;
-                }
+                    // Attributes we already touch above OR know aren't helpful to the media format component don't need to
+                    // be given to it, obviously
+                    if (!pj_strcmp2(&remote_sdp->media[stream]->attr[attribute]->name, "rtpmap") ||
+                        !pj_strcmp2(&remote_sdp->media[stream]->attr[attribute]->name, "fmtp") ||
+                        !pj_strcmp2(&remote_sdp->media[stream]->attr[attribute]->name, "rtcp") ||
+                        !pj_strcmp2(&remote_sdp->media[stream]->attr[attribute]->name, "sendrecv") ||
+                        !pj_strcmp2(&remote_sdp->media[stream]->attr[attribute]->name, "sendonly") ||
+                        !pj_strcmp2(&remote_sdp->media[stream]->attr[attribute]->name, "recvonly"))
+                    {
+                        continue;
+                    }
 
-                std::string parameter = std::string(pj_strbuf(&remote_sdp->media[stream]->attr[attribute]->name),
-                        pj_strlen(&remote_sdp->media[stream]->attr[attribute]->name)) + ':' +
-                    std::string(pj_strbuf(&remote_sdp->media[stream]->attr[attribute]->value),
-                            pj_strlen(&remote_sdp->media[stream]->attr[attribute]->value));
-                params->parameters.push_back(parameter);
-            }
+                    std::string parameter = std::string(pj_strbuf(&remote_sdp->media[stream]->attr[attribute]->name),
+                            pj_strlen(&remote_sdp->media[stream]->attr[attribute]->name)) + ':' +
+                        std::string(pj_strbuf(&remote_sdp->media[stream]->attr[attribute]->value),
+                                pj_strlen(&remote_sdp->media[stream]->attr[attribute]->value));
+                    params->parameters.push_back(parameter);
+                }
 
-            try
-            {
-                MediaFormatServicePrx service = MediaFormatServicePrx::uncheckedCast(mServiceLocator->locate(params));
+                try
+                {
+                    MediaFormatServicePrx service = MediaFormatServicePrx::uncheckedCast(mServiceLocator->locate(params));
 
-                // It is entirely possible for the service locator to not find a service that knows about this media
-                // format
-                if (service != 0)
+                    // It is entirely possible for the service locator to not find a service that knows about this media
+                    // format
+                    if (service != 0)
+                    {
+                        formats.push_back(FormatPtr::dynamicCast(service->getFormat(params)));
+                    }
+                }
+                catch (...)
                 {
-                    formats.push_back(FormatPtr::dynamicCast(service->getFormat(params)));
+                    // If we get here the format just isn't supported...
                 }
             }
-            catch (...)
-            {
-                // If we get here the format just isn't supported...
-            }
         }
+        return Complete;
     }
+
+    pjsip_inv_session* mInv;
+    const int mModuleId;
+    ServiceLocatorPrx mServiceLocator;
+};
+
+void PJSipSessionModule::invOnMediaUpdate(pjsip_inv_session *inv, pj_status_t status)
+{
+    if (status != PJ_SUCCESS)
+    {
+        // We have nothing, zip, nada, kablamo, in common.
+        return;
+    }
+
+    PJSipSessionModInfo *session_mod_info = (PJSipSessionModInfo*) inv->mod_data[mModule.id];
+    SipSessionPtr session = session_mod_info->getSessionPtr();
+    session->enqueueSessionWork(new HandleMediaUpdate(inv, mModule.id, mServiceLocator));
 }
 
 pjsip_redirect_op PJSipSessionModule::invOnRedirected(pjsip_inv_session*, const pjsip_uri*,
@@ -1460,6 +1481,23 @@ pjsip_dialog *PJSipSessionModule::uaOnDialogForked(pjsip_dialog*, pjsip_rx_data*
     return NULL;
 }
 
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
 QueuePtr PJSipSessionModule::getThreadPoolQueue()
 {
     return mPoolQueue;

commit 673f9e8d81c71018e60c6e7117348528c679ac72
Author: Mark Michelson <mmichelson at digium.com>
Date:   Fri May 13 15:36:41 2011 -0500

    Fix up screwy indentation because it's bothering me.

diff --git a/src/PJSipSessionModule.cpp b/src/PJSipSessionModule.cpp
index 9baaffe..fb08c14 100644
--- a/src/PJSipSessionModule.cpp
+++ b/src/PJSipSessionModule.cpp
@@ -1193,8 +1193,8 @@ void PJSipSessionModule::invOnStateChanged(pjsip_inv_session *inv, pjsip_event *
         }
         std::vector<AsteriskSCF::SessionCommunications::V1::SessionListenerPrx> listeners = session->getListeners();
         lg(Debug) << "Relating stopped state to " << listeners.size() << " listeners";
-	AsteriskSCF::SessionCommunications::V1::StoppedIndicationPtr stopped(new AsteriskSCF::SessionCommunications::V1::StoppedIndication());
-	stopped->response = response;
+        AsteriskSCF::SessionCommunications::V1::StoppedIndicationPtr stopped(new AsteriskSCF::SessionCommunications::V1::StoppedIndication());
+        stopped->response = response;
         for (std::vector<AsteriskSCF::SessionCommunications::V1::SessionListenerPrx>::iterator listener =
                  listeners.begin();
              listener != listeners.end();
@@ -1212,19 +1212,19 @@ void PJSipSessionModule::invOnStateChanged(pjsip_inv_session *inv, pjsip_event *
         session_mod_info->mNeedsRemoval = true;
         pjsip_dialog *dlg = inv->dlg;
         PJSipDialogModInfo *dlg_mod_info = (PJSipDialogModInfo*) dlg->mod_data[mModule.id];
-	if (dlg_mod_info)
-	{
-	    dlg_mod_info->mNeedsRemoval = true;
-	}
+        if (dlg_mod_info)
+        {
+            dlg_mod_info->mNeedsRemoval = true;
+        }
         lg(Debug) << "Replicating state on DISCONNECTED inv_state.";
         replicateState(dlg_mod_info, NULL, session_mod_info);
         delete session_mod_info;
-	inv->mod_data[mModule.id] = 0;
-	if (dlg_mod_info)
-	{
-	    delete dlg_mod_info;
-	    dlg->mod_data[mModule.id] = 0;
-	}
+        inv->mod_data[mModule.id] = 0;
+        if (dlg_mod_info)
+        {
+            delete dlg_mod_info;
+            dlg->mod_data[mModule.id] = 0;
+        }
     }
     if (event->type == PJSIP_EVENT_RX_MSG && inv->state == PJSIP_INV_STATE_CONFIRMED)
     {

commit 576ef02976be38ad1d9ce56a1d1a78145386acec
Author: Mark Michelson <mmichelson at digium.com>
Date:   Fri May 13 14:56:06 2011 -0500

    Adjust HandleInviteResponseOperation to proper use SipAMICallback.

diff --git a/src/PJSipSessionModule.cpp b/src/PJSipSessionModule.cpp
index 1868603..9baaffe 100644
--- a/src/PJSipSessionModule.cpp
+++ b/src/PJSipSessionModule.cpp
@@ -896,6 +896,8 @@ public:
     const int mModuleId;
     /**
      * Helps determine which end_* method to call on the session router when AMI completes.
+     * XXX It may be more elegant to handle this by sending a cookie to the AMI method
+     * instead.
      */
     bool mWasWithDestination;
 };
@@ -1046,10 +1048,9 @@ public:
             {
                 try
                 {
-                    ListenerCallbackPtr cb(new ListenerCallback(RingingCallbackName));
-                    Callback_SessionListener_indicatedPtr ringingCB =
-                        newCallback_SessionListener_indicated(cb, &ListenerCallback::failure);
-                    (*listener)->begin_indicated(mSession->getSessionProxy(), new RingingIndication(), ringingCB);
+                    SipAMICallbackPtr cb(new SipAMICallback(0, mSession, this, false, true));
+                    Ice::CallbackPtr d = Ice::newCallback(cb, &SipAMICallback::callback);
+                    (*listener)->begin_indicated(mSession->getSessionProxy(), new RingingIndication(), d);
                 }
                 catch (const Ice::Exception &ex)
                 {
@@ -1069,12 +1070,11 @@ public:
             {
                 try
                 {
-                    ListenerCallbackPtr cb(new ListenerCallback(ProgressingCallbackName));
-                    Callback_SessionListener_indicatedPtr progressingCB =
-                        newCallback_SessionListener_indicated(cb, &ListenerCallback::failure);
-	    	ProgressingIndicationPtr progressing(new ProgressingIndication());
-	    	progressing->response = response;
-                    (*listener)->begin_indicated(mSession->getSessionProxy(), progressing, progressingCB);
+                    SipAMICallbackPtr cb(new SipAMICallback(0, mSession, this, false, true));
+                    Ice::CallbackPtr d = Ice::newCallback(cb, &SipAMICallback::callback);
+                    ProgressingIndicationPtr progressing(new ProgressingIndication());
+                    progressing->response = response;
+                    (*listener)->begin_indicated(mSession->getSessionProxy(), progressing, d);
                 }
                 catch (const Ice::Exception &ex)
                 {
@@ -1094,10 +1094,9 @@ public:
                 {
                     try
                     {
-                        ListenerCallbackPtr cb(new ListenerCallback(ConnectedCallbackName));
-                        Callback_SessionListener_indicatedPtr connectedCB =
-                            newCallback_SessionListener_indicated(cb, &ListenerCallback::failure);
-                        (*listener)->begin_indicated(mSession->getSessionProxy(), new ConnectedIndication(), connectedCB);
+                        SipAMICallbackPtr cb(new SipAMICallback(0, mSession, this, false, true));
+                        Ice::CallbackPtr d = Ice::newCallback(cb, &SipAMICallback::callback);
+                        (*listener)->begin_indicated(mSession->getSessionProxy(), new ConnectedIndication(), d);
                     }
                     catch (const Ice::Exception &ex)
                     {
@@ -1111,7 +1110,16 @@ public:
 
     SuspendableWorkResult processSessionListenerResult()
     {
-        //XXX Come back to this after fixing other crap
+        assert(mAsyncResult);
+        SessionListenerPrx listener = SessionListenerPrx::uncheckedCast(mAsyncResult->getProxy());
+        try
+        {
+            listener->end_indicated(mAsyncResult);
+        }
+        catch (const Ice::Exception& ex)
+        {
+            lg(Error) << "Ice exception when attempting to indicate something or other";
+        }
         return Complete;
     }
     

commit 8ca653bd739b1b898d408fc439c12cfd85fa1f81
Author: Mark Michelson <mmichelson at digium.com>
Date:   Fri May 13 14:20:09 2011 -0500

    Fix an error in HandleReferOperation.
    
    The problem was that I could royally screw things up by
    calling the incorrect end_* calback. Now there is a boolean
    to determine which to use.
    
    The class has gotten large enough that I may move it to its own
    file.

diff --git a/src/PJSipSessionModule.cpp b/src/PJSipSessionModule.cpp
index 135384c..1868603 100644
--- a/src/PJSipSessionModule.cpp
+++ b/src/PJSipSessionModule.cpp
@@ -638,7 +638,8 @@ public:
             const AsteriskSCF::SmartProxy::SmartProxy<SessionRouterPrx>& sessionRouter,
             const int moduleId)
         : mState(Initial), mInv(inv), mTsx(tsx), mTdata(tdata), 
-        mTargetSipUri(target_sip_uri), mSession(session), mSessionRouter(sessionRouter), mModuleId(moduleId) { }
+        mTargetSipUri(target_sip_uri), mSession(session), mSessionRouter(sessionRouter),
+        mModuleId(moduleId), mWasWithDestination(false) { }
 
     SuspendableWorkResult execute(const SuspendableWorkListenerPtr& workListener)
     {
@@ -793,6 +794,7 @@ public:
                 Ice::CallbackPtr d = Ice::newCallback(cb, &SipAMICallback::callback);
 
                 lg(Debug) << "handleRefer() calling router connectBridgedSessionsWithDestination(). ";
+                mWasWithDestination = true;
                 mState = CalledBack;
                 mSessionRouter->begin_connectBridgedSessionsWithDestination(operationId, session->getSessionProxy(), target, d);
                 return Complete;
@@ -817,7 +819,22 @@ public:
         SessionRouterPrx router = SessionRouterPrx::uncheckedCast(mAsyncResult->getProxy());
         try
         {
-            router->end_connectBridgedSessions(mAsyncResult);
+            if (mWasWithDestination)
+            {
+                router->end_connectBridgedSessionsWithDestination(mAsyncResult);
+            }
+            else
+            {
+                router->end_connectBridgedSessions(mAsyncResult);
+            }
+        }
+        catch (const AsteriskSCF::Core::Routing::V1::DestinationNotFoundException &)
+        {
+            lg(Debug) << "ConnectBridgedSessionsWithDestination sending 404 due to destination not found.";
+
+            pjsip_dlg_modify_response(mInv->dlg, mTdata, 404, NULL);
+            pjsip_dlg_send_response(mInv->dlg, mTsx, mTdata);
+            return Complete;
         }
         catch (const std::exception &e)
         {
@@ -878,9 +895,9 @@ public:
      */
     const int mModuleId;
     /**
-     * The result of any AMI calls we make
+     * Helps determine which end_* method to call on the session router when AMI completes.
      */
-    Ice::AsyncResultPtr mAsyncResult;
+    bool mWasWithDestination;
 };
 
 void PJSipSessionModule::handleRefer(pjsip_inv_session *inv, pjsip_rx_data *rdata)
@@ -991,10 +1008,24 @@ class HandleInviteResponseOperation : public SipQueueableOperation
 {
 public:
     HandleInviteResponseOperation(int respCode, const int invState, const SipSessionPtr& session)
-        : mRespCode(respCode), mInvState(invState), mSession(session) { }
+        : mState(Initial), mRespCode(respCode), mInvState(invState), mSession(session) { }
 
     SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
     {
+        switch (mState)
+        {
+        case Initial:
+            return processInviteResponse();
+        case CalledBack:
+            return processSessionListenerResult();
+        default:
+            lg(Error) << "We're in a bad state here...";
+            return Complete;
+        }
+    }
+
+    SuspendableWorkResult processInviteResponse()
+    {
         //Treat all 1XX messages we don't recognize the same as a 180
         if (mRespCode > 100 && mRespCode < 200 && mRespCode != 183)
         {
@@ -1078,6 +1109,26 @@ public:
         return Complete;
     }
 
+    SuspendableWorkResult processSessionListenerResult()
+    {
+        //XXX Come back to this after fixing other crap
+        return Complete;
+    }
+    
+    enum states
+    {
+        /**
+         * First state.
+         * @see processRefer
+         */
+        Initial,
+        /**
+         * State after routing service completes
+         * @see processSessionListenerResult
+         */
+        CalledBack,
+    } mState;
+
     int mRespCode;
     const int mInvState;
     SipSessionPtr mSession;

commit e1847341cfa58fa1881760591e1b15c89a3b3d98
Author: Mark Michelson <mmichelson at digium.com>
Date:   Fri May 13 12:28:16 2011 -0500

    Fix up some twisted logic in SipAMICallback and start down the road of queuing INVITE response handling.

diff --git a/src/PJSipSessionModule.cpp b/src/PJSipSessionModule.cpp
index be27395..135384c 100644
--- a/src/PJSipSessionModule.cpp
+++ b/src/PJSipSessionModule.cpp
@@ -626,7 +626,7 @@ void PJSipSessionModule::handleNewInvite(pjsip_rx_data *rdata)
     }
 }
 
-class HandleReferOperation : public SuspendableWork, public SipQueueableOperation 
+class HandleReferOperation : public SipQueueableOperation 
 {
 public:
     HandleReferOperation(
@@ -761,7 +761,7 @@ public:
             try
             {
                 std::string operationId = ::IceUtil::generateUUID();
-                SipAMICallbackPtr cb(new SipAMICallback(workListener, this, false));
+                SipAMICallbackPtr cb(new SipAMICallback(workListener, mSession, this, false, true));
                 Ice::CallbackPtr d = Ice::newCallback(cb, &SipAMICallback::callback);
 
                 lg(Debug) << "handleRefer() calling router connectBridgedSessions(). ";
@@ -789,7 +789,7 @@ public:
                 std::string operationId = ::IceUtil::generateUUID();
                 PJSipSessionModInfo *session_mod_info = (PJSipSessionModInfo*)mInv->mod_data[mModuleId];
                 SipSessionPtr session = session_mod_info->getSessionPtr();
-                SipAMICallbackPtr cb(new SipAMICallback(workListener, this, false));
+                SipAMICallbackPtr cb(new SipAMICallback(workListener, mSession, this, false, true));
                 Ice::CallbackPtr d = Ice::newCallback(cb, &SipAMICallback::callback);
 
                 lg(Debug) << "handleRefer() calling router connectBridgedSessionsWithDestination(). ";
@@ -987,100 +987,117 @@ void PJSipSessionModule::on_tsx_state(pjsip_transaction*, pjsip_event*)
     return;
 }
 
-void PJSipSessionModule::handleInviteResponse(pjsip_inv_session* inv,
-    pjsip_rx_data* rdata, pjsip_dialog*)
+class HandleInviteResponseOperation : public SipQueueableOperation
 {
-    int respCode = rdata->msg_info.msg->line.status.code;
-    PJSipSessionModInfo *session_mod_info = (PJSipSessionModInfo*)inv->mod_data[mModule.id];
-    SipSessionPtr session = session_mod_info->getSessionPtr();
-    //Commented because they are currently unused. They
-    //will be once the individual cases are mapped out.
-    //pjsip_dialog *dlg = pjsip_rdata_get_dlg(rdata);
-    //pjsip_module *module = pjsip_ua_instance();
-    //
-    //XXX There are a BUNCH of response codes we need
-    //to add code to handle.
+public:
+    HandleInviteResponseOperation(int respCode, const int invState, const SipSessionPtr& session)
+        : mRespCode(respCode), mInvState(invState), mSession(session) { }
 
-    //Treat all 1XX messages we don't recognize the same as a 180
-    if (respCode > 100 && respCode < 200 && respCode != 183)
+    SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
     {
-        respCode = 180;
-    }
-    if (respCode == 100)
-    {
-        //Not sure if PJSIP even bothers passing these up
-        lg(Debug) << "Got 100 response";
-    }
-    else if (respCode == 180)
-    {
-        lg(Debug) << "Got 180 response";
-        std::vector<AsteriskSCF::SessionCommunications::V1::SessionListenerPrx> listeners = session->getListeners();
-        std::vector<AsteriskSCF::SessionCommunications::V1::SessionListenerPrx>::const_iterator listener;
-        lg(Debug) << "Relating ringing state to " << listeners.size() << " listeners";
-        for (listener = listeners.begin(); listener != listeners.end(); ++listener)
+        //Treat all 1XX messages we don't recognize the same as a 180
+        if (mRespCode > 100 && mRespCode < 200 && mRespCode != 183)
         {
-            try
-            {
-                ListenerCallbackPtr cb(new ListenerCallback(RingingCallbackName));
-                Callback_SessionListener_indicatedPtr ringingCB =
-                    newCallback_SessionListener_indicated(cb, &ListenerCallback::failure);
-                (*listener)->begin_indicated(session->getSessionProxy(), new RingingIndication(), ringingCB);
-            }
-            catch (const Ice::Exception &ex)
-            {
-                lg(Error) << "Ice exception when attempting to relate ringing state: " << ex.what();
-            }
+            mRespCode = 180;
         }
-    }
-    else if (respCode == 183)
-    {
-        lg(Debug) << "Got 183 response";
-        AsteriskSCF::SessionCommunications::V1::ResponseCodePtr response = new AsteriskSCF::SessionCommunications::V1::ResponseCode();
-        response->isdnCode = 42;
-        std::vector<AsteriskSCF::SessionCommunications::V1::SessionListenerPrx> listeners = session->getListeners();
-        std::vector<AsteriskSCF::SessionCommunications::V1::SessionListenerPrx>::const_iterator listener;
-        lg(Debug) << "Relating progressing state to " << listeners.size() << " listeners";
-        for (listener = listeners.begin(); listener != listeners.end(); ++listener)
+        if (mRespCode == 100)
         {
-            try
-            {
-                ListenerCallbackPtr cb(new ListenerCallback(ProgressingCallbackName));
-                Callback_SessionListener_indicatedPtr progressingCB =
-                    newCallback_SessionListener_indicated(cb, &ListenerCallback::failure);
-		ProgressingIndicationPtr progressing(new ProgressingIndication());
-		progressing->response = response;
-                (*listener)->begin_indicated(session->getSessionProxy(), progressing, progressingCB);
-            }
-            catch (const Ice::Exception &ex)
+            //Not sure if PJSIP even bothers passing these up
+            lg(Debug) << "Got 100 response";
+        }
+        else if (mRespCode == 180)
+        {
+            lg(Debug) << "Got 180 response";
+            std::vector<AsteriskSCF::SessionCommunications::V1::SessionListenerPrx> listeners = mSession->getListeners();
+            std::vector<AsteriskSCF::SessionCommunications::V1::SessionListenerPrx>::const_iterator listener;
+            lg(Debug) << "Relating ringing state to " << listeners.size() << " listeners";
+            for (listener = listeners.begin(); listener != listeners.end(); ++listener)
             {
-                lg(Error) << "Ice exception when attempting to relate progressing state: " << ex.what();
+                try
+                {
+                    ListenerCallbackPtr cb(new ListenerCallback(RingingCallbackName));
+                    Callback_SessionListener_indicatedPtr ringingCB =
+                        newCallback_SessionListener_indicated(cb, &ListenerCallback::failure);
+                    (*listener)->begin_indicated(mSession->getSessionProxy(), new RingingIndication(), ringingCB);
+                }
+                catch (const Ice::Exception &ex)
+                {
+                    lg(Error) << "Ice exception when attempting to relate ringing state: " << ex.what();
+                }
             }
         }
-    }
-    else if (respCode == 200)
-    {
-        lg(Debug) << "Got 200 response";
-        if (inv->state != PJSIP_INV_STATE_DISCONNECTED)
+        else if (mRespCode == 183)
         {
-            std::vector<AsteriskSCF::SessionCommunications::V1::SessionListenerPrx> listeners = session->getListeners();
+            lg(Debug) << "Got 183 response";
+            AsteriskSCF::SessionCommunications::V1::ResponseCodePtr response = new AsteriskSCF::SessionCommunications::V1::ResponseCode();
+            response->isdnCode = 42;
+            std::vector<AsteriskSCF::SessionCommunications::V1::SessionListenerPrx> listeners = mSession->getListeners();
             std::vector<AsteriskSCF::SessionCommunications::V1::SessionListenerPrx>::const_iterator listener;
-            lg(Debug) << "Relating connected state to " << listeners.size() << " listeners";
+            lg(Debug) << "Relating progressing state to " << listeners.size() << " listeners";
             for (listener = listeners.begin(); listener != listeners.end(); ++listener)
             {
                 try
                 {
-                    ListenerCallbackPtr cb(new ListenerCallback(ConnectedCallbackName));
-                    Callback_SessionListener_indicatedPtr connectedCB =
+                    ListenerCallbackPtr cb(new ListenerCallback(ProgressingCallbackName));
+                    Callback_SessionListener_indicatedPtr progressingCB =
                         newCallback_SessionListener_indicated(cb, &ListenerCallback::failure);
-                    (*listener)->begin_indicated(session->getSessionProxy(), new ConnectedIndication(), connectedCB);
+	    	ProgressingIndicationPtr progressing(new ProgressingIndication());
+	    	progressing->response = response;
+                    (*listener)->begin_indicated(mSession->getSessionProxy(), progressing, progressingCB);
                 }
                 catch (const Ice::Exception &ex)
                 {
-                    lg(Error) << "Ice exception when attempting to relate connected state: " << ex.what();
+                    lg(Error) << "Ice exception when attempting to relate progressing state: " << ex.what();
                 }
             }
         }
+        else if (mRespCode == 200)
+        {
+            lg(Debug) << "Got 200 response";
+            if (mInvState != PJSIP_INV_STATE_DISCONNECTED)
+            {
+                std::vector<AsteriskSCF::SessionCommunications::V1::SessionListenerPrx> listeners = mSession->getListeners();
+                std::vector<AsteriskSCF::SessionCommunications::V1::SessionListenerPrx>::const_iterator listener;
+                lg(Debug) << "Relating connected state to " << listeners.size() << " listeners";
+                for (listener = listeners.begin(); listener != listeners.end(); ++listener)
+                {
+                    try
+                    {
+                        ListenerCallbackPtr cb(new ListenerCallback(ConnectedCallbackName));
+                        Callback_SessionListener_indicatedPtr connectedCB =
+                            newCallback_SessionListener_indicated(cb, &ListenerCallback::failure);
+                        (*listener)->begin_indicated(mSession->getSessionProxy(), new ConnectedIndication(), connectedCB);
+                    }
+                    catch (const Ice::Exception &ex)
+                    {
+                        lg(Error) << "Ice exception when attempting to relate connected state: " << ex.what();
+                    }
+                }
+            }
+        }
+        return Complete;
     }
+
+    int mRespCode;
+    const int mInvState;
+    SipSessionPtr mSession;
+};
+
+void PJSipSessionModule::handleInviteResponse(pjsip_inv_session* inv,
+    pjsip_rx_data* rdata, pjsip_dialog*)
+{
+    int respCode = rdata->msg_info.msg->line.status.code;
+    PJSipSessionModInfo *session_mod_info = (PJSipSessionModInfo*)inv->mod_data[mModule.id];
+    SipSessionPtr session = session_mod_info->getSessionPtr();
+    //Commented because they are currently unused. They
+    //will be once the individual cases are mapped out.
+    //pjsip_dialog *dlg = pjsip_rdata_get_dlg(rdata);
+    //pjsip_module *module = pjsip_ua_instance();
+    //
+    //XXX There are a BUNCH of response codes we need
+    //to add code to handle.
+
+    session->enqueueSessionWork(new HandleInviteResponseOperation(respCode, inv->state, session));
 }
 
 void PJSipSessionModule::invOnStateChanged(pjsip_inv_session *inv, pjsip_event *event)
diff --git a/src/PJSipSessionModule.h b/src/PJSipSessionModule.h
index 03fa5d4..fe24470 100644
--- a/src/PJSipSessionModule.h
+++ b/src/PJSipSessionModule.h
@@ -125,7 +125,7 @@ private:
     AsteriskSCF::System::ThreadPool::V1::PoolListenerPtr mPoolListener;
 };
 
-class SipQueueableOperation : virtual public IceUtil::Shared
+class SipQueueableOperation : virtual public AsteriskSCF::System::WorkQueue::V1::SuspendableWork
 {
 public:
     virtual ~SipQueueableOperation();
@@ -142,9 +142,12 @@ class SipAMICallback : public IceUtil::Shared
 public:
     SipAMICallback(
             const AsteriskSCF::System::WorkQueue::V1::SuspendableWorkListenerPtr& listener,
+            const SipSessionPtr& session,
             const SipQueueableOperationPtr& operation,
-            bool isSuspended)
-        : mListener(listener), mOperation(operation), mIsSuspended(isSuspended)
+            bool isSuspended,
+            bool needsRequeuing)
+        : mListener(listener), mSession(session), mOperation(operation),
+        mIsSuspended(isSuspended), mNeedsRequeuing(needsRequeuing)
     {
     }
 
@@ -155,6 +158,10 @@ public:
         {
             mListener->workResumable();
         }
+        else if (mNeedsRequeuing)
+        {
+            mSession->enqueueSessionWork(mOperation);
+        }
     }
 
 private:
@@ -165,6 +172,15 @@ private:
      */
     AsteriskSCF::System::WorkQueue::V1::SuspendableWorkListenerPtr mListener;
     /**
+     * In other cases, when an AMI RPC is made, the operation on the SIP
+     * session is not marked as "Suspended" since it's perfectly fine
+     * for other operations to intervene as necessary. However, the operation
+     * has multiple parts and thus needs to be requeued once the AMI RPC
+     * returns. The AMI callback uses the session in order to enqueue the
+     * operation again.
+     */
+    SipSessionPtr mSession;
+    /**
      * The queued operation that made an AMI call
      */
     SipQueueableOperationPtr mOperation;
@@ -173,6 +189,11 @@ private:
      * work may be resumed.
      */
     bool mIsSuspended;
+    /**
+     * Use this to determine if we need to requeue the operation. This
+     * and mIsSuspended should never both be true.
+     */
+    bool mNeedsRequeuing;
 };
 
 typedef IceUtil::Handle<SipAMICallback> SipAMICallbackPtr;

commit 31f82419f61f057124629513bc58b08b0cbf3c0f
Author: Mark Michelson <mmichelson at digium.com>
Date:   Fri May 13 11:06:00 2011 -0500

    Change the "Suspendedness" of AMI calls in HandleReferOperation.
    
    This also changes SipAMICallback to be tolerant of whether it
    needs to let the listener know work is resumable.

diff --git a/src/PJSipSessionModule.cpp b/src/PJSipSessionModule.cpp
index 550d8f0..be27395 100644
--- a/src/PJSipSessionModule.cpp
+++ b/src/PJSipSessionModule.cpp
@@ -761,14 +761,14 @@ public:
             try
             {
                 std::string operationId = ::IceUtil::generateUUID();
-                SipAMICallbackPtr cb(new SipAMICallback(workListener, this));
+                SipAMICallbackPtr cb(new SipAMICallback(workListener, this, false));
                 Ice::CallbackPtr d = Ice::newCallback(cb, &SipAMICallback::callback);
 
                 lg(Debug) << "handleRefer() calling router connectBridgedSessions(). ";
                 mState = CalledBack;
                 mSessionRouter->begin_connectBridgedSessions(operationId, mSession->getSessionProxy(), other_session->getSessionProxy(), d);
                 pjsip_dlg_dec_lock(other_dlg);
-                return Suspended;
+                return Complete;
             }
             catch (const Ice::CommunicatorDestroyedException &)
             {
@@ -789,13 +789,13 @@ public:
                 std::string operationId = ::IceUtil::generateUUID();
                 PJSipSessionModInfo *session_mod_info = (PJSipSessionModInfo*)mInv->mod_data[mModuleId];
                 SipSessionPtr session = session_mod_info->getSessionPtr();
-                SipAMICallbackPtr cb(new SipAMICallback(workListener, this));
+                SipAMICallbackPtr cb(new SipAMICallback(workListener, this, false));
                 Ice::CallbackPtr d = Ice::newCallback(cb, &SipAMICallback::callback);
 
                 lg(Debug) << "handleRefer() calling router connectBridgedSessionsWithDestination(). ";
                 mState = CalledBack;
                 mSessionRouter->begin_connectBridgedSessionsWithDestination(operationId, session->getSessionProxy(), target, d);
-                return Suspended;
+                return Complete;
             }
             catch (const Ice::CommunicatorDestroyedException &)
             {
diff --git a/src/PJSipSessionModule.h b/src/PJSipSessionModule.h
index a1c87ea..03fa5d4 100644
--- a/src/PJSipSessionModule.h
+++ b/src/PJSipSessionModule.h
@@ -142,20 +142,37 @@ class SipAMICallback : public IceUtil::Shared
 public:
     SipAMICallback(
             const AsteriskSCF::System::WorkQueue::V1::SuspendableWorkListenerPtr& listener,
-            const SipQueueableOperationPtr& operation)
-        : mListener(listener), mOperation(operation)
+            const SipQueueableOperationPtr& operation,
+            bool isSuspended)
+        : mListener(listener), mOperation(operation), mIsSuspended(isSuspended)
     {
     }
 
     void callback(const Ice::AsyncResultPtr &r)
     {
         mOperation->setAsyncResult(r);
-        mListener->workResumable();
+        if (mIsSuspended)
+        {
+            mListener->workResumable();
+        }
     }
 
 private:
+    /**
+     * In some cases, when an AMI RPC is made, operation on a SIP session is
+     * suspended pending the AMI's return. The AMI callback uses the listener
+     * to notify that work may be resumed.
+     */
     AsteriskSCF::System::WorkQueue::V1::SuspendableWorkListenerPtr mListener;
+    /**
+     * The queued operation that made an AMI call
+     */
     SipQueueableOperationPtr mOperation;
+    /**
+     * Use this to determine if we need to notify the listener that
+     * work may be resumed.
+     */
+    bool mIsSuspended;
 };
 
 typedef IceUtil::Handle<SipAMICallback> SipAMICallbackPtr;

commit 9efb966a80d43d8472f1844e1960f7c3c4b26fc3
Author: Mark Michelson <mmichelson at digium.com>
Date:   Fri May 13 09:04:39 2011 -0500

    Adding a bit of comments.

diff --git a/src/PJSipSessionModule.cpp b/src/PJSipSessionModule.cpp
index 94fe166..550d8f0 100644
--- a/src/PJSipSessionModule.cpp
+++ b/src/PJSipSessionModule.cpp
@@ -654,6 +654,16 @@ public:
         }
     }
 
+    /**
+     * This is what is initially called when the operation is queued.
+     *
+     * In this portion, we grab some essential data out of the target URI and use
+     * it to call out to the routing service, either ConnectBridgedSessions or
+     * ConnectBridgedSessionWithDestination.
+     *
+     * When this operation is successful, it will result in work for this session
+     * being suspended until the routing service returns.
+     */
     SuspendableWorkResult processRefer(const SuspendableWorkListenerPtr& workListener)
     {
         // Determine if this is a blind transfer or an attended transfer
@@ -797,6 +807,10 @@ public:
         }
     };
 
+    /**
+     * Once the routing service has allowed for work to be resumed,
+     * this is where the final work is done
+     */
     SuspendableWorkResult processRoutingResponse(const SuspendableWorkListenerPtr&)
     {
         assert(mAsyncResult);
@@ -822,7 +836,15 @@ public:
 
     enum states
     {
+        /**
+         * First state.
+         * @see processRefer
+         */
         Initial,
+        /**
+         * State after routing service completes
+         * @see processRoutingResponse
+         */
         CalledBack,
     } mState;
     /**

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/sip.git



More information about the asterisk-scf-commits mailing list