[asterisk-scf-commits] asterisk-scf/integration/sip.git branch "threading" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Thu May 19 16:59:58 CDT 2011


branch "threading" has been updated
       via  c807bf56d40a56549bc37ea3a3e46d8f540821da (commit)
       via  7246f8002b621284be27c987248888c5f701b7de (commit)
       via  b04ec118712501b0512f7c231015dfba60d8d965 (commit)
      from  57d2734e77552204087bcddeb9fa183255a4d140 (commit)

Summary of changes:
 src/PJSipManager.h           |   52 +++++++++++++
 src/PJSipSessionModule.cpp   |  120 ++++++++++++++++++++-----------
 src/PJSipSessionModule.h     |    5 ++
 src/SipEndpoint.cpp          |    9 ++-
 src/SipSession.cpp           |  162 +++++++++++++++++++++++++----------------
 src/SipSession.h             |    8 ++-
 src/SipSessionManagerApp.cpp |   52 -------------
 7 files changed, 247 insertions(+), 161 deletions(-)


- Log -----------------------------------------------------------------
commit c807bf56d40a56549bc37ea3a3e46d8f540821da
Author: Mark Michelson <mmichelson at digium.com>
Date:   Thu May 19 16:58:31 2011 -0500

    Prevent a potential crash.
    
    During testing, the bridge will call removeBridge (or removeListener, or both) on
    a session that had told the bridge that the session had already stopped. This causes
    an issue since state replication has ceased and the pointer to the object on which
    we would run session replication commands is now NULL. Since we're not denying replicas
    any hot juicy session state information by simply not attempting to replicate state
    in such a scenario, that's exactly what we'll do here now.

diff --git a/src/SipSession.cpp b/src/SipSession.cpp
index 7b35442..16930b6 100644
--- a/src/SipSession.cpp
+++ b/src/SipSession.cpp
@@ -150,8 +150,14 @@ public:
             PJSipSessionModInfo *session_mod_info =
                 static_cast<PJSipSessionModInfo*>(
                     mInviteSession->mod_data[mManager->getSessionModule()->getModule().id]);
-            session_mod_info->updateSessionState(mInviteSession);
-            mManager->getSessionModule()->replicateState(NULL, NULL, session_mod_info);
+            //session listeners seem to have a tendency to call operations like this one
+            //after we have already indicated to them that we have stopped. Since we already
+            //have ceased state replication, checking for NULLity here is safe and proper.
+            if (session_mod_info)
+            {
+                session_mod_info->updateSessionState(mInviteSession);
+                mManager->getSessionModule()->replicateState(NULL, NULL, session_mod_info);
+            }
         }
     }
 

commit 7246f8002b621284be27c987248888c5f701b7de
Author: Mark Michelson <mmichelson at digium.com>
Date:   Thu May 19 16:48:36 2011 -0500

    Fix crash problem that would occur on connection.
    
    It helps to check for NULLity of a pointer *before* you attempt
    to actually use it, not after.

diff --git a/src/PJSipSessionModule.cpp b/src/PJSipSessionModule.cpp
index 0768b45..45f05c0 100644
--- a/src/PJSipSessionModule.cpp
+++ b/src/PJSipSessionModule.cpp
@@ -1269,22 +1269,25 @@ public:
                 session_mod_info->updateSessionState(mInv);
             }
             mSessionModule->replicateState(NULL, NULL, session_mod_info);
-    
-            std::string invBranch(pj_strbuf(&mInv->invite_tsx->branch), pj_strlen(&mInv->invite_tsx->branch));
-            //Compare branches to see if this got handled by the transaction layer.
-            if (mInv->invite_tsx && mEventBranch != invBranch)
+
+            if (mInv->invite_tsx)
             {
-                //Mismatched branch!
-                //XXX There's a check for if session_mod_info is NULL above. I don't
-                //know why session_mod_info would be NULL here since we should have created
-                //the session_mod_info when the INVITE was received. Still, I'm being safe
-                //here.
-                if (session_mod_info)
+                std::string invBranch(pj_strbuf(&mInv->invite_tsx->branch), pj_strlen(&mInv->invite_tsx->branch));
+                //Compare branches to see if this got handled by the transaction layer.
+                if (mEventBranch != invBranch)
                 {
-                    lg(Debug) << "Queuing a TransactionStateOperation";
-                    mSessionModule->enqueueSessionWork(
-                            new TransactionStateOperation(mSessionModule, mInv->invite_tsx, mInv, mEventType, mInv->invite_tsx->state),
-                            mInv);
+                    //Mismatched branch!
+                    //XXX There's a check for if session_mod_info is NULL above. I don't
+                    //know why session_mod_info would be NULL here since we should have created
+                    //the session_mod_info when the INVITE was received. Still, I'm being safe
+                    //here.
+                    if (session_mod_info)
+                    {
+                        lg(Debug) << "Queuing a TransactionStateOperation";
+                        mSessionModule->enqueueSessionWork(
+                                new TransactionStateOperation(mSessionModule, mInv->invite_tsx, mInv, mEventType, mInv->invite_tsx->state),
+                                mInv);
+                    }
                 }
             }
         }

commit b04ec118712501b0512f7c231015dfba60d8d965
Author: Mark Michelson <mmichelson at digium.com>
Date:   Thu May 19 15:51:06 2011 -0500

    Commit a bunch of changes that have helped since testing of this started.

diff --git a/src/PJSipManager.h b/src/PJSipManager.h
index 15d4e83..f36b0ac 100644
--- a/src/PJSipManager.h
+++ b/src/PJSipManager.h
@@ -111,6 +111,58 @@ private:
 
 typedef boost::shared_ptr<PJSipManager> PJSipManagerPtr;
 
