[asterisk-scf-commits] team/ken.hunt/route_async_routing.git branch "route_async" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Wed Dec 22 19:43:49 UTC 2010


branch "route_async" has been updated
       via  fbb061f636eff41df0a87504b6fbb7cde81d6b37 (commit)
      from  d74afeba80c8b862e7021b8378bf38f92565d317 (commit)

Summary of changes:
 src/EndpointRegistry.cpp             |   10 ++++
 src/RoutingServiceEventPublisher.cpp |   81 ++++++++++++++++++++-------------
 src/SessionRouter.cpp                |   37 ++++++++++-----
 3 files changed, 84 insertions(+), 44 deletions(-)


- Log -----------------------------------------------------------------
commit fbb061f636eff41df0a87504b6fbb7cde81d6b37
Author: Ken Hunt <ken.hunt at digium.com>
Date:   Wed Dec 22 13:43:11 2010 -0600

    Fixed deadlock issue in Event Publisher.

diff --git a/src/EndpointRegistry.cpp b/src/EndpointRegistry.cpp
index 79b96b6..8c9ba89 100644
--- a/src/EndpointRegistry.cpp
+++ b/src/EndpointRegistry.cpp
@@ -145,6 +145,11 @@ public:
         }
     }
 
+    ~LookupResultCollector()
+    {
+        lg(Debug) << "LookupResultCollector being destroyed. ";
+    }
+
     /**
      * Collect results of AMI lookups from multiple EndpointLocators. 
      */
@@ -221,6 +226,11 @@ public:
     {
     }
 
+    ~LookupCallback()
+    {
+        lg(Debug) << "LookupCallback being destroyed. ";
+    }
+
     void lookupResult(const EndpointSeq& endpoints)
     {
         // delegation to thread safe object
diff --git a/src/RoutingServiceEventPublisher.cpp b/src/RoutingServiceEventPublisher.cpp
index 73cbc18..9bceb87 100644
--- a/src/RoutingServiceEventPublisher.cpp
+++ b/src/RoutingServiceEventPublisher.cpp
@@ -44,6 +44,7 @@ public:
     RoutingServiceEventPublisherPriv(const Ice::ObjectAdapterPtr& adapter) :
         mAdapter(adapter), mInitialized(false)
     {
+        boost::lock_guard<boost::mutex> lock(mLock); 
         initialize();
     }
 
@@ -91,7 +92,17 @@ public:
         }
 
         Ice::ObjectPrx publisher = topic->getPublisher();
-        mEventTopic = Event::RoutingEventsPrx::uncheckedCast(publisher);
+        Ice::ObjectPrx oneway;
+        try
+        {
+            oneway = publisher->ice_oneway();
+        }
+        catch (const Ice::NoEndpointException&)
+        {
+            assert(0); // All operations callable via icestorm must support oneway. 
+        }
+
+        mEventTopic = Event::RoutingEventsPrx::uncheckedCast(oneway);
 
         mInitialized = true;
     }
@@ -118,7 +129,7 @@ public:
     }
 
 public:
-    Event::RoutingEventsPrx mEventTopic;
+    Event::RoutingEventsPrx mEventTopic; // Using one-way proxy. 
     boost::mutex mLock;
 
 private:
