[asterisk-scf-commits] asterisk-scf/integration/bridging.git branch "basecomponentsquash" created.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Tue Aug 2 15:07:51 CDT 2011


branch "basecomponentsquash" has been created
        at  4e1df70a193f2ff3f3713d22c2c5ab1428038f4e (commit)

- Log -----------------------------------------------------------------
commit 4e1df70a193f2ff3f3713d22c2c5ab1428038f4e
Author: Ken Hunt <ken.hunt at digium.com>
Date:   Tue Jun 28 20:51:24 2011 -0500

    Mods for changes to ServiceLocatorParams and use of base Component class.

diff --git a/config/bridging.conf b/config/bridging.conf
index 9fa634f..95e9796 100644
--- a/config/bridging.conf
+++ b/config/bridging.conf
@@ -5,7 +5,7 @@ BridgeManager.BridgeService.Endpoints=default
 Ice.ThreadPool.Client.Size=4
 
 # A proxy to the service locator management service
-ServiceLocatorManagementProxy=LocatorServiceManagement:tcp -p 4422
+LocatorServiceManagement.Proxy=LocatorServiceManagement:tcp -p 4422
 LocatorService.Proxy=LocatorService:tcp -p 4411
 
 # A proxy to the IceStorm topic manager
diff --git a/config/test_bridging.conf b/config/test_bridging.conf
index e88f751..c8cb3c8 100644
--- a/config/test_bridging.conf
+++ b/config/test_bridging.conf
@@ -9,6 +9,10 @@
 #
 Ice.Default.CollocationOptimized=0
 
+Ice.Override.Timeout=5000
+
+Ice.Warn.UnknownProperties=0
+
 #
 # We use a single file for configuration.
 #
@@ -104,7 +108,7 @@ ServiceLocatorLocalAdapter.Endpoints=tcp -p 4412
 # The proxies that clients use to access the Service Locator facilities.
 #
 LocatorService.Proxy=LocatorService:default -p 4411
-ServiceLocatorManagementProxy=LocatorServiceManagement:tcp -p 4422
+LocatorServiceManagement.Proxy=LocatorServiceManagement:tcp -p 4422
 
 #
 # The IceBox entries for loading the services.
@@ -128,15 +132,20 @@ TestChannel.InstanceName=BridgeTest
 #
 # Configuration for the test bridge instances. There are are two: one master
 # (TestBridge) and one standby (TestBridge2).
-# NOTE: These will be changed to be accessible through
-# service discovery in a later branch.
+# For testing purposes we're providing explicit endpoints for the standby 
+# component. Endpoints will be assigned random unused ports if not 
+# explicitly set. 
 #
 TestBridge.InstanceName=TestBridge
 TestBridge.ManagerId=TestBridgeManager
+# TestBridge.Endpoints=default -p 57000
+# TestBridge.Backplane.Endpoints=default -p 57001
 
 TestBridge2.InstanceName=TestBridge2
 TestBridge2.ManagerId=TestBridgeManager2
-TestBridge2.StateReplicatorListener=yes
+TestBridge2.Standby=yes
+TestBridge2.Endpoints=default -p 57010
+TestBridge2.Backplane.Endpoints=default -p 57011
 
 #
 # Configuration for the bridge state replicator.
diff --git a/src/BridgeListenerMgr.h b/src/BridgeListenerMgr.h
index 5c31156..d5a39d9 100644
--- a/src/BridgeListenerMgr.h
+++ b/src/BridgeListenerMgr.h
@@ -40,5 +40,6 @@ private:
 };
 
 typedef IceUtil::Handle<BridgeListenerMgr> BridgeListenerMgrPtr;
+
 } // End of namespace BridgeService
 } // End of namespace AsteriskSCF
diff --git a/src/BridgeManagerImpl.cpp b/src/BridgeManagerImpl.cpp
index be58b67..2be8238 100644
--- a/src/BridgeManagerImpl.cpp
+++ b/src/BridgeManagerImpl.cpp
@@ -45,7 +45,7 @@ class BridgeManagerImpl : public BridgeManagerServant
 public:
 
     BridgeManagerImpl(const Ice::ObjectAdapterPtr& adapter, const string& name,
-            const ReplicatorSmartPrx& replicator, const Logging::Logger& logger);
+            const BridgeReplicationContextPtr& replicationContext, const Logging::Logger& logger);
     ~BridgeManagerImpl();
 
     //
@@ -93,9 +93,8 @@ private:
     boost::shared_mutex mLock;
     string mName;
     vector<BridgeInfo> mBridges;
-    bool mActivated;
     Ice::ObjectAdapterPtr mAdapter;
-    ReplicatorSmartPrx mReplicator;
+    BridgeReplicationContextPtr mReplicationContext;
     BridgeManagerPrx mSourceProxy;
     BridgeManagerListenerMgrPtr mListeners;
     Logger mLogger;
@@ -110,12 +109,11 @@ private:
 
 typedef IceUtil::Handle<BridgeManagerImpl> BridgeManagerImplPtr;
 