+/**
+ * Wrapper class around pj_thread_desc.
+ */
+class ThreadDescWrapper
+{
+public:
+    /**
+     * pjthread thread description information, must persist for the life of the thread
+     */
+    pj_thread_desc mDesc;
+};
+
+/**
+ * Type definition used to create a smart pointer for the above.
+ */
+typedef boost::shared_ptr<ThreadDescWrapper> ThreadDescWrapperPtr;
+
+/**
+ * Implementation of the Ice::ThreadNotification class.
+ */
+class pjlibHook : public Ice::ThreadNotification
+{
+public:
+    /**
+     * Implementation of the start function which is called when a thread starts.
+     */
+    void start()
+    {
+        ThreadDescWrapperPtr wrapper = ThreadDescWrapperPtr(new ThreadDescWrapper());
+        pj_thread_t *thread;
+        pj_thread_register("ICE Thread", wrapper->mDesc, &thread);
+        pjThreads.insert(std::make_pair(thread, wrapper));
+    }
+
+    /**
+     * Implementation of the stop function which is called when a thread stops.
+     */
+    void stop()
+    {
+        if (pj_thread_is_registered())
+        {
+            pjThreads.erase(pj_thread_this());
+        }
+    }
+
+private:
+    /**
+     * A map containing thread lifetime persistent data.
+     */
+    std::map<pj_thread_t*, ThreadDescWrapperPtr> pjThreads;
+};
+
 }; //End namespace SipSessionManager
 
 }; //End namespace AsteriskSCF
diff --git a/src/PJSipSessionModule.cpp b/src/PJSipSessionModule.cpp
index df2714e..0768b45 100644
--- a/src/PJSipSessionModule.cpp
+++ b/src/PJSipSessionModule.cpp
@@ -348,6 +348,7 @@ public:
 
     SuspendableWorkResult initial()
     {
+        std::cout << "SessionCreationOperation running. About to create a session!" << std::endl;
         try
         {
             mSession = mCaller->createSession(mDestination);
@@ -361,15 +362,11 @@ public:
         }
         mSession->setInviteSession(mInv);
         mSession->setDialog(mInv->dlg);
-        PJSipDialogModInfo *dlg_mod_info = new PJSipDialogModInfo(mInv->dlg);
-        PJSipTransactionModInfo *tsx_mod_info = new PJSipTransactionModInfo(mInv->invite_tsx);
+        PJSipDialogModInfo *dlg_mod_info =(PJSipDialogModInfo*)mInv->dlg->mod_data[mSessionModule->getModule().id];
+        PJSipTransactionModInfo *tsx_mod_info = (PJSipTransactionModInfo *)mInv->invite_tsx->mod_data[mSessionModule->getModule().id];
         PJSipSessionModInfo *session_mod_info = (PJSipSessionModInfo*)mInv->mod_data[mSessionModule->getModule().id];
         // Now we can actually set a for-real non-NULL session on the session module information.
         session_mod_info->setSessionPtr(mSession);
-        dlg_mod_info->mDialogState->mSessionId = session_mod_info->mSessionState->mSessionId;
-        tsx_mod_info->mTransactionState->mSessionId = session_mod_info->mSessionState->mSessionId;
-    
-        mInv->dlg->mod_data[mSessionModule->getModule().id] = (void *)dlg_mod_info;
     
         lg(Debug) << "Replicating state on reception of new SIP INVITE.";
         mSessionModule->replicateState(dlg_mod_info, tsx_mod_info, session_mod_info);
@@ -504,9 +501,11 @@ void PJSipSessionModule::handleNewInvite(pjsip_rx_data *rdata)
         lg(Warning) << "Unable to create UAS dialog on incoming INVITE";
         return;
     }
+
     pjsip_transaction *tsx = pjsip_rdata_get_tsx(rdata);
     PJSipTransactionModInfo *tsx_mod_info = new PJSipTransactionModInfo(tsx);
     tsx->mod_data[mModule.id] = (void *)tsx_mod_info;
+    std::cout << "We put mod info into the tsx at address " << tsx << std::endl;
 
     //XXX The sdp argument is NULL for now, but can be changed if we
     //know what has been configured for this particular caller.
@@ -522,6 +521,9 @@ void PJSipSessionModule::handleNewInvite(pjsip_rx_data *rdata)
 
     // Add our own module as a dialog usage
     pjsip_dlg_add_usage(dlg, &mModule, NULL);
+    
+    PJSipDialogModInfo *dlg_mod_info(new PJSipDialogModInfo(dlg));
+    dlg->mod_data[mModule.id] = (void *)dlg_mod_info;
 
     pjsip_timer_setting session_timer_settings;
     pjsip_timer_setting_default(&session_timer_settings);
@@ -542,6 +544,9 @@ void PJSipSessionModule::handleNewInvite(pjsip_rx_data *rdata)
     SipSessionPtr nullSession(0);
     PJSipSessionModInfo *session_mod_info(new PJSipSessionModInfo(inv_session, nullSession, sessionWork));
     inv_session->mod_data[mModule.id] = (void *) session_mod_info;
+    
+    dlg_mod_info->mDialogState->mSessionId = session_mod_info->mSessionState->mSessionId;
+    tsx_mod_info->mTransactionState->mSessionId = session_mod_info->mSessionState->mSessionId;
 
     if (pjsip_inv_send_msg(inv_session, tdata) != PJ_SUCCESS)
     {
@@ -586,6 +591,7 @@ void PJSipSessionModule::handleNewInvite(pjsip_rx_data *rdata)
         lg(Debug) << "Call is destined for " << destination;
     }
     
