[asterisk-scf-commits] asterisk-scf/release/sip.git branch "master" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Wed Aug 15 14:32:47 CDT 2012


branch "master" has been updated
       via  42310a3a78e9b4eb3372c48ebb36c2698b40b906 (commit)
       via  d070d485e3030bcd5e4c1a6820e44ea0223cd754 (commit)
       via  df0edc0b3f42960b1fc2fd65ab19fb8b1f120406 (commit)
       via  3ee43d5514fc8a011b47fae23b3275c5ff5806ce (commit)
       via  3dd0ab1b6ac94f6e8cc41a70e568ab9fbcad17ff (commit)
       via  751dc5af1f264b3c13a9ae7385ed21c669a3deab (commit)
       via  a6c9a6b1d8e3e1219d4509e18d8300fa2ec38aad (commit)
       via  a872d4a38992c64a4ebecf880c202a761aabe474 (commit)
       via  34cf80f54a88569ae1894c2821d41d17945a324f (commit)
       via  e6b65c10643898b1ac0109a3f42bc3ef09753cbe (commit)
       via  dfa8647bf2e31c8de9cc0a7617049b061ed8ed9f (commit)
       via  288cdf2f20572064244732b8a897342fb9c3792a (commit)
      from  b83034471ecf88641b803585e7a9c9d0d19aa27e (commit)

Summary of changes:
 config/test_sip.conf                   |    3 +
 src/Component.cpp                      |    7 +-
 src/PJSIPManager.cpp                   |   11 +-
 src/PJSIPManager.h                     |    4 +-
 src/PJSIPSessionModule.cpp             |   88 +++++-
 src/PJSIPSessionModule.h               |   28 ++-
 src/PJSIPSessionModuleConstruction.cpp |   41 +++-
 src/SIPClientRegistration.cpp          |   27 ++-
 src/SIPEndpoint.cpp                    |   14 +-
 src/SIPSession.cpp                     |  519 +++++++++++++++++---------------
 src/SIPSession.h                       |   34 ++-
 11 files changed, 498 insertions(+), 278 deletions(-)


- Log -----------------------------------------------------------------
commit 42310a3a78e9b4eb3372c48ebb36c2698b40b906
Author: Brent Eagles <beagles at digium.com>
Date:   Wed Aug 8 17:13:13 2012 -0230

    FINALLY uncovered a huge source of memory consumption. Once things got to a
    certain point in the call setup sequence a lot of timers started to be created.
    Once 500 living calls (1000 session objects) memory would fly through the roof.

diff --git a/src/SIPClientRegistration.cpp b/src/SIPClientRegistration.cpp
index bb5f4e1..aa6dc4d 100644
--- a/src/SIPClientRegistration.cpp
+++ b/src/SIPClientRegistration.cpp
@@ -129,6 +129,31 @@ private:
 
 using namespace AsteriskSCF::SIP::ExtensionPoint::V1;
 
+//
+// Singleton for now, but could be pool.
+//
+class TimerPool
+{
+public:
+
+    static IceUtil::TimerPtr getTimer()
+    {
+        IceUtil::Mutex::Lock lock(mMutex);
+        if (!mTimer)
+        {
+            mTimer = new IceUtil::Timer;
+        }
+        return mTimer;
+    }
+
+private:
+    static IceUtil::Mutex mMutex;
+    static IceUtil::TimerPtr mTimer;
+};
+
+IceUtil::Mutex TimerPool::mMutex;
+IceUtil::TimerPtr TimerPool::mTimer;
+
 SIPRegistrationClient::SIPRegistrationClient(
         const SIPClientRegistrationItemPtr& confItem,
         pjsip_endpoint* pjEndpoint,
@@ -140,7 +165,7 @@ SIPRegistrationClient::SIPRegistrationClient(
     : mEndpointName(sipEndpoint->getName()),
     mAOR(confItem->aor),
     mManager(manager),
-    mTimer(new IceUtil::Timer()),
+    mTimer(TimerPool::getTimer()),
     mReplicationContext(replicationContext),
     mBackplaneAdapter(backplaneAdapter),
     mReplica(replica)

commit d070d485e3030bcd5e4c1a6820e44ea0223cd754
Author: Brent Eagles <beagles at digium.com>
Date:   Wed Aug 8 16:47:00 2012 -0230

    Reverting change made in commit 3dd0ab1b6ac94f6e8cc41a70e568ab9fbcad17ff.  This
    causes race conditions on pjsip memory pools when the invite session is
    destroyed. It appears that if everything else is working properly this code
    should not be necessary anyways.
    
    The other change introduces a tiny sleep the pjsip polling thread to encourage
    yields in a more timely manner. Otherwise, if a system manages to load up pjsip
    with a lot of waiting work, this thread will dominate things. Something to look
    at: even if this thread is running crazy, on a multi-core system thread pool
    worker threads should make some headway, but this simply isn't happening. There
    is some kind of contention occurring.

diff --git a/src/PJSIPManager.cpp b/src/PJSIPManager.cpp
index 9a0f31f..078a519 100644
--- a/src/PJSIPManager.cpp
+++ b/src/PJSIPManager.cpp
@@ -251,6 +251,11 @@ void PJSIPManager::handleEvents()
     onHandleEvents(mModules);
     const pj_time_val delay = {0, 10};
     pjsip_endpt_handle_events(mEndpoint, &delay);
+    //
+    // 10 ms is nothing.. but this is really about trying to yielding so some
+    // other threads can do something.
+    //
+    pj_thread_sleep(50);
 }
 
 void PJSIPManager::setUserAgent(const string& userAgent)
diff --git a/src/SIPSession.cpp b/src/SIPSession.cpp
index a84a1c5..1b326f9 100755
--- a/src/SIPSession.cpp
+++ b/src/SIPSession.cpp
@@ -3371,23 +3371,6 @@ public:
         }
         mSessionPriv->mEndpoint->removeSession(mSession);
 
-        if (mSessionPriv->mInviteSession)
-        {
-            pjsip_tx_data* tdata;
-            pj_status_t result = pjsip_inv_end_session(mSessionPriv->mInviteSession, 200, 0, &tdata);
-            //
-            // We don't care about this data... we should've already sent everything. Really tdata should be 0 here.
-            //
-            if (result == PJ_SUCCESS && tdata)
-            {
-                result = pjsip_inv_send_msg(mSessionPriv->mInviteSession, tdata);
-                if (result != PJ_SUCCESS)
-                {
-                    lg(Debug) << "We were trying to do the right thing here...";
-                }
-            }
-        }
-
         //
         // We moved the clean up of this object to here to avoid conflicts between pjsip originating operations
         // and the pending operations in the session object. The only other way to do it would've been to have

commit df0edc0b3f42960b1fc2fd65ab19fb8b1f120406
Author: Brent Eagles <beagles at digium.com>
Date:   Tue Aug 7 17:53:37 2012 -0230

    Fix a race condition where the dialog may disappear while we are using it.

diff --git a/src/PJSIPSessionModule.cpp b/src/PJSIPSessionModule.cpp
index 5110b70..843cedf 100644
--- a/src/PJSIPSessionModule.cpp
+++ b/src/PJSIPSessionModule.cpp
@@ -442,7 +442,15 @@ public:
         mReplacedDialog(replacedDialog),
         mDestination(destination),
         mCallerID(callerID),
-        mRedirections(redirections) { }
+        mRedirections(redirections) 
+    { 
+        pjsip_dlg_inc_session(mInv->dlg, &mSessionModule->getModule());
+    }
+
+    ~SessionCreationOperation()
+    {
+        pjsip_dlg_dec_session(mInv->dlg, &mSessionModule->getModule());
+    }
 
 protected:
     SuspendableWorkResult initial(const SuspendableWorkListenerPtr&)

commit 3ee43d5514fc8a011b47fae23b3275c5ff5806ce
Merge: 751dc5a 3dd0ab1
Author: Brent Eagles <beagles at digium.com>
Date:   Mon Aug 6 17:02:03 2012 -0230

    Merge recent changes to integration branch 'x_safe_constructors' into safe_construct_w_pool branch


commit 3dd0ab1b6ac94f6e8cc41a70e568ab9fbcad17ff
Author: Brent Eagles <beagles at digium.com>
Date:   Fri Aug 3 17:10:27 2012 -0230

    Testing a fix to a possible clean up issue for SIP sessions where
    dialogs were not properly cleaned up for successful sessions.

diff --git a/src/SIPSession.cpp b/src/SIPSession.cpp
index 1b326f9..a84a1c5 100755
--- a/src/SIPSession.cpp
+++ b/src/SIPSession.cpp
@@ -3371,6 +3371,23 @@ public:
         }
         mSessionPriv->mEndpoint->removeSession(mSession);
 
+        if (mSessionPriv->mInviteSession)
+        {
+            pjsip_tx_data* tdata;
+            pj_status_t result = pjsip_inv_end_session(mSessionPriv->mInviteSession, 200, 0, &tdata);
+            //
+            // We don't care about this data... we should've already sent everything. Really tdata should be 0 here.
+            //
+            if (result == PJ_SUCCESS && tdata)
+            {
+                result = pjsip_inv_send_msg(mSessionPriv->mInviteSession, tdata);
+                if (result != PJ_SUCCESS)
+                {
+                    lg(Debug) << "We were trying to do the right thing here...";
+                }
+            }
+        }
+
         //
         // We moved the clean up of this object to here to avoid conflicts between pjsip originating operations
         // and the pending operations in the session object. The only other way to do it would've been to have
diff --git a/src/SIPSession.h b/src/SIPSession.h
index 00ea2bd..dfc23c8 100644
--- a/src/SIPSession.h
+++ b/src/SIPSession.h
@@ -322,7 +322,7 @@ public:
     void getSinks_async(
             const AsteriskSCF::SessionCommunications::V1::AMD_TelephonySession_getSinksPtr&,
             const Ice::Current&);
-
+    
     /**
      * Only called from within a queued operation
      */

commit 751dc5af1f264b3c13a9ae7385ed21c669a3deab
Merge: a6c9a6b a872d4a
Author: Ken Hunt <ken.hunt at digium.com>
Date:   Fri Jul 27 13:08:39 2012 -0500

    Merge branch 'x_safe_constructors' of git.asterisk.org:asterisk-scf/integration/sip into safe_construct_w_pool

