[asterisk-scf-commits] asterisk-scf/integration/bridging.git branch "master" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Thu Oct 7 08:45:55 CDT 2010


branch "master" has been updated
       via  5ce9156b993ccb882d0dbd73f110017435b1a2aa (commit)
      from  0c08ed88b9103dec20836413cd075fae05f091c9 (commit)

Summary of changes:
 src/BridgeListenerMgr.cpp        |    2 +-
 src/BridgeManagerImpl.cpp        |   19 +++-
 src/BridgeManagerImpl.h          |    1 +
 src/BridgeManagerListenerMgr.cpp |    2 +-
 src/ListenerManager.h            |  228 ++++++++++++++++++++++++++------------
 src/Service.cpp                  |    4 +-
 6 files changed, 175 insertions(+), 81 deletions(-)


- Log -----------------------------------------------------------------
commit 5ce9156b993ccb882d0dbd73f110017435b1a2aa
Author: Brent Eagles <beagles at digium.com>
Date:   Thu Oct 7 11:13:35 2010 -0230

    Added an optional background initializer thread for the listener manager
    template.  The Bridge manager will now behave as if it is suspended if the
    listener subsystem (ie. IceStorm) is unavailable. The listener manager will
    continuously attempt to initialize until the manager is shutdown or it
    successfully contacts the IceStorm service. The attempt interval is currently
    hardcoded at 15 seconds, but should be made configurable.

diff --git a/src/BridgeListenerMgr.cpp b/src/BridgeListenerMgr.cpp
index c89d808..aca21c4 100644
--- a/src/BridgeListenerMgr.cpp
+++ b/src/BridgeListenerMgr.cpp
@@ -10,7 +10,7 @@
 AsteriskSCF::BridgeService::BridgeListenerMgr::BridgeListenerMgr(const Ice::CommunicatorPtr& comm,
         const std::string& name,
         const AsteriskSCF::SessionCommunications::Bridging::V1::BridgePrx& bridgeProxy) :
-    ListenerManagerT<AsteriskSCF::SessionCommunications::Bridging::V1::BridgeListenerPrx>(comm, name),
+    ListenerManagerT<AsteriskSCF::SessionCommunications::Bridging::V1::BridgeListenerPrx>(comm, name, false),
     mPrx(bridgeProxy)
 {
 }
diff --git a/src/BridgeManagerImpl.cpp b/src/BridgeManagerImpl.cpp
index 5d1671b..8d5cb13 100644
--- a/src/BridgeManagerImpl.cpp
+++ b/src/BridgeManagerImpl.cpp
@@ -68,6 +68,15 @@ AsteriskSCF::BridgeService::BridgeManagerImpl::BridgeManagerImpl(
     mListeners = new AsteriskSCF::BridgeService::BridgeManagerListenerMgr(mAdapter->getCommunicator(), mName, mSourceProxy);
 }
 
+AsteriskSCF::BridgeService::BridgeManagerImpl::~BridgeManagerImpl()
+{
+    if(mListeners)
+    {
+        mListeners->stop();
+    }
+    mListeners = 0;
+}
+
 AsteriskSCF::SessionCommunications::Bridging::V1::BridgePrx AsteriskSCF::BridgeService::BridgeManagerImpl::createBridge(
   const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions,
   const AsteriskSCF::SessionCommunications::Bridging::V1::BridgeListenerPrx& listener,
@@ -81,7 +90,7 @@ AsteriskSCF::SessionCommunications::Bridging::V1::BridgePrx AsteriskSCF::BridgeS
         lg(Debug) << __FUNCTION__ << ": called when shutting down." ;
         throw AsteriskSCF::System::Component::V1::ShuttingDown();
     }
