[asterisk-scf-commits] asterisk-scf/integration/bridging.git branch "master" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Fri Sep 10 11:12:09 CDT 2010
branch "master" has been updated
via 1f827738e764ef4a09b669972bc65f4495373d20 (commit)
via 9edb8c0f573e555691f058d73767ac0be4797aa3 (commit)
from ece0b81c0959c8e901bc28b271a93ad2ae38433c (commit)
Summary of changes:
CMakeLists.txt | 2 +-
cmake | 2 +-
slice | 2 +-
src/BridgeFactoryImpl.cpp | 220 --------------
src/BridgeFactoryImpl.h | 55 ----
src/BridgeImpl.cpp | 699 +++++++++++++++------------------------------
src/BridgeImpl.h | 187 ++++++-------
src/BridgeListenerMgr.cpp | 37 +++
src/BridgeListenerMgr.h | 36 +++
src/BridgeManagerImpl.cpp | 148 ++++++++++
src/BridgeManagerImpl.h | 59 ++++
src/BridgeServiceImpl.h | 4 +-
src/CMakeLists.txt | 8 +-
src/InternalExceptions.h | 45 +++
src/ListenerManager.h | 141 +++++++++
src/Logger.h | 4 +-
src/MediaSplicer.cpp | 75 +++--
src/MediaSplicer.h | 7 +-
src/Service.cpp | 71 +----
test/CMakeLists.txt | 9 +-
test/TestBridging.cpp | 119 ++++----
21 files changed, 919 insertions(+), 1011 deletions(-)
delete mode 100644 src/BridgeFactoryImpl.cpp
delete mode 100644 src/BridgeFactoryImpl.h
create mode 100644 src/BridgeListenerMgr.cpp
create mode 100644 src/BridgeListenerMgr.h
create mode 100644 src/BridgeManagerImpl.cpp
create mode 100644 src/BridgeManagerImpl.h
create mode 100644 src/InternalExceptions.h
create mode 100644 src/ListenerManager.h
- Log -----------------------------------------------------------------
commit 1f827738e764ef4a09b669972bc65f4495373d20
Author: Brent Eagles <beagles at digium.com>
Date: Fri Sep 10 13:34:12 2010 -0230
More refactorings for UML redesign.
(Test suite updates underway)
diff --git a/src/BridgeListenerMgr.cpp b/src/BridgeListenerMgr.cpp
new file mode 100644
index 0000000..8aa176e
--- /dev/null
+++ b/src/BridgeListenerMgr.cpp
@@ -0,0 +1,37 @@
+/*
+* Asterisk Scalable Communications Framework
+*
+* Copyright (C) 2010 -- Digium, Inc.
+*
+* All rights reserved.
+*/
+#include "BridgeListenerMgr.h"
+
+AsteriskSCF::BridgeService::BridgeListenerMgr::BridgeListenerMgr(const Ice::CommunicatorPtr& comm,
+ const std::string& name,
+ const AsteriskSCF::SessionCommunications::Bridging::V1::BridgePrx& bridgeProxy) :
+ ListenerManagerT<AsteriskSCF::SessionCommunications::Bridging::V1::BridgeListenerPrx>(comm, name),
+ mPrx(bridgeProxy)
+{
+ mPublisher = AsteriskSCF::SessionCommunications::Bridging::V1::BridgeListenerPrx::uncheckedCast(mTopic->getPublisher());
+}
+
+void AsteriskSCF::BridgeService::BridgeListenerMgr::sessionsAdded(const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions)
+{
+ mPublisher->sessionsAdded(mPrx, sessions);
+}
+
+void AsteriskSCF::BridgeService::BridgeListenerMgr::sessionsRemoved(const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions)
+{
+ mPublisher->sessionsRemoved(mPrx, sessions);
+}
+
+void AsteriskSCF::BridgeService::BridgeListenerMgr::stopped()
+{
+ mPublisher->stopped(mPrx);
+}
+
+void AsteriskSCF::BridgeService::BridgeListenerMgr::stopping()
+{
+ mPublisher->stopping(mPrx);
+}
diff --git a/src/BridgeListenerMgr.h b/src/BridgeListenerMgr.h
new file mode 100644
index 0000000..5c4b640
--- /dev/null
+++ b/src/BridgeListenerMgr.h
@@ -0,0 +1,36 @@
+/*
+* Asterisk Scalable Communications Framework
+*
+* Copyright (C) 2010 -- Digium, Inc.
+*
+* All rights reserved.
+*/
+#pragma once
+#include <string>
+#include <Ice/Ice.h>
+#include "ListenerManager.h"
+#include <SessionCommunications/Bridging/BridgingIf.h>
+
+namespace AsteriskSCF
+{
+namespace BridgeService
+{
+ class BridgeListenerMgr : virtual public ListenerManagerT<AsteriskSCF::SessionCommunications::Bridging::V1::BridgeListenerPrx>
+ {
+ public:
+ BridgeListenerMgr(const Ice::CommunicatorPtr& communicator, const std::string& name,
+ const AsteriskSCF::SessionCommunications::Bridging::V1::BridgePrx& source);
+
+ void sessionsAdded(const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions);
+ void sessionsRemoved(const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions);
+ void stopped();
+ void stopping();
+
+ private:
+ AsteriskSCF::SessionCommunications::Bridging::V1::BridgePrx mPrx;
+ AsteriskSCF::SessionCommunications::Bridging::V1::BridgeListenerPrx mPublisher;
+ };
+
+ typedef IceUtil::Handle<BridgeListenerMgr> BridgeListenerMgrPtr;
+} // End of namespace BridgeService
+} // End of namespace AsteriskSCF
diff --git a/src/InternalExceptions.h b/src/InternalExceptions.h
new file mode 100644
index 0000000..13c5d43
--- /dev/null
+++ b/src/InternalExceptions.h
@@ -0,0 +1,45 @@
+#pragma once
+
+#include <exception>
+#include <string>
+#include <sstream>
+
+// TODO: Some exceptions that might be better off in the shared codebase.
+
+namespace AsteriskSCF
+{
+namespace BridgeService
+{
+ class ConfigException : public std::exception
+ {
+ public:
+
+ ConfigException(const std::string& propertyName, const std::string& message)
+ {
+ std::stringstream what;
+ what << propertyName << " configuration error: ";
+ if(message.size() != 0)
+ {
+ what << message;
+ }
+ else
+ {
+ what << "(no message)";
+ }
+ mWhat = what.str();
+ }
+
+ ~ConfigException() throw()
+ {
+ }
+
+ const char* what() const throw()
+ {
+ return mWhat.c_str();
+ }
+
+ private:
+ std::string mWhat;
+ };
+}
+}
diff --git a/src/ListenerManager.h b/src/ListenerManager.h
new file mode 100644
index 0000000..a307fd3
--- /dev/null
+++ b/src/ListenerManager.h
@@ -0,0 +1,141 @@
+#pragma once
+
+#include <Ice/Ice.h>
+#include <IceStorm/IceStorm.h>
+#include <boost/thread/shared_mutex.hpp>
+#include <string>
+#include <algorithm>
+#include <vector>
+#include "InternalExceptions.h"
+
+namespace AsteriskSCF
+{
+namespace BridgeService
+{
+ //
+ // Helper template for classes that need to implement listener style interfaces.
+ //
+ template <class T>
+ class ListenerManagerT : public IceUtil::Shared
+ {
+ typedef std::vector<T> ListenerSeq;
+ typename std::vector<T>::iterator ListenerIter;
+ public:
+ ListenerManagerT(const Ice::CommunicatorPtr& communicator, const std::string& topicName) :
+ mCommunicator(communicator),
+ mTopicName(topicName)
+ {
+ //
+ // TODO: While this is being concocted for a single component, it would make more sense
+ // to have the topic manager passed in during construction for more general usage.
+ //
+ const std::string propertyName = "TopicManager.Proxy";
+ std::string topicManagerProperty = mCommunicator->getProperties()->getProperty(propertyName);
+ if(topicManagerProperty.size() == 0)
+ {
+ throw ConfigException(propertyName, "Topic manager proxy property missing. "
+ "Unable to initialize listener support.");
+ }
+
+ try
+ {
+ mTopicManager = IceStorm::TopicManagerPrx::checkedCast(mCommunicator->stringToProxy(topicManagerProperty));
+ }
+ catch(const Ice::Exception&)
+ {
+ }
+ if(!mTopicManager)
+ {
+ throw ConfigException(propertyName, "Topic manager proxy is not a valid proxy or is unreachable");
+ }
+
+ try
+ {
+ mTopic = mTopicManager->retrieve(mTopicName);
+ }
+ catch(const IceStorm::NoSuchTopic&)
+ {
+ }
+
+ if(!mTopic)
+ {
+ try
+ {
+ mTopic = mTopicManager->create(mTopicName);
+ }
+ catch(const IceStorm::TopicExists&)
+ {
+ //
+ // In case there is a race condition when creating the topic.
+ //
+ mTopic = mTopicManager->retrieve(mTopicName);
+ }
+ }
+
+ if(!mTopic)
+ {
+ throw ConfigException(propertyName,
+ std::string("unable to create topic with the provided configuration :") + mTopicName);
+ }
+ }
+
+ virtual ~ListenerManagerT()
+ {
+ //
+ // TODO: Destroy topic.
+ //
+ }
+
+ //
+ // NOTE: The current implementation is a little fast and loose here. Inconsistent conditions
+ // and whatnot are not flagged.
+ //
+ void addListener(const T& listener)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ if(std::find(mListeners.begin(), mListeners.end(), listener) == mListeners.end())
+ {
+ mListeners.push_back(listener);
+ }
+ IceStorm::QoS qos;
+ qos["reliability"] = "ordered";
+ try
+ {
+ mTopic->subscribeAndGetPublisher(qos, listener);
+ }
+ catch(const IceStorm::AlreadySubscribed&)
+ {
+ //
+ // This indicates some kind of inconsistent state.
+ //
+ }
+ }
+
+ void removeListener(const T& listener)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ typename std::vector<T>::iterator i = std::find(mListeners.begin(), mListeners.end(), listener);
+ if(i != mListeners.end())
+ {
+ mListeners.erase(i);
+ mTopic->unsubscribe(listener);
+ }
+ }
+
+ std::vector<T> getListeners()
+ {
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ std::vector<T> result(mListeners);
+ return result;
+ }
+
+ protected:
+ boost::shared_mutex mLock;
+ Ice::CommunicatorPtr mCommunicator;
+ std::string mTopicName;
+ IceStorm::TopicPrx mTopic;
+ IceStorm::TopicManagerPrx mTopicManager;
+ ListenerSeq mListeners;
+ };
+}
+}
commit 9edb8c0f573e555691f058d73767ac0be4797aa3
Author: Brent Eagles <beagles at digium.com>
Date: Fri Sep 10 13:26:11 2010 -0230
Majority of the service refactoring done with.
diff --git a/CMakeLists.txt b/CMakeLists.txt
index ac1e98b..534deef 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -10,7 +10,7 @@ include(cmake/Hydra_v4.cmake)
hydra_project("bridging service" 3.4 CXX)
# Take care of slice definitions
-add_subdirectory(slice EXCLUDE_FROM_ALL)
+add_subdirectory(slice)
# Include the subdirectory information. This is separate so that
# a higher-level CMakeLists.txt can act as a master for integrated
diff --git a/cmake b/cmake
index 1095aeb..1e7a172 160000
--- a/cmake
+++ b/cmake
@@ -1 +1 @@
-Subproject commit 1095aeb680f1c6c2c411d80657e2c066bf7aa6d8
+Subproject commit 1e7a1725fe9e2d176a2c26820d6bcd96c6c3939e
diff --git a/slice b/slice
index dcb271b..14413db 160000
--- a/slice
+++ b/slice
@@ -1 +1 @@
-Subproject commit dcb271baaca90fa89ed6b7d4846ea458fb303943
+Subproject commit 14413db47bfae3d1ff57d36e80cfe700d755ae0b
diff --git a/src/BridgeFactoryImpl.cpp b/src/BridgeFactoryImpl.cpp
deleted file mode 100644
index e44cd74..0000000
--- a/src/BridgeFactoryImpl.cpp
+++ /dev/null
@@ -1,220 +0,0 @@
-#include "BridgeFactoryImpl.h"
-#include <Ice/Ice.h>
-#include <boost/thread/locks.hpp>
-
-namespace Hydra
-{
-namespace BridgeService
-{
- //
- // Functor used with for_each on shutdown.
- //
- class ShutdownFunctor : public std::unary_function<Hydra::BridgeService::BridgeImplPtr, void>
- {
- public:
- ShutdownFunctor(const Ice::Current& c) :
- mCurrent(c)
- {
- }
-
- void operator()(Hydra::BridgeService::BridgeImplPtr b)
- {
- b->shutdown(mCurrent);
- }
-
- private:
- const Ice::Current mCurrent;
- };
-
-} // End of namespace BridgeService
-} // End of namespace Hydra
-
-Hydra::BridgeService::BridgeFactoryImpl::BridgeFactoryImpl(Ice::ObjectAdapterPtr adapter, const Hydra::Core::Bridging::V1::BridgeEventsPrx& ev) :
- mShuttingDown(false),
- mSuspended(false),
- mAdapter(adapter),
- mBridgeEvents(ev)
-{
- mLogger.getInfoStream() << "Created Hydra Session-Oriented Bridge Factory." << std::endl;
-}
-
-Hydra::Core::Bridging::V1::BridgePrx Hydra::BridgeService::BridgeFactoryImpl::createBridge(
- const Hydra::Core::Endpoint::V1::BaseEndpointPtr& ep,
- const Hydra::Core::Endpoint::V1::EndpointSeq& endpoints,
- const Hydra::Core::Bridging::V1::BridgeMonitorPrx& mgr,
- const Ice::Current& current)
-{
- mLogger.getTraceStream() << __FUNCTION__ << ":" << current.adapter->getCommunicator()->identityToString(current.id) << std::endl;
- //
- // Verify the endpoints are of the correct type before doing anything else.
- //
- mLogger.getDebugStream() << __FUNCTION__ << ": validating endpoints." << std::endl;
- Hydra::Session::V1::SessionEndpointPtr adminEp(Hydra::Session::V1::SessionEndpointPtr::dynamicCast(ep));
- if(ep != 0)
- {
- if(!adminEp)
- {
- mLogger.getDebugStream() << __FUNCTION__ << ": casting admin endpoint resulted in null value." << std::endl;
- throw Hydra::Core::Bridging::V1::UnsupportedEndpoint(ep->id);
- }
- if(!checkEndpointId(*ep->id))
- {
- mLogger.getDebugStream() << __FUNCTION__ << ": admin endpoint had invalid id." << std::endl;
- throw Hydra::Core::Endpoint::V1::InvalidEndpointId(ep->id);
- }
- }
-
- Hydra::Session::V1::SessionEndpointSeq eps;
- mLogger.getDebugStream() << __FUNCTION__ << ": scanning " << eps.size() << " endpoints." << std::endl;
- for(Hydra::Core::Endpoint::V1::EndpointSeq::const_iterator i = endpoints.begin(); i != endpoints.end(); ++i)
- {
- if(*i == 0)
- {
- mLogger.getDebugStream() << __FUNCTION__ << ": null endpoint encountered." << std::endl;
- throw Hydra::Core::Bridging::V1::UnsupportedEndpoint(0);
- }
- Hydra::Session::V1::SessionEndpointPtr t(Hydra::Session::V1::SessionEndpointPtr::dynamicCast(*i));
- if(!t)
- {
- mLogger.getDebugStream() << __FUNCTION__ << ": casting endpoint resulted in null value." << std::endl;
- throw Hydra::Core::Bridging::V1::UnsupportedEndpoint((*i)->id);
- }
- if(!t->id || !checkEndpointId(*t->id))
- {
- if(mLogger.debugTracing())
- {
- if(!t->id)
- {
- mLogger.getDebugStream() << __FUNCTION__ << ": endpoint has null id." << std::endl;
- }
- else
- {
- mLogger.getDebugStream() << __FUNCTION__ << ": id (" << t->id->endpointManagerId << ":" << t->id->destinationId << ") is invalid." << std::endl;
- }
- }
- throw Hydra::Core::Endpoint::V1::InvalidEndpointId(t->id);
- }
- if(adminEp != 0 && adminEp->id != 0)
- {
- if(adminEp->id->endpointManagerId == t->id->endpointManagerId && adminEp->id->destinationId == t->id->destinationId)
- {
- mLogger.getDebugStream() << __FUNCTION__ << ": endpoint id collision between admin endpoint and initial endpoint set for id " <<
- adminEp->id->endpointManagerId << ":" << adminEp->id->destinationId << std::endl;
- throw Hydra::Core::Bridging::V1::EndpointCollision(adminEp->id, t->id);
- }
- }
- //
- // Kind of icky but necessary. The bridge cannot an endpoint registered more than twice.
- //
- for(Hydra::Session::V1::SessionEndpointSeq::const_iterator j = eps.begin(); j != eps.end(); ++j)
- {
- if((*j)->id->endpointManagerId == t->id->endpointManagerId && (*j)->id->destinationId == t->id->destinationId)
- {
- mLogger.getDebugStream() << __FUNCTION__ << ": endpoint id collision between endpoints in initial endpoint set " <<
- t->id->endpointManagerId << ":" << t->id->destinationId << std::endl;
- throw Hydra::Core::Bridging::V1::EndpointCollision((*j)->id, t->id);
- }
- }
- eps.push_back(t);
- }
- mLogger.getDebugStream() << __FUNCTION__ << ": endpoints have been successfully validated." << std::endl;
-
- //
- // It is arguable that it might be better to do the shutting down check before going through all the casting work.
- // This order has been chosen as the work above is actually an argument validation check that will be required
- // for all running bridges and it was felt that it was better to arrange things so the lock was not obtained
- // under normal running conditions for the check. The alternative would be to do two locks, one to check the
- // shutdown state and then anothert to modify the state.. but that that is probably more costly still.
- //
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- if(mShuttingDown)
- {
- mLogger.getDebugStream() << __FUNCTION__ << ": called when shutting down." << std::endl;
- throw Hydra::System::Component::V1::ShuttingDown();
- }
- if(mSuspended)
- {
- mLogger.getDebugStream() << __FUNCTION__ << ": called when suspended." << std::endl;
- throw Hydra::System::Component::V1::Suspended();
- }
- reap();
-
- Hydra::BridgeService::BridgeImplPtr bridge = new Hydra::BridgeService::BridgeImpl(mAdapter, adminEp, eps, mgr, mBridgeEvents);
- Ice::ObjectPrx obj = mAdapter->addWithUUID(bridge);
- mAdapter->addFacet(bridge->signalCB(), obj->ice_getIdentity(), "SignalCallback");
- mBridges.push_back(bridge);
- mLogger.getInfoStream() << current.adapter->getCommunicator()->identityToString(current.id) << ": creating new bridge " << obj->ice_toString() << "." << std::endl;
- return Hydra::Core::Bridging::V1::BridgePrx::uncheckedCast(obj);
-}
-
-void Hydra::BridgeService::BridgeFactoryImpl::shutdown(const Ice::Current& current)
-{
- mLogger.getTraceStream() << __FUNCTION__ << ":" << current.adapter->getCommunicator()->identityToString(current.id) << std::endl;
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- if(mShuttingDown)
- {
- mLogger.getDebugStream() << __FUNCTION__ << ": called when shutting down." << std::endl;
- return;
- }
- if(mSuspended)
- {
- mLogger.getDebugStream() << __FUNCTION__ << ": called when suspended." << std::endl;
- throw Hydra::System::Component::V1::Suspended();
- }
- mLogger.getInfoStream() << current.adapter->getCommunicator()->identityToString(current.id) << ": shutting down." << std::endl;
- mShuttingDown = true;
- reap();
- std::for_each(mBridges.begin(), mBridges.end(), Hydra::BridgeService::ShutdownFunctor(current));
-
- mAdapter->getCommunicator()->shutdown();
-}
-
-void Hydra::BridgeService::BridgeFactoryImpl::suspend(const Ice::Current& current)
-{
- mLogger.getTraceStream() << __FUNCTION__ << ":" << current.adapter->getCommunicator()->identityToString(current.id) << std::endl;
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- if(mShuttingDown)
- {
- mLogger.getDebugStream() << __FUNCTION__ << ": called when shutting down." << std::endl;
- throw Hydra::System::Component::V1::ShuttingDown();
- }
- if(mSuspended)
- {
- mLogger.getDebugStream() << __FUNCTION__ << ": called when suspended." << std::endl;
- return;
- }
- mLogger.getInfoStream() << current.adapter->getCommunicator()->identityToString(current.id) << ": suspending." << std::endl;
- mSuspended = true;
-}
-
-void Hydra::BridgeService::BridgeFactoryImpl::resume(const Ice::Current& current)
-{
- mLogger.getTraceStream() << __FUNCTION__ << ":" << current.adapter->getCommunicator()->identityToString(current.id) << std::endl;
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- if(mShuttingDown)
- {
- mLogger.getDebugStream() << __FUNCTION__ << ": called when shutting down." << std::endl;
- throw Hydra::System::Component::V1::ShuttingDown();
- }
- if(!mSuspended)
- {
- mLogger.getDebugStream() << __FUNCTION__ << ": called when suspended." << std::endl;
- return;
- }
- mLogger.getInfoStream() << current.adapter->getCommunicator()->identityToString(current.id) << ": resuming." << std::endl;
- mSuspended = false;
-}
-
-void Hydra::BridgeService::BridgeFactoryImpl::reap()
-{
- mLogger.getDebugStream() << __FUNCTION__ << ": reaping bridge set of " << mBridges.size() << " bridges." << std::endl;
- for(std::vector<Hydra::BridgeService::BridgeImplPtr>::iterator i = mBridges.begin(); i != mBridges.end(); ++i)
- {
- if((*i)->destroyed())
- {
- std::vector<Hydra::BridgeService::BridgeImplPtr>::iterator t = i;
- mBridges.erase(t);
- }
- }
- mLogger.getDebugStream() << __FUNCTION__ << ": reaping completed, bridge set size is now " << mBridges.size() << "." << std::endl;
-}
diff --git a/src/BridgeFactoryImpl.h b/src/BridgeFactoryImpl.h
deleted file mode 100644
index eeb2dea..0000000
--- a/src/BridgeFactoryImpl.h
+++ /dev/null
@@ -1,55 +0,0 @@
-#pragma once
-#ifndef __HYDRA_BRIDGE_FACTORY_IMPL_H
-#define __HYDRA_BRIDGE_FACTORY_IMPL_H
-
-#include <Core/Bridging/BridgeServiceIf.h>
-#include <Core/Bridging/BridgeServiceEventsIf.h>
-#include <boost/thread/shared_mutex.hpp>
-#include <vector>
-
-#include "BridgeImpl.h"
-#include "Logger.h"
-
-namespace Hydra
-{
- namespace BridgeService
-{
- class BridgeFactoryImpl : public Core::Bridging::V1::BridgeFactory
- {
- public:
-
- BridgeFactoryImpl(Ice::ObjectAdapterPtr adapter, const Core::Bridging::V1::BridgeEventsPrx& events);
-
- //
- // Hydra::Core::Bridging::V1::BridgeFactory Interface
- //
- Core::Bridging::V1::BridgePrx createBridge(
- const Core::Endpoint::V1::BaseEndpointPtr& ep,
- const Core::Endpoint::V1::EndpointSeq& endpoints,
- const Core::Bridging::V1::BridgeMonitorPrx& monitor,
- const Ice::Current& current);
-
- //
- // Hydra::System::Component::V1::ComponentService Interface.
- //
- void suspend(const Ice::Current& current);
- void resume(const Ice::Current& current);
- void shutdown(const Ice::Current& current);
-
- private:
-
- boost::shared_mutex mLock;
- std::vector<BridgeImplPtr> mBridges;
- bool mShuttingDown;
- bool mSuspended;
- Ice::ObjectAdapterPtr mAdapter;
- Logger mLogger;
- Core::Bridging::V1::BridgeEventsPrx mBridgeEvents;
-
- void reap();
- };
-
- typedef IceUtil::Handle<BridgeFactoryImpl> BridgeFactoryImplPtr;
-};
-};
-#endif
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index fe66533..ee1106a 100644
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -1,451 +1,327 @@
#include "BridgeImpl.h"
#include <System/Component/ComponentServiceIf.h>
+#include <SessionCommunications/SessionCommunicationsIf.h>
#include <Ice/Ice.h>
#include <memory>
+#include <algorithm>
#include <boost/thread/locks.hpp>
+
+//
+// Compiled in constants.
+// TODO: Replace with configuration!
+//
+
+static const std::string TopicPrefix("AsteriskSCF.Bridge.");
+
//
// TODO:
-// Operations that are performed on all bridge endpoints might be better done as AMI requests.
+// Operations that are performed on all bridge sessions might be better done as AMI requests.
//
-namespace Hydra
+namespace AsteriskSCF
{
namespace BridgeService
{
- class EventTopicWrapper
+ //
+ // Functor to support using for_each on shutdown.
+ //
+ class ShutdownImpl : public std::unary_function<BridgeImpl::BridgeSession, void>
{
public:
- EventTopicWrapper(const Core::Bridging::V1::BridgeEventsPrx& ev) :
- mEvents(ev)
- {
- }
-
- void endpointAdded(const Core::Endpoint::V1::BaseEndpointPtr& ep)
- {
- if(mEvents)
- {
- try
- {
- mEvents->endpointAdded(ep);
- }
- catch(const Ice::Exception& ex)
- {
- mLogger.getDebugStream() << __FUNCTION__ << ": publishing failed with " << ex << std::endl;
- }
- }
- }
-
- void endpointRemoved(const Core::Endpoint::V1::BaseEndpointPtr& ep)
- {
- if(mEvents)
- {
- try
- {
- mEvents->endpointRemoved(ep);
- }
- catch(const Ice::Exception& ex)
- {
- mLogger.getDebugStream() << __FUNCTION__ << ": publishing failed with " << ex << std::endl;
- }
- }
- }
-
- void shuttingDown()
+ ShutdownImpl(const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response) :
+ mResponse(response)
{
- if(mEvents)
- {
- try
- {
- mEvents->shuttingDown();
- }
- catch(const Ice::Exception& ex)
- {
- mLogger.getDebugStream() << __FUNCTION__ << ": publishing failed with " << ex << std::endl;
- }
- }
}
- void stopped()
+ void operator()(const BridgeImpl::BridgeSession& b)
{
- if(mEvents)
+ b.session->stop(mResponse);
+ if(b.connector)
{
- try
- {
- mEvents->stopped();
- }
- catch(const Ice::Exception& ex)
- {
- mLogger.getDebugStream() << __FUNCTION__ << ": publishing failed with " << ex << std::endl;
- }
+ b.connector->unplug();
}
}
-
private:
- Core::Bridging::V1::BridgeEventsPrx mEvents;
- Logger mLogger;
+ AsteriskSCF::SessionCommunications::V1::ResponseCodePtr mResponse;
};
-
- //
- // Functor to support using for_each on shutdown.
- //
- class BridgeShutdownFunctor : public std::unary_function<BridgeImpl::BridgeEndpoint, void>
+
+ class ProgressingImpl : public std::unary_function<BridgeImpl::BridgeSession, void>
{
public:
- BridgeShutdownFunctor(const Core::Endpoint::V1::EndpointIdPtr& id) :
- mId(id)
+ ProgressingImpl(const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response) :
+ mResponse(response)
{
}
- void operator()(const BridgeImpl::BridgeEndpoint& b)
- {
- if(b.endpoint)
- {
- Hydra::Session::V1::SessionEndpointPtr p(Hydra::Session::V1::SessionEndpointPtr::dynamicCast(b.endpoint));
- assert(p != 0);
- if(p->callback)
- {
- p->command->terminate(mId);
- }
- if(b.connector)
- {
- b.connector->unplug();
- }
- }
+ void operator()(const BridgeImpl::BridgeSession& b)
+ {
+ b.session->progress(mResponse);
}
private:
- Core::Endpoint::V1::EndpointIdPtr mId;
+ AsteriskSCF::SessionCommunications::V1::ResponseCodePtr mResponse;
};
- //
- // Functor to support broadcasting busy notifications.
- //
- struct BusyImpl : public std::binary_function<Hydra::Core::Endpoint::V1::EndpointIdPtr, Hydra::Session::V1::SignalCallbackPrx, void>
+ struct RingImpl : public std::unary_function<BridgeImpl::BridgeSession, void>
{
- void operator()(const Hydra::Core::Endpoint::V1::EndpointIdPtr source, const Hydra::Session::V1::SignalCallbackPrx& p)
- {
- p->busy(source);
+ void operator()(const BridgeImpl::BridgeSession& b)
+ {
+ b.session->ring();
}
};
- struct RingImpl : public std::binary_function<Hydra::Core::Endpoint::V1::EndpointIdPtr, Hydra::Session::V1::SignalCallbackPrx, void>
+ struct FlashImpl : public std::unary_function<BridgeImpl::BridgeSession, void>
{
- void operator()(const Hydra::Core::Endpoint::V1::EndpointIdPtr source, const Hydra::Session::V1::SignalCallbackPrx& p)
- {
- p->ring(source);
+ void operator()(const BridgeImpl::BridgeSession& b)
+ {
+ b.session->flash();
}
};
-
- class BroadcasterBase : public std::unary_function<BridgeImpl::BridgeEndpoint, void>
+
+ struct HoldImpl : public std::unary_function<BridgeImpl::BridgeSession, void>
{
- public:
- BroadcasterBase(const Core::Endpoint::V1::EndpointIdPtr& id) :
- mId(id)
- {
- }
-
- void operator()(const BridgeImpl::BridgeEndpoint& b)
- {
- if(b.endpoint)
- {
- Hydra::Session::V1::SessionEndpointPtr p(Hydra::Session::V1::SessionEndpointPtr::dynamicCast(b.endpoint));
- assert(p != 0);
- if(p->callback)
- {
- doOp(p->callback);
- }
- }
-
- }
- protected:
- Core::Endpoint::V1::EndpointIdPtr mId;
-
- virtual void doOp(const Hydra::Session::V1::SignalCallbackPrx& p)
+ void operator()(const BridgeImpl::BridgeSession& b)
{
+ b.session->hold();
}
};
-
- template <class F>
- class Broadcaster : public BroadcasterBase
+
+ struct UnholdImpl : public std::unary_function<BridgeImpl::BridgeSession, void>
{
- public:
- Broadcaster(const Core::Endpoint::V1::EndpointIdPtr& id) :
- BroadcasterBase(id)
- {
- }
- protected:
- void doOp(const Hydra::Session::V1::SignalCallbackPrx& p)
+ void operator()(const BridgeImpl::BridgeSession& b)
{
- F f;
- f(mId, p);
+ b.session->unhold();
}
};
- template <class F>
- class BroadcasterWithResponseCode : public BroadcasterBase
+ class FindImpl : public std::unary_function<BridgeImpl::BridgeSession, bool>
{
public:
- BroadcasterWithResponseCode(const Core::Endpoint::V1::EndpointIdPtr& id, const Session::V1::ResponseCodePtr& responseCode) :
- BroadcasterBase(id),
- mResponseCode(responseCode)
+ FindImpl(const AsteriskSCF::SessionCommunications::V1::SessionPrx& prx) :
+ mPrx(prx)
{
}
-
- protected:
- Session::V1::ResponseCodePtr mResponseCode;
- void doOp(const Hydra::Session::V1::SignalCallbackPrx& p)
+
+ bool operator()(const BridgeImpl::BridgeSession& b)
{
- F f;
- f(mId, p, mResponseCode);
+ return b.session == mPrx;
}
+ private:
+ AsteriskSCF::SessionCommunications::V1::SessionPrx mPrx;
};
- class DefaultBridgeSignallingCallback : public Session::V1::SignalCallback
+
+ //
+ // For events that require modification to the bridge, we use helper methods on the bridge itself.
+ // For events result in distribution to the bridge sessions, we copy the current sessions and
+ // run the calls from the listener itself.
+ //
+ class SessionListener : public AsteriskSCF::SessionCommunications::V1::SessionListener
{
public:
-
- DefaultBridgeSignallingCallback(const BridgeImplPtr& bridge) :
- mBridge(bridge)
- {
- }
-
- void ring(const Core::Endpoint::V1::EndpointIdPtr& p, const Ice::Current&)
+ SessionListener(const BridgeImplPtr& b) :
+ mBridge(b)
{
- mBridge->endpointRinging(p);
- }
-
- void connected(const Core::Endpoint::V1::EndpointIdPtr& p, const Ice::Current&)
- {
- //
- // It's here where the actual connection of then endpoint needs to be setup.
- //
- mBridge->endpointConnected(p);
}
- void terminated(const Core::Endpoint::V1::EndpointIdPtr& p, const Session::V1::ResponseCodePtr& r, const Ice::Current& current)
+ void connected(const AsteriskSCF::SessionCommunications::V1::SessionPrx& source, const Ice::Current&)
{
- if(mBridge->endpointTerminated(p, r, current) == 1)
- {
- mBridge->shutdown(current);
- }
+ mBridge->sessionConnected(source);
}
- void busy(const Core::Endpoint::V1::EndpointIdPtr& p, const Ice::Current&)
+ void flashed(const AsteriskSCF::SessionCommunications::V1::SessionPrx& source, const Ice::Current&)
{
- mBridge->endpointBusy(p);
}
- void congestion(const Core::Endpoint::V1::EndpointIdPtr& p, const Session::V1::ResponseCodePtr& r, const Ice::Current&)
+ void held(const AsteriskSCF::SessionCommunications::V1::SessionPrx& source, const Ice::Current&)
{
}
- void hold(const Core::Endpoint::V1::EndpointIdPtr& p, const Ice::Current&)
+ void progressing(const AsteriskSCF::SessionCommunications::V1::SessionPrx& source, const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response, const Ice::Current&)
{
}
- void unhold(const Core::Endpoint::V1::EndpointIdPtr& p, const Ice::Current&)
+ void ringing(const AsteriskSCF::SessionCommunications::V1::SessionPrx& source, const Ice::Current&)
{
+ std::vector<BridgeImpl::BridgeSession> sessions(mBridge->currentSessions());
+ std::for_each(sessions.begin(), sessions.end(), RingImpl());
}
- void flash(const Core::Endpoint::V1::EndpointIdPtr& p, const Ice::Current&)
+ void stopped(const AsteriskSCF::SessionCommunications::V1::SessionPrx& source, const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response, const Ice::Current& current)
{
+ size_t endpointCount = mBridge->sessionStopped(source, response);
+ if(endpointCount < 2)
+ {
+ mBridge->shutdown(current);
+ }
}
- void progress(const Core::Endpoint::V1::EndpointIdPtr& p, const Session::V1::ResponseCodePtr& r, const Ice::Current&)
+ void unheld(const AsteriskSCF::SessionCommunications::V1::SessionPrx& source, const Ice::Current&)
{
}
-
private:
BridgeImplPtr mBridge;
};
-
+
} // End of namespace BridgeService
-} // End of namespace Hydra
-
-static std::string endpointIdToString(const Hydra::Core::Endpoint::V1::EndpointIdPtr& id)
-{
- return id->endpointManagerId + ":" + id->destinationId;
-}
+} // End of namespace AsteriskSCF
-Hydra::BridgeService::BridgeImpl::BridgeImpl(
+AsteriskSCF::BridgeService::BridgeImpl::BridgeImpl(
const Ice::ObjectAdapterPtr& adapter,
- const Hydra::Session::V1::SessionEndpointPtr& adminEp,
- const Hydra::Session::V1::SessionEndpointSeq& initialEndpoints,
- const Hydra::Core::Bridging::V1::BridgeMonitorPrx& manager,
- const Hydra::Core::Bridging::V1::BridgeEventsPrx& ev
+ const AsteriskSCF::SessionCommunications::V1::SessionSeq& initialSessions,
+ const AsteriskSCF::SessionCommunications::Bridging::V1::BridgeListenerPrx& ev,
+ const AsteriskSCF::BridgeService::BridgeListenerMgrPtr& listenerMgr
) :
mState(Running),
- mMonitor(manager),
- mEvents(new Hydra::BridgeService::EventTopicWrapper(ev)),
- mAdminEndpointInList(true),
- mBridgeId(new Hydra::Core::Endpoint::V1::EndpointId),
- mObjAdapter(adapter)
+ mObjAdapter(adapter),
+ mListeners(listenerMgr),
+ mSessionListener(new SessionListener(this))
{
- if(adminEp)
- {
- mAdminEndpoint = std::auto_ptr<Hydra::BridgeService::BridgeImpl::BridgeEndpoint>(new Hydra::BridgeService::BridgeImpl::BridgeEndpoint(adminEp, 0));
- mAdminEndpoint->connector = mSplicer.connect(endpointIdToString(adminEp->id), adminEp->mediaSession);
- }
- for(Hydra::Session::V1::SessionEndpointSeq::const_iterator i = initialEndpoints.begin(); i != initialEndpoints.end(); ++i)
- {
- mQueuedEndpoints.insert(std::pair<std::string, Hydra::Session::V1::SessionEndpointPtr>(endpointIdToString((*i)->id), *i));
- }
- mSignallingServant = new DefaultBridgeSignallingCallback(this);
- mSignalling = Hydra::Session::V1::SignalCallbackPrx::uncheckedCast(mObjAdapter->addWithUUID(mSignallingServant));
- for(Hydra::Session::V1::SessionEndpointSeq::const_iterator i = initialEndpoints.begin(); i != initialEndpoints.end(); ++i)
+ mListeners->addListener(ev);
+ mSessionListenerPrx = AsteriskSCF::SessionCommunications::V1::SessionListenerPrx::uncheckedCast(mObjAdapter->addWithUUID(mSessionListener));
+
+ for(AsteriskSCF::SessionCommunications::V1::SessionSeq::const_iterator i = initialSessions.begin();
+ i != initialSessions.end(); ++i)
{
- (*i)->command->call(mBridgeId, (*i)->id, mSignalling);
+ AsteriskSCF::SessionCommunications::V1::SessionInfoPtr info = (*i)->addListener(mSessionListenerPrx);
+ //
+ // We need to define these states! Especially the ones that define when start is called or not.
+ //
+ if(info->currentState == "ready")
+ {
+ (*i)->start();
+ mSessions.push_back(BridgeSession(*i, 0));
+ }
+ else
+ {
+ mSessions.push_back(BridgeSession(*i, mSplicer.connect(*i)));
+ }
}
+ mListeners->sessionsAdded(initialSessions);
}
-Hydra::BridgeService::BridgeImpl::~BridgeImpl()
+AsteriskSCF::BridgeService::BridgeImpl::~BridgeImpl()
{
try
{
- mObjAdapter->remove(mSignalling->ice_getIdentity());
+ //
+ // We don't want to remove all listener activity unless we are definitively destroyed. Any other
+ // situations where the bridge servant might be released from memory might be a temporary condition
+ // where we may want events to hold until some kind of timeout causes the listener to be removed.
+ //
+ if(mState == Destroyed)
+ {
+ mObjAdapter->remove(mSessionListenerPrx->ice_getIdentity());
+ }
}
catch(...)
{
}
}
-void Hydra::BridgeService::BridgeImpl::addEndpoint(const Hydra::Core::Endpoint::V1::BaseEndpointPtr& ep, const Ice::Current& current)
+void AsteriskSCF::BridgeService::BridgeImpl::addSessions(const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions, const Ice::Current& current)
{
- mLogger.getTraceStream() << __FUNCTION__ << ":" << current.adapter->getCommunicator()->identityToString(current.id) << std::endl;
- if(ep == 0 || ep->id == 0)
- {
- mLogger.getDebugStream() << __FUNCTION__ << ": attempting to null endpoint or endpoint with invalid id." << std::endl;
- throw Hydra::Core::Endpoint::V1::InvalidEndpointId(new Hydra::Core::Endpoint::V1::EndpointId);
- }
-
- if(!checkEndpointId(*ep->id))
- {
- mLogger.getDebugStream() << __FUNCTION__ << ": attempting to add endpoint with invalid id " << ep->id->endpointManagerId << ":" << ep->id->destinationId << "." << std::endl;
- throw Hydra::Core::Endpoint::V1::InvalidEndpointId(ep->id);
- }
-
- Hydra::Session::V1::SessionEndpointPtr newEndpoint(Hydra::Session::V1::SessionEndpointPtr::dynamicCast(ep));
- if(!newEndpoint)
- {
- mLogger.getDebugStream() << __FUNCTION__ << ": endpoint with id " << ep->id->endpointManagerId << ":" << ep->id->destinationId << " failed dynamic cast." << std::endl;
- throw Hydra::Core::Bridging::V1::UnsupportedEndpoint(ep->id);
- }
+ AsteriskSCF::SessionCommunications::V1::SessionSeq addedSessions;
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
statePreCheck();
- std::vector<Hydra::BridgeService::BridgeImpl::BridgeEndpoint>::iterator i = find(ep->id);
- if(i != mEndpoints.end())
+ for(AsteriskSCF::SessionCommunications::V1::SessionSeq::const_iterator i = sessions.begin();
+ i != sessions.end(); ++i)
{
- mLogger.getDebugStream() << __FUNCTION__ << ": endpoint with id " << ep->id->endpointManagerId << ":" << ep->id->destinationId << " already registered." << std::endl;
- throw Hydra::Core::Bridging::V1::EndpointAlreadyRegistered(ep->id);
- }
-
- if(mMonitor)
- {
- bool result = mMonitor->onAddEndpoint(ep);
- mLogger.getDebugStream() << __FUNCTION__ << "onAddEndpoint() returned " << result << std::endl;
- }
+ //
+ // TODO: how do we want to handle sessions that have already been added to the bridge. Its pretty much
+ // impossible to guard against race conditions where multiple call paths might want to add a session
+ // more than once for some reason. We should probably just log it and move on.
+ //
+ std::vector<BridgeSession>::iterator j = find_if(mSessions.begin(), mSessions.end(), AsteriskSCF::BridgeService::FindImpl(*i));
+ if(j != mSessions.end())
+ {
+ continue;
+ }
- if(newEndpoint->command)
- {
- mQueuedEndpoints.insert(std::pair<std::string, Hydra::Session::V1::SessionEndpointPtr>(endpointIdToString(newEndpoint->id), newEndpoint));
- }
- else
- {
- mEndpoints.push_back(BridgeEndpoint(newEndpoint, mSplicer.connect(endpointIdToString(newEndpoint->id), newEndpoint->mediaSession)));
+ AsteriskSCF::SessionCommunications::V1::SessionInfoPtr info = (*i)->addListener(mSessionListenerPrx);
+ //
+ // We need to define these states! Especially the ones that define when start is called or not.
+ //
+ if(info->currentState == "ready")
+ {
+ (*i)->start();
+ mSessions.push_back(BridgeSession(*i, 0));
+ }
+ else
+ {
+ mSessions.push_back(BridgeSession(*i, mSplicer.connect(*i)));;
+ }
+
+ addedSessions.push_back(*i);
}
}
- //
- // Don't perform the RPC with the lock held!
- //
- if(newEndpoint->command)
+ if(addedSessions.size())
{
- try
- {
- newEndpoint->command->call(mBridgeId, newEndpoint->id, mSignalling);
- }
- catch(...)
- {
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- mQueuedEndpoints.erase(endpointIdToString(newEndpoint->id));
- throw;
- }
+ mListeners->sessionsAdded(addedSessions);
}
-
- mLogger.getDebugStream() << __FUNCTION__ << ": bridge " << current.adapter->getCommunicator()->identityToString(current.id) <<
- " now has " << mEndpoints.size() << " endpoints." << std::endl;
- mEvents->endpointAdded(newEndpoint);
}
-void Hydra::BridgeService::BridgeImpl::removeEndpoint(const Hydra::Core::Endpoint::V1::EndpointIdPtr& ep, const Ice::Current& current)
+void AsteriskSCF::BridgeService::BridgeImpl::removeSessions(const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions, const Ice::Current& current)
{
- mLogger.getTraceStream() << __FUNCTION__ << ":" << current.adapter->getCommunicator()->identityToString(current.id) << std::endl;
- statePreCheck();
- if(ep == 0)
- {
- throw Hydra::Core::Endpoint::V1::InvalidEndpointId(0);
- }
- if(!checkEndpointId(*ep))
+ AsteriskSCF::SessionCommunications::V1::SessionSeq removedSessions;
{
- throw Hydra::Core::Endpoint::V1::InvalidEndpointId(ep);
- }
- std::vector<Hydra::BridgeService::BridgeImpl::BridgeEndpoint>::iterator i = find(ep);
- if(i == mEndpoints.end())
- {
- throw Hydra::Core::Bridging::V1::UnknownEndpoint(ep);
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ statePreCheck();
+ for(AsteriskSCF::SessionCommunications::V1::SessionSeq::const_iterator i = sessions.begin(); i != sessions.end(); ++i)
+ {
+ std::vector<BridgeSession>::iterator j = std::find_if(mSessions.begin(), mSessions.end(), AsteriskSCF::BridgeService::FindImpl(*i));
+ if(j != mSessions.end())
+ {
+ j->session->removeListener(mSessionListenerPrx);
+ j->connector->unplug();
+ mSessions.erase(j);
+ removedSessions.push_back(*i);
+ }
+ else
+ {
+ //
+ // TODO: how do we want to handle sessions that aren't there. Its pretty much impossible to guard against race conditions where
+ // an expected session may no longer be present (device hung up, etc). In other words, this should probably be expected and maybe
+ // just logged.
+ //
+ }
+ }
}
- if(mMonitor)
+ if(removedSessions.size() != 0)
{
- bool result = mMonitor->onRemoveEndpoint(i->endpoint);
- mLogger.getDebugStream() << __FUNCTION__ << "onRemoveEndpoint() returned " << result << std::endl;
+ mListeners->sessionsRemoved(removedSessions);
}
-
- mEvents->endpointRemoved(i->endpoint);
- if(i->endpoint->command)
+ //
+ // TODO: Should be policy driven.
+ //
+ if(mSessions.size() < 2)
{
- i->endpoint->command->terminate(mBridgeId);
+ shutdown(current);
}
- i->connector->unplug();
- mEndpoints.erase(i);
-
- mLogger.getDebugStream() << __FUNCTION__ << ": bridge " << current.adapter->getCommunicator()->identityToString(current.id) <<
- " now has " << mEndpoints.size() << " endpoints." << std::endl;
}
-Hydra::Core::Endpoint::V1::EndpointSeq Hydra::BridgeService::BridgeImpl::listEndpoints(const Ice::Current& current)
+AsteriskSCF::SessionCommunications::V1::SessionSeq AsteriskSCF::BridgeService::BridgeImpl::listSessions(const Ice::Current& current)
{
mLogger.getTraceStream() << __FUNCTION__ << ":" << current.adapter->getCommunicator()->identityToString(current.id) << std::endl;
boost::shared_lock<boost::shared_mutex> lock(mLock);
statePreCheck();
- mLogger.getDebugStream() << __FUNCTION__ << ": working with " << mEndpoints.size() << " endpoints." << std::endl;
-
- Hydra::Core::Endpoint::V1::EndpointSeq eps;
- if(mAdminEndpointInList && mAdminEndpoint.get() != 0)
- {
- mLogger.getDebugStream() << __FUNCTION__ << ": adding admin endpoint to result set." << std::endl;
- eps.push_back(mAdminEndpoint->endpoint);
- }
- for(std::vector<Hydra::BridgeService::BridgeImpl::BridgeEndpoint>::const_iterator i = mEndpoints.begin(); i != mEndpoints.end(); ++i)
- {
- eps.push_back(i->endpoint);
- }
- if(mMonitor)
+ mLogger.getDebugStream() << __FUNCTION__ << ": working with " << mSessions.size() << " sessions." << std::endl;
+ AsteriskSCF::SessionCommunications::V1::SessionSeq result;
+ for(std::vector<AsteriskSCF::BridgeService::BridgeImpl::BridgeSession>::const_iterator i = mSessions.begin(); i != mSessions.end(); ++i)
{
- bool result = mMonitor->onListEndpoints(eps, eps);
- mLogger.getDebugStream() << __FUNCTION__ << "onListEndpoints() returned " << result << std::endl;
+ result.push_back(i->session);
}
- mLogger.getDebugStream() << __FUNCTION__ << ": returning " << eps.size() << " endpoints." << std::endl;
- return eps;
+ return result;
}
-void Hydra::BridgeService::BridgeImpl::shutdown(const Ice::Current& current)
+void AsteriskSCF::BridgeService::BridgeImpl::shutdown(const Ice::Current& current)
{
//
// When shutting down, the bridge makes a copy of its current state and unlocks, proceeding with
// no other internal locks.
//
- std::vector<BridgeEndpoint> copyOfEndpoints;
- Hydra::Session::V1::SignalCommandsPrx adminCmd;
+ std::vector<BridgeSession> copyOfSessions;
mLogger.getTraceStream() << __FUNCTION__ << ":" << current.adapter->getCommunicator()->identityToString(current.id) << std::endl;
{
@@ -461,49 +337,33 @@ void Hydra::BridgeService::BridgeImpl::shutdown(const Ice::Current& current)
throw Ice::ObjectNotExistException(__FILE__, __LINE__);
}
mState = ShuttingDown;
- std::swap(copyOfEndpoints, mEndpoints);
- if(mAdminEndpoint.get() != 0 && mAdminEndpoint->endpoint->command)
- {
- adminCmd = mAdminEndpoint->endpoint->command;
- }
+ std::swap(copyOfSessions, mSessions);
}
- if(mMonitor)
- {
- bool result = mMonitor->onShutdown();
- mLogger.getDebugStream() << __FUNCTION__ << "onShuttingDown() returned " << result << std::endl;
- }
- mEvents->shuttingDown();
+ mListeners->stopping();
//
// TODO: Response code for termination messages for bridges shutting down should come from configuration
//
- std::for_each(copyOfEndpoints.begin(), copyOfEndpoints.end(), Hydra::BridgeService::BridgeShutdownFunctor(mBridgeId));
- try
- {
- adminCmd->terminate(mBridgeId);
- }
- catch(const Ice::Exception&)
- {
- //
- // Swallow this. The admin endpoint going away might be the cause for the shutdown, in which case an exception from a terminate request would
- // be expected.
- //
- }
-
- //
- // Now call terminate on the bridge endpoint.
- //
+ std::for_each(copyOfSessions.begin(), copyOfSessions.end(),
+ AsteriskSCF::BridgeService::ShutdownImpl(new AsteriskSCF::SessionCommunications::V1::ResponseCode));
mLogger.getInfoStream() << current.adapter->getCommunicator()->identityToString(current.id) << ": is shutdown." << std::endl;
- mEvents->stopped();
- if(mMonitor)
- {
- bool result = mMonitor->onShutdownComplete();
- mLogger.getDebugStream() << __FUNCTION__ << "onShutdownComplete() returned " << result << std::endl;
- }
+ mListeners->stopped();
}
-void Hydra::BridgeService::BridgeImpl::destroy(const Ice::Current& current)
+void AsteriskSCF::BridgeService::BridgeImpl::addListener(const AsteriskSCF::SessionCommunications::Bridging::V1::BridgeListenerPrx& listener,
+ const Ice::Current&)
+{
+ mListeners->addListener(listener);
+}
+
+void AsteriskSCF::BridgeService::BridgeImpl::removeListener(const AsteriskSCF::SessionCommunications::Bridging::V1::BridgeListenerPrx& listener,
+ const Ice::Current&)
+{
+ mListeners->removeListener(listener);
+}
+
+void AsteriskSCF::BridgeService::BridgeImpl::destroy(const Ice::Current& current)
{
{
mLogger.getTraceStream() << __FUNCTION__ << ":" << current.adapter->getCommunicator()->identityToString(current.id) << std::endl;
@@ -511,148 +371,68 @@ void Hydra::BridgeService::BridgeImpl::destroy(const Ice::Current& current)
if(mState == ShuttingDown)
{
mLogger.getDebugStream() << __FUNCTION__ << ": called when shutting down." << std::endl;
- throw Hydra::System::Component::V1::ShuttingDown();
+ throw AsteriskSCF::System::Component::V1::ShuttingDown();
}
if(mState == Destroyed)
{
mLogger.getDebugStream() << __FUNCTION__ << ": called when destroyed." << std::endl;
throw Ice::ObjectNotExistException(__FILE__, __LINE__);
}
- if(mMonitor)
- {
- bool result = mMonitor->onDestroy();
- mLogger.getDebugStream() << __FUNCTION__ << "onDestroy() returned " << result << std::endl;
- }
mState = Destroyed;
mLogger.getInfoStream() << current.adapter->getCommunicator()->identityToString(current.id) << ": is now destroyed." << std::endl;
- mEvents->stopped();
+ mListeners->stopped();
}
+
//
// Last act is to remove the servant from the map.
//
mObjAdapter->remove(current.id);
}
-bool Hydra::BridgeService::BridgeImpl::destroyed()
+bool AsteriskSCF::BridgeService::BridgeImpl::destroyed()
{
boost::shared_lock<boost::shared_mutex> lock(mLock);
mLogger.getDebugStream() << __FUNCTION__ << ": " << (mState == Destroyed ? "yes, I am destroyed." : "no, I am not destroyed") << std::endl;
return mState == Destroyed;
}
-void Hydra::BridgeService::BridgeImpl::endpointConnected(const Hydra::Core::Endpoint::V1::EndpointIdPtr& id)
+void AsteriskSCF::BridgeService::BridgeImpl::sessionConnected(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session)
{
- mLogger.getDebugStream() << __FUNCTION__ << ": endpoint connected " << endpointIdToString(id) << std::endl;
+ mLogger.getDebugStream() << __FUNCTION__ << ": session connected " << session->ice_toString() << std::endl;
boost::unique_lock<boost::shared_mutex> lock(mLock);
- std::map<std::string, Hydra::Session::V1::SessionEndpointPtr>::iterator i = mQueuedEndpoints.find(endpointIdToString(id));
- if(i != mQueuedEndpoints.end())
+ std::vector<BridgeSession>::iterator i = find_if(mSessions.begin(), mSessions.end(), AsteriskSCF::BridgeService::FindImpl(session));
+ if(i != mSessions.end())
{
- mEndpoints.push_back(BridgeEndpoint(i->second, mSplicer.connect(endpointIdToString(id), i->second->mediaSession)));
- mQueuedEndpoints.erase(i);
- }
- if(mAdminEndpoint.get() != 0 && mAdminEndpoint->endpoint && mAdminEndpoint->endpoint->callback)
- {
- mAdminEndpoint->endpoint->callback->connected(id);
+ i->connector = mSplicer.connect(session);
}
}
-size_t Hydra::BridgeService::BridgeImpl::endpointTerminated(const Hydra::Core::Endpoint::V1::EndpointIdPtr& id,
- const Hydra::Session::V1::ResponseCodePtr& response,
- const Ice::Current& current)
+size_t AsteriskSCF::BridgeService::BridgeImpl::sessionStopped(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session,
+ const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response)
{
- mLogger.getDebugStream() << __FUNCTION__ << ": endpoint terminated from " << endpointIdToString(id) << std::endl;
- size_t endpointCount(0);
-
- Hydra::Session::V1::SignalCallbackPrx callback;
+ mLogger.getDebugStream() << __FUNCTION__ << ": session terminated from " << session->ice_toString() << std::endl;
+ session->removeListener(mSessionListenerPrx);
- //
- // Find out if we have an endpoint we need to "unplug"
- //
- std::auto_ptr<BridgeEndpoint> ep;
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ std::vector<BridgeSession>::iterator i = find_if(mSessions.begin(), mSessions.end(), AsteriskSCF::BridgeService::FindImpl(session));
+ if(i != mSessions.end())
{
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- ep = removeEndpointImpl(id);
- endpointCount = mEndpoints.size();
- if(ep.get() == 0)
- {
- mLogger.getDebugStream() << __FUNCTION__ << ": unable to locate terminated endpoint in list of known endpoints! Checking for match on admin endpoint" << std::endl;
- if(mAdminEndpoint.get() != 0)
- {
- if(id->endpointManagerId == mAdminEndpoint->endpoint->id->endpointManagerId && id->destinationId == mAdminEndpoint->endpoint->id->destinationId)
- {
- mLogger.getDebugStream() << __FUNCTION__ << ": terminated endpoint is the admin endpoint. Resetting admin endpoint!" << std::endl;
- ep = mAdminEndpoint;
- mAdminEndpoint.reset(0);
- }
- }
- }
- if(mAdminEndpoint.get() != 0)
+ if(i->connector)
{
- callback = mAdminEndpoint->endpoint->callback;
- ++endpointCount;
+ i->connector->unplug();
}
+ mSessions.erase(i);
}
- if(callback)
- {
- callback->terminated(id, response);
- }
-
- //
- // This endpoint is now "out there" and not known to any other part of the bridge. Disconnecting the media is the last thing we can do.
- //
- if(ep.get() != 0 && ep->connector)
- {
- try
- {
- ep->connector->unplug();
- }
- catch(const Ice::Exception&)
- {
- }
- }
- return endpointCount;
+ return mSessions.size();
}
-void Hydra::BridgeService::BridgeImpl::endpointRinging(const Hydra::Core::Endpoint::V1::EndpointIdPtr& id)
-{
- mLogger.getDebugStream() << __FUNCTION__ << ": endpoint ringing from " << endpointIdToString(id) << std::endl;
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- if(mAdminEndpoint.get() != 0)
- {
- mAdminEndpoint->endpoint->callback->ring(id);
- }
- else
- {
- std::for_each(mEndpoints.begin(), mEndpoints.end(), Hydra::BridgeService::Broadcaster<Hydra::BridgeService::RingImpl>(id));
- }
-}
-
-void Hydra::BridgeService::BridgeImpl::endpointBusy(const Hydra::Core::Endpoint::V1::EndpointIdPtr& id)
-{
- mLogger.getDebugStream() << __FUNCTION__ << ": busy callback from " << endpointIdToString(id) << std::endl;
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- if(mAdminEndpoint.get() != 0)
- {
- mAdminEndpoint->endpoint->callback->busy(id);
- }
- else
- {
- std::for_each(mEndpoints.begin(), mEndpoints.end(), Hydra::BridgeService::Broadcaster<Hydra::BridgeService::BusyImpl>(id));
- }
-}
-
-Hydra::Session::V1::SignalCallbackPtr Hydra::BridgeService::BridgeImpl::signalCB()
-{
- return mSignallingServant;
-}
-
-void Hydra::BridgeService::BridgeImpl::statePreCheck()
+void AsteriskSCF::BridgeService::BridgeImpl::statePreCheck()
{
if(mState == ShuttingDown)
{
mLogger.getDebugStream() << __FUNCTION__ << ": called when shutting down." << std::endl;
- throw Hydra::System::Component::V1::ShuttingDown();
+ throw AsteriskSCF::System::Component::V1::ShuttingDown();
}
if(mState == Destroyed)
{
@@ -661,28 +441,9 @@ void Hydra::BridgeService::BridgeImpl::statePreCheck()
}
}
-std::vector<Hydra::BridgeService::BridgeImpl::BridgeEndpoint>::iterator Hydra::BridgeService::BridgeImpl::find(const Hydra::Core::Endpoint::V1::EndpointIdPtr& e)
+std::vector<AsteriskSCF::BridgeService::BridgeImpl::BridgeSession> AsteriskSCF::BridgeService::BridgeImpl::currentSessions()
{
- mLogger.getDebugStream() << __FUNCTION__ << ": searching endpoints for " << e->endpointManagerId << ":" << e->destinationId << "." << std::endl;
- for(std::vector<Hydra::BridgeService::BridgeImpl::BridgeEndpoint>::iterator i = mEndpoints.begin(); i != mEndpoints.end(); ++i)
- {
- if(e->endpointManagerId == i->endpoint->id->endpointManagerId && e->destinationId == i->endpoint->id->destinationId)
- {
- return i;
- }
- }
- mLogger.getDebugStream() << __FUNCTION__ << ": endpoint " << e->endpointManagerId << ":" << e->destinationId << " not found." << std::endl;
- return mEndpoints.end();
-}
-
-std::auto_ptr<Hydra::BridgeService::BridgeImpl::BridgeEndpoint> Hydra::BridgeService::BridgeImpl::removeEndpointImpl(const Hydra::Core::Endpoint::V1::EndpointIdPtr& e)
-{
- std::vector<Hydra::BridgeService::BridgeImpl::BridgeEndpoint>::iterator i = find(e);
- if(i != mEndpoints.end())
- {
- std::auto_ptr<Hydra::BridgeService::BridgeImpl::BridgeEndpoint> result(new BridgeEndpoint(*i));
- mEndpoints.erase(i);
- return result;
- }
- return std::auto_ptr<Hydra::BridgeService::BridgeImpl::BridgeEndpoint>(0);
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ return mSessions;
}
+
diff --git a/src/BridgeImpl.h b/src/BridgeImpl.h
index 3d544b0..242babc 100644
--- a/src/BridgeImpl.h
+++ b/src/BridgeImpl.h
@@ -1,114 +1,99 @@
+/*
+* Asterisk Scalable Communications Framework
+*
+* Copyright (C) 2010 -- Digium, Inc.
+*
+* All rights reserved.
+*/
#pragma once
-#ifndef __HYDRA_BRIDGE_SERVICE_IMPL_H
-#define __HYDRA_BRIDGE_SERVICE_IMPL_H
-#include <Core/Bridging/BridgeServiceIf.h>
-#include <Core/Bridging/BridgeServiceEventsIf.h>
-#include <Session/SessionIf.h>
+#include <SessionCommunications/Bridging/BridgingIf.h>
+#include <SessionCommunications/SessionCommunicationsIf.h>
#include <boost/thread/shared_mutex.hpp>
#include <vector>
#include "Logger.h"
#include "MediaSplicer.h"
+#include "BridgeListenerMgr.h"
-namespace Hydra
+namespace AsteriskSCF
{
namespace BridgeService
{
- inline bool checkEndpointId(const Core::Endpoint::V1::EndpointId& e)
+ //
+ // BridgeImpl is an implmentation of AsteriskSCF::Bridging::V1::Bridge.
+ //
+ class BridgeImpl : public SessionCommunications::Bridging::V1::Bridge
{
- if(e.endpointManagerId.size() == 0 || e.destinationId.size() == 0)
+ public:
+ struct BridgeSession
{
- return false;
- }
- return true;
- }
-
- class EventTopicWrapper;
-
- //
- // BridgeImpl is a session oriented bridge.
- //
- class BridgeImpl : public Core::Bridging::V1::Bridge
- {
- public:
- struct BridgeEndpoint
- {
- Session::V1::SessionEndpointPtr endpoint;
- MediaConnectorPtr connector;
-
- BridgeEndpoint(const Session::V1::SessionEndpointPtr& ep, const MediaConnectorPtr& con) :
- endpoint(ep),
- connector(con)
- {
- }
- };
-
- BridgeImpl(const Ice::ObjectAdapterPtr& objAdapter, const Session::V1::SessionEndpointPtr& adminEp, const Session::V1::SessionEndpointSeq& initialEndpoints,
- const Core::Bridging::V1::BridgeMonitorPrx& manager, const Core::Bridging::V1::BridgeEventsPrx& ev);
-
- ~BridgeImpl();
-
- //
- // Hydra::Core::Bridging::Bridge Interface
- //
- void addEndpoint(const Core::Endpoint::V1::BaseEndpointPtr& ep, const Ice::Current& current);
- void removeEndpoint(const Core::Endpoint::V1::EndpointIdPtr& ep, const Ice::Current& current);
- Core::Endpoint::V1::EndpointSeq listEndpoints(const Ice::Current& current);
- void shutdown(const Ice::Current& current);
- void destroy(const Ice::Current& current);
-
- //
- // Internal methods
- //
- bool destroyed();
-
- void endpointConnected(const Core::Endpoint::V1::EndpointIdPtr& id);
- size_t endpointTerminated(const Core::Endpoint::V1::EndpointIdPtr& id, const Session::V1::ResponseCodePtr& response, const Ice::Current&);
- void endpointRinging(const Core::Endpoint::V1::EndpointIdPtr& id);
- void endpointBusy(const Core::Endpoint::V1::EndpointIdPtr& id);
-
- Session::V1::SignalCallbackPtr signalCB();
-
- private:
-
- boost::shared_mutex mLock;
- enum ServiceStates
- {
- Running,
- ShuttingDown,
- Destroyed
- };
- ServiceStates mState;
-
- std::auto_ptr<BridgeEndpoint> mAdminEndpoint;
- std::vector<BridgeEndpoint> mEndpoints;
- Core::Bridging::V1::BridgeMonitorPrx mMonitor;
- std::auto_ptr<EventTopicWrapper> mEvents;
-
- std::map<std::string, Session::V1::SessionEndpointPtr> mQueuedEndpoints;
-
- MediaSplicer mSplicer;
-
- //
- // Policy values.
- //
- bool mAdminEndpointInList;
- Logger mLogger;
-
- Core::Endpoint::V1::EndpointIdPtr mBridgeId;
-
- Ice::ObjectAdapterPtr mObjAdapter;
- Session::V1::SignalCallbackPrx mSignalling;
- Session::V1::SignalCallbackPtr mSignallingServant;
-
- void statePreCheck();
- std::vector<BridgeEndpoint>::iterator find(const Core::Endpoint::V1::EndpointIdPtr& e);
-
- std::auto_ptr<BridgeEndpoint> removeEndpointImpl(const Core::Endpoint::V1::EndpointIdPtr& e);
- };
-
- typedef IceUtil::Handle<BridgeImpl> BridgeImplPtr;
+ SessionCommunications::V1::SessionPrx session;
+ MediaConnectorPtr connector;
+
+ BridgeSession(const SessionCommunications::V1::SessionPrx& s, const MediaConnectorPtr& con) :
+ session(s),
+ connector(con)
+ {
+ }
+ };
+
+ BridgeImpl(const Ice::ObjectAdapterPtr& objAdapter,
+ const SessionCommunications::V1::SessionSeq& initialSessions,
+ const SessionCommunications::Bridging::V1::BridgeListenerPrx& ev,
+ const AsteriskSCF::BridgeService::BridgeListenerMgrPtr& listenerMgr);
+
+ ~BridgeImpl();
+
+ //
+ // AsteriskSCF::SessionCommunications::Bridging::Bridge Interface
+ //
+ void addSessions(const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions, const Ice::Current& current);
+ void removeSessions(const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions, const Ice::Current& current);
+
+ AsteriskSCF::SessionCommunications::V1::SessionSeq listSessions(const Ice::Current&);
+ void shutdown(const Ice::Current& current);
+ void destroy(const Ice::Current& current);
+
+ void addListener(const AsteriskSCF::SessionCommunications::Bridging::V1::BridgeListenerPrx& listener, const Ice::Current& current);
+ void removeListener(const AsteriskSCF::SessionCommunications::Bridging::V1::BridgeListenerPrx& listener, const Ice::Current& current);
+
+ //
+ // Internal methods
+ //
+ bool destroyed();
+
+ void sessionConnected(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session);
+ size_t sessionStopped(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session,
+ const SessionCommunications::V1::ResponseCodePtr& response);
+
+ std::vector<BridgeSession> currentSessions();
+
+ private:
+
+ boost::shared_mutex mLock;
+ enum ServiceStates
+ {
+ Running,
+ ShuttingDown,
+ Destroyed
+ };
+ ServiceStates mState;
+
+ std::vector<BridgeSession> mSessions;
+
+ MediaSplicer mSplicer;
+ Logger mLogger;
+
+ const std::string mName;
+ Ice::ObjectAdapterPtr mObjAdapter;
+
+ BridgeListenerMgrPtr mListeners;
+ SessionCommunications::V1::SessionListenerPtr mSessionListener;
+ SessionCommunications::V1::SessionListenerPrx mSessionListenerPrx;
+
+ void statePreCheck();
+ };
+
+ typedef IceUtil::Handle<BridgeImpl> BridgeImplPtr;
} // End of namespace Bridging.
-} // End of namespace Hydra.
-
-#endif
+} // End of namespace AsteriskSCF.
diff --git a/src/BridgeManagerImpl.cpp b/src/BridgeManagerImpl.cpp
new file mode 100644
index 0000000..83e4bc6
--- /dev/null
+++ b/src/BridgeManagerImpl.cpp
@@ -0,0 +1,148 @@
+#include "BridgeManagerImpl.h"
+#include <Ice/Ice.h>
+#include <IceUtil/UUID.h>
+#include <boost/thread/locks.hpp>
+#include "BridgeListenerMgr.h"
+
+//
+// Compiled in constants.
+// TODO: Replace with configuration!
+//
+
+static const std::string TopicPrefix("AsteriskSCF.BridgeManager.");
+
+namespace AsteriskSCF
+{
+namespace BridgeService
+{
+ //
+ // Functor used with for_each on shutdown.
+ //
+ class ShutdownImpl : public std::unary_function<BridgeManagerImpl::BridgeInfo, void>
+ {
+ public:
+ ShutdownImpl(const Ice::Current& c) :
+ mCurrent(c)
+ {
+ }
+
+ void operator()(const BridgeManagerImpl::BridgeInfo& b)
+ {
+ b.servant->shutdown(mCurrent);
+ }
+
+ private:
+ const Ice::Current mCurrent;
+ };
+
+} // End of namespace BridgeService
+} // End of namespace AsteriskSCF
+
+AsteriskSCF::BridgeService::BridgeManagerImpl::BridgeManagerImpl(
+ const Ice::ObjectAdapterPtr& adapter,
+ const std::string& name,
+ const AsteriskSCF::SessionCommunications::Bridging::V1::BridgeManagerListenerPrx& listener) :
+ mName(name),
+ mShuttingDown(false),
+ mSuspended(false),
+ mAdapter(adapter)
+{
+ mLogger.getInfoStream() << "Created AsteriskSCF Session-Oriented Bridge Manager." << std::endl;
+ mListeners = new ListenerManager(adapter->getCommunicator(), TopicPrefix + mName);
+ if(listener)
+ {
+ mListeners->addListener(listener);
+ }
+}
+
+AsteriskSCF::SessionCommunications::Bridging::V1::BridgePrx AsteriskSCF::BridgeService::BridgeManagerImpl::createBridge(
+ const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions,
+ const AsteriskSCF::SessionCommunications::Bridging::V1::BridgeListenerPrx& listener,
+ const Ice::Current& current)
+{
+ mLogger.getTraceStream() << __FUNCTION__ << ":" << current.adapter->getCommunicator()->identityToString(current.id) << std::endl;
+
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ if(mShuttingDown)
+ {
+ mLogger.getDebugStream() << __FUNCTION__ << ": called when shutting down." << std::endl;
+ throw AsteriskSCF::System::Component::V1::ShuttingDown();
+ }
+ if(mSuspended)
+ {
+ mLogger.getDebugStream() << __FUNCTION__ << ": called when suspended." << std::endl;
+ throw AsteriskSCF::System::Component::V1::Suspended();
+ }
+ reap();
+
+ std::string stringId = std::string("bridge.") + IceUtil::generateUUID();
+ Ice::Identity id(mAdapter->getCommunicator()->stringToIdentity(stringId));
+ AsteriskSCF::SessionCommunications::Bridging::V1::BridgePrx prx(
+ AsteriskSCF::SessionCommunications::Bridging::V1::BridgePrx::uncheckedCast(mAdapter->createProxy(id)));
+ AsteriskSCF::BridgeService::BridgeListenerMgrPtr mgr(new BridgeListenerMgr(mAdapter->getCommunicator(), stringId, prx));
+
+ AsteriskSCF::BridgeService::BridgeImplPtr bridge = new AsteriskSCF::BridgeService::BridgeImpl(mAdapter, sessions, listener, mgr);
+ Ice::ObjectPrx obj = mAdapter->add(bridge, id);
+ mLogger.getInfoStream() << current.adapter->getCommunicator()->identityToString(current.id) << ": creating new bridge " << obj->ice_toString() << "." << std::endl;
+ BridgeInfo info;
+ info.servant = bridge;
+ info.proxy = AsteriskSCF::SessionCommunications::Bridging::V1::BridgePrx::uncheckedCast(obj);
+ mBridges.push_back(info);
+ return info.proxy;
+}
+
+void AsteriskSCF::BridgeService::BridgeManagerImpl::addListener(const SessionCommunications::Bridging::V1::BridgeManagerListenerPrx& listener, const Ice::Current& current)
+{
+}
+
+void AsteriskSCF::BridgeService::BridgeManagerImpl::removeListener(const SessionCommunications::Bridging::V1::BridgeManagerListenerPrx& listener, const Ice::Current& current)
+{
+}
+
+AsteriskSCF::SessionCommunications::Bridging::V1::BridgeSeq
+AsteriskSCF::BridgeService::BridgeManagerImpl::listBridges(const Ice::Current&)
+{
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ AsteriskSCF::SessionCommunications::Bridging::V1::BridgeSeq result;
+ for(std::vector<BridgeInfo>::iterator i = mBridges.begin(); i != mBridges.end();++i)
+ {
+ result.push_back(i->proxy);
+ }
+ return result;
+}
+
+void AsteriskSCF::BridgeService::BridgeManagerImpl::shutdown(const Ice::Current& current)
+{
+ mLogger.getTraceStream() << __FUNCTION__ << ":" << current.adapter->getCommunicator()->identityToString(current.id) << std::endl;
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ if(mShuttingDown)
+ {
+ mLogger.getDebugStream() << __FUNCTION__ << ": called when shutting down." << std::endl;
+ return;
+ }
+ if(mSuspended)
+ {
+ mLogger.getDebugStream() << __FUNCTION__ << ": called when suspended." << std::endl;
+ throw AsteriskSCF::System::Component::V1::Suspended();
+ }
+ mLogger.getInfoStream() << current.adapter->getCommunicator()->identityToString(current.id) << ": shutting down." << std::endl;
+ mShuttingDown = true;
+ reap();
+ std::for_each(mBridges.begin(), mBridges.end(), AsteriskSCF::BridgeService::ShutdownImpl(current));
+
+ mAdapter->getCommunicator()->shutdown();
+}
+
+void AsteriskSCF::BridgeService::BridgeManagerImpl::reap()
+{
+ mLogger.getDebugStream() << __FUNCTION__ << ": reaping bridge set of " << mBridges.size() << " bridges." << std::endl;
+ for(std::vector<BridgeInfo>::iterator i = mBridges.begin(); i != mBridges.end(); ++i)
+ {
+ if(i->servant->destroyed())
+ {
+ std::vector<BridgeInfo>::iterator t = i;
+ mBridges.erase(t);
+ }
+ }
+ mLogger.getDebugStream() << __FUNCTION__ << ": reaping completed, bridge set size is now " << mBridges.size() << "." << std::endl;
+}
diff --git a/src/BridgeManagerImpl.h b/src/BridgeManagerImpl.h
new file mode 100644
index 0000000..530ecec
--- /dev/null
+++ b/src/BridgeManagerImpl.h
@@ -0,0 +1,59 @@
+#pragma once
+
+#include <SessionCommunications/SessionCommunicationsIf.h>
+#include <SessionCommunications/Bridging/BridgingIf.h>
+#include <boost/thread/shared_mutex.hpp>
+#include <vector>
+
+#include "BridgeImpl.h"
+#include "Logger.h"
+#include "ListenerManager.h"
+
+namespace AsteriskSCF
+{
+namespace BridgeService
+{
+ class BridgeManagerImpl : public SessionCommunications::Bridging::V1::BridgeManager
+ {
+ public:
+
+ BridgeManagerImpl(const Ice::ObjectAdapterPtr& adapter, const std::string& name,
+ const SessionCommunications::Bridging::V1::BridgeManagerListenerPrx& listener);
+
+ //
+ // AsteriskSCF::SessionCommunications::Bridging::V1::BridgeManager Interface
+ //
+ SessionCommunications::Bridging::V1::BridgePrx createBridge(
+ const SessionCommunications::V1::SessionSeq& endpoints,
+ const SessionCommunications::Bridging::V1::BridgeListenerPrx& listener,
+ const Ice::Current& current);
+
+ void addListener(const SessionCommunications::Bridging::V1::BridgeManagerListenerPrx& listener, const Ice::Current& current);
+ void removeListener(const SessionCommunications::Bridging::V1::BridgeManagerListenerPrx& listener, const Ice::Current& current);
+ SessionCommunications::Bridging::V1::BridgeSeq listBridges(const Ice::Current& current);
+ void shutdown(const Ice::Current& current);
+
+ struct BridgeInfo
+ {
+ BridgeImplPtr servant;
+ SessionCommunications::Bridging::V1::BridgePrx proxy;
+ };
+
+ private:
+
+ boost::shared_mutex mLock;
+ std::string mName;
+ std::vector<BridgeInfo> mBridges;
+ bool mShuttingDown;
+ bool mSuspended;
+ Ice::ObjectAdapterPtr mAdapter;
+ Logger mLogger;
+ typedef ListenerManagerT<SessionCommunications::Bridging::V1::BridgeManagerListenerPrx> ListenerManager;
+ typedef IceUtil::Handle<ListenerManager> ListenerManagerPtr;
+ ListenerManagerPtr mListeners;
+ void reap();
+ };
+
+ typedef IceUtil::Handle<BridgeManagerImpl> BridgeManagerImplPtr;
+};
+};
diff --git a/src/BridgeServiceImpl.h b/src/BridgeServiceImpl.h
index cca9add..12aa7e6 100644
--- a/src/BridgeServiceImpl.h
+++ b/src/BridgeServiceImpl.h
@@ -2,9 +2,9 @@
#ifndef __HYDRA_BRIDGE_SERVICE_IMPL_H
#define __HYDRA_BRIDGE_SERVICE_IMPL_H
-#include <Hydra/Telephony/Bridging/BridgeService.h>
+#include <AsteriskSCF/Telephony/Bridging/BridgeService.h>
-namespace Hydra
+namespace AsteriskSCF
... 739 lines suppressed ...
--
asterisk-scf/integration/bridging.git
More information about the asterisk-scf-commits
mailing list