diff --cc src/PJSIPSessionModule.cpp
index 07ac87c,5affc20..5110b70
--- a/src/PJSIPSessionModule.cpp
+++ b/src/PJSIPSessionModule.cpp
@@@ -2372,40 -2360,27 +2372,50 @@@ void PJSIPSessionModuleThreadPoolListen
      //XXX Making this behavior more customizable would be nice
      //
      //For now, what we do is kill all idle threads.
 -    if (idle > 0)
 +
 +    int poolSize = active + idle;
 +    int newPoolSize = poolSize; 
 +
 +    // Should we consider downsizing the pool?
 +    if (poolSize > mDefaultPoolSize)
      {
 -        lg(Debug) << "idle threads available, increasing pool size to match active threads. A: " << active << " I: " << idle;
 -        pool->setSize((int) active);
 +        // Are there any idle threads to shut down?
 +        if (idle > 0)
 +        {
 +           newPoolSize = std::max(mDefaultPoolSize, active);
 +           if (newPoolSize != poolSize)
 +           {
++              lg(Debug) << "Excess idle threads being shutdown. A: " << active << " I: " << idle << " default: " << mDefaultPoolSize;
++
 +               mActiveThreads = newPoolSize;
 +               pool->setSize(newPoolSize);
 +           }
 +        }
++	else
++	{
++            lg(Debug) << "No idle threads to shutdown. A: " << active << " default: " << mDefaultPoolSize;
++	}
+     }
+     else
+     { 
 -        lg(Debug) <<  "No current idle threads, the number of active threads is " << active;
++        lg(Debug) <<  "Pool size at default. Active: " << active;
      }
 -    mActiveThreads = active;
  }
  
 -void PJSIPSessionModuleThreadPoolListener::queueWorkAdded(const PoolPtr& pool, Ice::Long numNewWork, bool)
 +void PJSIPSessionModuleThreadPoolListener::queueWorkAdded(const PoolPtr& pool, Ice::Long numNewWork, Ice::Long newQueueSize)
  {
      //XXX Making this behavior more customizable would be nice
 -    //
 -    //For now, use one thread per work item.
  
 -    lg(Debug) << "Detected the addition of work to SIP's thread pool. Setting the size to " << mActiveThreads + numNewWork;
 -    int newSize = (int) (mActiveThreads + numNewWork);
 -    pool->setSize(newSize);
 +    if (mActiveThreads < mMaxPoolSize)
 +    {
 +        if (newQueueSize > mDefaultPoolSize && newQueueSize < mActiveThreads)
 +        {
 +            int newSize = std::min(mMaxPoolSize, newQueueSize);
 +            lg(Debug) << "SIP's thread pool queue size exceeds default threads. Setting pool size to " << newSize;
 +            mActiveThreads = newSize;
 +            pool->setSize(newSize);
 +        }
 +    }
  }
  
  void PJSIPSessionModuleThreadPoolListener::queueEmptied(const PoolPtr& pool)

commit a6c9a6b1d8e3e1219d4509e18d8300fa2ec38aad
Author: Ken Hunt <ken.hunt at digium.com>
Date:   Thu Jul 26 19:12:43 2012 -0500

    Limit the growth of the thread pool.

diff --git a/config/test_sip.conf b/config/test_sip.conf
index 3254e57..2a80127 100644
--- a/config/test_sip.conf
+++ b/config/test_sip.conf
@@ -27,6 +27,9 @@ SIPSessionGateway.SIP.RoutingDestinationId=pjsip
 # PJSIP Modules to register
 SIPSessionGateway.SIP.Modules=Session
 
+SIPSessionGateway.SIP.Modules.Session.ThreadPool.DefaultSize=20
+SIPSessionGateway.SIP.Modules.Session.ThreadPool.MaxSize=30
+
 # The service name of the State replicator to use
 SIPSessionGateway.SIP.StateReplicatorService=default
 
diff --git a/src/Component.cpp b/src/Component.cpp
index c38816d..3a5c6ca 100644
--- a/src/Component.cpp
+++ b/src/Component.cpp
@@ -385,6 +385,7 @@ void Component::registerPJSIPModules()
         boost::static_pointer_cast<SIPReplicationContext>(getReplicationContext());
 
     Ice::PropertiesPtr props = getCommunicator()->getProperties();