+    lg(Debug) << "Queueing a SessionCreationOperation";
     sessionWork->enqueueWork(new SessionCreationOperation(this, caller, mSessionRouter, inv_session, tdata, replaced_dlg, destination));
 }
 
@@ -911,6 +917,7 @@ void PJSipSessionModule::handleRefer(pjsip_inv_session *inv, pjsip_rx_data *rdat
     pjsip_tx_data *tdata;
     pjsip_dlg_create_response(inv->dlg, rdata, 200, NULL, &tdata);
 
+    lg(Debug) << "Queuing a HandleReferOperation";
     enqueueSessionWork(new HandleReferOperation(inv, tsx, tdata, target_sip_uri, session, mSessionRouter, mModule.id), inv);
 }
 
@@ -991,6 +998,7 @@ public:
 
     SuspendableWorkResult processInviteResponse()
     {
+        std::cout << "Handling response to an INVITE yo!" << std::endl;
         //Treat all 1XX messages we don't recognize the same as a 180
         if (mRespCode > 100 && mRespCode < 200 && mRespCode != 183)
         {
@@ -1122,6 +1130,7 @@ void PJSipSessionModule::handleInviteResponse(pjsip_inv_session* inv,
     //XXX There are a BUNCH of response codes we need
     //to add code to handle.
 
+    lg(Debug) << "Queuing a HandleInviteResponseOperation";
     enqueueSessionWork(new HandleInviteResponseOperation(respCode, inv->state, session), inv);
 }
 
@@ -1138,7 +1147,12 @@ public:
 
     SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
     {
+        std::cout << "The transaction has address " << mTsx << std::endl;
         PJSipTransactionModInfo *tsx_mod_info = static_cast<PJSipTransactionModInfo *> (mTsx->mod_data[mSessionModule->getModule().id]);
+        if (!tsx_mod_info)
+        {
+            return Complete;
+        }
         if (mTsxState != PJSIP_TSX_STATE_DESTROYED)
         {
             tsx_mod_info->updateTransactionState(mTsx);
@@ -1150,10 +1164,9 @@ public:
         if (mEventType == PJSIP_EVENT_TSX_STATE)
         {
             pjsip_dialog *dlg = pjsip_tsx_get_dlg(mTsx);
-            PJSipDialogModInfo *dlg_mod_info = NULL;
-            if (dlg)
+            PJSipDialogModInfo *dlg_mod_info = dlg ? static_cast<PJSipDialogModInfo*>(dlg->mod_data[mSessionModule->getModule().id]) : NULL;
+            if (dlg_mod_info)
             {
-                dlg_mod_info = static_cast<PJSipDialogModInfo*>(dlg->mod_data[mSessionModule->getModule().id]);
                 dlg_mod_info->updateDialogState(dlg);
             }
             PJSipSessionModInfo *session_mod_info = static_cast<PJSipSessionModInfo*>(mInv->mod_data[mSessionModule->getModule().id]);
@@ -1237,6 +1250,7 @@ public:
             }
             lg(Debug) << "Replicating state on DISCONNECTED inv_state.";
             mSessionModule->replicateState(dlg_mod_info, NULL, session_mod_info);
+            std::cout << "Deleting session mod info on a session" << std::endl;
             delete session_mod_info;
             mInv->mod_data[mSessionModule->getModule().id] = 0;
             if (dlg_mod_info)
@@ -1267,28 +1281,13 @@ public:
                 //here.
                 if (session_mod_info)
                 {
+                    lg(Debug) << "Queuing a TransactionStateOperation";
                     mSessionModule->enqueueSessionWork(
                             new TransactionStateOperation(mSessionModule, mInv->invite_tsx, mInv, mEventType, mInv->invite_tsx->state),
                             mInv);
                 }
             }
         }
-        if (mEventType == PJSIP_EVENT_TSX_STATE && mInvState == PJSIP_INV_STATE_CALLING && mInv->role == PJSIP_ROLE_UAC)
-        {
-            //We have sent an INVITE out. We need to set up the transaction and dialog structures
-            //to have the appropriate mod_data and initiate some state replication here as well.
-            //The inv_session's mod_info was set up in SipSession::start() because we had the SipSession
-            //information there.
-            PJSipDialogModInfo *dlg_mod_info = new PJSipDialogModInfo(mInv->dlg);
-            PJSipTransactionModInfo *tsx_mod_info = new PJSipTransactionModInfo(mInv->invite_tsx);
-            PJSipSessionModInfo *session_mod_info = static_cast<PJSipSessionModInfo *>(mInv->mod_data[mSessionModule->getModule().id]);
-            mInv->invite_tsx->mod_data[mSessionModule->getModule().id] = (void *) tsx_mod_info;
-            mInv->dlg->mod_data[mSessionModule->getModule().id] = (void *) dlg_mod_info;
-            dlg_mod_info->mDialogState->mSessionId = session_mod_info->mSessionState->mSessionId;
-            tsx_mod_info->mTransactionState->mSessionId = session_mod_info->mSessionState->mSessionId;
-            lg(Debug) << "Replicating state on new outbound INVITE.";
-            mSessionModule->replicateState(dlg_mod_info, tsx_mod_info, session_mod_info);
-        }
         return Complete;
     }
 
@@ -1313,7 +1312,25 @@ void PJSipSessionModule::invOnStateChanged(pjsip_inv_session *inv, pjsip_event *
     {
         branch = std::string(pj_strbuf(&event->body.rx_msg.rdata->msg_info.via->branch_param), pj_strlen(&event->body.rx_msg.rdata->msg_info.via->branch_param));
     }
-    
+
+    if (event->type == PJSIP_EVENT_TSX_STATE && inv->state == PJSIP_INV_STATE_CALLING && inv->role == PJSIP_ROLE_UAC)
+    {
+        //We have sent an INVITE out. We need to set up the transaction and dialog structures
+        //to have the appropriate mod_data and initiate some state replication here as well.
+        //The inv_session's mod_info was set up in SipSession::start() because we had the SipSession
+        //information there.
+        PJSipDialogModInfo *dlg_mod_info = new PJSipDialogModInfo(inv->dlg);
+        PJSipTransactionModInfo *tsx_mod_info = new PJSipTransactionModInfo(inv->invite_tsx);
+        PJSipSessionModInfo *session_mod_info = static_cast<PJSipSessionModInfo *>(inv->mod_data[mModule.id]);
+        inv->invite_tsx->mod_data[mModule.id] = (void *) tsx_mod_info;
+        inv->dlg->mod_data[mModule.id] = (void *) dlg_mod_info;
+        dlg_mod_info->mDialogState->mSessionId = session_mod_info->mSessionState->mSessionId;
+        tsx_mod_info->mTransactionState->mSessionId = session_mod_info->mSessionState->mSessionId;
+        lg(Debug) << "Replicating state on new outbound INVITE.";
+        replicateState(dlg_mod_info, tsx_mod_info, session_mod_info);
+    }
+   
+    lg(Debug) << "Queuing an InviteStateOperation";
     enqueueSessionWork(new InviteStateOperation(this, inv, event->type, inv->state, branch), inv);
 }
 
@@ -1344,6 +1361,8 @@ void PJSipSessionModule::invOnTsxStateChanged(pjsip_inv_session *inv, pjsip_tran
     PJSipSessionModInfo *session_mod_info = static_cast<PJSipSessionModInfo*>(inv->mod_data[mModule.id]);
     SipSessionPtr session = session_mod_info->getSessionPtr();
 
+    std::string method(pj_strbuf(&tsx->method.name), pj_strlen(&tsx->method.name));
+    std::cout << "Queuing a Transaction state operation for transaction " << tsx  << " Method: " << method << std::endl;
     enqueueSessionWork(new TransactionStateOperation(this, tsx, inv, e->type, tsx->state), inv);
 }
 
@@ -1463,6 +1482,7 @@ public:
                 }
                 catch (...)
                 {
+                    std::cout << "EXCEPTION DURING MEDIA UPDATE????" << std::endl;
                     // If we get here the format just isn't supported...
                 }
             }