-BridgeManagerImpl::BridgeManagerImpl(const Ice::ObjectAdapterPtr& adapter, const string& name, const ReplicatorSmartPrx& replicator,
+BridgeManagerImpl::BridgeManagerImpl(const Ice::ObjectAdapterPtr& adapter, const string& name, const BridgeReplicationContextPtr& replicationContext,
         const Logger& logger) :
     mName(name),
-    mActivated(false),
     mAdapter(adapter),
-    mReplicator(replicator), 
+    mReplicationContext(replicationContext), 
     mLogger(logger),
     mState(new BridgeManagerStateItem)
 {
@@ -191,7 +189,7 @@ void BridgeManagerImpl::createBridge_async(const AMD_BridgeManager_createBridgeP
             listeners.push_back(listener);
         }
 
-        BridgeServantPtr bridge = BridgeServant::create(stringId, mAdapter, listeners, mgr, mReplicator, mLogger);
+        BridgeServantPtr bridge = BridgeServant::create(stringId, mAdapter, listeners, mgr, mReplicationContext->getReplicator(), mLogger);
         Ice::ObjectPrx obj = mAdapter->add(bridge, id);
 
         mLogger(Info) << objectIdFromCurrent(current) << ": creating new bridge " << obj->ice_toString() << "." ;
@@ -376,7 +374,6 @@ vector<BridgeServantPtr> BridgeManagerImpl::getBridges()
 void BridgeManagerImpl::activate()
 {
     boost::unique_lock<boost::shared_mutex> lock(mLock);
-    mActivated = true;
     for (BridgeManagerListenerSeq::iterator i = mState->listeners.begin(); i != mState->listeners.end(); ++i)
     {
         mListeners->addListener(*i);
@@ -391,7 +388,7 @@ void BridgeManagerImpl::createBridgeReplica(const BridgeStateItemPtr& state)
     BridgePrx prx(BridgePrx::uncheckedCast(mAdapter->createProxy(id)));
     BridgeListenerMgrPtr mgr(new BridgeListenerMgr(mAdapter->getCommunicator(), state->bridgeId, prx));
 
-    BridgeServantPtr bridge = BridgeServant::create(mAdapter, mgr, mReplicator, mLogger, state);
+    BridgeServantPtr bridge = BridgeServant::create(mAdapter, mgr, mReplicationContext->getReplicator(), mLogger, state);
     Ice::ObjectPrx obj = mAdapter->add(bridge, id);
 
     mLogger(Info) << ": creating bridge replica " << obj->ice_toString() << "." ;
@@ -437,7 +434,7 @@ void BridgeManagerImpl::reap()
 
 void BridgeManagerImpl::statePreCheck(const string& caller)
 {
-    if (!mActivated)
+    if (!mReplicationContext->isActive())
     {
         mLogger(Info) << caller << ": remote call invoked when not active!";
     }
@@ -455,12 +452,12 @@ void BridgeManagerImpl::statePreCheck(const string& caller)
 
 void BridgeManagerImpl::update()
 {
-    if (mActivated)
+    if (mReplicationContext->isReplicating())
     {
         ++mState->serial;
         ReplicatedStateItemSeq seq;
         seq.push_back(getState());
-        mReplicator->setState(seq);
+        mReplicationContext->getReplicator()->setState(seq);
     }
 }
 
@@ -468,8 +465,8 @@ void BridgeManagerImpl::update()
 
 BridgeManagerServantPtr 
 AsteriskSCF::BridgeService::createBridgeManager(const Ice::ObjectAdapterPtr& adapter, const string& name,
-        const ReplicatorSmartPrx& replicator,
+        const BridgeReplicationContextPtr& replicationContext,
         const Logger& logger)
 {
-    return new BridgeManagerImpl(adapter, name, replicator, logger);
+    return new BridgeManagerImpl(adapter, name, replicationContext, logger);
 }
diff --git a/src/BridgeManagerImpl.h b/src/BridgeManagerImpl.h
index dd7fad3..b5e4ec9 100644
--- a/src/BridgeManagerImpl.h
+++ b/src/BridgeManagerImpl.h
@@ -21,6 +21,7 @@
 #include <string>
 #include <vector>
 
+#include "BridgeReplicationContext.h"
 #include "BridgeServiceConfig.h"
 #include "BridgeImpl.h"
 
@@ -72,7 +73,7 @@ typedef IceUtil::Handle<BridgeManagerServant> BridgeManagerServantPtr;
  *
  **/
 BridgeManagerServantPtr createBridgeManager(const Ice::ObjectAdapterPtr& adapter,
-        const std::string& name, const ReplicatorSmartPrx& replicator,
+        const std::string& name, const BridgeReplicationContextPtr& replicationContext,
         const AsteriskSCF::System::Logging::Logger& logger);
 
 };
diff --git a/src/BridgeReplicationContext.h b/src/BridgeReplicationContext.h
new file mode 100644
index 0000000..8959336
--- /dev/null
+++ b/src/BridgeReplicationContext.h
@@ -0,0 +1,69 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#pragma once
+
+#include <boost/shared_ptr.hpp>
+#include <boost/thread/shared_mutex.hpp>
+
+#include <AsteriskSCF/Replication/ReplicationContext.h>
+#include "BridgeServiceConfig.h"
+
+namespace AsteriskSCF
+{
+namespace BridgeService
+{
+
+/** 
+ * This class provides the component with context
+ * related to state replication. 
+ */
+class BridgeReplicationContext : public AsteriskSCF::Replication::ReplicationContext
+{
+public:
+    BridgeReplicationContext(AsteriskSCF::Replication::ReplicationStateType state, 
+                             const ReplicatorSmartPrx& bridgeReplicator) : 
+        ReplicationContext(state),
+        mBridgeReplicator(bridgeReplicator)
+    {
+    }
+        
+    // Override
+     virtual bool isReplicating() 
+     {
+         // If the base context says we aren't replicating, we aren't. 
+         if (!ReplicationContext::isReplicating())
+         {
+             return false;
+         }
+
+         // Do we have a replicator proxy?
+         if (mBridgeReplicator.isInitialized())
+         {
+             return true;
+         }
+
+         return false;
+     }
+
+    ReplicatorSmartPrx getReplicator() {return mBridgeReplicator;}
+
+private:
+   ReplicatorSmartPrx mBridgeReplicator;
+};
+typedef boost::shared_ptr<BridgeReplicationContext> BridgeReplicationContextPtr;
+
+} // namespace BridgeService
+} // namespace AsteriskSCF
diff --git a/src/BridgeReplicatorIf.ice b/src/BridgeReplicatorIf.ice
index 1d3e1b8..7d4b4ee 100644
--- a/src/BridgeReplicatorIf.ice
+++ b/src/BridgeReplicatorIf.ice
@@ -60,16 +60,6 @@ sequence<ReplicatedStateItem> ReplicatedStateItemSeq;
 
 /**
  *
- * TODO: Use!
- *
- */
-unsliceable class BridgeStateReplicatorParams extends AsteriskSCF::Core::Discovery::V1::ServiceLocatorParams
-{
-    string name;
-};
-
-/**
- *
  * MediaOperationReplicationPolicy indicates whether proxies to media
  * operations should be replicated along with the media sessions or media
  * sessions only are replicated. If only the media sessions are replicated,
diff --git a/src/BridgeReplicatorService.cpp b/src/BridgeReplicatorService.cpp
index 6bda816..ca4b709 100644
--- a/src/BridgeReplicatorService.cpp
+++ b/src/BridgeReplicatorService.cpp
@@ -41,6 +41,11 @@ typedef IceUtil::Handle<ReplicatorI> ReplicatorIPtr;
 
 namespace
 {
+Logger lg = getLoggerFactory().getLogger("AsteriskSCF.BridgeService");
+}
+
+namespace
+{
 
 class BridgeReplicatorApp : public IceBox::Service
 {
@@ -117,11 +122,11 @@ void BridgeReplicatorApp::start(const string& name, const Ice::CommunicatorPtr&
     getLoggerFactory().setLogOutput(iceLogger->getLogger());
 
 
-    string property = communicator->getProperties()->getProperty("ServiceLocatorManagementProxy");
+    string property = communicator->getProperties()->getProperty("LocatorServiceManagement.Proxy");
     if(property.size() == 0)
     {
         throw IceBox::FailureException(__FILE__, __LINE__,
-                "Configuration error: Unable to locate property `ServiceLocatorManagementProxy'");
+                "Configuration error: Unable to locate property `LocatorServiceManagement.Proxy'");
     }
     
     //
@@ -141,16 +146,23 @@ void BridgeReplicatorApp::start(const string& name, const Ice::CommunicatorPtr&
     bool registered = false;
     try
     {
-        BridgeStateReplicatorParamsPtr locatorParameters = new BridgeStateReplicatorParams;
+        std::string serviceName = communicator->getProperties()->getPropertyWithDefault(
+          name + ".ServiceName", "default");
+
+        ServiceLocatorParamsPtr locatorParameters = new ServiceLocatorParams;
         locatorParameters->category = StateReplicatorDiscoveryCategory;
-        locatorParameters->name = communicator->getProperties()->getPropertyWithDefault(name + ".ReplicatorName", "default");
+        locatorParameters->service = serviceName;
+        locatorParameters->id = communicator->getProperties()->getPropertyWithDefault(name + ".ReplicatorName", "default");
         mLocator = new LocatorRegistrationWrapper<ReplicatorPrx>(communicator, property, replicatorPrx, adapterName, 
           locatorParameters);
         registered = mLocator->registerService();
     }
-    catch(const Ice::Exception&)
+    catch(const Ice::Exception& e)
     {
+        lg(Error) << "Exception starting bridge replicator: " << e.what();
+        throw;
     }
+
     if(!registered)
     {
         mRegisterThread = new RegisterThread<ReplicatorPrx>(mLocator);
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index f2c83c3..ccc9502 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -5,7 +5,7 @@ include_directories(${ice-util-cpp_dir}/include)
 asterisk_scf_slice_include_directories(${API_SLICE_DIR})
 
 asterisk_scf_component_init(bridgeservice)
-asterisk_scf_component_add_file(bridgeservice Service.cpp)
+asterisk_scf_component_add_file(bridgeservice Component.cpp)
 asterisk_scf_component_add_file(bridgeservice BridgeImpl.h)
 asterisk_scf_component_add_file(bridgeservice BridgeImpl.cpp)
 asterisk_scf_component_add_file(bridgeservice BridgeListenerMgr.h)
@@ -13,6 +13,7 @@ asterisk_scf_component_add_file(bridgeservice BridgeListenerMgr.cpp)
 asterisk_scf_component_add_file(bridgeservice BridgeManagerListenerMgr.h)
 asterisk_scf_component_add_file(bridgeservice BridgeManagerListenerMgr.cpp)
 asterisk_scf_component_add_file(bridgeservice BridgeReplicatorStateListenerI.h)
+asterisk_scf_component_add_file(bridgeservice BridgeReplicationContext.h)
 asterisk_scf_component_add_file(bridgeservice BridgeReplicatorStateListenerI.cpp)
 asterisk_scf_component_add_file(bridgeservice BridgeManagerImpl.h)
 asterisk_scf_component_add_file(bridgeservice BridgeManagerImpl.cpp)
@@ -38,6 +39,7 @@ asterisk_scf_component_add_ice_libraries(bridgeservice IceBox)
 asterisk_scf_component_add_boost_libraries(bridgeservice thread date_time)
 asterisk_scf_component_build_icebox(bridgeservice)
 target_link_libraries(bridgeservice logging-client)
+target_link_libraries(bridgeservice ice-util-cpp)
 asterisk_scf_component_install(bridgeservice)
 
 asterisk_scf_component_init(BridgeReplicator)
diff --git a/src/Component.cpp b/src/Component.cpp
new file mode 100644
index 0000000..aa7d5d7
--- /dev/null
+++ b/src/Component.cpp
@@ -0,0 +1,254 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#include <Ice/Ice.h>
+#include <IceBox/IceBox.h>
+#include <IceStorm/IceStorm.h>
+
+#include <AsteriskSCF/Logger/IceLogger.h>
+#include <AsteriskSCF/logger.h>
+#include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
+#include <AsteriskSCF/System/Component/ComponentServiceIf.h>
+#include <AsteriskSCF/System/Component/ReplicaIf.h>
+#include <AsteriskSCF/Component/Component.h>
+#include <AsteriskSCF/Discovery/LocatorRegistrationWrapper.h>
+
+#include "BridgeManagerImpl.h"
+#include "BridgeReplicatorStateListenerI.h"
+#include "BridgeReplicationContext.h"
+
+using namespace AsteriskSCF::System::Logging;
+using namespace AsteriskSCF::SessionCommunications::V1;
+using namespace AsteriskSCF::Core::Discovery::V1;
+using namespace AsteriskSCF::System::Component::V1;
+using namespace AsteriskSCF::BridgeService;
+using namespace AsteriskSCF::Bridge::V1;
+using namespace AsteriskSCF::Replication;
+using namespace AsteriskSCF::Discovery;
+using namespace std;
+
+
+namespace
+{
+Logger lg = getLoggerFactory().getLogger("AsteriskSCF.BridgeService");
+}
+
+namespace
+{
+
+class Component : public AsteriskSCF::Component::Component
+{
+public:
+    Component() : 
+        AsteriskSCF::Component::Component(lg, 
+                                          "BridgeService"),
+        mListeningToReplicator(false)   {}
+
+private:
+    // Optional base Component notification overrides. 
+    virtual void onActivated();
+    virtual void onPreInitialize();
+    virtual void onPostInitialize();
+    virtual void onStop();
+
+    // Required base Component overrides
+    virtual void createPrimaryServices();
+    virtual void preparePrimaryServicesForDiscovery();
+    virtual void createReplicationStateListeners();
+    virtual void stopListeningToStateReplicators();
+    virtual void listenToStateReplicators();
+    virtual void findRemoteServices() {};  // N/A
+
+    // Other base Component overrides
+    virtual ReplicationContextPtr createReplicationContext(ReplicationStateType state);
+
+    ReplicatorSmartPrx mReplicator;
+    BridgeManagerServantPtr mBridgeManager;
+    BridgeManagerPrx mBridgeManagerPrx;
+    LocatorRegistrationWrapperPtr mBridgeManagerRegistration;
+    RegisterThreadPtr mRegisterThread;
+    ReplicatorListenerPrx mReplicatorListenerPrx;
+    bool mListeningToReplicator;
+    boost::shared_mutex mReplicatorLock;
+};
+
+/**
+ * Override of factory method of the base Component class. Out custom 
+ * version allows us to attach our replicator proxy to the context.
+ */
+ReplicationContextPtr Component::createReplicationContext(ReplicationStateType state)
+{
+    // Find our state replicator. 
+    ServiceLocatorParamsPtr searchParams = new ServiceLocatorParams;
+    searchParams->category = StateReplicatorDiscoveryCategory;
+    searchParams->service = getCommunicator()->getProperties()->getPropertyWithDefault(
+               "Bridge.StateReplicatorService", "default");
+    searchParams->id = getCommunicator()->getProperties()->getProperty("Bridge.StateReplicatorId");
+
+    ReplicatorSmartPrx replicator;
+    try
+    {
+        replicator = ReplicatorSmartPrx(getServiceLocator(), searchParams, lg);
+    }
+    catch (const std::exception& ex)
+    {
+        lg(Error) << "Bridge state replicator lookup failed. Continuing without replication. " << ex.what();
+    }
+
+    BridgeReplicationContextPtr context(new BridgeReplicationContext(state, replicator));
+    return context;
+}
+
+void Component::onPostInitialize()
+{
+    if (getReplicationContext()->isActive())
+    {
+       mBridgeManager->activate(); 
+    }
+}
+
+void Component::onActivated()
+{
+    mBridgeManager->activate(); // TBD...Is this really needed? Is there really no standby() correlary?
+}
+
+void Component::onPreInitialize()
+{
+    lg(Debug) << "Launching AsteriskSCF Session-Oriented Bridging Service " << getName();
+}
+
+/** 
+ * Create our servant implementations.
+ */
+void Component::createPrimaryServices()
+{
+     std::string managerName = 
+        getCommunicator()->getProperties()->getPropertyWithDefault(getName() + ".ManagerId", "BridgeManager");
+
+    //
+    // It's very important that uncheckedCast's be used here as there are no guarantees
+    // that the object adapter is activated yet. If it is not, then this can hang.
+    //
+    BridgeReplicationContextPtr replicationContext = boost::static_pointer_cast<BridgeReplicationContext>(getReplicationContext());
+    mBridgeManager = createBridgeManager(getServiceAdapter(), managerName, replicationContext, lg);
+    mBridgeManagerPrx = BridgeManagerPrx::uncheckedCast(getServiceAdapter()->add(mBridgeManager, 
+          getCommunicator()->stringToIdentity(managerName)));
+    assert(mBridgeManagerPrx != 0);
+    if (mBridgeManagerPrx == 0)
+    {
+        throw IceBox::FailureException(__FILE__, __LINE__, "Unable to instantiate bridge manager object");
+    }
+}
+
+/**
+ * Wrap our servants for the locator service.
+ */
+void Component::preparePrimaryServicesForDiscovery()
+{
+    try
+    {
+        mBridgeManagerRegistration = this->wrapServiceForRegistration(mBridgeManagerPrx,
+                                                                      BridgeManagerDiscoveryCategory);
+        managePrimaryService(mBridgeManagerRegistration);
+    }
+    catch (const Ice::Exception& e)
+    {
+        lg(Error) << "Exception in " << BOOST_CURRENT_FUNCTION << e.what(); 
+        throw;
+    }
+}
+
+void Component::createReplicationStateListeners()
+{
+    ReplicatorListenerPtr replicaListener = createStateListener(lg, mBridgeManager);
+    mReplicatorListenerPrx = ReplicatorListenerPrx::uncheckedCast(getBackplaneAdapter()->addWithUUID(replicaListener));
+}
+
+void Component::listenToStateReplicators()
+{
+    boost::unique_lock<boost::shared_mutex> lock(mReplicatorLock);
+    BridgeReplicationContextPtr bridgeReplicationContext = 
+        static_pointer_cast<BridgeReplicationContext>(getReplicationContext());
+
+    if (mListeningToReplicator == true)
+    {
+        return;
+    }
+
+    if (!bridgeReplicationContext->getReplicator().isInitialized())
+    {
+        lg(Error) << getName() << ": " << BOOST_CURRENT_FUNCTION << ": State replicator could not be found. Unable to listen for state updates!";
+        return;
+    }
+
+    try
+    {
+        // Are we in standby mode?
+        if (bridgeReplicationContext->getState() == STANDBY_IN_REPLICA_GROUP)
+        {
+            bridgeReplicationContext->getReplicator().tryOneWay()->addListener(mReplicatorListenerPrx);
+            mListeningToReplicator = true;
+        }
+    }
+    catch (const Ice::Exception& e)
+    {
+        lg(Error) << e.what();
+        throw;
+    }
+}
+
+void Component::stopListeningToStateReplicators()
+{
+    boost::unique_lock<boost::shared_mutex> lock(mReplicatorLock);
+    BridgeReplicationContextPtr bridgeReplicationContext = 
+        static_pointer_cast<BridgeReplicationContext>(getReplicationContext());
+
+    if ((!bridgeReplicationContext->getReplicator().isInitialized()) || (mListeningToReplicator == false))
+    {
+        return;
+    }
+
+    try
+    {
+        bridgeReplicationContext->getReplicator().tryOneWay()->removeListener(mReplicatorListenerPrx);
+        mListeningToReplicator = false;
+    }
+    catch (const Ice::Exception& e)
+    {
+        lg(Error) << e.what();
+        throw;
+    }
+}
+
+/**
+ * Override of stop notification. 
+ */
+void Component::onStop()
+{
+    if (mRegisterThread)
+    {
+        mRegisterThread->stop();
+    }
+}
+
+}
+
+extern "C" {
+ASTERISK_SCF_ICEBOX_EXPORT ::IceBox::Service* create(Ice::CommunicatorPtr)
+{
+    return new Component;
+}
+}
diff --git a/src/Service.cpp b/src/Service.cpp
deleted file mode 100644
index fd3fcd6..0000000
--- a/src/Service.cpp
+++ /dev/null
@@ -1,334 +0,0 @@
-/*
- * Asterisk SCF -- An open-source communications framework.
- *
- * Copyright (C) 2010-2011, Digium, Inc.
- *
- * See http://www.asterisk.org for more information about
- * the Asterisk SCF project. Please do not directly contact
- * any of the maintainers of this project for assistance;
- * the project provides a web site, mailing lists and IRC
- * channels for your use.
- *
- * This program is free software, distributed under the terms of
- * the GNU General Public License Version 2. See the LICENSE.txt file
- * at the top of the source tree.
- */
-
-#include <Ice/Ice.h>
-#include <IceBox/IceBox.h>
-#include <IceStorm/IceStorm.h>
-
-#include <AsteriskSCF/Logger/IceLogger.h>
-#include <AsteriskSCF/logger.h>
-#include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
-#include <AsteriskSCF/System/Component/ComponentServiceIf.h>
-#include <AsteriskSCF/System/Component/ReplicaIf.h>
-#include "ServiceUtil.h"
-#include "BridgeManagerImpl.h"
-#include "BridgeReplicatorStateListenerI.h"
-
-using namespace AsteriskSCF::System::Logging;
-using namespace AsteriskSCF::SessionCommunications::V1;
-using namespace AsteriskSCF::Core::Discovery::V1;
-using namespace AsteriskSCF::System::Component::V1;
-using namespace AsteriskSCF::BridgeService;
-using namespace AsteriskSCF::Bridge::V1;
-using namespace AsteriskSCF;
-using namespace std;
-
-namespace
-{
-
-class ReplicaControl : virtual public Replica
-{
-public:
-    ReplicaControl(const Ice::ObjectAdapterPtr& adapter, const BridgeManagerServantPtr& manager, const string& id) :
-        mActive(true),
-        mAdapter(adapter),
-        mManager(manager),
-        mReplicaId(id)
-    {
-    }
-
-    bool isActive(const Ice::Current&)
-    {
-        return mActive;
-    }
-
-    bool activate(const Ice::Current&)
-    {
-        mActive = true;
-        mManager->activate();
-        for (vector<ReplicaListenerPrx>::const_iterator i = mListeners.begin(); i != mListeners.end(); ++i)
-        {
-            (*i)->activated(ReplicaPrx::uncheckedCast(mAdapter->createDirectProxy(mAdapter->getCommunicator()->stringToIdentity(mReplicaId))));
-        }
-        return true;
-    }
-
-    void standby(const Ice::Current&)
-    {
-        mActive = false;
-        for (vector<ReplicaListenerPrx>::const_iterator i = mListeners.begin(); i != mListeners.end(); ++i)
-        {
-            (*i)->onStandby(ReplicaPrx::uncheckedCast(mAdapter->createDirectProxy(mAdapter->getCommunicator()->stringToIdentity(mReplicaId))));
-        }
-    }
-
-    void addListener(const ReplicaListenerPrx& listener, const Ice::Current&)
-    {
-        mListeners.push_back(listener);
-    }
-
-    void removeListener(const ReplicaListenerPrx& listener, const Ice::Current&)
-    {
-        mListeners.erase(remove(mListeners.begin(), mListeners.end(), listener), mListeners.end());
-    }
-    
-private:
-    bool mActive;
-    Ice::ObjectAdapterPtr mAdapter;
-    vector<ReplicaListenerPrx> mListeners;
-    BridgeManagerServantPtr mManager;
-    string mReplicaId;
-};
-typedef IceUtil::Handle<ReplicaControl> ReplicaControlPtr;
-
-class BridgingApp : public IceBox::Service
-{
-public:
-    BridgingApp();
-
-protected:
-    void start(const std::string& name, const Ice::CommunicatorPtr& communicator, const Ice::StringSeq& args);
-    void stop();
-
-private:
-    /**
-     * A proxy to the service locator manager for the bridge manager service.
-     */
-    IceUtil::Handle<LocatorRegistrationWrapper<BridgeManagerPrx> > mLocator;
-    Ice::ObjectAdapterPtr mAdapter;
-    Ice::ObjectAdapterPtr mInfrastructureAdapter;
-    IceUtil::Handle<RegisterThread<BridgeManagerPrx> > mRegisterThread;
-    ReplicaControlPtr mReplicaControl;
-};
-
-BridgingApp::BridgingApp()
-{
-}
-
-void BridgingApp::start(const std::string& name, const Ice::CommunicatorPtr& communicator, const Ice::StringSeq&)
-{
-    Logger logger = getLoggerFactory().getLogger("AsteriskSCF.BridgeService");
-    logger(Debug) << "Launching AsteriskSCF Session-Oriented Bridging Service." ;
-
-    //
-    // It is standard practice to base the adapter name on the configured
-    // service instance name.
-    //
-    std::string adapterName;
-    if (name.size() == 0)
-    {
-        adapterName = "BridgeService";
-    }
-    else
-    {
-        adapterName = name + ".BridgeService";
-    }
-
-    //
-    // Check on the threadpool properties, make sure that they are compatible with the requirements of this 
-    // service. This could be moved into a helper method, but it is somehow more clear for seeing what
-    // is going on during initialization to leave it here.
-    //
-    // TODO: Are there any other properties that need to be double-checked before proceeding?
-    //
-    Ice::Int defaultPoolSize = communicator->getProperties()->getPropertyAsIntWithDefault("Ice.ThreadPool.Server.Size", 0);
-    if (communicator->getProperties()->getPropertyAsIntWithDefault(adapterName + ".ThreadPool.Size", 0) < 4)
-    {
-        if (defaultPoolSize < 4)
-        {
-            logger(Info) << "Configured thread pool size for " << adapterName + " is too small, defaulting to 4";
-            communicator->getProperties()->setProperty(adapterName + ".ThreadPool.Size", "4");
-        }
-    }
-    if (communicator->getProperties()->getPropertyAsIntWithDefault(adapterName + "Internal.ThreadPool.Size", 0) < 4)
-    {
-        if (defaultPoolSize < 4)
-        {
-            logger(Info) << "Configured thread pool size for " << adapterName + "Internal is too small, defaulting to 4";
-            communicator->getProperties()->setProperty(adapterName + "Internal.ThreadPool.Size", "4");
-        }
-    }
-    defaultPoolSize = communicator->getProperties()->getPropertyAsIntWithDefault("Ice.ThreadPool.Client.Size", 0);
-    if (defaultPoolSize < 4)
-    {
-        logger(Warning) << "Client thread pool size is too small. It should be set to 4 or greater";
-    }
-
-    
-    //
-    // TODO: All adapter ids should be globally unique. This would allow replicas to be identified.
-    // How this might be useful is that it might be used as a replicated state item to identify
-    // which adapter is currently the primary in a group of replicas.
-    //
-    mAdapter = communicator->createObjectAdapterWithEndpoints(adapterName, 
-        communicator->getProperties()->getPropertyWithDefault(adapterName + ".Endpoints", "default"));
-
-    //
-    // We do not activate the main object adapter until most everything is initialized. The infrastructure
-    // adapter is for internally accessed objects or for supporting things that will be externally
-    // accessible until later so activating it now is fine.
-    //
-    mInfrastructureAdapter = communicator->createObjectAdapterWithEndpoints(adapterName + "Internal",
-        communicator->getProperties()->getPropertyWithDefault(adapterName + "Internal.Endpoints", "default"));
-    mInfrastructureAdapter->activate();
-
-    //
-    // Configure the AsteriskSCF logger.
-    // TODO: check whether this works right if we've already created the logger object.
-    //
-    ConfiguredIceLoggerPtr iceLogger = createIceLogger(mAdapter);
-    if (iceLogger)
-    {
-        getLoggerFactory().setLogOutput(iceLogger->getLogger());
-    }
-
-    //
-    // While this property is checked early in this method, it is not used until later on.
-    // This is to avoid several initialization steps if the final steps of initialization
-    // cannot be performed.
-    //
-    std::string serviceLocatorManagementProperty = communicator->getProperties()->getProperty("ServiceLocatorManagementProxy");
-    if (serviceLocatorManagementProperty.empty())
-    {
-        throw IceBox::FailureException(__FILE__, __LINE__, 
-            "Configuration error: Unable to locate property `ServiceLocatorManagementProxy'");
-    }
-
-    string locatorPropertyString = communicator->getProperties()->getProperty("LocatorService.Proxy");
-    if (locatorPropertyString.empty())
-    {
-        throw IceBox::FailureException(__FILE__, __LINE__,
-            "Configuration error: Unable to locate property `LocatorService.Proxy`");
-    }
-    
-    ServiceLocatorPrx locator = ServiceLocatorPrx::checkedCast(communicator->stringToProxy(locatorPropertyString));
-       
-    BridgeStateReplicatorParamsPtr searchParams = new BridgeStateReplicatorParams;
-    searchParams->category = StateReplicatorDiscoveryCategory;
-    searchParams->name = communicator->getProperties()->getPropertyWithDefault("Bridge.StateReplicatorName", "default");
-  
-    ReplicatorSmartPrx replicator;
-    try
-    {
-        replicator = ReplicatorSmartPrx(locator, searchParams, logger);
-    }
-    catch (const std::exception& ex)
-    {
-        logger(Error) << "Bridge state replicator lookup failed. Continuing without replication. " << ex.what();
-    }
-   
-    std::string managerName = 
-        communicator->getProperties()->getPropertyWithDefault(name + ".ManagerId", "BridgeManager");
-
-    //
-    // It's very important that uncheckedCast's be used here as there are no guarantees
-    // that the object adapter is activated yet. If it is not, then this can hang.
-    //
-    BridgeManagerServantPtr manager = createBridgeManager(mAdapter, managerName, replicator, logger);
-    BridgeManagerPrx bridgeManagerPrx = BridgeManagerPrx::uncheckedCast(mAdapter->add(manager, 
-          mAdapter->getCommunicator()->stringToIdentity(managerName)));
-    assert(bridgeManagerPrx != 0);
-    if (bridgeManagerPrx == 0)
-    {
-        throw IceBox::FailureException(__FILE__, __LINE__, "Unable to instantiate bridge manager object");
-    }
-
-    //
-    // Configure replication connections.
-    //
-    ReplicatorListenerPtr replicaListener = createStateListener(logger, manager);
-    ReplicatorListenerPrx listenerPrx = ReplicatorListenerPrx::uncheckedCast(mInfrastructureAdapter->addWithUUID(replicaListener));
-
-    string replicaId = 
-        mAdapter->getCommunicator()->getProperties()->getPropertyWithDefault(
-            adapterName + ".ReplicaId", "BridgeReplica");
-    mReplicaControl = new ReplicaControl(mInfrastructureAdapter, manager, replicaId);
-    ReplicaPrx replicaControlPrx =
-        ReplicaPrx::uncheckedCast(mInfrastructureAdapter->add(mReplicaControl, communicator->stringToIdentity(replicaId)));
-
-    bool onStandby = false;
-    if (replicator)
-    {
-        onStandby = communicator->getProperties()->getPropertyWithDefault(name + ".StateReplicatorListener", "no") == "yes";
-    }
-    if (onStandby)
-    {
-        replicator->addListener(listenerPrx);
-        replicaControlPrx->standby();
-    }
-    else
-    {
-        manager->activate();
-    }
-
-    bool registered = false;
-    try
-    {
-        ServiceLocatorParamsPtr parameters(new ServiceLocatorParams);
-        parameters->category = BridgeServiceDiscoveryCategory;
-        mLocator = 
-            new LocatorRegistrationWrapper<BridgeManagerPrx>(communicator, serviceLocatorManagementProperty, bridgeManagerPrx, 
-               adapterName, parameters);
-        registered = mLocator->registerService();
-    }
-    catch (const Ice::Exception&)
-    {
-    }
-    if (!registered)
-    {
-        mRegisterThread = new RegisterThread<BridgeManagerPrx>(mLocator);
-        mRegisterThread->start();
-    }
-    //
-    // TODO: We need to know whether or not to activate!
-    //
-    mAdapter->activate();
-}
-
-void BridgingApp::stop()
-{
-    if (mRegisterThread)
-    {
-        mRegisterThread->stop();
-    }
-    try
-    {
-        mLocator->unregister();
-    }
-    catch (const Ice::Exception&)
-    {
-    }
-    try
-    {
-        Ice::CommunicatorPtr comm = mAdapter->getCommunicator();
-        comm->shutdown();
-        mAdapter->waitForDeactivate();
-        mInfrastructureAdapter->waitForDeactivate();
-        comm->destroy();
-    }
-    catch (...)
-    {
-        // TODO: log
-    }
-}
-}
-
-extern "C" {
-ASTERISK_SCF_ICEBOX_EXPORT ::IceBox::Service* create(Ice::CommunicatorPtr)
-{
-    return new BridgingApp;
-}
-}
diff --git a/test/TestBridging.cpp b/test/TestBridging.cpp
index f5569e9..bf88553 100644
--- a/test/TestBridging.cpp
+++ b/test/TestBridging.cpp
@@ -21,6 +21,7 @@
 #include <boost/test/debug.hpp>
 #include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.h>
 #include <AsteriskSCF/Media/MediaIf.h>
+#include <AsteriskSCF/System/Component/ReplicaIf.h>
 #include <IceUtil/UUID.h>
 
 #include "BridgeManagerListenerI.h"
@@ -39,6 +40,7 @@ using namespace AsteriskSCF::BridgingTest;
 using namespace AsteriskSCF::SessionCommunications::V1;
 using namespace AsteriskSCF::Core::Discovery::V1;
 using namespace AsteriskSCF::Core::Routing::V1;
+using namespace AsteriskSCF::System::Component::V1;
 using namespace std;
 
 /* Cache the command line arguments so that Ice can be initialized within the global fixture. */
@@ -108,7 +110,17 @@ public:
 
     BridgeManagerPrx standbyBridgeManager()
     {
-        return mStandby;
+        return BridgeManagerPrx::uncheckedCast(mCommunicator->stringToProxy("TestBridgeManager2:default -p 57010"));
+    }
+
+    ReplicaPrx primaryReplicaControl()
+    {
+        return mPrimaryReplica;
+    }
+
+    ReplicaPrx secondaryReplicaControl()
+    {
+        return mSecondaryReplica;
     }
 
 private:
@@ -126,43 +138,50 @@ private:
         // the correct one.
         //
         ServiceLocatorParamsPtr searchParams(new ServiceLocatorParams);
-        searchParams->category = BridgeServiceDiscoveryCategory;
-        Ice::ObjectProxySeq results = locator->locateAll(searchParams);
-        if (results.size() < 2)
+        searchParams->category = BridgeManagerDiscoveryCategory;
+        try
         {
-            throw IceBox::FailureException(__FILE__, __LINE__, 
-                "Configuration Error: Unable to find test bridge managers");
+            mPrimary = BridgeManagerPrx::uncheckedCast(locator->locate(searchParams));
+        }
+        catch(const AsteriskSCF::Core::Discovery::V1::ServiceNotFound&)
+        {
+            throw;
         }
 
-        for (Ice::ObjectProxySeq::const_iterator i = results.begin(); i != results.end(); ++i)
+        ServiceLocatorParamsPtr primaryReplicaParams(new ServiceLocatorParams);
+        primaryReplicaParams->category = "BridgeService.Replica";
+        primaryReplicaParams->service = "default";
+        primaryReplicaParams->id = "TestBridge";
+        try
         {
-            string idString = mCommunicator->identityToString((*i)->ice_getIdentity());
-            if (idString == "TestBridgeManager")
-            {
-                mPrimary = BridgeManagerPrx::checkedCast((*i));
-                if (!mPrimary)
-                {
-                    throw IceBox::FailureException(__FILE__, __LINE__, 
-                        "Configuration Error: Lookup for primary bridge manager failed");
-                }
-            }
-            else if (idString == "TestBridgeManager2")
-            {
-                mStandby = BridgeManagerPrx::checkedCast((*i));
-                if (!mStandby)
-                {
-                    throw IceBox::FailureException(__FILE__, __LINE__, 
-                        "Configuration Error: Lookup for standby bridge manager failed");
-                }
-            }
+            mPrimaryReplica = ReplicaPrx::uncheckedCast(locator->locate(primaryReplicaParams));
+        }
+        catch(const AsteriskSCF::Core::Discovery::V1::ServiceNotFound&)
+        {
+            throw;
+        }
+
+        ServiceLocatorParamsPtr secondaryReplicaParams(new ServiceLocatorParams);
+        secondaryReplicaParams->category = "BridgeService.Replica";
+        secondaryReplicaParams->service = "default";
+        secondaryReplicaParams->id = "TestBridge2";
+        try
+        {
+            mSecondaryReplica = ReplicaPrx::uncheckedCast(locator->locate(secondaryReplicaParams));
+        }
+        catch(const AsteriskSCF::Core::Discovery::V1::ServiceNotFound&)
+        {
+            throw;
         }
     }
 
     BridgeManagerPrx mPrimary;
-    BridgeManagerPrx mStandby;
     Ice::CommunicatorPtr mCommunicator;
     Ice::StringSeq mArgs;
     vector<char const *> mArgv;
+
+    ReplicaPrx mPrimaryReplica;
+    ReplicaPrx mSecondaryReplica;
 };
 typedef IceUtil::Handle<TestEnvironment> TestEnvironmentPtr;
 
@@ -825,15 +844,40 @@ public:
                     BOOST_MESSAGE("Wait for event expired");
                 }
                 BOOST_CHECK(servant->createCalls() == 1);