+
     Ice::StringSeq moduleNames = props->getPropertyAsList(getName() + ".SIP.Modules");
     for (Ice::StringSeq::iterator i = moduleNames.begin();
          i != moduleNames.end();
@@ -399,7 +400,11 @@ void Component::registerPJSIPModules()
         //
         if ((*i) == "Session")
         {
-            mPJSIPManager->registerSessionModule(mEndpointFactory,
+            Ice::PropertyDict sessionModuleProps = props->getPropertiesForPrefix(getName() + ".SIP.Modules.Session");
+
+            mPJSIPManager->registerSessionModule(
+                sessionModuleProps,
+                mEndpointFactory,
                 mSessionRouter,
                 getServiceLocator(),
                 sipReplicationContext,
diff --git a/src/PJSIPManager.cpp b/src/PJSIPManager.cpp
index 10fa36b..9a0f31f 100644
--- a/src/PJSIPManager.cpp
+++ b/src/PJSIPManager.cpp
@@ -128,7 +128,9 @@ PJSIPManager::~PJSIPManager()
 }
 
 
-void PJSIPManager::registerSessionModule(const boost::shared_ptr<SIPEndpointFactory>& endpointFactoryPtr,
+void PJSIPManager::registerSessionModule(
+        const Ice::PropertyDict& sessionModuleProps,
+        const boost::shared_ptr<SIPEndpointFactory>& endpointFactoryPtr,
         const AsteriskSCF::Discovery::SmartProxy<
             AsteriskSCF::SessionCommunications::V1::SessionRouterPrx>& sessionRouter,
         const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
@@ -137,7 +139,7 @@ void PJSIPManager::registerSessionModule(const boost::shared_ptr<SIPEndpointFact
         const AsteriskSCF::Core::Discovery::V1::ServiceLocatorManagementPrx& serviceLocatorManagement
     )
 {
-    mSessionModule = new PJSIPSessionModule(mEndpoint, endpointFactoryPtr, sessionRouter,
+    mSessionModule = new PJSIPSessionModule(sessionModuleProps, mEndpoint, endpointFactoryPtr, sessionRouter,
         serviceLocator, replicationContext, adapter, serviceLocatorManagement);
 }
 
diff --git a/src/PJSIPManager.h b/src/PJSIPManager.h
index e5eecef..559f4ce 100644
--- a/src/PJSIPManager.h
+++ b/src/PJSIPManager.h
@@ -95,7 +95,9 @@ public:
      * Register the PJSIPSessionModule, responsible
      * for basic call handling
      */
-    void registerSessionModule(const boost::shared_ptr<SIPEndpointFactory>& endpointFactoryPtr,
+    void registerSessionModule(
+        const Ice::PropertyDict& sessionModuleProps,
+        const boost::shared_ptr<SIPEndpointFactory>& endpointFactoryPtr,
         const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::SessionCommunications::V1::SessionRouterPrx>&
             sessionRouter,
         const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
diff --git a/src/PJSIPSessionModule.cpp b/src/PJSIPSessionModule.cpp
index bc009ae..07ac87c 100644
--- a/src/PJSIPSessionModule.cpp
+++ b/src/PJSIPSessionModule.cpp
@@ -29,6 +29,7 @@
 #include <boost/lexical_cast.hpp>
 #include <boost/algorithm/string.hpp>
 #include <boost/numeric/conversion/cast.hpp>
+#include <algorithm>
 
 #include <AsteriskSCF/Core/Endpoint/EndpointIf.h>
 #include <AsteriskSCF/Core/Routing/RoutingIf.h>
@@ -2352,40 +2353,73 @@ void PJSIPSessionModule::enqueueSessionWork(const SuspendableWorkPtr& work, pjsi
     }
 }
 
-PJSIPSessionModuleThreadPoolListener::PJSIPSessionModuleThreadPoolListener()
-    : mActiveThreads(0), mPJLIBHook(new AsteriskSCF::PJLIB::ThreadHook("SIP Session Module Pool")) { }
+PJSIPSessionModuleThreadPoolListener::PJSIPSessionModuleThreadPoolListener(Ice::Long defaultPoolSize, Ice::Long maxPoolSize)
+    : mActiveThreads(0), 
+      mPJLIBHook(new AsteriskSCF::PJLIB::ThreadHook("SIP Session Module Pool")), 
+      mDefaultPoolSize(defaultPoolSize),
+      mMaxPoolSize(maxPoolSize) 
+{ 
+}
+
+void PJSIPSessionModuleThreadPoolListener::initialized(const PoolPtr& pool)
+{
+    pool->setSize(mDefaultPoolSize);
+    mActiveThreads = mDefaultPoolSize;
+}
 
 void PJSIPSessionModuleThreadPoolListener::stateChanged(const PoolPtr& pool, Ice::Long active, Ice::Long idle, Ice::Long)
 {
     //XXX Making this behavior more customizable would be nice
     //
     //For now, what we do is kill all idle threads.
-    if (idle > 0)
+
+    int poolSize = active + idle;
+    int newPoolSize = poolSize; 
+
+    // Should we consider downsizing the pool?
+    if (poolSize > mDefaultPoolSize)
     {
-        pool->setSize((int) active);
+        // Are there any idle threads to shut down?
+        if (idle > 0)
+        {
+           newPoolSize = std::max(mDefaultPoolSize, active);
+           if (newPoolSize != poolSize)
+           {
+               mActiveThreads = newPoolSize;
+               pool->setSize(newPoolSize);
+           }
+        }
     }
-    mActiveThreads = active;
 }
 
-void PJSIPSessionModuleThreadPoolListener::queueWorkAdded(const PoolPtr& pool, Ice::Long numNewWork, bool)
+void PJSIPSessionModuleThreadPoolListener::queueWorkAdded(const PoolPtr& pool, Ice::Long numNewWork, Ice::Long newQueueSize)
 {
     //XXX Making this behavior more customizable would be nice
-    //
-    //For now, use one thread per work item.
 
-    lg(Debug) << "Detected the addition of work to SIP's thread pool. Setting the size to " << mActiveThreads + numNewWork;
-    int newSize = (int) (mActiveThreads + numNewWork);
-    pool->setSize(newSize);
+    if (mActiveThreads < mMaxPoolSize)
+    {
+        if (newQueueSize > mDefaultPoolSize && newQueueSize < mActiveThreads)
+        {
+            int newSize = std::min(mMaxPoolSize, newQueueSize);
+            lg(Debug) << "SIP's thread pool queue size exceeds default threads. Setting pool size to " << newSize;
+            mActiveThreads = newSize;
+            pool->setSize(newSize);
+        }
+    }
 }
 
 void PJSIPSessionModuleThreadPoolListener::queueEmptied(const PoolPtr& pool)
 {
     //XXX Making this behavior more customizable would be nice
     //
-    //For now, kill off everything
 
-    lg(Debug) << "The queue is empty so we're killing all the threads";
-    pool->setSize(0);
+    if (mActiveThreads > mDefaultPoolSize)
+    {
+        lg(Debug) << "The queue is empty so we're dropping thread pool back to default size.";
+
+        mActiveThreads = mDefaultPoolSize;
+        pool->setSize(mDefaultPoolSize);
+    }
 }
 
 void PJSIPSessionModuleThreadPoolListener::threadStart()
@@ -2419,12 +2453,14 @@ SessionWork::SessionWork(const QueuePtr& queue)
     : mThreadPoolQueue(queue),
     mInternalQueue(new SuspendableWorkQueue(this)) { }
 
-void SessionWork::workAdded(const QueueBasePtr&, Ice::Long, bool wasEmpty)
+void SessionWork::workAdded(const QueueBasePtr&, Ice::Long numWorkAdded, Ice::Long newQueueSize)
 {
-    if (wasEmpty)
+    // Was this WorkQueue empty?
+    if (numWorkAdded == newQueueSize)
     {
         try
         {
+            // We need to be put back into the Pool's queue to be assigned to a worker thread. 
             mThreadPoolQueue->enqueueWork(this);
         }
         catch (const ShuttingDown&)
diff --git a/src/PJSIPSessionModule.h b/src/PJSIPSessionModule.h
index d08b475..1688609 100644
--- a/src/PJSIPSessionModule.h
+++ b/src/PJSIPSessionModule.h
@@ -81,15 +81,18 @@ private:
 class PJSIPSessionModuleThreadPoolListener : public AsteriskSCF::System::ThreadPool::V1::PoolListener
 {
 public:
-    PJSIPSessionModuleThreadPoolListener();
+    PJSIPSessionModuleThreadPoolListener(Ice::Long defaultPoolSize, Ice::Long maxPoolSize);
+    void initialized(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool);
     void stateChanged(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool, Ice::Long active, Ice::Long idle, Ice::Long zombie);
-    void queueWorkAdded(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool, Ice::Long count, bool wasEmpty);
+    void queueWorkAdded(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool, Ice::Long count, Ice::Long newQueueSize);
     void queueEmptied(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool);
     void threadStart();
     void threadStop();
 private:
     Ice::Long mActiveThreads;
     AsteriskSCF::PJLIB::ThreadHook *mPJLIBHook;
+    Ice::Long mDefaultPoolSize;
+    Ice::Long mMaxPoolSize;
 };
 
 typedef IceUtil::Handle<PJSIPSessionModuleThreadPoolListener> PJSIPSessionModuleThreadPoolListenerPtr;
@@ -127,7 +130,10 @@ typedef std::vector<ReasonId> ReasonIdSeq;
 class PJSIPSessionModule : public PJSIPModule
 {
 public:
-    PJSIPSessionModule(pjsip_endpoint *endpt, const boost::shared_ptr<SIPEndpointFactory>& endpointFactoryPtr,
+    PJSIPSessionModule(
+        const Ice::PropertyDict& sessionModuleProps,
+        pjsip_endpoint *endpt, 
+        const boost::shared_ptr<SIPEndpointFactory>& endpointFactoryPtr,
         const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::SessionCommunications::V1::SessionRouterPrx>&
             sessionRouter,
         const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
diff --git a/src/PJSIPSessionModuleConstruction.cpp b/src/PJSIPSessionModuleConstruction.cpp
index f5fe146..c627778 100644
--- a/src/PJSIPSessionModuleConstruction.cpp
+++ b/src/PJSIPSessionModuleConstruction.cpp
@@ -14,6 +14,8 @@
  * at the top of the source tree.
  */
 
+#include <boost/algorithm/string/predicate.hpp>
+
 #include <AsteriskSCF/Logger.h>
 #include <AsteriskSCF/WorkQueue/WorkQueue.h>
 #include <AsteriskSCF/ThreadPool/ThreadPool.h>
@@ -130,7 +132,25 @@ static void invOnSendReinviteResponse(pjsip_inv_session *inv, pjsip_tx_data *tda
     return sessionModule->invOnSendReinviteResponse(inv, tdata);
 }
 
-PJSIPSessionModule::PJSIPSessionModule(pjsip_endpoint *endpt,
+int getIntValue(const std::string& propertyName, const std::string& propertyValue, int defaultValue)
+{
+    int value = defaultValue;
+    try
+    {
+        value = boost::lexical_cast<int>(propertyValue);
+    }
+    catch(boost::bad_lexical_cast const&)
+    {
+        lg(Error) << "Invalid value for " << propertyName << " specified. Using default of " << defaultValue;
+        return defaultValue;
+    }
+
+    return value;
+}
+
+PJSIPSessionModule::PJSIPSessionModule(
+    const Ice::PropertyDict& sessionModuleProps,
+    pjsip_endpoint *endpt,
     const boost::shared_ptr<SIPEndpointFactory>& endpointFactoryPtr,
     const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::SessionCommunications::V1::SessionRouterPrx>& sessionRouter,
     const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
@@ -142,6 +162,7 @@ PJSIPSessionModule::PJSIPSessionModule(pjsip_endpoint *endpt,
       mReplicationContext(replicationContext), mEndpoint(endpt),
       mSessionCreationExtensionPoint(new SIPSessionCreationExtensionPoint()), mAdapter(adapter)
 {
+
     sessionModule = this;
     mModule.name = pj_str(moduleName);
     mModule.priority = PJSIP_MOD_PRIORITY_APPLICATION;
@@ -163,6 +184,22 @@ PJSIPSessionModule::PJSIPSessionModule(pjsip_endpoint *endpt,
         mSessionCreationExtensionPointPrx, 
         SessionCreationExtensionPointId);
 
+    int defaultThreadPoolSize=10;
+    int maxThreadPoolSize=15;
+
+    Ice::PropertyDict::const_iterator prop;
+    for (prop = sessionModuleProps.begin(); prop != sessionModuleProps.end(); prop++)
+    {
+        if (boost::algorithm::ends_with((*prop).first, "ThreadPool.DefaultSize"))
+        {
+            defaultThreadPoolSize = getIntValue((*prop).first, (*prop).second, defaultThreadPoolSize);
+        }
+        else if (boost::algorithm::ends_with((*prop).first, "ThreadPool.MaxSize"))
+        {
+            maxThreadPoolSize = getIntValue((*prop).first, (*prop).second, maxThreadPoolSize);
+        }
+    }
+
     // TBD... how to access the Component's service and instance ids. 
     mSessionCreationExtensionPointService->addLocatorParams(
         AsteriskSCF::Operations::createContext(), 
@@ -170,7 +207,7 @@ PJSIPSessionModule::PJSIPSessionModule(pjsip_endpoint *endpt,
         "");
 
     mPoolQueue = new AsteriskSCF::WorkQueue::WorkQueue();
-    mPoolListener = new PJSIPSessionModuleThreadPoolListener(); 
+    mPoolListener = new PJSIPSessionModuleThreadPoolListener(defaultThreadPoolSize, maxThreadPoolSize); 
 
     ThreadPoolFactoryPtr factory(new ThreadPoolFactory);
     mPool = factory->createPool(mPoolListener, mPoolQueue);
diff --git a/src/SIPSession.h b/src/SIPSession.h
index e4aa05a..0464b3b 100644
--- a/src/SIPSession.h
+++ b/src/SIPSession.h
@@ -77,7 +77,7 @@ public:
      * Overrides of QueueListener interface
      */
     SessionWork(const AsteriskSCF::System::WorkQueue::V1::QueuePtr& threadPoolQueue);
-    void workAdded(const AsteriskSCF::System::WorkQueue::V1::QueueBasePtr&, Ice::Long newWork, bool wasEmpty);
+    void workAdded(const AsteriskSCF::System::WorkQueue::V1::QueueBasePtr&, Ice::Long newWork, Ice::Long newQueueSize);
     void workResumable(const AsteriskSCF::System::WorkQueue::V1::QueueBasePtr&);
     void emptied(const AsteriskSCF::System::WorkQueue::V1::QueueBasePtr&);
     void shuttingDown(const AsteriskSCF::System::WorkQueue::V1::QueueBasePtr&);

commit a872d4a38992c64a4ebecf880c202a761aabe474
Author: Ken Hunt <ken.hunt at digium.com>
Date:   Mon Jul 16 18:52:56 2012 -0500

    HYDRA-827 - Changes to make registrar thread safe.

diff --git a/src/PJSIPRegistrarModule.cpp b/src/PJSIPRegistrarModule.cpp
index 1821f41..3607167 100644
--- a/src/PJSIPRegistrarModule.cpp
+++ b/src/PJSIPRegistrarModule.cpp
@@ -678,35 +678,43 @@ bool verifyContacts(const std::vector<pjsip_contact_hdr *>& contacts)
     return true;
 }
 
-BindingWrapperSeq::iterator PJSIPRegistrarModule::findMatchingBinding(pjsip_contact_hdr *contact, BindingWrapperSeq& bindings)
+BindingWrapperSeq::iterator PJSIPRegistrarModule::findMatchingBinding(const std::string& contactURI, BindingWrapperSeq& bindings)
 {
-    char buf[512];
-    pjsip_sip_uri *contactURI = (pjsip_sip_uri *)pjsip_uri_get_uri(contact->uri);
-    pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, contactURI, buf, sizeof(buf));
-    std::string contactURIStr(buf, strlen(buf));
-
-    lg(Debug) << "Searching for binding " << contactURIStr;
+    lg(Debug) << "Searching for binding " << contactURI;
     for (BindingWrapperSeq::iterator iter = bindings.begin(); iter != bindings.end(); ++iter)
     {
-        lg(Debug) << "Comparing REGISTER contact " << contactURIStr << " with binding contact " << (*iter)->mBinding->contact;
-        if (contactURIStr == (*iter)->mBinding->contact || contactURIStr == "*")
+        lg(Debug) << "Comparing REGISTER contact " << contactURI << " with binding contact " << (*iter)->mBinding->contact;
+        if (contactURI == (*iter)->mBinding->contact || contactURI == "*")
         {
             lg(Debug) << "Found matching binding " << (*iter)->mBinding->contact;
             return iter;
         }
     }
-    lg(Debug) << "No matching binding found for " << contactURIStr;
+    lg(Debug) << "No matching binding found for " << contactURI;
     return bindings.end();
 }
 
-int PJSIPRegistrarModule::getExpiration(pjsip_contact_hdr *contact, pjsip_rx_data *rdata)
+void PJSIPRegistrarModule::addURIExpiration(ContactExpireMap& expireMap, pjsip_contact_hdr *contact, int defaultExpire)
 {
+    int expires = defaultExpire;
+
     if (contact->expires != -1)
     {
         lg(Debug) << "Contact header has expires parameter of " << contact->expires;
-        return contact->expires;
+        expires = contact->expires;
     }
 
+    char buf[512];
+    pjsip_sip_uri *contactURI = (pjsip_sip_uri *)pjsip_uri_get_uri(contact->uri);
+    pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, contactURI, buf, sizeof(buf));
+    std::string contactURIStr(buf, strlen(buf));
+
+    expireMap[contactURIStr] = expires;
+}
+
+int PJSIPRegistrarModule::getMessageExpiration(pjsip_rx_data *rdata)
+{
+
     pjsip_expires_hdr *expires = (pjsip_expires_hdr *) pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
 
     if (expires)
@@ -725,49 +733,21 @@ int PJSIPRegistrarModule::getExpiration(pjsip_contact_hdr *contact, pjsip_rx_dat
     }
 }
 
-BindingWrapperPtr PJSIPRegistrarModule::createNewBinding(pjsip_contact_hdr *contact, const std::string &callID, int cSeq, int expiration, const std::string& aor)
+BindingWrapperPtr PJSIPRegistrarModule::createNewBinding(const std::string& contactURI, 
+      const std::string &callID, int cSeq, int expiration, const std::string& aor)
 {
-    char buf[512];
-    pjsip_sip_uri *contactURI = (pjsip_sip_uri *)pjsip_uri_get_uri(contact->uri);
-    pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, contactURI, buf, sizeof(buf));
-    std::string contactURIStr(buf, strlen(buf));
-
-    lg(Debug) << "Creating new binding with Contact: " << contactURIStr
+    lg(Debug) << "Creating new binding with Contact: " << contactURI
         << ", Call-Id: " << callID
         << ", CSeq: " << cSeq
         << ", and Expires: " << expiration;
 
-    BindingPtr binding(new Binding(contactURIStr, callID, cSeq, (int)time(NULL) + expiration));
+    BindingPtr binding(new Binding(contactURI, callID, cSeq, (int)time(NULL) + expiration));
     BindingWrapperPtr wrapper(new BindingWrapper(expiration, binding, this, mEndpoint, aor));
     //We could just return binding, but using wrapper here makes the compiler
     //not complain about an unused variable.
     return wrapper;
 }
 
-class Response : public Work
-{
-public:
-    Response(pjsip_transaction *tsx, pjsip_tx_data *tdata, const std::string& aor, const RegistrarIPtr& registrar)
-        : mTsx(tsx), mTdata(tdata), mAOR(aor), mRegistrar(registrar) { }
-    void execute()
-    {
-        BindingWrapperSeq wrappers = mRegistrar->getAORBindingWrappers(mAOR);
-        for (BindingWrapperSeq::const_iterator iter = wrappers.begin(); iter != wrappers.end(); ++iter)
-        {
-            pjsip_contact_hdr *contact = pjsip_contact_hdr_create(mTdata->pool);
-            contact->uri = pjsip_parse_uri(mTdata->pool, (char*) (*iter)->mBinding->contact.c_str(), (*iter)->mBinding->contact.size(), PJSIP_URI_IN_FROMTO_HDR);
-            contact->expires = (*iter)->mBinding->expiration - time(0);
-            pjsip_msg_add_hdr(mTdata->msg, (pjsip_hdr*) contact);
-        }
-        pjsip_tsx_send_msg(mTsx, mTdata);
-    }
-private:
-    pjsip_transaction *mTsx;
-    pjsip_tx_data *mTdata;
-    const std::string mAOR;
-    RegistrarIPtr mRegistrar;
-};
-
 bool PJSIPRegistrarModule::checkAuth(pjsip_rx_data *rdata, pjsip_transaction *tsx, RequestType type)
 {
     DigestChallengeSeq digests;
@@ -804,6 +784,96 @@ void PJSIPRegistrarModule::authTimeout(pj_timer_heap_t *timer_heap, pj_timer_ent
     mAuthManager->authTimeout(timer_heap, entry);
 }
 
+class RegisterRequest : public Work
+{
+public:
+    RegisterRequest(
+            const PJSIPRegistrarModulePtr& module,
+            const RegistrarIPtr& registrar,
+            const std::string& aor,
+            const ContactExpireMap& contactExpireMap,
+            const std::string& callId,
+            int cSeq,
+            pjsip_transaction* tsx,
+            pjsip_tx_data *tdata)
+        : mModule(module),
+          mRegistrar(registrar), 
+          mAOR(aor), 
+          mContactExpireMap(contactExpireMap), 
+          mCallId(callId), 
+          mCSeq(cSeq),
+          mTsx(tsx),
+          mTdata(tdata) {}
+
+    void execute()
+    {
+        // We should attempt to determine at this point who the
+        // REGISTER is from and determine whether they have permission
+        // to be doing this sort of thing. How this is done is still murky
+        // and should perhaps be put in writing on the wiki or in some other
+        // way decided upon before trying to write any code for it.
+       
+        lg(Debug) << "AoR in REGISTER is " << mAOR;
+        BindingWrapperSeq& currentBindings = mRegistrar->getAORBindingWrappers(mAOR);
+
+        BindingWrapperSeq newBindings;
+        BindingWrapperSeq removedBindings;
+        BindingWrapperSeq existingBindings;
+        for (ContactExpireMap::iterator iter = mContactExpireMap.begin();
+                iter != mContactExpireMap.end(); ++iter)
+        {
+            BindingWrapperSeq::iterator bindingToUpdate = mModule->findMatchingBinding((*iter).first, currentBindings);
+
+            int expiration = (*iter).second;
+
+            if (bindingToUpdate != currentBindings.end())
+            {
+                if (expiration == 0)
+                {
+                    lg(Debug) << "Adding " << (*bindingToUpdate)->mBinding->contact << " to our bindings to remove";
+                    removedBindings.push_back(*bindingToUpdate);
+                }
+                else
+                {
+                    (*bindingToUpdate)->updateBinding(mCallId, mCSeq, expiration, mRegistrar->getQueue());
+                    lg(Debug) << "Maintaining " << (*bindingToUpdate)->mBinding->contact << " in our existing bindings";
+                    existingBindings.push_back(*bindingToUpdate);
+                }
+            }
+            else
+            {
+                BindingWrapperPtr wrapper = mModule->createNewBinding((*iter).first, mCallId, mCSeq, expiration, mAOR);
+                lg(Debug) << "Adding " << wrapper->mBinding->contact << " to our bindings to add";
+                newBindings.push_back(wrapper);
+            }
+        }
+
+        mRegistrar->addAndRemoveBindings(mAOR, newBindings, removedBindings, mRegistrar->getQueue());
+
+        BindingWrapperSeq wrappers = mRegistrar->getAORBindingWrappers(mAOR);
+        for (BindingWrapperSeq::const_iterator iter = wrappers.begin(); iter != wrappers.end(); ++iter)
+        {
+            pjsip_contact_hdr *contact = pjsip_contact_hdr_create(mTdata->pool);
+            contact->uri = pjsip_parse_uri(mTdata->pool, (char*) (*iter)->mBinding->contact.c_str(), (*iter)->mBinding->contact.size(), PJSIP_URI_IN_FROMTO_HDR);
+            contact->expires = (*iter)->mBinding->expiration - time(0);
+            pjsip_msg_add_hdr(mTdata->msg, (pjsip_hdr*) contact);
+        }
+        pjsip_tsx_send_msg(mTsx, mTdata);
+
+        mModule->replicateState(mAOR, existingBindings, newBindings, removedBindings);
+    }
+
+private:
+    PJSIPRegistrarModulePtr mModule;
+    RegistrarIPtr mRegistrar;
+    const std::string mAOR;
+    ContactExpireMap mContactExpireMap;
+    std::string mCallId;
+    int mCSeq;
+    pjsip_transaction *mTsx;
+    pjsip_tx_data *mTdata;
+};
+
 pj_bool_t PJSIPRegistrarModule::on_rx_request(pjsip_rx_data *rdata)
 {
     if (rdata->msg_info.msg->line.req.method.id != PJSIP_REGISTER_METHOD)
@@ -827,17 +897,8 @@ pj_bool_t PJSIPRegistrarModule::on_rx_request(pjsip_rx_data *rdata)
         return PJ_TRUE;
     }
 
-    // We should attempt to determine at this point who the
-    // REGISTER is from and determine whether they have permission
-    // to be doing this sort of thing. How this is done is still murky
-    // and should perhaps be put in writing on the wiki or in some other
-    // way decided upon before trying to write any code for it.
-
     std::string aor = getAOR(rdata);
-    lg(Debug) << "AoR in REGISTER is " << aor;
-    BindingWrapperSeq& currentBindings = mRegistrar->getAORBindingWrappers(aor);
     std::vector<pjsip_contact_hdr *> registerContacts = extractRegisterContacts(rdata);
-
     std::string callID(pj_strbuf(&rdata->msg_info.cid->id), pj_strlen(&rdata->msg_info.cid->id));
     int cSeq = rdata->msg_info.cseq->cseq;
 
@@ -858,38 +919,6 @@ pj_bool_t PJSIPRegistrarModule::on_rx_request(pjsip_rx_data *rdata)
         return PJ_TRUE;
     }
 
-    BindingWrapperSeq newBindings;
-    BindingWrapperSeq removedBindings;
-    BindingWrapperSeq existingBindings;
-    for (std::vector<pjsip_contact_hdr *>::iterator iter = registerContacts.begin();
-            iter != registerContacts.end(); ++iter)
-    {
-        BindingWrapperSeq::iterator bindingToUpdate = findMatchingBinding(*iter, currentBindings);
-
-        int expiration = getExpiration(*iter, rdata);
-
-        if (bindingToUpdate != currentBindings.end())
-        {
-            if (expiration == 0)
-            {
-                lg(Debug) << "Adding " << (*bindingToUpdate)->mBinding->contact << " to our bindings to remove";
-                removedBindings.push_back(*bindingToUpdate);
-            }
-            else
-            {
-                (*bindingToUpdate)->updateBinding(callID, cSeq, expiration, mRegistrar->getQueue());
-                lg(Debug) << "Maintaining " << (*bindingToUpdate)->mBinding->contact << " in our existing bindings";
-                existingBindings.push_back(*bindingToUpdate);
-            }
-        }
-        else
-        {
-            BindingWrapperPtr wrapper = createNewBinding(*iter, callID, cSeq, expiration, aor);
-            lg(Debug) << "Adding " << wrapper->mBinding->contact << " to our bindings to add";
-            newBindings.push_back(wrapper);
-        }
-    }
-
     // We enqueue the SIP response to make sure we send it and replicate state AFTER we have updated
     // our internal bindings.
     pjsip_tx_data *tdata;
@@ -899,9 +928,18 @@ pj_bool_t PJSIPRegistrarModule::on_rx_request(pjsip_rx_data *rdata)
         return PJ_TRUE;
     }
 
-    mRegistrar->addAndRemoveBindings(aor, newBindings, removedBindings, mRegistrar->getQueue());
-    mRegistrar->getQueue()->enqueueWork(new Response(tsx, tdata, aor, mRegistrar));
-    replicateState(aor, existingBindings, newBindings, removedBindings);
+    int defaultExpiration = getMessageExpiration(rdata);
+
+    ContactExpireMap contactExpireMap;
+    for (std::vector<pjsip_contact_hdr *>::iterator iter = registerContacts.begin();
+         iter != registerContacts.end(); ++iter)
+    {
+        addURIExpiration(contactExpireMap, (*iter), defaultExpiration);
+    }
+
+    // enqueue the work
+    getRegistrar()->getQueue()->enqueueWork(
+        new RegisterRequest(this, getRegistrar(), aor, contactExpireMap, callID, cSeq, tsx, tdata));
 
     return PJ_TRUE;
 }
diff --git a/src/PJSIPRegistrarModule.h b/src/PJSIPRegistrarModule.h
index 43dc6a6..7027da0 100644
--- a/src/PJSIPRegistrarModule.h
+++ b/src/PJSIPRegistrarModule.h
@@ -101,6 +101,8 @@ private:
 
 typedef IceUtil::Handle<RegistrarI> RegistrarIPtr;
 
+typedef std::map<std::string, int> ContactExpireMap;
+
 class PJSIPRegistrarModule : public PJSIPModule
 {
 public:
@@ -134,32 +136,35 @@ public:
         const BindingWrapperSeq& newBindings,
         const BindingWrapperSeq& removedBindings);
 
-private:
-    /**
-     * Extract the AoR from the To header of a REGISTER request
-     */
-    std::string getAOR(pjsip_rx_data *rdata);
-    /**
-     * Get the Contact URIs from a REGISTER request
-     */
-    std::vector<pjsip_contact_hdr *> extractRegisterContacts(pjsip_rx_data *rdata);
     /**
      * Try to find an existing BindingWrapper matching a Contact header from
      * a REGISTER
      */
-    BindingWrapperSeq::iterator findMatchingBinding(pjsip_contact_hdr *contact,
+    BindingWrapperSeq::iterator findMatchingBinding(const std::string& contactURI,
             BindingWrapperSeq& bindings);
-    /**
-     * Find the expiration of a binding from a REGISTER request
-     */
-    int getExpiration(pjsip_contact_hdr *contact, pjsip_rx_data *rdata);
+
     /**
      * Create a new BindingWrapper based on information extracted from a
      * REGISTER request
      */
-    BindingWrapperPtr createNewBinding(pjsip_contact_hdr *contact,
+    BindingWrapperPtr createNewBinding(const std::string& contactURI,
             const std::string& callID, int cSeq, int expiration, const std::string& aor);
 
+    /**
+     * Find the expiration of a binding from a REGISTER request
+     */
+    int getMessageExpiration(pjsip_rx_data *rdata);
+
+private:
+    /**
+     * Extract the AoR from the To header of a REGISTER request
+     */
+    std::string getAOR(pjsip_rx_data *rdata);
+    /**
+     * Get the Contact URIs from a REGISTER request
+     */
+    std::vector<pjsip_contact_hdr *> extractRegisterContacts(pjsip_rx_data *rdata);
+
     bool checkAuth(pjsip_rx_data *rdata,
             pjsip_transaction *tsx,
             AsteriskSCF::SIP::ExtensionPoint::V1::RequestType type);
@@ -169,6 +174,8 @@ private:
     pjsip_endpoint *mEndpoint;
     RegistrarIPtr mRegistrar;
     SIPReplicationContextPtr mReplicationContext;
+
+    void addURIExpiration(ContactExpireMap& contactExpireMap, pjsip_contact_hdr *contact, int defaultExpire);
 };
 
 typedef IceUtil::Handle<PJSIPRegistrarModule> PJSIPRegistrarModulePtr;
@@ -200,7 +207,10 @@ public:
      * Update the binding in the wrapper with new information. This will
      * result in rescheduling destruction of the binding.
      */
-    void updateBinding(const std::string &callID, int cSeq, int expiration, const AsteriskSCF::System::WorkQueue::V1::QueuePtr& queue);
+    void updateBinding(const std::string &callID, 
+                       int cSeq, 
+                       int expiration, 
+                       const AsteriskSCF::System::WorkQueue::V1::QueuePtr& queue);
 
     bool operator==(const BindingWrapper& rhs);
     bool operator==(const AsteriskSCF::SIP::Registration::V1::BindingPtr& rhs);

commit 34cf80f54a88569ae1894c2821d41d17945a324f
Author: Brent Eagles <beagles at digium.com>
Date:   Wed Jul 18 14:16:04 2012 -0230

    Add some logging to one of the methods that can alter the thread pool but does not yet have logging.

diff --git a/src/PJSIPSessionModule.cpp b/src/PJSIPSessionModule.cpp
index bc009ae..5affc20 100644
--- a/src/PJSIPSessionModule.cpp
+++ b/src/PJSIPSessionModule.cpp
@@ -2362,8 +2362,13 @@ void PJSIPSessionModuleThreadPoolListener::stateChanged(const PoolPtr& pool, Ice
     //For now, what we do is kill all idle threads.
     if (idle > 0)
     {
+        lg(Debug) << "idle threads available, increasing pool size to match active threads. A: " << active << " I: " << idle;
         pool->setSize((int) active);
     }
+    else
+    { 
+        lg(Debug) <<  "No current idle threads, the number of active threads is " << active;
+    }
     mActiveThreads = active;
 }
 

commit e6b65c10643898b1ac0109a3f42bc3ef09753cbe
Author: Brent Eagles <beagles at digium.com>
Date:   Tue Jul 17 20:25:43 2012 -0230

    The mSessions vector was modifiable directly by SIPsession and whatever
    operations initiated creation of sessions, however there was no lock around
    modifying or reading the collection.  I suspect this is what has been
    causing our nasty SIP component bug.

diff --git a/src/SIPEndpoint.cpp b/src/SIPEndpoint.cpp
index 5cd7a73..55f04c9 100644
--- a/src/SIPEndpoint.cpp
+++ b/src/SIPEndpoint.cpp
@@ -278,6 +278,11 @@ public:
     std::map<std::string, SIPRegistrationClientPtr> mClientRegistrations;
     AsteriskSCF::Collections::HandleSet<AsteriskSCF::SessionCommunications::V1::SessionCookiePtr>::SetPtr mDefaultSessionCookies;
     AsteriskSCF::Operations::OperationContextCachePtr mOperationContextCache;
+
+    //
+    // Temporary (?) fix for race conditions where multiple sessions for an Endpoint might actually be alive at a given time.
+    //
+    boost::mutex mSessionsLock;
 };
 
 SIPEndpoint::SIPEndpoint(const Ice::ObjectAdapterPtr& adapter, 
@@ -595,7 +600,10 @@ AsteriskSCF::SessionCommunications::V1::SessionPrx SIPEndpoint::createSession(
                                    mImplPriv->mConfig.sessionConfig.udptlOverICE,
                                    mImplPriv->mConfig.sessionConfig.udptlWithTURN));
 
-        mImplPriv->mSessions.push_back(session);
+        {
+            boost::lock_guard<boost::mutex> lock(mImplPriv->mSessionsLock);
+            mImplPriv->mSessions.push_back(session);
+        }
 
         std::cout << "And now we're returing a session proxy..." << std::endl;
 
@@ -639,6 +647,7 @@ AsteriskSCF::SIPSessionGateway::SIPSessionPtr SIPEndpoint::createSession(const s
                                 mImplPriv->mConfig.sessionConfig.udptlWithTURN)
 	     );
 
+    boost::lock_guard<boost::mutex> lock(mImplPriv->mSessionsLock);
     mImplPriv->mSessions.push_back(session);
     return session;
 }
@@ -683,6 +692,7 @@ SIPSessionPtr SIPEndpoint::createSession
                             mImplPriv->mConfig.sessionConfig.udptlOverICE,
                             mImplPriv->mConfig.sessionConfig.udptlWithTURN));
 
+    boost::lock_guard<boost::mutex> lock(mImplPriv->mSessionsLock);
     mImplPriv->mSessions.push_back(session);
     return session;
 }
@@ -695,6 +705,7 @@ SIPSessionPtr SIPEndpoint::createSession
 void SIPEndpoint::removeSession(const AsteriskSCF::SessionCommunications::V1::SessionPtr& session)
 {
     SIPSessionPtr sipsession = SIPSessionPtr::dynamicCast(session);
+    boost::lock_guard<boost::mutex> lock(mImplPriv->mSessionsLock);
     mImplPriv->mSessions.erase(std::remove(mImplPriv->mSessions.begin(), mImplPriv->mSessions.end(), sipsession),
             mImplPriv->mSessions.end());
 }
@@ -702,6 +713,7 @@ void SIPEndpoint::removeSession(const AsteriskSCF::SessionCommunications::V1::Se
 AsteriskSCF::SessionCommunications::V1::SessionSeq SIPEndpoint::getSessions(const Ice::Current&)
 {
     AsteriskSCF::SessionCommunications::V1::SessionSeq sessions;
+    boost::lock_guard<boost::mutex> lock(mImplPriv->mSessionsLock);
 
     for (std::vector<SIPSessionPtr>::const_iterator session = mImplPriv->mSessions.begin();
          session != mImplPriv->mSessions.end(); ++session)

commit dfa8647bf2e31c8de9cc0a7617049b061ed8ed9f
Author: Brent Eagles <beagles at digium.com>
Date:   Tue Jul 17 13:40:13 2012 -0230

    A large number of small changes:
    
     - Operation contexts are checked before enqueuing AMD ops, thereby reducing
       the amount of thread creation-per-operation bashing that the thread pool is
       currently exhibiting. This is better even if the thread pool were working
       the way we want.
    
     - Added a small "hack-like" change to SIPAMICallback to remove references to
       the SIPSession, etc once it is done with them. The reason is that the callback
       object is actually held onto by AsyncResult. AsyncResult is being passed
       back to an enqueued operation, etc. I cannot see at the moment how that
       could be causing the problem we are seeing at the moment, but this change
       might help make the bug manifest itself closer to its cause.

diff --git a/src/PJSIPSessionModule.h b/src/PJSIPSessionModule.h
index d08b475..4e67cd1 100644
--- a/src/PJSIPSessionModule.h
+++ b/src/PJSIPSessionModule.h
@@ -373,6 +373,8 @@ public:
         {
             mSession->enqueueSessionWork(new SIPAMICallbackOperation(r, mOperation));
         }
+
+        clear();
     }
 
 private:
@@ -405,6 +407,20 @@ private:
      * and mIsSuspended should never both be true.
      */
     bool mNeedsRequeuing;
+
+    void clear() 
+    {
+        //
+        // kind of a hack to see if this is what is causing the problem. The
+        // ami AsyncResult holds a reference to the callback, which we pass to
+        // an operation. There is just a lot of weird final references when the
+        // stack unwinds. This might make it a bit easier to figure out what is
+        // going on.
+        //
+        mSession = 0;
+        mListener = 0;
+        mOperation = 0;
+    } 
 };
 
 typedef IceUtil::Handle<SIPAMICallback> SIPAMICallbackPtr;
diff --git a/src/SIPSession.cpp b/src/SIPSession.cpp
index e6ce9dc..1b326f9 100755
--- a/src/SIPSession.cpp
+++ b/src/SIPSession.cpp
@@ -704,24 +704,12 @@ class ChangeStreamStatesOperation : public SuspendableWork
 public:
     ChangeStreamStatesOperation(
         const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_changeStreamStatesPtr& cb,
-        const OperationContextPtr& operationContext,
         const AsteriskSCF::Media::V1::StreamStateDict& streams,
         const boost::shared_ptr<SIPSessionPriv>& sessionPriv)
-        : mCb(cb), mOperationContext(operationContext), mStreams(streams), mImplPriv(sessionPriv) { }
+        : mCb(cb), mStreams(streams), mImplPriv(sessionPriv) { }
     
     SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
     {
-        AMDContextData<AMD_SessionController_changeStreamStatesPtr>::ptr_type operationCookie = 
-            getContext<AMDContextData<AMD_SessionController_changeStreamStatesPtr> >(
-                mImplPriv->mOperationContextCache,
-                mOperationContext,
-                mCb);
-        if (!operationCookie)
-        {
-            lg(Debug) << "changeStreamStates() detected retry for operation " << mOperationContext->id;
-            return Complete;
-        }
-
         lg(Debug) << "Executing a changeStreamStates Operation";
 
         try
@@ -818,11 +806,11 @@ public:
                 }
             }
 
-            operationCookie->setCompleted();
+            mCb->ice_response();
         }
         catch (const std::exception& e)
         {
-            operationCookie->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+            mCb->ice_exception(e);
             assert(false);
         }
         return Complete;
@@ -830,7 +818,6 @@ public:
     
 private:
     AsteriskSCF::SessionCommunications::V1::AMD_SessionController_changeStreamStatesPtr mCb;
-    OperationContextPtr mOperationContext;
     AsteriskSCF::Media::V1::StreamStateDict mStreams;
     boost::shared_ptr<SIPSessionPriv> mImplPriv;
 };
@@ -840,29 +827,22 @@ class AddStreamsOperation : public SuspendableWork
 public:
     AddStreamsOperation(
         const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_addStreamsPtr& cb,
-        const OperationContextPtr& operationContext,
+        const OperationContextPtr& opContext,
         const AsteriskSCF::Media::V1::StreamInformationDict& streams,
         const boost::shared_ptr<SIPSessionPriv>& sessionPriv,
         const SIPSessionPtr& session)
-        : mCb(cb), mOperationContext(operationContext), mStreams(streams), mImplPriv(sessionPriv), mSession(session) { }
+        : mCb(cb), mOperationContext(opContext), mStreams(streams), mImplPriv(sessionPriv), mSession(session) { }
 
     SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
     {
-        AMDContextResultData<StreamInformationDict, AMD_SessionController_addStreamsPtr>::ptr_type operationCookie = 
-            getContext<AMDContextResultData<StreamInformationDict, AMD_SessionController_addStreamsPtr> >(
-                mImplPriv->mOperationContextCache,
-                mOperationContext,
-                mCb);
-        if (!operationCookie)
-        {
-            lg(Debug) << "addStreams() detected retry for operation " << mOperationContext->id;
-            return Complete;
-        }
-
         try
         {
             lg(Debug) << "Executing an addStreams Operation";
 
+            //
+            // XXX this is seriously screwy code!
+            //
+
             // If there is an outstanding transaction then no streams can be added at this time
             if (mImplPriv->mInviteSession->invite_tsx)
             {
@@ -883,7 +863,7 @@ public:
 
             // Store callback information so when the remote party responds with which streams were accepted we can
             // communicate it to the controller
-            mImplPriv->mAddStreamsCb = mCb;
+            mImplPriv->mAddStreamsCb = mCb; // XXX - this is wrong. And it should've been set before createSDPOffer() was called!
 
             // Okay, create and send the reinvite!
             pjsip_tx_data *packet = NULL;
@@ -897,12 +877,15 @@ public:
                 // If we couldn't create the reinvite no streams were added
                 lg(Warning) << "Unable to create reinvite when adding streams";
             }
-        
-            operationCookie->setResult(StreamInformationDict());
+       
+            //
+            // If the callback is supposed to happen in createSDPOffer, then this shouldn't be happening here!
+            //
+            mCb->ice_response(StreamInformationDict());
         }
         catch (const std::exception& e)
         {
-            operationCookie->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+            mCb->ice_exception(e);
             assert(false);
         }
         return Complete;
@@ -910,7 +893,7 @@ public:
 
 private:
     AsteriskSCF::SessionCommunications::V1::AMD_SessionController_addStreamsPtr mCb;
-    OperationContextPtr mOperationContext;
+    AsteriskSCF::System::V1::OperationContextPtr mOperationContext;
     AsteriskSCF::Media::V1::StreamInformationDict mStreams;
     boost::shared_ptr<SIPSessionPriv> mImplPriv;
     SIPSessionPtr mSession;
@@ -921,25 +904,13 @@ class RemoveStreamsOperation : public SuspendableWork
 public:
     RemoveStreamsOperation(
         const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_removeStreamsPtr& cb,
-        const OperationContextPtr& operationContext,
         const AsteriskSCF::Media::V1::StreamInformationDict& streams,
         const boost::shared_ptr<SIPSessionPriv>& sessionPriv,
         const SIPSessionPtr& session)
-        : mCb(cb), mOperationContext(operationContext), mStreams(streams), mImplPriv(sessionPriv), mSession(session) { }
+        : mCb(cb), mStreams(streams), mImplPriv(sessionPriv), mSession(session) { }
 
     SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
     {
-        AMDContextData<AMD_SessionController_removeStreamsPtr>::ptr_type operationCookie = 
-            getContext<AMDContextData<AMD_SessionController_removeStreamsPtr> >(
-                mImplPriv->mOperationContextCache,
-                mOperationContext,
-                mCb);
-        if (!operationCookie)
-        {
-            lg(Debug) << "removeStreamStates() detected retry for operation " << mOperationContext->id;
-            return Complete;
-        }
-
         try
         {
             lg(Debug) << "Executing a removeStreams Operation";
@@ -964,13 +935,16 @@ public:
             {
                 pjsip_inv_set_sdp_answer(mImplPriv->mInviteSession, sdp);
             }
-
-            operationCookie->setCompleted();
+            mCb->ice_response();
         }
         catch (const std::exception& e)
         {
-            operationCookie->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+            mCb->ice_exception(e);
+        }
+        catch (...)
+        {
             assert(false);
+            mCb->ice_exception();
         }
 
         return Complete;
@@ -978,7 +952,6 @@ public:
 
 private:
     AsteriskSCF::SessionCommunications::V1::AMD_SessionController_removeStreamsPtr mCb;
-    OperationContextPtr mOperationContext;
     AsteriskSCF::Media::V1::StreamInformationDict mStreams;
     boost::shared_ptr<SIPSessionPriv> mImplPriv;
     SIPSessionPtr mSession;
@@ -1040,31 +1013,67 @@ public:
                                   const OperationContextPtr& operationContext,
                                   const AsteriskSCF::Media::V1::StreamStateDict& states, const Ice::Current&)
     {
+        AMDContextData<AMD_SessionController_changeStreamStatesPtr>::ptr_type operationCookie = 
+            getContext<AMDContextData<AMD_SessionController_changeStreamStatesPtr> >(
+                mImplPriv->mOperationContextCache,
+                operationContext,
+                cb);
+        if (!operationCookie)
+        {
+            lg(Debug) << "changeStreamStates() detected retry for operation " << operationContext->id;
+            return;
+        }
         lg(Debug) << "Queueing changeStreamStates operation";
-        mSession->enqueueSessionWork(new ChangeStreamStatesOperation(cb, operationContext, states, mImplPriv));
+        mSession->enqueueSessionWork(new ChangeStreamStatesOperation(operationCookie->getProxy(),
+                states, mImplPriv));
     }
     
     void addStreams_async(const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_addStreamsPtr& cb,
                           const OperationContextPtr& operationContext,
                           const AsteriskSCF::Media::V1::StreamInformationDict& streams, const Ice::Current&)
     {
+        AMDContextResultData<StreamInformationDict, AMD_SessionController_addStreamsPtr>::ptr_type operationCookie = 
+            getContext<AMDContextResultData<StreamInformationDict, AMD_SessionController_addStreamsPtr> >(
+                mImplPriv->mOperationContextCache,
+                operationContext,
+                cb);
+        if (!operationCookie)
+        {
+            lg(Debug) << "addStreams() detected retry for operation " << operationContext->id;
+            return;
+        }
+
         lg(Debug) << "Queueing addStreams operation";
-        mSession->enqueueSessionWork(new AddStreamsOperation(cb, operationContext, streams, mImplPriv, mSession));
+        mSession->enqueueSessionWork(new AddStreamsOperation(operationCookie->getProxy(), operationContext,
+                streams, mImplPriv, mSession));
     }
     
     void removeStreams_async(const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_removeStreamsPtr& cb,
                              const OperationContextPtr& operationContext,
                              const AsteriskSCF::Media::V1::StreamInformationDict& streams, const Ice::Current&)
     {
+        AMDContextData<AMD_SessionController_removeStreamsPtr>::ptr_type operationCookie = 
+            getContext<AMDContextData<AMD_SessionController_removeStreamsPtr> >(
+                mImplPriv->mOperationContextCache,
+                operationContext,
+                cb);
+        if (!operationCookie)
+        {
+            lg(Debug) << "removeStreamStates() detected retry for operation " << operationContext->id;
+            return;
+        }
+
         lg(Debug) << "Queueing removeStreams operation";
-        mSession->enqueueSessionWork(new RemoveStreamsOperation(cb, operationContext, streams, mImplPriv, mSession));
+        mSession->enqueueSessionWork(new RemoveStreamsOperation(operationCookie->getProxy(),
+                streams, mImplPriv, mSession));
     }
     
     /**
      * This operation allows the externally connected component (typically the bridge)
      * to update this session's ConnectedLine information. 
      */
-    void updateConnectedLine(const OperationContextPtr& operationContext, const ConnectedLinePtr& connected, const Ice::Current&) 
+    void updateConnectedLine(const OperationContextPtr& operationContext, const ConnectedLinePtr& connected,
+        const Ice::Current&) 
     {
         ContextDataPtr operationCookie;
         if (!(operationCookie = Operations::checkAndThrow(mOperationContextCache, operationContext)))
@@ -1199,25 +1208,13 @@ class ConnectStreamsOperation : public SuspendableWork
 public:
     ConnectStreamsOperation(
         const AsteriskSCF::Media::V1::AMD_DirectMediaConnection_connectStreamsPtr& cb,
-        const OperationContextPtr& operationContext,
         const AsteriskSCF::Media::V1::DirectMediaConnectionDict& connections,
         const boost::shared_ptr<SIPSessionPriv>& implPriv,
         const SIPSessionPtr& session)
-        : mCb(cb), mOperationContext(operationContext), mConnections(connections), mImplPriv(implPriv), mSession(session) { }
+        : mCb(cb), mConnections(connections), mImplPriv(implPriv), mSession(session) { }
 
     SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
     {
-        AMDContextData<AMD_DirectMediaConnection_connectStreamsPtr>::ptr_type operationCookie = 
-            getContext<AMDContextData<AMD_DirectMediaConnection_connectStreamsPtr> >(
-                mImplPriv->mOperationContextCache,
-                mOperationContext,
-                mCb);
-        if (!operationCookie)
-        {
-            lg(Debug) << "connectStreams() detected retry for operation " << mOperationContext->id;
-            return Complete;
-        }
-
         try
         {
             if (mSession->getInviteSession()->invite_tsx)
@@ -1238,12 +1235,16 @@ public:
                 lg(Warning) << "Unable to create reinvite to connect streams";
             }
 
-            operationCookie->setCompleted();
+            mCb->ice_response();
         }
         catch (const std::exception& e)
         {
-            operationCookie->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+            mCb->ice_exception(e);
+        }
+        catch (...)
+        {
             assert(false);
+            mCb->ice_exception();
         }
 
         return Complete;
@@ -1251,7 +1252,6 @@ public:
 
 private:
     AsteriskSCF::Media::V1::AMD_DirectMediaConnection_connectStreamsPtr mCb;
-    OperationContextPtr mOperationContext;
     AsteriskSCF::Media::V1::DirectMediaConnectionDict mConnections;
     boost::shared_ptr<SIPSessionPriv> mImplPriv;
     const SIPSessionPtr mSession;
@@ -1262,25 +1262,13 @@ class DisconnectStreamsOperation : public SuspendableWork
 public:
     DisconnectStreamsOperation(
         const AsteriskSCF::Media::V1::AMD_DirectMediaConnection_disconnectStreamsPtr& cb,
-        const OperationContextPtr& operationContext,
         const Ice::StringSeq& streams,
         const boost::shared_ptr<SIPSessionPriv>& implPriv,
         const SIPSessionPtr& session)
-        : mCb(cb), mOperationContext(operationContext), mStreams(streams), mImplPriv(implPriv), mSession(session) { }
+        : mCb(cb), mStreams(streams), mImplPriv(implPriv), mSession(session) { }
 
     SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
     {
-        AMDContextData<AMD_DirectMediaConnection_disconnectStreamsPtr>::ptr_type operationCookie = 
-            getContext<AMDContextData<AMD_DirectMediaConnection_disconnectStreamsPtr> >(
-                mImplPriv->mOperationContextCache,
-                mOperationContext,
-                mCb);
-        if (!operationCookie)
-        {
-            lg(Debug) << "removeStreamStates() detected retry for operation " << mOperationContext->id;
-            return Complete;
-        }
-
         try
         {
             pjmedia_sdp_session *sdp = mSession->modifySDP(mStreams);
@@ -1295,12 +1283,16 @@ public:
                 lg(Warning) << "Unable to create reinvite to disconnect streams";
             }
 
-            operationCookie->setCompleted();
+            mCb->ice_response();
         }
         catch (const std::exception& e)
         {
-            operationCookie->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+            mCb->ice_exception(e);
+        }
+        catch (...)
+        {
             assert(false);
+            mCb->ice_exception();
         }
 
         return Complete;
@@ -1308,7 +1300,6 @@ public:
 
 private:
     AsteriskSCF::Media::V1::AMD_DirectMediaConnection_disconnectStreamsPtr mCb;
-    OperationContextPtr mOperationContext;
     Ice::StringSeq mStreams;
     boost::shared_ptr<SIPSessionPriv> mImplPriv;
     const SIPSessionPtr mSession;
@@ -1334,16 +1325,39 @@ public:
                               const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
                               const AsteriskSCF::Media::V1::DirectMediaConnectionDict& connections, const Ice::Current&)
     {
+        AMDContextData<AMD_DirectMediaConnection_connectStreamsPtr>::ptr_type operationCookie = 
+            getContext<AMDContextData<AMD_DirectMediaConnection_connectStreamsPtr> >(
+                mImplPriv->mOperationContextCache,
+                operationContext,
+                cb);
+        if (!operationCookie)
+        {
+            lg(Debug) << "connectStreams() detected retry for operation " << operationContext->id;
+            return;
+        }
         lg(Debug) << "Queueing connectStreams operation";
-        mSession->enqueueSessionWork(new ConnectStreamsOperation(cb, operationContext, connections, mImplPriv, mSession));
+        mSession->enqueueSessionWork(new ConnectStreamsOperation(operationCookie->getProxy(),
+                connections, mImplPriv, mSession));
     }
 
     void disconnectStreams_async(const AsteriskSCF::Media::V1::AMD_DirectMediaConnection_disconnectStreamsPtr& cb,
                                  const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
                                  const Ice::StringSeq& streams, const Ice::Current&)
     {
+        AMDContextData<AMD_DirectMediaConnection_disconnectStreamsPtr>::ptr_type operationCookie = 
+            getContext<AMDContextData<AMD_DirectMediaConnection_disconnectStreamsPtr> >(
+                mImplPriv->mOperationContextCache,
+                operationContext,
+                cb);
+        if (!operationCookie)
+        {
+            lg(Debug) << "removeStreamStates() detected retry for operation " << operationContext->id;
+            return;
+        }
+
         lg(Debug) << "Queueing disconnectStreams operation";
-        mSession->enqueueSessionWork(new DisconnectStreamsOperation(cb, operationContext, streams, mImplPriv, mSession));
+        mSession->enqueueSessionWork(new DisconnectStreamsOperation(operationCookie->getProxy(),
+                streams, mImplPriv, mSession));
     }
 
 private:
@@ -1600,6 +1614,10 @@ SIPSessionPtr SIPSession::create(
         hooks.push_back(oneShotHook);
     }
     newSession->activateIceObjects(hooks, config, isUAC);
+    //
+    // TODO: introduce try/catch block to destroy the newSession object and
+    // cleanup servants on exceptions during initialization.
+    //
 
     return newSession;
 }
@@ -1652,6 +1670,15 @@ SIPSessionPtr SIPSession::create(
         hooks.push_back(oneShotHook);
     }
     newSession->activateIceObjects(hooks, config, isUAC);
+    //
+    // TODO: introduce try/catch block to destroy the newSession object and
+    // cleanup servants on exceptions during initialization.  Also, a
+    // replica that fails to initialize standby objects is in a
+    // questionable state, so it may be desirable to bounce the replica. On
+    // the other hand, we could simply add the notion that replication is
+    // best-effort and that a certain subset of replication failures is
+    // allowable.
+    //
 
     return newSession;
 }
@@ -1724,35 +1751,27 @@ class AddListenerOperation : public SuspendableWork
 public:
     AddListenerOperation(
             const AsteriskSCF::SessionCommunications::V1::AMD_Session_addListenerPtr& cb,
-            const OperationContextPtr& operationContext,
             const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
             const boost::shared_ptr<SIPSessionPriv>& implPriv,
             const SIPSessionPtr& session)
-        : mCb(cb), mOperationContext(operationContext), mListener(listener), mImplPriv(implPriv), mSession(session) { }
+        : mCb(cb), mListener(listener), mImplPriv(implPriv), mSession(session) { }
 
     SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
     {
-        AMDContextResultData<SessionInfoPtr, AMD_Session_addListenerPtr>::ptr_type operationCookie = 
-            getContext<AMDContextResultData<SessionInfoPtr, AMD_Session_addListenerPtr> >(
-                mImplPriv->mOperationContextCache,
-                mOperationContext,
-                mCb);
-        if (!operationCookie)
-        {
-            lg(Debug) << "addListener() detected retry for operation " << mOperationContext->id;
-            return Complete;
-        }
-
         try
         {
             lg(Debug) << "Executing addListener operation";
             mSession->addListener(mListener);
-            operationCookie->setResult(mSession->getInfo());
+            mCb->ice_response(mSession->getInfo());
         }
         catch (const std::exception& e)
         {
-            operationCookie->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+            mCb->ice_exception(e);
+        }
+        catch (...)
+        {
             assert(false);
+            mCb->ice_exception();
         }
 
         return Complete;
@@ -1771,8 +1790,18 @@ void SIPSession::addListener_async(
         const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
         const Ice::Current&)
 {
+    AMDContextResultData<SessionInfoPtr, AMD_Session_addListenerPtr>::ptr_type operationCookie = 
+        getContext<AMDContextResultData<SessionInfoPtr, AMD_Session_addListenerPtr> >(
+            mImplPriv->mOperationContextCache,
+            operationContext,
+            cb);
+    if (!operationCookie)
+    {
+        lg(Debug) << "addListener() detected retry for operation " << operationContext->id;
+        return;
+    }
     lg(Debug) << "Queueing addListener operation";
-    enqueueSessionWork(new AddListenerOperation(cb, operationContext, listener, mImplPriv, this));
+    enqueueSessionWork(new AddListenerOperation(operationCookie->getProxy(), listener, mImplPriv, this));
 }
 
 void SIPSession::addListener(const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener)
@@ -1796,25 +1825,13 @@ class IndicateOperation : public SuspendableWork
 public:
     IndicateOperation(
             const AsteriskSCF::SessionCommunications::V1::AMD_Session_indicatePtr& cb,
-            const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
             const AsteriskSCF::SessionCommunications::V1::IndicationPtr& indication,
             const boost::shared_ptr<SIPSessionPriv>& sessionPriv,
             const SIPSessionPtr& session)
-        : mCb(cb), mOperationContext(operationContext), mIndication(indication), mImplPriv(sessionPriv), mSession(session) { }
+        : mCb(cb), mIndication(indication), mImplPriv(sessionPriv), mSession(session) { }
 
     SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
     {
-        AMDContextData<AMD_Session_indicatePtr>::ptr_type operationCookie = 
-            getContext<AMDContextData<AMD_Session_indicatePtr> >(
-                mImplPriv->mOperationContextCache,
-                mOperationContext,
-                mCb);
-        if (!operationCookie)
-        {
-            lg(Debug) << "indicate() detected retry for operation " << mOperationContext->id;
-            return Complete;
-        }
-
         try
         {
             lg(Debug) << "Executing indicate operation";
@@ -1872,12 +1889,16 @@ public:
                 pjsip_inv_send_msg(mImplPriv->mInviteSession, packet);
             }
 
-            operationCookie->setCompleted();
+            mCb->ice_response();
         }
         catch (const std::exception& e)
         {
-            operationCookie->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+            mCb->ice_exception(e);
+        }
+        catch (...)
+        {
             assert(false);
+            mCb->ice_exception();
         }
 
         return Complete;
@@ -1915,6 +1936,9 @@ private:
                 connectedName = connectedNumber;
             }
             pj_str_t paiValue;
+            //
+            // XXX: I strongly distrust this method of accessing something that can be modified in another thread!
+            //
             SIPEndpointConfig &config = mSession->getEndpoint()->getConfig();
             std::string ss = "\"" + connectedName + "\" <" + connectedNumber + "@" + config.sessionConfig.sourceAddress + ">;party=called";
             pj_strset(&paiValue, (char *) ss.c_str(), ss.length());
@@ -1954,8 +1978,19 @@ void SIPSession::indicate_async(
         const AsteriskSCF::SessionCommunications::V1::IndicationPtr& indication,
         const Ice::Current&)
 {
+    AMDContextData<AMD_Session_indicatePtr>::ptr_type operationCookie = 
+        getContext<AMDContextData<AMD_Session_indicatePtr> >(
+            mImplPriv->mOperationContextCache,
+            operationContext,
+            cb);
+    if (!operationCookie)
+    {
+        lg(Debug) << "indicate() detected retry for operation " << operationContext->id;
+        return;
+    }
+
     lg(Debug) << "Queuing an indicate operation";
-    enqueueSessionWork(new IndicateOperation(cb, operationContext, indication, mImplPriv, this));
+    enqueueSessionWork(new IndicateOperation(operationCookie->getProxy(), indication, mImplPriv, this));
 }
 
 class GetEndpointOperation : public SuspendableWork
@@ -1985,6 +2020,11 @@ void SIPSession::getEndpoint_async(
         const AsteriskSCF::SessionCommunications::V1::AMD_Session_getEndpointPtr& cb,
         const Ice::Current&)
 {
+    //
+    // TODO: AFAICT, the endpoint proxy for a session is immutable, in which
+    // case there is no real need to queue this work, we should be able to
+    // return immediately.
+    //
     lg(Debug) << "Queuing a getEndpoint operation";
     enqueueSessionWork(new GetEndpointOperation(cb, mImplPriv));
 }
@@ -2176,22 +2216,11 @@ public:
 
     SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
     {
-        AMDContextResultData<SessionControllerPrx, AMD_Session_setAndGetSessionControllerPtr>::ptr_type operationCookie = 
-            getContext<AMDContextResultData<SessionControllerPrx, AMD_Session_setAndGetSessionControllerPtr> >(
-                mImplPriv->mOperationContextCache,
-                mOperationContext,
-                mCb);
-        if (!operationCookie)
-        {
-            lg(Debug) << "getAndSetSessionController() detected retry for operation " << mOperationContext->id;
-            return Complete;
-        }
-
         lg(Debug) << "Executing a SetAndGetSessionController operation";
             
         if (mImplPriv->mSessionController)
         {
-            operationCookie->setException(AsteriskSCF::SessionCommunications::V1::ControllerAlreadySet());
+            mCb->ice_exception(AsteriskSCF::SessionCommunications::V1::ControllerAlreadySet());
             return Complete;
         }
 
@@ -2217,12 +2246,16 @@ public:
                 lg(Info) << "Unable to set ConnectedLine party info. No Id configured for endpoint " << mImplPriv->mEndpoint->getName(); 
             }
 
-            operationCookie->setResult(mImplPriv->mOurSessionControllerProxy);
+            mCb->ice_response(mImplPriv->mOurSessionControllerProxy);
         }
         catch (const std::exception& e)
         {
-            operationCookie->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+            mCb->ice_exception(e);
+        }
+        catch (...)
+        {
             assert(false);
+            mCb->ice_exception();
         }
 
         return Complete;
@@ -2244,8 +2277,18 @@ void SIPSession::setAndGetSessionController_async(
     const AsteriskSCF::SessionCommunications::V1::SessionControllerPrx& controller,
     const Ice::Current&)
 {
+    AMDContextResultData<SessionControllerPrx, AMD_Session_setAndGetSessionControllerPtr>::ptr_type operationCookie = 
+        getContext<AMDContextResultData<SessionControllerPrx, AMD_Session_setAndGetSessionControllerPtr> >(
+            mImplPriv->mOperationContextCache,
+            operationContext,
+            cb);
+    if (!operationCookie)
+    {
+        lg(Debug) << "getAndSetSessionController() detected retry for operation " << operationContext->id;
+        return;
+    }
     lg(Debug) << "queueing a setAndGetSessionController operation";
-    enqueueSessionWork(new SetAndGetSessionControllerOperation(cb, operationContext, controller, mImplPriv));
+    enqueueSessionWork(new SetAndGetSessionControllerOperation(operationCookie->getProxy(), operationContext, controller, mImplPriv));
 }
 
 class RemoveSessionControllerOperation : public SuspendableWork
@@ -2253,24 +2296,12 @@ class RemoveSessionControllerOperation : public SuspendableWork
 public:
     RemoveSessionControllerOperation(
         const AsteriskSCF::SessionCommunications::V1::AMD_Session_removeSessionControllerPtr& cb,
-        const OperationContextPtr& operationContext,
         const AsteriskSCF::SessionCommunications::V1::SessionControllerPrx& controller,
         const boost::shared_ptr<SIPSessionPriv>& sessionPriv)
-        : mCb(cb), mOperationContext(operationContext), mController(controller), mImplPriv(sessionPriv) { }
+        : mCb(cb), mController(controller), mImplPriv(sessionPriv) { }
 
     SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
     {
-        AMDContextData<AMD_Session_removeSessionControllerPtr>::ptr_type operationCookie = 
-            getContext<AMDContextData<AMD_Session_removeSessionControllerPtr> >(
-                mImplPriv->mOperationContextCache,
-                mOperationContext,
-                mCb);
-        if (!operationCookie)
-        {
-            lg(Debug) << "removeSessionController() detected retry for operation " << mOperationContext->id;
-            return Complete;
-        }
-
         lg(Debug) << "Executing a RemoveSessionController operation";
 
         if (mImplPriv->mSessionController == mController)
@@ -2278,13 +2309,12 @@ public:
             mImplPriv->mSessionController = 0;
         }
 
-        operationCookie->setCompleted();
+        mCb->ice_response();
         return Complete;
     }
 
 private:
     AsteriskSCF::SessionCommunications::V1::AMD_Session_removeSessionControllerPtr mCb;
