[asterisk-scf-commits] asterisk-scf/release/bridging.git branch "master" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Fri Jul 13 15:46:01 CDT 2012


branch "master" has been updated
       via  543b2ec2d77f1d1279f40cffc1a225ed04ce1dbc (commit)
      from  e159074147caca84eb8d6e0785de863fe9adb19c (commit)

Summary of changes:
 src/BridgeListenerMgr.cpp        |    7 +-
 src/BridgeListenerMgr.h          |    6 +-
 src/BridgeManagerImpl.cpp        |   21 ++-
 src/BridgeManagerListenerMgr.cpp |    6 +-
 src/BridgeManagerListenerMgr.h   |    4 +-
 src/CMakeLists.txt               |    1 -
 src/ListenerManager.h            |  284 --------------------------------------
 7 files changed, 29 insertions(+), 300 deletions(-)
 delete mode 100644 src/ListenerManager.h


- Log -----------------------------------------------------------------
commit 543b2ec2d77f1d1279f40cffc1a225ed04ce1dbc
Author: Ken Hunt <ken.hunt at digium.com>
Date:   Fri Jul 13 13:16:13 2012 -0500

    ASTSCF-467 - Consolidiate ListenerManagerT into ASTSCFIceUtilCpp.

diff --git a/src/BridgeListenerMgr.cpp b/src/BridgeListenerMgr.cpp
index 6ec7f3a..ee3bcf9 100644
--- a/src/BridgeListenerMgr.cpp
+++ b/src/BridgeListenerMgr.cpp
@@ -19,10 +19,11 @@
 using namespace AsteriskSCF::BridgeService;
 using namespace AsteriskSCF::SessionCommunications::V1;
 