@@ -1483,6 +1503,7 @@ void PJSipSessionModule::invOnMediaUpdate(pjsip_inv_session *inv, pj_status_t st
         return;
     }
 
+    lg(Debug) << "Queuing HandleMediaUpdate";
     enqueueSessionWork(new HandleMediaUpdate(inv, mModule.id, mServiceLocator), inv);
 }
 
@@ -1530,7 +1551,7 @@ void PJSipSessionModule::enqueueSessionWork(const SuspendableWorkPtr& work, pjsi
 }
 
 PJSipSessionModuleThreadPoolListener::PJSipSessionModuleThreadPoolListener()
-    : mActiveThreads(0) { }
+    : mActiveThreads(0), mPjLibHook(new pjlibHook()) { }
 
 void PJSipSessionModuleThreadPoolListener::stateChanged(const PoolPtr& pool, Ice::Long active, Ice::Long idle, Ice::Long)
 {
@@ -1550,7 +1571,7 @@ void PJSipSessionModuleThreadPoolListener::queueWorkAdded(const PoolPtr& pool, I
     //
     //For now, use one thread per work item.
 
-    lg(Debug) << "Detected the addition of work to SIP's thread pool";
+    lg(Debug) << "Detected the addition of work to SIP's thread pool. Setting the size to " << mActiveThreads + numNewWork;
     int newSize = (int) (mActiveThreads + numNewWork);
     pool->setSize(newSize);
 }
@@ -1560,9 +1581,23 @@ void PJSipSessionModuleThreadPoolListener::queueEmptied(const PoolPtr& pool)
     //XXX Making this behavior more customizable would be nice
     //
     //For now, kill off everything
+
+    lg(Debug) << "The queue is empty so we're killing all the threads";
     pool->setSize(0);
 }
 
+void PJSipSessionModuleThreadPoolListener::threadStart()
+{
+    lg(Debug) << "New thread created. Registering it with PJLIB";
+    mPjLibHook->start();
+}
+
+void PJSipSessionModuleThreadPoolListener::threadStop()
+{
+    lg(Debug) << "Thread has completed. Unregistering it from PJLIB";
+    mPjLibHook->stop();
+}
+
 class SipSessionSuspendableWorkListener : public SuspendableWorkListener
 {
 public:
diff --git a/src/PJSipSessionModule.h b/src/PJSipSessionModule.h
index 8e8a8e3..a8968d3 100644
--- a/src/PJSipSessionModule.h
+++ b/src/PJSipSessionModule.h
@@ -58,6 +58,8 @@ private:
     SessionWorkPtr mSessionWork;
 };
 