-    OperationContextPtr mOperationContext;
     AsteriskSCF::SessionCommunications::V1::SessionControllerPrx mController;
     boost::shared_ptr<SIPSessionPriv> mImplPriv;
 };
@@ -2298,8 +2328,18 @@ void SIPSession::removeSessionController_async(
     const AsteriskSCF::SessionCommunications::V1::SessionControllerPrx& controller,
     const Ice::Current&)
 {
+    AMDContextData<AMD_Session_removeSessionControllerPtr>::ptr_type operationCookie = 
+        getContext<AMDContextData<AMD_Session_removeSessionControllerPtr> >(
+            mImplPriv->mOperationContextCache,
+            operationContext,
+            cb);
+    if (!operationCookie)
+    {
+        lg(Debug) << "removeSessionController() detected retry for operation " << operationContext->id;
+        return;
+    }
     lg(Debug) << "queueing a removeSessionController operation";
-    enqueueSessionWork(new RemoveSessionControllerOperation(cb, operationContext, controller, mImplPriv));
+    enqueueSessionWork(new RemoveSessionControllerOperation(operationCookie->getProxy(), controller, mImplPriv));
 }
 
 class SetBridgeOperation : public SuspendableWork
@@ -2307,41 +2347,33 @@ class SetBridgeOperation : public SuspendableWork
 public:
     SetBridgeOperation(
             const AsteriskSCF::SessionCommunications::V1::AMD_Session_setBridgePtr& cb,
-            const OperationContextPtr& operationContext,
             const SIPSessionPtr& session,
             const AsteriskSCF::SessionCommunications::V1::BridgePrx& bridge,
             const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
             const boost::shared_ptr<SIPSessionPriv>& sessionPriv,
             const Ice::Current& current)
         : mCb(cb), 