-BridgeListenerMgr::BridgeListenerMgr(const Ice::CommunicatorPtr& comm,
-    const std::string& name,
+BridgeListenerMgr::BridgeListenerMgr(
+    const IceStorm::TopicManagerPrx& topicManager,
+    const std::string& topicName,
     const BridgePrx& bridgeProxy) :
-    ListenerManagerT<BridgeListenerPrx>(comm, name, false),
+    ListenerManagerT<BridgeListenerPrx>(topicManager, topicName, false, IceUtil::Time::seconds(15)),
     mPrx(bridgeProxy)
 {
 }
diff --git a/src/BridgeListenerMgr.h b/src/BridgeListenerMgr.h
index 1ddfc3c..a33c086 100644
--- a/src/BridgeListenerMgr.h
+++ b/src/BridgeListenerMgr.h
@@ -17,17 +17,17 @@
 
 #include <string>
 #include <Ice/Ice.h>
-#include "ListenerManager.h"
+#include <AsteriskSCF/Listener/ListenerManager.h>
 #include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.h>
 
 namespace AsteriskSCF
 {
 namespace BridgeService
 {
-class BridgeListenerMgr : virtual public ListenerManagerT<AsteriskSCF::SessionCommunications::V1::BridgeListenerPrx>
+class BridgeListenerMgr : virtual public AsteriskSCF::ListenerManagerT<AsteriskSCF::SessionCommunications::V1::BridgeListenerPrx>
 {
 public:
-    BridgeListenerMgr(const Ice::CommunicatorPtr& communicator, const std::string& name,
+    BridgeListenerMgr(const IceStorm::TopicManagerPrx& topicManager, const std::string& topicName,
         const AsteriskSCF::SessionCommunications::V1::BridgePrx& source);
 
     void sessionsAdded(const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions, 
diff --git a/src/BridgeManagerImpl.cpp b/src/BridgeManagerImpl.cpp
index f1982eb..d7db327 100644
--- a/src/BridgeManagerImpl.cpp
+++ b/src/BridgeManagerImpl.cpp
@@ -139,6 +139,7 @@ private:
     BridgeReplicationContextPtr mReplicationContext;
     BridgeManagerPrx mSourceProxy;
     BridgeManagerListenerMgrPtr mListeners;
+    IceStorm::TopicManagerPrx mTopicManager;
     Logger mLogger;
 
     BridgeCreationExtensionPointImplPtr mCreationExtension;
@@ -182,8 +183,18 @@ BridgeManagerImpl::BridgeManagerImpl(const Ice::ObjectAdapterPtr& adapter,
         mState(new BridgeManagerStateItem),
         mOperationCache(AsteriskSCF::Operations::OperationContextCache::create(60))
 {
+    const std::string propertyName = "TopicManager.Proxy";
+    std::string topicManagerProperty = adapter->getCommunicator()->getProperties()->getProperty(propertyName);
+    if(topicManagerProperty.size() == 0)
+    {
+        mLogger(Error) << "Topic manager proxy property missing. "
+            "Unable to initialize listener support.";
+    }
+
+    mTopicManager = IceStorm::TopicManagerPrx::checkedCast(adapter->getCommunicator()->stringToProxy(topicManagerProperty));
+
     mLogger(Info) << "Created AsteriskSCF Session-Oriented Bridge Manager." ;
-    mListeners = new BridgeManagerListenerMgr(mAdapter->getCommunicator(), mName, mSourceProxy);
+    mListeners = new BridgeManagerListenerMgr(mTopicManager, mName, mSourceProxy);
     mState->runningState = Running;
     mState->key = name;
     mState->serial = SerialCounterStart;
@@ -360,7 +371,7 @@ void BridgeManagerImpl::createBridge_async(const AMD_BridgeManager_createBridgeP
                     // construction of the bridge itself as this is a natural
                     // area of refinement and extension.
                     //
-                    BridgeListenerMgrPtr mgr(new BridgeListenerMgr(mAdapter->getCommunicator(), stringId, info.decoratingPrx));
+                    BridgeListenerMgrPtr mgr(new BridgeListenerMgr(mTopicManager, stringId, info.decoratingPrx));
                     
                     //
                     // Now we can get down to the creation of the bridge
@@ -473,7 +484,7 @@ void BridgeManagerImpl::addListener(const AsteriskSCF::System::V1::OperationCont
                 if (!mListeners)
                 {
                     mSourceProxy = BridgeManagerPrx::uncheckedCast( mAdapter->createProxy(current.id));
-                    mListeners = new BridgeManagerListenerMgr(mAdapter->getCommunicator(), mName, mSourceProxy);
+                    mListeners = new BridgeManagerListenerMgr(mTopicManager, mName, mSourceProxy);
                 }
                 mListeners->addListener(listener);
                 //
@@ -527,7 +538,7 @@ void BridgeManagerImpl::removeListener(const AsteriskSCF::System::V1::OperationC
                 if (!mListeners)
                 {
                     mSourceProxy = BridgeManagerPrx::uncheckedCast(mAdapter->createProxy(current.id));
-                    mListeners = new BridgeManagerListenerMgr(mAdapter->getCommunicator(), mName, mSourceProxy);
+                    mListeners = new BridgeManagerListenerMgr(mTopicManager, mName, mSourceProxy);
                 }
                 mListeners->removeListener(listener);
                 //
@@ -839,7 +850,7 @@ void BridgeManagerImpl::createBridgeReplica(const BridgeStateItemPtr& state)
         boost::unique_lock<boost::shared_mutex> lock(mLock);
         Ice::Identity id(mAdapter->getCommunicator()->stringToIdentity(state->bridgeId));
         prx = BridgePrx::uncheckedCast(mAdapter->createProxy(id));
-        BridgeListenerMgrPtr mgr(new BridgeListenerMgr(mAdapter->getCommunicator(), state->bridgeId, prx));
+        BridgeListenerMgrPtr mgr(new BridgeListenerMgr(mTopicManager, state->bridgeId, prx));
     
         BridgeServantPtr bridge = BridgeServant::create(mAdapter, mgr, mReplicationContext->getReplicator(), mLogger, state);
         Ice::ObjectPrx obj = mAdapter->add(bridge, id);
diff --git a/src/BridgeManagerListenerMgr.cpp b/src/BridgeManagerListenerMgr.cpp
index 2459edb..5c2eb31 100644
--- a/src/BridgeManagerListenerMgr.cpp
+++ b/src/BridgeManagerListenerMgr.cpp
@@ -19,9 +19,11 @@
 using namespace AsteriskSCF::BridgeService;
 using namespace AsteriskSCF::SessionCommunications::V1;
 
-BridgeManagerListenerMgr::BridgeManagerListenerMgr(const Ice::CommunicatorPtr& communicator, const std::string& name,
+BridgeManagerListenerMgr::BridgeManagerListenerMgr(
+    const IceStorm::TopicManagerPrx& topicManager, 
+    const std::string& topicName,
     const BridgeManagerPrx& source) :
-    ListenerManagerT<BridgeManagerListenerPrx>(communicator, name, true),
+    ListenerManagerT<BridgeManagerListenerPrx>(topicManager, topicName, true, IceUtil::Time::seconds(15)),
     mPrx(source)
 {
 }
diff --git a/src/BridgeManagerListenerMgr.h b/src/BridgeManagerListenerMgr.h
index 45f0435..6688ac4 100644
--- a/src/BridgeManagerListenerMgr.h
+++ b/src/BridgeManagerListenerMgr.h
@@ -17,7 +17,7 @@
 
 #include <string>
 #include <Ice/Ice.h>
-#include "ListenerManager.h"
+#include <AsteriskSCF/Listener/ListenerManager.h>
 #include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.h>
 
 namespace AsteriskSCF
@@ -27,7 +27,7 @@ namespace BridgeService
 class BridgeManagerListenerMgr : virtual public ListenerManagerT<AsteriskSCF::SessionCommunications::V1::BridgeManagerListenerPrx>
 {
 public:
-    BridgeManagerListenerMgr(const Ice::CommunicatorPtr& communicator, const std::string& name,
+    BridgeManagerListenerMgr(const IceStorm::TopicManagerPrx& topicManager, const std::string& topicName,
         const AsteriskSCF::SessionCommunications::V1::BridgeManagerPrx& source);
 
     void bridgeCreated(const AsteriskSCF::SessionCommunications::V1::BridgePrx& bridge);
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 904272c..44141cd 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -35,7 +35,6 @@ astscf_component_add_files(BridgeService MediaMixer.cpp)
 astscf_component_add_files(BridgeService Tasks.h)
 astscf_component_add_files(BridgeService InternalExceptions.h)
 astscf_component_add_files(BridgeService BridgeServiceConfig.h)
-astscf_component_add_files(BridgeService ListenerManager.h)
 astscf_component_add_files(BridgeService ReplicatorSmartProxy.h)
 astscf_component_add_slices(BridgeService PROJECT AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice)
 astscf_component_add_ice_libraries(BridgeService IceStorm)
diff --git a/src/ListenerManager.h b/src/ListenerManager.h
deleted file mode 100644
index 6db0404..0000000
--- a/src/ListenerManager.h
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- * Asterisk Scalable Communications Framework
- *
- * Copyright (C) 2010 -- Digium, Inc.
- *
- * All rights reserved.
- */
-#pragma once
-
-#include <Ice/Ice.h>
-#include <IceStorm/IceStorm.h>
-#include <boost/thread/shared_mutex.hpp>
-#include <boost/thread/locks.hpp>
-#include <string>
-#include <algorithm>
-#include <vector>
-#include "InternalExceptions.h"
-#include <IceUtil/Thread.h>
-
-namespace AsteriskSCF
-{
-namespace BridgeService
-{
-//
-// Helper template for classes that need to implement listener style interfaces.
-//
-template <class T>
-class ListenerManagerT : public IceUtil::Shared
-{
-    typedef std::vector<T> ListenerSeq;
-    typename std::vector<T>::iterator ListenerIter;
-
-    class InitializationThread : public IceUtil::Thread
-    {
-    public:
-        InitializationThread(const typename IceUtil::Handle<ListenerManagerT>& mgr) :
-            mMgr(mgr),
-            mStopped(false)
-        {
-        }
-
-        void run()
-        {
-            bool initialized = false;
-            IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
-            while(!mStopped && !initialized)
-            {
-                //
-                // TODO: Make configurable.
-                //
-                mMonitor.timedWait(IceUtil::Time::seconds(15));
-                initialized = mMgr->init();
-            }
-        }
-
-        void stop()
-        {
-            IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
-            mStopped = true;
-            mMonitor.notify();
-        }
-
-    private:
-        //
-        // NOTE: Currently uses the monitor semantics provided by the Ice threading library. This should be switched to
-        // use a mechanism implemented in Boost if there is something equivalent. (Not indicated as something that needs
-        // to be done, as it really isn't a priority)
-        //
-        IceUtil::Monitor<IceUtil::Mutex> mMonitor;
-        typename IceUtil::Handle<ListenerManagerT> mMgr;
-        bool mStopped;
-    };
-
-public:
-    ListenerManagerT(const Ice::CommunicatorPtr& communicator, const std::string& topicName, bool enableBackgroundInit) :
-        mCommunicator(communicator),
-        mTopicName(topicName),
-        mInitialized(false)
-    {
-        try
-        {
-            init();
-        }
-        catch(const Ice::Exception&)
-        {
-        }
-        if(!mInitialized && enableBackgroundInit)
-        {
-            mInitThread = new InitializationThread(this);
-            mInitThread->start();
-        }
-    }
-
-    virtual ~ListenerManagerT()
-    {
-        stop();
-
-        if(mTopic)
-        {
-            try
-            {
-                mTopic->destroy();
-            }
-            catch(...)
-            {
-                //
-                // Destructors are no-throw!
-                //
-            }
-        }
-    }
-
-    //
-    // NOTE: The current implementation is a little fast and loose here. Inconsistent conditions
-    // and whatnot are not flagged.
-    //
-    bool addListener(const T& listener)
-    {
-        bool added = false;
-        {
-            boost::unique_lock<boost::shared_mutex> lock(mLock);
-            if(std::find(mListeners.begin(), mListeners.end(), listener) == mListeners.end())
-            {
-                mListeners.push_back(listener);
-                added = true;
-            }
-        }
-
-        if(mInitialized && added)
-        {
-            IceStorm::QoS qos;
-            qos["reliability"] = "ordered";
-
-            try
-            {
-                mTopic->subscribeAndGetPublisher(qos, listener);
-            }
-            catch (const IceStorm::AlreadySubscribed&)
-            {
-                //
-                // This indicates some kind of inconsistent state or could
-                // happen if this is a replica.
-                //
-            }
-        }
-        return added;
-    }
-
-    bool removeListener(const T& listener)
-    {
-        boost::unique_lock<boost::shared_mutex> lock(mLock);
-        typename std::vector<T>::iterator i = std::find(mListeners.begin(), mListeners.end(), listener);
-        if(i != mListeners.end())
-        {
-            mListeners.erase(i);
-            if(mInitialized)
-            {
-                //
-                // unsubscribe doesn't seem to care whether the subscriber was subscribed or not.
-                //
-                mTopic->unsubscribe(listener);
-            }
-            return true;
-        }
-        return false;
-    }
-
-    std::vector<T> getListeners()
-    {
-        boost::shared_lock<boost::shared_mutex> lock(mLock);
-        std::vector<T> result(mListeners);
-        return result;
-    }
-
-    bool isSuspended()
-    {
-        boost::shared_lock<boost::shared_mutex> lock(mLock);
-        return !mInitialized;
-    }
-
-    void stop()
-    {
-        if(mInitThread)
-        {
-            mInitThread->stop();
-            mInitThread = 0;
-        }
-    }
-
-protected:
-    boost::shared_mutex mLock;
-    Ice::CommunicatorPtr mCommunicator;
-    std::string mTopicName;
-    IceStorm::TopicPrx mTopic;
-    IceStorm::TopicManagerPrx mTopicManager;
-    T mPublisher;
-    ListenerSeq mListeners;
-    IceUtil::Handle<InitializationThread> mInitThread;
-
-    bool mInitialized;
-
-    bool init()
-    {
-        boost::unique_lock<boost::shared_mutex> lock(mLock);
-        if(mInitialized)
-        {
-            return mInitialized;
-        }
-
-        //
-        // TODO: While this is being concocted for a single component, it would make more sense
-        // to have the topic manager passed in during construction for more general usage.
-        //
-        const std::string propertyName = "TopicManager.Proxy";
-        std::string topicManagerProperty = mCommunicator->getProperties()->getProperty(propertyName);
-        if(topicManagerProperty.size() == 0)
-        {
-            throw ConfigException(propertyName, "Topic manager proxy property missing. "
-                "Unable to initialize listener support.");
-        }
-
-        try
-        {
-            mTopicManager = IceStorm::TopicManagerPrx::checkedCast(mCommunicator->stringToProxy(topicManagerProperty));
-        }
-        catch(const Ice::Exception&)
-        {
-            return false;
-        }
-
-        try
-        {
-            mTopic = mTopicManager->retrieve(mTopicName);
-        }
-        catch(const IceStorm::NoSuchTopic&)
-        {
-        }
-
-        if(!mTopic)
-        {
-            try
-            {
-                mTopic = mTopicManager->create(mTopicName);
-            }
-            catch(const IceStorm::TopicExists&)
-            {
-                //
-                // In case there is a race condition when creating the topic.
-                //
-                mTopic = mTopicManager->retrieve(mTopicName);
-            }
-        }
-
-        if(!mTopic)
-        {
-            return mInitialized;
-        }
-        mPublisher = T::uncheckedCast(mTopic->getPublisher());
-
-        if(mListeners.size() > 0)
-        {
-            for(typename std::vector<T>::iterator i = mListeners.begin(); i != mListeners.end(); ++i)
-            {
-                IceStorm::QoS qos;
-                qos["reliability"] = "ordered";
-
-                try
-                {
-                    mTopic->subscribeAndGetPublisher(qos, *i);
-                }
-                catch(const IceStorm::AlreadySubscribed&)
-                {
-                    //
-                    // This indicates some kind of inconsistent state.
-                    //
-                }
-            }
-        }
-        mInitialized = true;
-        return mInitialized;
-    }
-};
-}
-}

-----------------------------------------------------------------------


-- 
asterisk-scf/release/bridging.git



More information about the asterisk-scf-commits mailing list