-    if(mSuspended)
+    if(mSuspended || mListeners->isSuspended())
     {
         lg(Debug) << __FUNCTION__ << ": called when suspended." ;
         throw AsteriskSCF::System::Component::V1::Suspended();
@@ -123,7 +132,7 @@ void AsteriskSCF::BridgeService::BridgeManagerImpl::addListener(const SessionCom
         lg(Debug) << __FUNCTION__ << ": called when shutting down." ;
         throw AsteriskSCF::System::Component::V1::ShuttingDown();
     }
-    if(mSuspended)
+    if(mSuspended || mListeners->isSuspended())
     {
         lg(Debug) << __FUNCTION__ << ": called when suspended." ;
         throw AsteriskSCF::System::Component::V1::Suspended();
@@ -144,7 +153,7 @@ void AsteriskSCF::BridgeService::BridgeManagerImpl::removeListener(const Session
         lg(Debug) << __FUNCTION__ << ": called when shutting down." ;
         throw AsteriskSCF::System::Component::V1::ShuttingDown();
     }
-    if(mSuspended)
+    if(mSuspended || mListeners->isSuspended())
     {
         lg(Debug) << __FUNCTION__ << ": called when suspended." ;
         throw AsteriskSCF::System::Component::V1::Suspended();
@@ -183,7 +192,7 @@ void AsteriskSCF::BridgeService::BridgeManagerImpl::shutdown(const Ice::Current&
         lg(Debug) << __FUNCTION__ << ": called when suspended." ;
         throw AsteriskSCF::System::Component::V1::Suspended();
     }
-    if(mListeners)
+    if(mListeners && !mListeners->isSuspended())
     {
         mListeners->stopping();
     }
@@ -195,7 +204,7 @@ void AsteriskSCF::BridgeService::BridgeManagerImpl::shutdown(const Ice::Current&
         std::for_each(mBridges.begin(), mBridges.end(), AsteriskSCF::BridgeService::BridgeMgrShutdownImpl(current));
     }
 
-    if(mListeners)
+    if(mListeners && !mListeners->isSuspended())
     {
         mListeners->stopped();
     }
diff --git a/src/BridgeManagerImpl.h b/src/BridgeManagerImpl.h
index 5213532..88f34f4 100644
--- a/src/BridgeManagerImpl.h
+++ b/src/BridgeManagerImpl.h
@@ -24,6 +24,7 @@ namespace BridgeService
     public:
         
         BridgeManagerImpl(const Ice::ObjectAdapterPtr& adapter, const std::string& name, const AsteriskSCF::SessionCommunications::Bridging::V1::BridgeManagerPrx& prx);
+        ~BridgeManagerImpl();
                 
         //
         // AsteriskSCF::SessionCommunications::Bridging::V1::BridgeManager Interface
diff --git a/src/BridgeManagerListenerMgr.cpp b/src/BridgeManagerListenerMgr.cpp
index 107ad2e..725aa26 100644
--- a/src/BridgeManagerListenerMgr.cpp
+++ b/src/BridgeManagerListenerMgr.cpp
@@ -10,7 +10,7 @@
 
 AsteriskSCF::BridgeService::BridgeManagerListenerMgr::BridgeManagerListenerMgr(const Ice::CommunicatorPtr& communicator, const std::string& name,
         const AsteriskSCF::SessionCommunications::Bridging::V1::BridgeManagerPrx& source) :
-    AsteriskSCF::BridgeService::ListenerManagerT<AsteriskSCF::SessionCommunications::Bridging::V1::BridgeManagerListenerPrx>(communicator, name),
+    AsteriskSCF::BridgeService::ListenerManagerT<AsteriskSCF::SessionCommunications::Bridging::V1::BridgeManagerListenerPrx>(communicator, name, true),
     mPrx(source)
 {
 }
diff --git a/src/ListenerManager.h b/src/ListenerManager.h
index f1543aa..e034e5d 100644
--- a/src/ListenerManager.h
+++ b/src/ListenerManager.h
@@ -14,6 +14,7 @@
 #include <algorithm>
 #include <vector>
 #include "InternalExceptions.h"
+#include <IceUtil/Thread.h>
 
 namespace AsteriskSCF
 {
@@ -27,83 +28,60 @@ namespace BridgeService
     {
         typedef std::vector<T> ListenerSeq;
         typename std::vector<T>::iterator ListenerIter;
-    public:
-        ListenerManagerT(const Ice::CommunicatorPtr& communicator, const std::string& topicName) :
-            mCommunicator(communicator),
-            mTopicName(topicName)
-        {
-            //
-            // TODO: While this is being concocted for a single component, it would make more sense
-            // to have the topic manager passed in during construction for more general usage.
-            //
-            const std::string propertyName = "TopicManager.Proxy";
-            std::string topicManagerProperty = mCommunicator->getProperties()->getProperty(propertyName);
-            if(topicManagerProperty.size() == 0)
-            {
-                throw ConfigException(propertyName, "Topic manager proxy property missing. "
-                        "Unable to initialize listener support.");
-            }
-
-            try
-            {
-                mTopicManager = IceStorm::TopicManagerPrx::checkedCast(mCommunicator->stringToProxy(topicManagerProperty));
-            }
-            catch(const Ice::Exception&)
-            {
-            }
-            if(!mTopicManager)
-            {
-                throw ConfigException(propertyName, "Topic manager proxy is not a valid proxy or is unreachable");
-            }
 
-            try
-            {
-                mTopic = mTopicManager->retrieve(mTopicName);
-            }
-            catch(const IceStorm::NoSuchTopic&)
+        class InitializationThread : public IceUtil::Thread
+        {
+        public:
+            InitializationThread(const typename IceUtil::Handle<ListenerManagerT>& mgr) : 
+                mMgr(mgr),
+                mStopped(false)
             {
             }
 
-            if(!mTopic)
+            void run()
             {
-                try
-                {
-                    mTopic = mTopicManager->create(mTopicName);
-                }
-                catch(const IceStorm::TopicExists&)
+                bool initialized = false;
+                IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
+                while(!mStopped && !initialized)
                 {
                     //
-                    // In case there is a race condition when creating the topic.
+                    // TODO: Make configurable.
                     //
-                    mTopic = mTopicManager->retrieve(mTopicName);
+                    mMonitor.timedWait(IceUtil::Time::seconds(15));
+                    initialized = mMgr->init();
                 }
             }
 
-            if(!mTopic)
+            void stop()
             {
-                throw ConfigException(propertyName,
-                        std::string("unable to create topic with the provided configuration :") + mTopicName);
+                IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
+                mStopped = true;
+                mMonitor.notify();
             }
-            mPublisher = T::uncheckedCast(mTopic->getPublisher());
-        }
 
-        virtual ~ListenerManagerT()
+        private:
+            IceUtil::Monitor<IceUtil::Mutex> mMonitor;
+            typename IceUtil::Handle<ListenerManagerT> mMgr;
+            bool mStopped;
+        };
+
+    public:
+        ListenerManagerT(const Ice::CommunicatorPtr& communicator, const std::string& topicName, bool enableBackgroundInit) :
+            mCommunicator(communicator),
+            mTopicName(topicName),
+            mInitialized(false)
         {
-            //
-            // TODO: 
-            //
-            if(mTopic)
+            try
             {
-                try
-                {
-                    mTopic->destroy();
-                }
-                catch(...)
-                {
-                    //
-                    // Destructors are no-throw!
-                    //
-                }
+                init();
+            }
+            catch(const Ice::Exception&)
+            {
+            }
+            if(!mInitialized && enableBackgroundInit)
+            {
+                mInitThread = new InitializationThread(this);
+                mInitThread->start();
             }
         }
 
@@ -118,17 +96,22 @@ namespace BridgeService
             {
                 mListeners.push_back(listener);
             }
-            IceStorm::QoS qos;
-            qos["reliability"] = "ordered";
-            try
-            {
-                mTopic->subscribeAndGetPublisher(qos, listener);
-            }
-            catch(const IceStorm::AlreadySubscribed&)
+           
+            if(mInitialized)
             {
-                //
-                // This indicates some kind of inconsistent state.
-                //
+                IceStorm::QoS qos;
+                qos["reliability"] = "ordered";
+
+                try
+                {
+                    mTopic->subscribeAndGetPublisher(qos, listener);
+                }
+                catch(const IceStorm::AlreadySubscribed&)
+                {
+                    //
+                    // This indicates some kind of inconsistent state.
+                    //
+                }
             }
         }
 
@@ -139,7 +122,10 @@ namespace BridgeService
             if(i != mListeners.end())
             {
                 mListeners.erase(i);
-                mTopic->unsubscribe(listener);
+                if(mInitialized)
+                {
+                    mTopic->unsubscribe(listener);
+                }
             }
         }
 
@@ -150,6 +136,20 @@ namespace BridgeService
             return result;
         }
 
+        bool isSuspended()
+        {
+            boost::shared_lock<boost::shared_mutex> lock(mLock);
+            return !mInitialized;
+        }
+
+        void stop()
+        {
+            if(mInitThread)
+            {
+                mInitThread->stop();
+            }
+        }
+
     protected:
         boost::shared_mutex mLock;
         Ice::CommunicatorPtr mCommunicator;
@@ -158,6 +158,90 @@ namespace BridgeService
         IceStorm::TopicManagerPrx mTopicManager;
         T mPublisher;
         ListenerSeq mListeners;
+        IceUtil::Handle<InitializationThread> mInitThread;
+
+        bool mInitialized;
+
+        bool init()
+        {
+            boost::shared_lock<boost::shared_mutex> lock(mLock);
+            if(mInitialized)
+            {
+                return mInitialized;
+            }
+
+            //
+            // TODO: While this is being concocted for a single component, it would make more sense
+            // to have the topic manager passed in during construction for more general usage.
+            //
+            const std::string propertyName = "TopicManager.Proxy";
+            std::string topicManagerProperty = mCommunicator->getProperties()->getProperty(propertyName);
+            if(topicManagerProperty.size() == 0)
+            {
+                throw ConfigException(propertyName, "Topic manager proxy property missing. "
+                        "Unable to initialize listener support.");
+            }
+
+            try
+            {
+                mTopicManager = IceStorm::TopicManagerPrx::checkedCast(mCommunicator->stringToProxy(topicManagerProperty));
+            }
+            catch(const Ice::Exception&)
+            {
+                return false;
+            }
+
+            try
+            {
+                mTopic = mTopicManager->retrieve(mTopicName);
+            }
+            catch(const IceStorm::NoSuchTopic&)
+            {
+            }
+
+            if(!mTopic)
+            {
+                try
+                {
+                    mTopic = mTopicManager->create(mTopicName);
+                }
+                catch(const IceStorm::TopicExists&)
+                {
+                    //
+                    // In case there is a race condition when creating the topic.
+                    //
+                    mTopic = mTopicManager->retrieve(mTopicName);
+                }
+            }
+
+            if(!mTopic)
+            {
+                return mInitialized;
+            }
+            mPublisher = T::uncheckedCast(mTopic->getPublisher());
+
+            if(mListeners.size() > 0)
+            {
+                for(typename std::vector<T>::iterator i = mListeners.begin(); i != mListeners.end(); ++i)
+                {
+                    IceStorm::QoS qos;
+                    qos["reliability"] = "ordered";
+
+                    try
+                    {
+                        mTopic->subscribeAndGetPublisher(qos, *i);
+                    }
+                    catch(const IceStorm::AlreadySubscribed&)
+                    {
+                        //
+                        // This indicates some kind of inconsistent state.
+                        //
+                    }
+                }
+            }
+            mInitialized = true;
+            return mInitialized;
+        }
     };
 }
 }
diff --git a/src/Service.cpp b/src/Service.cpp
index cf8582c..c33b979 100644
--- a/src/Service.cpp
+++ b/src/Service.cpp
@@ -93,13 +93,13 @@ public:
     void run()
     {
         bool result = false;
+        IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
         do
         {
             //
             // TODO: Make configurable.
             //
-            IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
-            mMonitor.timedWait(IceUtil::Time::seconds(60));
+            mMonitor.timedWait(IceUtil::Time::seconds(15));
             if(mStopped)
             {
                 break;

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/bridging.git



More information about the asterisk-scf-commits mailing list