@@ -140,11 +151,12 @@ RoutingServiceEventPublisher::RoutingServiceEventPublisher(const Ice::ObjectAdap
 void RoutingServiceEventPublisher::lookupEvent(const std::string& destination,
     AsteriskSCF::Core::Routing::V1::Event::OperationResult result, const Ice::Current &)
 {
-    boost::lock_guard<boost::mutex> lock(mImpl->mLock); 
-
-    if (!mImpl->isInitialized())
-    {
-        return;
+    { // scope for the lock
+        boost::lock_guard<boost::mutex> lock(mImpl->mLock); 
+        if (!mImpl->isInitialized())
+        {
+            return;
+        }
     }
 
     try
@@ -165,11 +177,12 @@ void RoutingServiceEventPublisher::addEndpointLocatorEvent(const std::string& lo
     const ::AsteriskSCF::Core::Routing::V1::RegExSeq& regexList, AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
     const Ice::Current &)
 {
-    boost::lock_guard<boost::mutex> lock(mImpl->mLock); 
-
-    if (!mImpl->isInitialized())
-    {
-        return;
+    { // scope for the lock
+        boost::lock_guard<boost::mutex> lock(mImpl->mLock); 
+        if (!mImpl->isInitialized())
+        {
+            return;
+        }
     }
 
     try
@@ -188,11 +201,12 @@ void RoutingServiceEventPublisher::addEndpointLocatorEvent(const std::string& lo
 void RoutingServiceEventPublisher::removeEndpointLocatorEvent(const std::string& locatorId,
     AsteriskSCF::Core::Routing::V1::Event::OperationResult result, const Ice::Current &)
 {
-    boost::lock_guard<boost::mutex> lock(mImpl->mLock); 
-
-    if (!mImpl->isInitialized())
-    {
-        return;
+    { // scope for the lock
+        boost::lock_guard<boost::mutex> lock(mImpl->mLock); 
+        if (!mImpl->isInitialized())
+        {
+            return;
+        }
     }
 
     try
@@ -212,11 +226,12 @@ void RoutingServiceEventPublisher::setEndpointLocatorDestinationIdsEvent(const s
     const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList, AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
     const Ice::Current &)
 {
-    boost::lock_guard<boost::mutex> lock(mImpl->mLock); 
-
-    if (!mImpl->isInitialized())
-    {
-        return;
+    { // scope for the lock
+        boost::lock_guard<boost::mutex> lock(mImpl->mLock); 
+        if (!mImpl->isInitialized())
+        {
+            return;
+        }
     }
 
     try
@@ -234,11 +249,12 @@ void RoutingServiceEventPublisher::setEndpointLocatorDestinationIdsEvent(const s
  */
 void RoutingServiceEventPublisher::clearEndpointLocatorsEvent(const Ice::Current &)
 {
-    boost::lock_guard<boost::mutex> lock(mImpl->mLock); 
-
-    if (!mImpl->isInitialized())
-    {
-        return;
+    { // scope for the lock
+        boost::lock_guard<boost::mutex> lock(mImpl->mLock); 
+        if (!mImpl->isInitialized())
+        {
+            return;
+        }
     }
 
     try
@@ -256,11 +272,12 @@ void RoutingServiceEventPublisher::clearEndpointLocatorsEvent(const Ice::Current
  */
 void RoutingServiceEventPublisher::setPolicyEvent(const std::string& policy, const Ice::Current &)
 {
-    boost::lock_guard<boost::mutex> lock(mImpl->mLock); 
-
-    if (!mImpl->isInitialized())
-    {
-        return;
+    { // scope for the lock
+        boost::lock_guard<boost::mutex> lock(mImpl->mLock); 
+        if (!mImpl->isInitialized())
+        {
+            return;
+        }
     }
 
     try
diff --git a/src/SessionRouter.cpp b/src/SessionRouter.cpp
index b160069..fb10b26 100644
--- a/src/SessionRouter.cpp
+++ b/src/SessionRouter.cpp
@@ -396,12 +396,13 @@ struct SessionContext
 {
 public:
     /**
-     * Constructor. The BridgeManager isn't initialized, but configured via a setter.
+     * Constructor. The BridgeManager isn't initialized. It's configured later via a setter
+     * due to component initialization sequence. 
      */
     SessionContext(const Ice::ObjectAdapterPtr& adapter,
                             const EndpointRegistryPtr& registry,
                             const RoutingEventsPtr& publisher,
-                            const boost::shared_ptr<WorkQueue> &mWorkQueue)
+                            const boost::shared_ptr<WorkQueue>& workQueue)
                                   :  adapter(adapter),
                                      endpointRegistry(registry),
                                      eventPublisher(publisher),
@@ -412,8 +413,8 @@ public:
     Ice::ObjectAdapterPtr adapter;
     EndpointRegistryPtr endpointRegistry;
     RoutingEventsPtr eventPublisher;
-    AsteriskSCF::SmartProxy::SmartProxy<BridgeManagerPrx> bridgeManager;
     boost::shared_ptr<WorkQueue> workQueue;
+    AsteriskSCF::SmartProxy::SmartProxy<BridgeManagerPrx> bridgeManager;
 };
 
 /** 
@@ -687,6 +688,12 @@ protected: // These protected operations are utiltity functions.
         return removedSessions;
     }
 
+    void setState(const boost::function<void ()>& stateHandler, std::string stateName)
+    {
+        lg(Debug) << "Operation setting new state handler " << stateName;
+        mCurrentStateHandler = stateHandler; 
+    }
+
 protected:
     T mInitiatorCallback;
     SessionContext mSessionContext;
@@ -696,6 +703,8 @@ protected:
     EndpointSeq mLookupResult;
     SessionListenerAllocatorPtr mListenerManager;
     OperationsManager* mOperationsManager;
+
+private:
     boost::function<void ()> mCurrentStateHandler;     // Lightweight state machine. Current state handles doWork() for a given state.
 
 }; // class SessionRouterOperation
@@ -756,7 +765,9 @@ private:
         mListenerManager = listener;
 
         // Set the state to exectute once we've looked up our endpoints. 
-        mCurrentStateHandler = boost::bind(&RouteSessionOperation::establishBridgeState, this);
+        setState(boost::bind(&RouteSessionOperation::establishBridgeState, this), "establishBridgeState");
+
+        // mCurrentStateHandler = boost::bind(&RouteSessionOperation::establishBridgeState, this);
 
         // Lookup the destination. This will use AMI, and the callback should 
         // schedule us to execute again. 
@@ -777,10 +788,10 @@ private:
             return;
         }
 
-        assert(mEndpoints.size() > 0);
+        assert(mLookupResult.size() > 0);
 
         // Add a session to the endpoints.
-        SessionSeq newSessions = createSessionForEndpoints(mEndpoints, mDestination);
+        SessionSeq newSessions = createSessionForEndpoints(mLookupResult, mDestination);
 
         if (mListenerManager->getListener()->getNumSessions() < 2)
         {
@@ -834,9 +845,6 @@ private:
     string mDestination;
     ::Ice::Current mIceCurrent;
     
-    // Implementation state
-    EndpointSeq mEndpoints;
-
 }; // class RouteSessionOperation
 
 /**
@@ -893,7 +901,8 @@ private:
         lg(Debug) << "connectBridgedSessionsWithDestination(): Routing destination " << mDestination;
 
         // Set the state to exectute after lookup. 
-        mCurrentStateHandler = boost::bind(&ConnectBridgedSessionsWithDestinationOperation::establishBridgeState, this);
+        setState(boost::bind(&ConnectBridgedSessionsWithDestinationOperation::establishBridgeState, this), "establishBridgeState");
+       // mCurrentStateHandler = boost::bind(&ConnectBridgedSessionsWithDestinationOperation::establishBridgeState, this);
 
         // Lookup the destination. This will use AMI, and the callback should 
         // schedule us to execute again. 
@@ -914,7 +923,7 @@ private:
         }
 
         // Add a session 
-        SessionSeq newSessions = createSessionForEndpoints(mEndpoints, mDestination);
+        SessionSeq newSessions = createSessionForEndpoints(mLookupResult, mDestination);
 
         if (mListenerManager->getListener()->getNumSessions() < 2)
         {
@@ -962,7 +971,6 @@ private:
     ::Ice::Current mIceCurrent;
 
     // Implementation state
-    EndpointSeq mEndpoints;
     BridgePrx mBridge;
     SessionSeq mRemainingSessions;
 
@@ -1077,6 +1085,11 @@ public:
     {
     }
 
+    ~LookupCallback()
+    {
+        lg(Debug) << "LookupCallback destroyed."; 
+    }
+
 public: // Overrides. 
 
     virtual void ice_exception(const ::std::exception& e)

-----------------------------------------------------------------------


-- 
team/ken.hunt/route_async_routing.git



More information about the asterisk-scf-commits mailing list