[asterisk-scf-commits] asterisk-scf/release/bridging.git branch "master" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Fri Jul 13 15:46:01 CDT 2012
branch "master" has been updated
via 543b2ec2d77f1d1279f40cffc1a225ed04ce1dbc (commit)
from e159074147caca84eb8d6e0785de863fe9adb19c (commit)
Summary of changes:
src/BridgeListenerMgr.cpp | 7 +-
src/BridgeListenerMgr.h | 6 +-
src/BridgeManagerImpl.cpp | 21 ++-
src/BridgeManagerListenerMgr.cpp | 6 +-
src/BridgeManagerListenerMgr.h | 4 +-
src/CMakeLists.txt | 1 -
src/ListenerManager.h | 284 --------------------------------------
7 files changed, 29 insertions(+), 300 deletions(-)
delete mode 100644 src/ListenerManager.h
- Log -----------------------------------------------------------------
commit 543b2ec2d77f1d1279f40cffc1a225ed04ce1dbc
Author: Ken Hunt <ken.hunt at digium.com>
Date: Fri Jul 13 13:16:13 2012 -0500
ASTSCF-467 - Consolidiate ListenerManagerT into ASTSCFIceUtilCpp.
diff --git a/src/BridgeListenerMgr.cpp b/src/BridgeListenerMgr.cpp
index 6ec7f3a..ee3bcf9 100644
--- a/src/BridgeListenerMgr.cpp
+++ b/src/BridgeListenerMgr.cpp
@@ -19,10 +19,11 @@
using namespace AsteriskSCF::BridgeService;
using namespace AsteriskSCF::SessionCommunications::V1;
-BridgeListenerMgr::BridgeListenerMgr(const Ice::CommunicatorPtr& comm,
- const std::string& name,
+BridgeListenerMgr::BridgeListenerMgr(
+ const IceStorm::TopicManagerPrx& topicManager,
+ const std::string& topicName,
const BridgePrx& bridgeProxy) :
- ListenerManagerT<BridgeListenerPrx>(comm, name, false),
+ ListenerManagerT<BridgeListenerPrx>(topicManager, topicName, false, IceUtil::Time::seconds(15)),
mPrx(bridgeProxy)
{
}
diff --git a/src/BridgeListenerMgr.h b/src/BridgeListenerMgr.h
index 1ddfc3c..a33c086 100644
--- a/src/BridgeListenerMgr.h
+++ b/src/BridgeListenerMgr.h
@@ -17,17 +17,17 @@
#include <string>
#include <Ice/Ice.h>
-#include "ListenerManager.h"
+#include <AsteriskSCF/Listener/ListenerManager.h>
#include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.h>
namespace AsteriskSCF
{
namespace BridgeService
{
-class BridgeListenerMgr : virtual public ListenerManagerT<AsteriskSCF::SessionCommunications::V1::BridgeListenerPrx>
+class BridgeListenerMgr : virtual public AsteriskSCF::ListenerManagerT<AsteriskSCF::SessionCommunications::V1::BridgeListenerPrx>
{
public:
- BridgeListenerMgr(const Ice::CommunicatorPtr& communicator, const std::string& name,
+ BridgeListenerMgr(const IceStorm::TopicManagerPrx& topicManager, const std::string& topicName,
const AsteriskSCF::SessionCommunications::V1::BridgePrx& source);
void sessionsAdded(const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions,
diff --git a/src/BridgeManagerImpl.cpp b/src/BridgeManagerImpl.cpp
index f1982eb..d7db327 100644
--- a/src/BridgeManagerImpl.cpp
+++ b/src/BridgeManagerImpl.cpp
@@ -139,6 +139,7 @@ private:
BridgeReplicationContextPtr mReplicationContext;
BridgeManagerPrx mSourceProxy;
BridgeManagerListenerMgrPtr mListeners;
+ IceStorm::TopicManagerPrx mTopicManager;
Logger mLogger;
BridgeCreationExtensionPointImplPtr mCreationExtension;
@@ -182,8 +183,18 @@ BridgeManagerImpl::BridgeManagerImpl(const Ice::ObjectAdapterPtr& adapter,
mState(new BridgeManagerStateItem),
mOperationCache(AsteriskSCF::Operations::OperationContextCache::create(60))
{
+ const std::string propertyName = "TopicManager.Proxy";
+ std::string topicManagerProperty = adapter->getCommunicator()->getProperties()->getProperty(propertyName);
+ if(topicManagerProperty.size() == 0)
+ {
+ mLogger(Error) << "Topic manager proxy property missing. "
+ "Unable to initialize listener support.";
+ }
+
+ mTopicManager = IceStorm::TopicManagerPrx::checkedCast(adapter->getCommunicator()->stringToProxy(topicManagerProperty));
+
mLogger(Info) << "Created AsteriskSCF Session-Oriented Bridge Manager." ;
- mListeners = new BridgeManagerListenerMgr(mAdapter->getCommunicator(), mName, mSourceProxy);
+ mListeners = new BridgeManagerListenerMgr(mTopicManager, mName, mSourceProxy);
mState->runningState = Running;
mState->key = name;
mState->serial = SerialCounterStart;
@@ -360,7 +371,7 @@ void BridgeManagerImpl::createBridge_async(const AMD_BridgeManager_createBridgeP
// construction of the bridge itself as this is a natural
// area of refinement and extension.
//
- BridgeListenerMgrPtr mgr(new BridgeListenerMgr(mAdapter->getCommunicator(), stringId, info.decoratingPrx));
+ BridgeListenerMgrPtr mgr(new BridgeListenerMgr(mTopicManager, stringId, info.decoratingPrx));
//
// Now we can get down to the creation of the bridge
@@ -473,7 +484,7 @@ void BridgeManagerImpl::addListener(const AsteriskSCF::System::V1::OperationCont
if (!mListeners)
{
mSourceProxy = BridgeManagerPrx::uncheckedCast( mAdapter->createProxy(current.id));
- mListeners = new BridgeManagerListenerMgr(mAdapter->getCommunicator(), mName, mSourceProxy);
+ mListeners = new BridgeManagerListenerMgr(mTopicManager, mName, mSourceProxy);
}
mListeners->addListener(listener);
//
@@ -527,7 +538,7 @@ void BridgeManagerImpl::removeListener(const AsteriskSCF::System::V1::OperationC
if (!mListeners)
{
mSourceProxy = BridgeManagerPrx::uncheckedCast(mAdapter->createProxy(current.id));
- mListeners = new BridgeManagerListenerMgr(mAdapter->getCommunicator(), mName, mSourceProxy);
+ mListeners = new BridgeManagerListenerMgr(mTopicManager, mName, mSourceProxy);
}
mListeners->removeListener(listener);
//
@@ -839,7 +850,7 @@ void BridgeManagerImpl::createBridgeReplica(const BridgeStateItemPtr& state)
boost::unique_lock<boost::shared_mutex> lock(mLock);
Ice::Identity id(mAdapter->getCommunicator()->stringToIdentity(state->bridgeId));
prx = BridgePrx::uncheckedCast(mAdapter->createProxy(id));
- BridgeListenerMgrPtr mgr(new BridgeListenerMgr(mAdapter->getCommunicator(), state->bridgeId, prx));
+ BridgeListenerMgrPtr mgr(new BridgeListenerMgr(mTopicManager, state->bridgeId, prx));
BridgeServantPtr bridge = BridgeServant::create(mAdapter, mgr, mReplicationContext->getReplicator(), mLogger, state);
Ice::ObjectPrx obj = mAdapter->add(bridge, id);
diff --git a/src/BridgeManagerListenerMgr.cpp b/src/BridgeManagerListenerMgr.cpp
index 2459edb..5c2eb31 100644
--- a/src/BridgeManagerListenerMgr.cpp
+++ b/src/BridgeManagerListenerMgr.cpp
@@ -19,9 +19,11 @@
using namespace AsteriskSCF::BridgeService;
using namespace AsteriskSCF::SessionCommunications::V1;
-BridgeManagerListenerMgr::BridgeManagerListenerMgr(const Ice::CommunicatorPtr& communicator, const std::string& name,
+BridgeManagerListenerMgr::BridgeManagerListenerMgr(
+ const IceStorm::TopicManagerPrx& topicManager,
+ const std::string& topicName,
const BridgeManagerPrx& source) :
- ListenerManagerT<BridgeManagerListenerPrx>(communicator, name, true),
+ ListenerManagerT<BridgeManagerListenerPrx>(topicManager, topicName, true, IceUtil::Time::seconds(15)),
mPrx(source)
{
}
diff --git a/src/BridgeManagerListenerMgr.h b/src/BridgeManagerListenerMgr.h
index 45f0435..6688ac4 100644
--- a/src/BridgeManagerListenerMgr.h
+++ b/src/BridgeManagerListenerMgr.h
@@ -17,7 +17,7 @@
#include <string>
#include <Ice/Ice.h>
-#include "ListenerManager.h"
+#include <AsteriskSCF/Listener/ListenerManager.h>
#include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.h>
namespace AsteriskSCF
@@ -27,7 +27,7 @@ namespace BridgeService
class BridgeManagerListenerMgr : virtual public ListenerManagerT<AsteriskSCF::SessionCommunications::V1::BridgeManagerListenerPrx>
{
public:
- BridgeManagerListenerMgr(const Ice::CommunicatorPtr& communicator, const std::string& name,
+ BridgeManagerListenerMgr(const IceStorm::TopicManagerPrx& topicManager, const std::string& topicName,
const AsteriskSCF::SessionCommunications::V1::BridgeManagerPrx& source);
void bridgeCreated(const AsteriskSCF::SessionCommunications::V1::BridgePrx& bridge);
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 904272c..44141cd 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -35,7 +35,6 @@ astscf_component_add_files(BridgeService MediaMixer.cpp)
astscf_component_add_files(BridgeService Tasks.h)
astscf_component_add_files(BridgeService InternalExceptions.h)
astscf_component_add_files(BridgeService BridgeServiceConfig.h)
-astscf_component_add_files(BridgeService ListenerManager.h)
astscf_component_add_files(BridgeService ReplicatorSmartProxy.h)
astscf_component_add_slices(BridgeService PROJECT AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice)
astscf_component_add_ice_libraries(BridgeService IceStorm)
diff --git a/src/ListenerManager.h b/src/ListenerManager.h
deleted file mode 100644
index 6db0404..0000000
--- a/src/ListenerManager.h
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- * Asterisk Scalable Communications Framework
- *
- * Copyright (C) 2010 -- Digium, Inc.
- *
- * All rights reserved.
- */
-#pragma once
-
-#include <Ice/Ice.h>
-#include <IceStorm/IceStorm.h>
-#include <boost/thread/shared_mutex.hpp>
-#include <boost/thread/locks.hpp>
-#include <string>
-#include <algorithm>
-#include <vector>
-#include "InternalExceptions.h"
-#include <IceUtil/Thread.h>
-
-namespace AsteriskSCF
-{
-namespace BridgeService
-{
-//
-// Helper template for classes that need to implement listener style interfaces.
-//
-template <class T>
-class ListenerManagerT : public IceUtil::Shared
-{
- typedef std::vector<T> ListenerSeq;
- typename std::vector<T>::iterator ListenerIter;
-
- class InitializationThread : public IceUtil::Thread
- {
- public:
- InitializationThread(const typename IceUtil::Handle<ListenerManagerT>& mgr) :
- mMgr(mgr),
- mStopped(false)
- {
- }
-
- void run()
- {
- bool initialized = false;
- IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
- while(!mStopped && !initialized)
- {
- //
- // TODO: Make configurable.
- //
- mMonitor.timedWait(IceUtil::Time::seconds(15));
- initialized = mMgr->init();
- }
- }
-
- void stop()
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
- mStopped = true;
- mMonitor.notify();
- }
-
- private:
- //
- // NOTE: Currently uses the monitor semantics provided by the Ice threading library. This should be switched to
- // use a mechanism implemented in Boost if there is something equivalent. (Not indicated as something that needs
- // to be done, as it really isn't a priority)
- //
- IceUtil::Monitor<IceUtil::Mutex> mMonitor;
- typename IceUtil::Handle<ListenerManagerT> mMgr;
- bool mStopped;
- };
-
-public:
- ListenerManagerT(const Ice::CommunicatorPtr& communicator, const std::string& topicName, bool enableBackgroundInit) :
- mCommunicator(communicator),
- mTopicName(topicName),
- mInitialized(false)
- {
- try
- {
- init();
- }
- catch(const Ice::Exception&)
- {
- }
- if(!mInitialized && enableBackgroundInit)
- {
- mInitThread = new InitializationThread(this);
- mInitThread->start();
- }
- }
-
- virtual ~ListenerManagerT()
- {
- stop();
-
- if(mTopic)
- {
- try
- {
- mTopic->destroy();
- }
- catch(...)
- {
- //
- // Destructors are no-throw!
- //
- }
- }
- }
-
- //
- // NOTE: The current implementation is a little fast and loose here. Inconsistent conditions
- // and whatnot are not flagged.
- //
- bool addListener(const T& listener)
- {
- bool added = false;
- {
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- if(std::find(mListeners.begin(), mListeners.end(), listener) == mListeners.end())
- {
- mListeners.push_back(listener);
- added = true;
- }
- }
-
- if(mInitialized && added)
- {
- IceStorm::QoS qos;
- qos["reliability"] = "ordered";
-
- try
- {
- mTopic->subscribeAndGetPublisher(qos, listener);
- }
- catch (const IceStorm::AlreadySubscribed&)
- {
- //
- // This indicates some kind of inconsistent state or could
- // happen if this is a replica.
- //
- }
- }
- return added;
- }
-
- bool removeListener(const T& listener)
- {
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- typename std::vector<T>::iterator i = std::find(mListeners.begin(), mListeners.end(), listener);
- if(i != mListeners.end())
- {
- mListeners.erase(i);
- if(mInitialized)
- {
- //
- // unsubscribe doesn't seem to care whether the subscriber was subscribed or not.
- //
- mTopic->unsubscribe(listener);
- }
- return true;
- }
- return false;
- }
-
- std::vector<T> getListeners()
- {
- boost::shared_lock<boost::shared_mutex> lock(mLock);
- std::vector<T> result(mListeners);
- return result;
- }
-
- bool isSuspended()
- {
- boost::shared_lock<boost::shared_mutex> lock(mLock);
- return !mInitialized;
- }
-
- void stop()
- {
- if(mInitThread)
- {
- mInitThread->stop();
- mInitThread = 0;
- }
- }
-
-protected:
- boost::shared_mutex mLock;
- Ice::CommunicatorPtr mCommunicator;
- std::string mTopicName;
- IceStorm::TopicPrx mTopic;
- IceStorm::TopicManagerPrx mTopicManager;
- T mPublisher;
- ListenerSeq mListeners;
- IceUtil::Handle<InitializationThread> mInitThread;
-
- bool mInitialized;
-
- bool init()
- {
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- if(mInitialized)
- {
- return mInitialized;
- }
-
- //
- // TODO: While this is being concocted for a single component, it would make more sense
- // to have the topic manager passed in during construction for more general usage.
- //
- const std::string propertyName = "TopicManager.Proxy";
- std::string topicManagerProperty = mCommunicator->getProperties()->getProperty(propertyName);
- if(topicManagerProperty.size() == 0)
- {
- throw ConfigException(propertyName, "Topic manager proxy property missing. "
- "Unable to initialize listener support.");
- }
-
- try
- {
- mTopicManager = IceStorm::TopicManagerPrx::checkedCast(mCommunicator->stringToProxy(topicManagerProperty));
- }
- catch(const Ice::Exception&)
- {
- return false;
- }
-
- try
- {
- mTopic = mTopicManager->retrieve(mTopicName);
- }
- catch(const IceStorm::NoSuchTopic&)
- {
- }
-
- if(!mTopic)
- {
- try
- {
- mTopic = mTopicManager->create(mTopicName);
- }
- catch(const IceStorm::TopicExists&)
- {
- //
- // In case there is a race condition when creating the topic.
- //
- mTopic = mTopicManager->retrieve(mTopicName);
- }
- }
-
- if(!mTopic)
- {
- return mInitialized;
- }
- mPublisher = T::uncheckedCast(mTopic->getPublisher());
-
- if(mListeners.size() > 0)
- {
- for(typename std::vector<T>::iterator i = mListeners.begin(); i != mListeners.end(); ++i)
- {
- IceStorm::QoS qos;
- qos["reliability"] = "ordered";
-
- try
- {
- mTopic->subscribeAndGetPublisher(qos, *i);
- }
- catch(const IceStorm::AlreadySubscribed&)
- {
- //
- // This indicates some kind of inconsistent state.
- //
- }
- }
- }
- mInitialized = true;
- return mInitialized;
- }
-};
-}
-}
-----------------------------------------------------------------------
--
asterisk-scf/release/bridging.git
More information about the asterisk-scf-commits
mailing list