-                BridgeSeq bridges = mgrPrx2->listBridges();
-                BridgeSeq bridges2 = mgrPrx->listBridges();
-                BOOST_CHECK(bridges.size() == bridges2.size()); // XXX
+
                 mgrPrx->addListener(listenerPrx);
                 bridge = mgrPrx->createBridge(sessions, 0);
                 servant->wait(5000);
                 BOOST_CHECK(servant->createCalls() == 2);
                 bridge->shutdown();
                 mgrPrx->removeListener(listenerPrx);
+
+                BridgeSeq bridges = mgrPrx->listBridges();
+
+                // Activate the secondary component so we can 
+                // query it. 
+                ReplicaPrx secondaryReplica = env()->secondaryReplicaControl();
+                ReplicaPrx primaryReplica = env()->primaryReplicaControl();
+                BOOST_CHECK(primaryReplica->isActive() == true);
+                BOOST_CHECK(secondaryReplica->isActive() == false);
+
+                primaryReplica->standby();
+                BOOST_CHECK(primaryReplica->isActive() == false);
+
+                secondaryReplica->activate();
+                BOOST_CHECK(secondaryReplica->isActive() == true);
+
+                BridgeSeq bridges2 = mgrPrx2->listBridges();
+
+                BOOST_CHECK(bridges.size() == bridges2.size()); // XXX
+
+                // Set the components back to original state.
+                secondaryReplica->standby();
+                primaryReplica->activate();
+
+                BOOST_CHECK(primaryReplica->isActive() == true);
+                BOOST_CHECK(secondaryReplica->isActive() == false);
+
             }
             catch (const Ice::Exception& ex)
             {

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/bridging.git



More information about the asterisk-scf-commits mailing list