-          mOperationContext(operationContext), 
           mSession(session), 
           mBridge(bridge), mListener(listener), mImplPriv(sessionPriv), mCurrent(current) { }
     
     SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
     {
-        AMDContextResultData<SessionInfoPtr, AMD_Session_setBridgePtr>::ptr_type operationCookie = 
-            getContext<AMDContextResultData<SessionInfoPtr, AMD_Session_setBridgePtr> >(
-                mImplPriv->mOperationContextCache,
-                mOperationContext,
-                mCb);
-        if (!operationCookie)
-        {
-            lg(Debug) << "setBridge() detected retry for operation " << mOperationContext->id;
-            return Complete;
-        }
 
         try
         {
             lg(Debug) << "Executing a SetBridge operation";
             mSession->setBridge(mBridge);
             mSession->addListener(mListener);
-            operationCookie->setResult(mSession->getInfo());
+            mCb->ice_response(mSession->getInfo());
         }
         catch (const std::exception& e)
         {
-            operationCookie->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+            mCb->ice_exception(e);
+        }
+        catch (...)
+        {
             assert(false);
+            mCb->ice_exception();
         }
 
         return Complete;
@@ -2349,7 +2381,6 @@ public:
 
 private:
     AsteriskSCF::SessionCommunications::V1::AMD_Session_setBridgePtr mCb;
-    OperationContextPtr mOperationContext;
     SIPSessionPtr mSession;
     AsteriskSCF::SessionCommunications::V1::BridgePrx mBridge;
     AsteriskSCF::SessionCommunications::V1::SessionListenerPrx mListener;
@@ -2367,8 +2398,18 @@ void SIPSession::setBridge_async(
     const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
     const Ice::Current& current)
 {
+    AMDContextResultData<SessionInfoPtr, AMD_Session_setBridgePtr>::ptr_type operationCookie = 
+        getContext<AMDContextResultData<SessionInfoPtr, AMD_Session_setBridgePtr> >(
+            mImplPriv->mOperationContextCache,
+            operationContext,
+            cb);
+    if (!operationCookie)
+    {
+        lg(Debug) << "setBridge() detected retry for operation " << operationContext->id;
+        return;
+    }
     lg(Debug) << "queuing a setBridge operation";
-    enqueueSessionWork(new SetBridgeOperation(cb, operationContext, this, bridge, listener, mImplPriv, current));
+    enqueueSessionWork(new SetBridgeOperation(operationCookie->getProxy(), this, bridge, listener, mImplPriv, current));
 }
 
 /**
@@ -2389,45 +2430,36 @@ class RemoveBridgeOperation : public SuspendableWork
 public:
     RemoveBridgeOperation(
             const AsteriskSCF::SessionCommunications::V1::AMD_Session_removeBridgePtr& cb,
-            const OperationContextPtr& operationContext,
             const SIPSessionPtr& session,
             const boost::shared_ptr<SIPSessionPriv>& sessionPriv,
             const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
             const Ice::Current& current)
         : mCb(cb), 
-        mOperationContext(operationContext), 
         mSession(session), mImplPriv(sessionPriv), mListener(listener), mCurrent(current) { }
 
     SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
     {
-        AMDContextData<AMD_Session_removeBridgePtr>::ptr_type operationCookie = 
-            getContext<AMDContextData<AMD_Session_removeBridgePtr> >(
-                mImplPriv->mOperationContextCache,
-                mOperationContext,
-                mCb);
-        if (!operationCookie)
-        {
-            lg(Debug) << "connectStreams() detected retry for operation " << mOperationContext->id;
-            return Complete;
-        }
-
         try
         {
             if (mImplPriv->mBridge == 0)
             {
-                operationCookie->setException(AsteriskSCF::SessionCommunications::V1::NotBridged());
+                mCb->ice_exception(AsteriskSCF::SessionCommunications::V1::NotBridged());
                 return Complete;
             }
 
             mSession->setBridge(0);
 
             mImplPriv->removeListener(mListener);
-            operationCookie->setCompleted();
+            mCb->ice_response();
         }
         catch (const std::exception& e)
         {
-            operationCookie->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+            mCb->ice_exception(e);
+        }
+        catch (...)
+        {
             assert(false);
+            mCb->ice_exception();
         }
 
         return Complete;
@@ -2435,7 +2467,6 @@ public:
... 338 lines suppressed ...


-- 
asterisk-scf/release/sip.git



More information about the asterisk-scf-commits mailing list