+class pjlibHook;
+
 /**
  * Listens to the PJSipSessionModule's thread pool.
  *
@@ -71,8 +73,11 @@ public:
     void stateChanged(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool, Ice::Long active, Ice::Long idle, Ice::Long zombie);
     void queueWorkAdded(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool, Ice::Long count, bool wasEmpty);
     void queueEmptied(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool);
+    void threadStart();
+    void threadStop();
 private:
     Ice::Long mActiveThreads;
+    pjlibHook *mPjLibHook;
 };
 
 typedef IceUtil::Handle<PJSipSessionModuleThreadPoolListener> PJSipSessionModuleThreadPoolListenerPtr;
diff --git a/src/SipEndpoint.cpp b/src/SipEndpoint.cpp
index e066cd4..889cbf8 100644
--- a/src/SipEndpoint.cpp
+++ b/src/SipEndpoint.cpp
@@ -257,6 +257,8 @@ std::string SipEndpoint::getId(const Ice::Current&)
 AsteriskSCF::SessionCommunications::V1::SessionPrx SipEndpoint::createSession(const std::string& destination,
         const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener, const Ice::Current&)
 {
+
+    std::cout << "Got call over Ice to create a session for endpoint " << mImplPriv->mName << std::endl;
     if (mImplPriv->mConfig.sessionConfig.callDirection != BOTH &&
             mImplPriv->mConfig.sessionConfig.callDirection != INBOUND)
     {
@@ -265,15 +267,16 @@ AsteriskSCF::SessionCommunications::V1::SessionPrx SipEndpoint::createSession(co
     }
 
     SipSessionPtr session = new SipSession(mImplPriv->mAdapter, this, destination, listener, mImplPriv->mManager,
-            mImplPriv->mServiceLocator, mImplPriv->mReplica);
+            mImplPriv->mServiceLocator, mImplPriv->mReplica, true);
     mImplPriv->mSessions.push_back(session);
+    std::cout << "And now we're returing a session proxy..." << std::endl;
     return session->getSessionProxy();
 }
 
 AsteriskSCF::SipSessionManager::SipSessionPtr SipEndpoint::createSession(const std::string& destination)
 {
     SipSessionPtr session = new SipSession(mImplPriv->mAdapter, this, destination, 0, mImplPriv->mManager,
-            mImplPriv->mServiceLocator, mImplPriv->mReplica);
+            mImplPriv->mServiceLocator, mImplPriv->mReplica, false);
     mImplPriv->mSessions.push_back(session);
     return session;
 }
@@ -284,7 +287,7 @@ AsteriskSCF::SipSessionManager::SipSessionPtr SipEndpoint::createSession(const s
         const AsteriskSCF::Media::V1::StreamSinkSeq& sinks)
 {
     SipSessionPtr session = new SipSession(mImplPriv->mAdapter, this, destination, sessionid, mediaid, mediasession,
-            sources, sinks, mImplPriv->mManager, mImplPriv->mServiceLocator, mImplPriv->mReplica);
+            sources, sinks, mImplPriv->mManager, mImplPriv->mServiceLocator, mImplPriv->mReplica, false);
     mImplPriv->mSessions.push_back(session);
     return session;
 }
diff --git a/src/SipSession.cpp b/src/SipSession.cpp
index 1d97045..7b35442 100644
--- a/src/SipSession.cpp
+++ b/src/SipSession.cpp
@@ -239,13 +239,59 @@ public:
     AsteriskSCF::System::Component::V1::ReplicaPtr mReplica;
 };
 
+void SipSession::initializePJSIPStructs()
+{
+    pj_str_t local_uri, remote_uri;
+    SipEndpointConfig &config = mImplPriv->mEndpoint->getConfig();
+    
+    char local[64];
+    pj_ansi_sprintf(local, "sip:%s", config.sessionConfig.sourceAddress.c_str());
+    local_uri = pj_str(local);
+    
+    char remote[64];
+    bool userDefined = mImplPriv->mDestination.size() != 0;
+    pj_ansi_sprintf(remote, "sip:%s%s%s",
+        userDefined ? mImplPriv->mDestination.c_str() : "",
+        userDefined ? "@" : "",
+        config.transportConfig.address.c_str());
+    remote_uri = pj_str(remote);
+
+    pjsip_dialog *dialog;
+    if ((pjsip_dlg_create_uac(pjsip_ua_instance(), &local_uri, &local_uri, &remote_uri, &remote_uri, &dialog)) !=
+            PJ_SUCCESS)
+    {
+        // What should we do here? Throw an exception?
+        return;
+    }
+    mImplPriv->mDialog = dialog;
+    
+    pjsip_inv_session *inviteSession;
+    pjmedia_sdp_session *sdp = createSDPOffer();
+    if ((pjsip_inv_create_uac(dialog, sdp, 0, &inviteSession)) != PJ_SUCCESS)
+    {
+        pjsip_dlg_terminate(dialog);
+        // What should we do here? Throw an exception?
+        return;
+    }
+
+    pjsip_timer_setting session_timer_settings;
+    pjsip_timer_setting_default(&session_timer_settings);
+    pjsip_timer_init_session(inviteSession, &session_timer_settings);
+
+    SessionWorkPtr sessionWork(new SessionWork(mImplPriv->mManager->getSessionModule()->getThreadPoolQueue()));
+    PJSipSessionModInfo *session_mod_info(new PJSipSessionModInfo(inviteSession, this, sessionWork));
+    inviteSession->mod_data[mImplPriv->mManager->getSessionModule()->getModule().id] = (void*)session_mod_info;
+    pjsip_dlg_add_usage(dialog, &mImplPriv->mManager->getSessionModule()->getModule(), NULL);
+    mImplPriv->mInviteSession = inviteSession;
+}
+
 /**
  * Default constructor.
  */
 SipSession::SipSession(const Ice::ObjectAdapterPtr& adapter, const SipEndpointPtr& endpoint,
         const std::string& destination,  const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
         PJSipManager *manager, const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
-        const AsteriskSCF::System::Component::V1::ReplicaPtr& replica)
+        const AsteriskSCF::System::Component::V1::ReplicaPtr& replica, bool isUAC)
     : mImplPriv(new SipSessionPriv(adapter, endpoint, destination, manager, serviceLocator, replica))
 {
     if (listener != 0)
@@ -263,6 +309,11 @@ SipSession::SipSession(const Ice::ObjectAdapterPtr& adapter, const SipEndpointPt
     // Get an RTP session capable of handling the formats we are going to offer
     AsteriskSCF::Media::V1::FormatSeq formats;
     requestRTPSessions(formats);
+
+    if (isUAC)
+    {
+        initializePJSIPStructs();
+    }
 }
 
 /**
@@ -273,7 +324,7 @@ SipSession::SipSession(const Ice::ObjectAdapterPtr& adapter, const SipEndpointPt
         const Ice::Identity& mediaid, const AsteriskSCF::Media::V1::SessionPrx& mediasession,
         const AsteriskSCF::Media::V1::StreamSourceSeq& sources, const AsteriskSCF::Media::V1::StreamSinkSeq& sinks,
         PJSipManager *manager, const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
-        const AsteriskSCF::System::Component::V1::ReplicaPtr& replica)
+        const AsteriskSCF::System::Component::V1::ReplicaPtr& replica, bool isUAC)
     : mImplPriv(new SipSessionPriv(adapter, endpoint, destination, manager, serviceLocator, replica))
 {
     mImplPriv->mSessionProxy =
@@ -289,6 +340,11 @@ SipSession::SipSession(const Ice::ObjectAdapterPtr& adapter, const SipEndpointPt
 
     mImplPriv->mSources = sources;
     mImplPriv->mSinks = sinks;
+
+    if (isUAC)
+    {
+        initializePJSIPStructs();
+    }
 }
 
 class AddListenerOperation : public SuspendableWork
@@ -302,6 +358,7 @@ public:
 
     SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
     {
+        std::cout << "Actually executing addListener logic" << std::endl;
         mImplPriv->addListener(mListener);
         AsteriskSCF::SessionCommunications::V1::SessionInfoPtr sessionInfo =
             mImplPriv->getInfo();
@@ -319,6 +376,7 @@ void SipSession::addListener_async(
         const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
         const Ice::Current&)
 {
+    std::cout << "addListener called. Queueing operation..." << std::endl;
     enqueueSessionWork(new AddListenerOperation(cb, listener, mImplPriv));
 }
 
@@ -334,6 +392,7 @@ public:
 
     SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
     {
+        std::cout << "Executing indicate operation" << std::endl;
         AsteriskSCF::SessionCommunications::V1::ConnectIndicationPtr Connect;
         AsteriskSCF::SessionCommunications::V1::FlashIndicationPtr Flash;
         AsteriskSCF::SessionCommunications::V1::HoldIndicationPtr Hold;
@@ -398,6 +457,7 @@ void SipSession::indicate_async(
         const AsteriskSCF::SessionCommunications::V1::IndicationPtr& indication,
         const Ice::Current&)
 {
+    std::cout << "Queuing an indicate operation" << std::endl;
     enqueueSessionWork(new IndicateOperation(cb, indication, mImplPriv, this));
 }
 
@@ -411,6 +471,7 @@ public:
 
     SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
     {
+        std::cout << "Executing a getEndpoint Operation" << std::endl;
         mCb->ice_response(mImplPriv->mEndpoint->getEndpointProxy());
         return Complete;
     }
@@ -426,6 +487,7 @@ void SipSession::getEndpoint_async(
         const AsteriskSCF::SessionCommunications::V1::AMD_Session_getEndpointPtr& cb,
         const Ice::Current&)
 {
+    std::cout << "Queuing a getEndpoint operation" << std::endl;
     enqueueSessionWork(new GetEndpointOperation(cb, mImplPriv));
 }
 
@@ -439,6 +501,7 @@ public:
 
     SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
     {
+        std::cout << "Executing a GetInfo operation" << std::endl;
         AsteriskSCF::SessionCommunications::V1::SessionInfoPtr sessionInfo =
             mImplPriv->getInfo();
 
@@ -457,6 +520,7 @@ void SipSession::getInfo_async(
         const AsteriskSCF::SessionCommunications::V1::AMD_Session_getInfoPtr& cb,
         const Ice::Current&)
 {
+    std::cout << "queuing a getInfo operation" << std::endl;
     enqueueSessionWork(new GetInfoOperation(cb, mImplPriv));
 }
 
@@ -470,6 +534,7 @@ public:
 
     SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
     {
+        std::cout << "Executing a GetMediaSession operation" << std::endl;
         mCb->ice_response(mImplPriv->mMediaSessionProxy);
         return Complete;
     }
@@ -485,6 +550,7 @@ void SipSession::getMediaSession_async(
         const AsteriskSCF::SessionCommunications::V1::AMD_Session_getMediaSessionPtr& cb,
         const Ice::Current&)
 {
+    std::cout << "queuing a getMediaSession operation" << std::endl;
     enqueueSessionWork(new GetMediaSessionOperation(cb, mImplPriv));
 }
 
@@ -497,6 +563,7 @@ public:
 
     SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
     {
+        std::cout << "Executing a GetBridge operation" << std::endl;
         AsteriskSCF::SessionCommunications::V1::BridgePrx bridge;
         try
         {
@@ -522,6 +589,7 @@ void SipSession::getBridge_async(
         const AsteriskSCF::SessionCommunications::V1::AMD_Session_getBridgePtr& cb,
         const Ice::Current&)
 {
+    std::cout << "queuing a getBridge operation" << std::endl;
     enqueueSessionWork(new GetBridgeOperation(cb, mImplPriv));
 }
 
@@ -539,6 +607,7 @@ public:
     
     SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
     {
+        std::cout << "Executing a SetBridge operation" << std::endl;
         mSession->setBridge(mBridge);
         mImplPriv->addListener(mListener);
         mCb->ice_response(mImplPriv->getInfo());
@@ -562,6 +631,7 @@ void SipSession::setBridge_async(
     const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
     const Ice::Current& current)
 {
+    std::cout << "queuing a setBridge operation" << std::endl;
     enqueueSessionWork(new SetBridgeOperation(cb, this, mImplPriv, bridge, listener, current));
 }
 
@@ -619,6 +689,7 @@ void SipSession::removeBridge_async(
         const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
         const Ice::Current& current)
 {
+    std::cout << "queuing a removeBridge operation" << std::endl;
     enqueueSessionWork(new RemoveBridgeOperation(cb, this, mImplPriv, listener, current));
 }
 
@@ -632,6 +703,7 @@ public:
 
     SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
     {
+        std::cout << "Actually removing listener" << std::endl;
         mImplPriv->removeListener(mListener);
         return Complete;
     }
@@ -644,88 +716,40 @@ public:
  */
 void SipSession::removeListener(const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener, const Ice::Current&)
 {
+    std::cout << "removeListener called. Queuing operation" << std::endl;
     enqueueSessionWork(new RemoveListenerOperation(listener, mImplPriv));
 }
 
 class StartOperation : public SuspendableWork
 {
 public:
-    StartOperation(const SipSessionPtr& session, const boost::shared_ptr<SipSessionPriv>& sessionPriv, PJSipSessionModInfo *session_mod_info)
-        : mSession(session), mImplPriv(sessionPriv), mSessionModInfo(session_mod_info) { }
+    StartOperation(const SipSessionPtr& session, const boost::shared_ptr<SipSessionPriv>& sessionPriv)
+        : mSession(session), mImplPriv(sessionPriv) { }
 
     SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
     {
-        pj_str_t local_uri, remote_uri;
-        pjsip_dialog *dialog;
-        SipEndpointConfig &config = mImplPriv->mEndpoint->getConfig();
-
-        char local[64];
-        pj_ansi_sprintf(local, "sip:%s", config.sessionConfig.sourceAddress.c_str());
-        local_uri = pj_str(local);
-
-        char remote[64];
-        bool userDefined = mImplPriv->mDestination.size() != 0;
-        pj_ansi_sprintf(remote, "sip:%s%s%s",
-            userDefined ? mImplPriv->mDestination.c_str() : "",
-            userDefined ? "@" : "",
-            config.transportConfig.address.c_str());
-        remote_uri = pj_str(remote);
-        lg(Debug) << "Sending new SIP INVITE to " << remote;
-
-        // Create a UAC dialog for the outgoing call
-        if ((pjsip_dlg_create_uac(pjsip_ua_instance(), &local_uri, &local_uri, &remote_uri, &remote_uri, &dialog)) !=
-                PJ_SUCCESS)
-        {
-            // What should we do here? Throw an exception?
-            return Complete;
-        }
-
-        // Since the SDP generation requires a pool we use the dialog one, so it has to be set here
-        mImplPriv->mDialog = dialog;
-
-        pjmedia_sdp_session *sdp = mSession->createSDPOffer();
-
-        // Create an INVITE session
-        pjsip_inv_session *inviteSession;
-        if ((pjsip_inv_create_uac(dialog, sdp, 0, &inviteSession)) != PJ_SUCCESS)
-        {
-            pjsip_dlg_terminate(dialog);
-            // What should we do here? Throw an exception?
-            return Complete;
-        }
-
-        pjsip_dlg_add_usage(dialog, &mImplPriv->mManager->getSessionModule()->getModule(), NULL);
-
-        pjsip_timer_setting session_timer_settings;
-        pjsip_timer_setting_default(&session_timer_settings);
-        pjsip_timer_init_session(inviteSession, &session_timer_settings);
+        //lg(Debug) << "Sending new SIP INVITE to " << remote;
 
         // Record our session within the dialog so code handling pjsip events can do STUFF
         SipSessionPtr session = new SipSession(*mSession);
 
-        inviteSession->mod_data[mImplPriv->mManager->getSessionModule()->getModule().id] = (void*)mSessionModInfo;
-
         // Create the actual INVITE packet
         pjsip_tx_data *packet;
-        if ((pjsip_inv_invite(inviteSession, &packet)) != PJ_SUCCESS)
+        if ((pjsip_inv_invite(mImplPriv->mInviteSession, &packet)) != PJ_SUCCESS)
         {
-            pjsip_inv_terminate(inviteSession, 500, 0);
-            pjsip_dlg_terminate(dialog);
+            pjsip_inv_terminate(mImplPriv->mInviteSession, 500, 0);
+            pjsip_dlg_terminate(mImplPriv->mInviteSession->dlg);
             // What should we do here? Throw an exception?
             return Complete;
         }
 
-        // Before we send the message we probably should populate the endpoint data... just in case
-        mImplPriv->mInviteSession = inviteSession;
-
         // Boom! Houston, we have transmission.
-        pjsip_inv_send_msg(inviteSession, packet);
+        pjsip_inv_send_msg(mImplPriv->mInviteSession, packet);
         return Complete;
     }
 
     SipSessionPtr mSession;
     boost::shared_ptr<SipSessionPriv> mImplPriv;
-    PJSipSessionModInfo *mSessionModInfo;
 };
 
 /**
@@ -734,9 +758,8 @@ public:
  */
 void SipSession::start(const Ice::Current&)
 {
-    SessionWorkPtr sessionWork(new SessionWork(mImplPriv->mManager->getSessionModule()->getThreadPoolQueue()));
-    PJSipSessionModInfo *session_mod_info(new PJSipSessionModInfo(NULL, this, sessionWork));
-    enqueueSessionWork(new StartOperation(this, mImplPriv, session_mod_info));
+    std::cout << "We've been asked to start a session." << std::endl;
+    enqueueSessionWork(new StartOperation(this, mImplPriv));
 }
 
 class StopOperation : public SuspendableWork
@@ -780,6 +803,7 @@ public:
  */
 void SipSession::stop(const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response, const Ice::Current&)
 {
+    std::cout << "queuing a stop operation" << std::endl;
     enqueueSessionWork(new StopOperation(response, mImplPriv->mInviteSession));
 }
 
@@ -827,6 +851,7 @@ public:
  */
 void SipSession::destroy()
 {
+    std::cout << "queuing a destroy operation" << std::endl;
     enqueueSessionWork(new DestroyOperation(this, mImplPriv));
 }
 
@@ -1042,6 +1067,11 @@ bool SipSession::operator==(const SipSession &other) const {
 
 void SipSession::enqueueSessionWork(const SuspendableWorkPtr& task)
 {
+    if (!mImplPriv->mInviteSession)
+    {
+        std::cout << "No invite session yet. Unable to queue operation" << std::endl;
+        return;
+    }
     PJSipSessionModInfo *session_mod_info =
         static_cast<PJSipSessionModInfo*>(
                 mImplPriv->mInviteSession->mod_data[mImplPriv->mManager->getSessionModule()->getModule().id]);
diff --git a/src/SipSession.h b/src/SipSession.h
index 02527d2..495cf81 100644
--- a/src/SipSession.h
+++ b/src/SipSession.h
@@ -96,13 +96,15 @@ public:
     SipSession(const Ice::ObjectAdapterPtr&, const SipEndpointPtr&, const std::string&,
         const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx&, PJSipManager *manager,
         const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
-        const AsteriskSCF::System::Component::V1::ReplicaPtr& replica);
+        const AsteriskSCF::System::Component::V1::ReplicaPtr& replica,
+        bool isUAC);
 
     SipSession(const Ice::ObjectAdapterPtr&, const SipEndpointPtr&, const std::string&, const Ice::Identity&,
         const Ice::Identity&, const AsteriskSCF::Media::V1::SessionPrx&,
         const AsteriskSCF::Media::V1::StreamSourceSeq&, const AsteriskSCF::Media::V1::StreamSinkSeq&,
         PJSipManager *manager, const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
-        const AsteriskSCF::System::Component::V1::ReplicaPtr& replica);
+        const AsteriskSCF::System::Component::V1::ReplicaPtr& replica,
+        bool isUAC);
 
     bool operator==(const SipSession &other) const;
 
@@ -200,6 +202,8 @@ public:
 private:
     void requestRTPSessions(AsteriskSCF::Media::V1::FormatSeq& formats);
 
+    void initializePJSIPStructs();
+
     /**
      * Private implementation details.
      */
diff --git a/src/SipSessionManagerApp.cpp b/src/SipSessionManagerApp.cpp
index 2644402..d37584d 100644
--- a/src/SipSessionManagerApp.cpp
+++ b/src/SipSessionManagerApp.cpp
@@ -231,58 +231,6 @@ private:
 };
 
 /**
- * Wrapper class around pj_thread_desc.
- */
-class ThreadDescWrapper
-{
-public:
-    /**
-     * pjthread thread description information, must persist for the life of the thread
-     */
-    pj_thread_desc mDesc;
-};
-
-/**
- * Type definition used to create a smart pointer for the above.
- */
-typedef boost::shared_ptr<ThreadDescWrapper> ThreadDescWrapperPtr;
-
-/**
- * Implementation of the Ice::ThreadNotification class.
- */
-class pjlibHook : public Ice::ThreadNotification
-{
-public:
-    /**
-     * Implementation of the start function which is called when a thread starts.
-     */
-    void start()
-    {
-        ThreadDescWrapperPtr wrapper = ThreadDescWrapperPtr(new ThreadDescWrapper());
-        pj_thread_t *thread;
-        pj_thread_register("ICE Thread", wrapper->mDesc, &thread);
-        pjThreads.insert(make_pair(thread, wrapper));
-    }
-
-    /**
-     * Implementation of the stop function which is called when a thread stops.
-     */
-    void stop()
-    {
-        if (pj_thread_is_registered())
-        {
-            pjThreads.erase(pj_thread_this());
-        }
-    }
-
-private:
-    /**
-     * A map containing thread lifetime persistent data.
-     */
-    map<pj_thread_t*, ThreadDescWrapperPtr> pjThreads;
-};
-
-/**
  * Helper function to add some parameters to one of our registered interfaces in the ServiceLocator, so that
  * other components can look up our interfaces.
  */

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/sip.git



More information about the asterisk-scf-commits mailing list