[asterisk-scf-commits] asterisk-scf/release/media_operations_core.git branch "master" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Thu Aug 18 18:13:52 CDT 2011


branch "master" has been updated
       via  df280d228311779edaa3fb6ec64d05d754539f93 (commit)
       via  91d0dc55e0eb12f481c0ea26a2162e05ff428498 (commit)
      from  b0f0497104ff0726fddd0ab626c0dab957ae610c (commit)

Summary of changes:
 .../MediaOperationsCore/MediaOperationsCoreIf.ice  |    7 +
 src/CMakeLists.txt                                 |   35 ++--
 src/MediaOperationFactoryImpl.cpp                  |   12 +
 src/MediaOperationFactoryImpl.h                    |   16 ++
 src/MediaOperationReplicationContext.h             |   89 ++++++++
 src/MediaOperationStateReplicator.h                |    2 +
 src/MediaOperationStateReplicatorApp.cpp           |   20 ++-
 src/MediaOperationStateReplicatorListener.cpp      |   31 +++
 src/MediaOperationsCore.cpp                        |  220 ++++++++++++++-----
 src/ulaw_alaw.cpp                                  |   78 ++++++--
 src/ulaw_alaw.h                                    |    7 +-
 test/CMakeLists.txt                                |   32 ++--
 12 files changed, 442 insertions(+), 107 deletions(-)
 create mode 100644 src/MediaOperationReplicationContext.h


- Log -----------------------------------------------------------------
commit df280d228311779edaa3fb6ec64d05d754539f93
Author: Mark Michelson <mmichelson at digium.com>
Date:   Thu Aug 18 18:13:50 2011 -0500

    Implement state replication in the media operation I've written.
    
    The last thing I need to do before this can be put up for review is
    to implement the stateRemovedForItems method. It shouldn't be too
    hard, I just need to leave now.

diff --git a/src/MediaOperationFactoryImpl.cpp b/src/MediaOperationFactoryImpl.cpp
index fa4f0d1..74777f4 100644
--- a/src/MediaOperationFactoryImpl.cpp
+++ b/src/MediaOperationFactoryImpl.cpp
@@ -27,9 +27,11 @@ using namespace AsteriskSCF::Media::V1;
 
 MediaOperationFactoryImpl::MediaOperationFactoryImpl(const Ice::ObjectAdapterPtr& adapter,
         const Logger& logger,
+        const MediaOperationReplicationContextPtr& replicationContext,
         const std::string& name)
     : mAdapter(adapter),
     mLogger(logger),
+    mReplicationContext(replicationContext),
     mName(name),
     mLocatorParams(new MediaOperationServiceLocatorParams)
 {
@@ -45,5 +47,15 @@ MediaOperationServiceLocatorParamsPtr MediaOperationFactoryImpl::getLocatorParam
     return mLocatorParams;
 }
 
+void MediaOperationFactoryImpl::setProxy(const MediaOperationFactoryPrx& proxy)
+{
+    mProxy = proxy;
+}
+
+MediaOperationFactoryPrx MediaOperationFactoryImpl::getProxy()
+{
+    return mProxy;
+}
+
 } //end namespace MediaOperationsCore
 } //end namespace AsteriskSCF
diff --git a/src/MediaOperationFactoryImpl.h b/src/MediaOperationFactoryImpl.h
index 48f9d9f..e37b9d0 100644
--- a/src/MediaOperationFactoryImpl.h
+++ b/src/MediaOperationFactoryImpl.h
@@ -21,6 +21,8 @@
 #include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
 #include <AsteriskSCF/logger.h>
 
+#include "MediaOperationReplicationContext.h"
+
 namespace AsteriskSCF
 {
 
@@ -32,6 +34,7 @@ class MediaOperationFactoryImpl : public AsteriskSCF::Media::V1::MediaOperationF
 public:
     MediaOperationFactoryImpl(const Ice::ObjectAdapterPtr&,
             const AsteriskSCF::System::Logging::Logger&,
+            const MediaOperationReplicationContextPtr&,
             const std::string&);
 
     virtual AsteriskSCF::Media::V1::MediaOperationPrx createMediaOperation(
@@ -42,20 +45,33 @@ public:
     const std::string& getName();
 
     AsteriskSCF::Media::V1::MediaOperationServiceLocatorParamsPtr getLocatorParams();
+
+    void setProxy(const AsteriskSCF::Media::V1::MediaOperationFactoryPrx& proxy);
+
+    AsteriskSCF::Media::V1::MediaOperationFactoryPrx getProxy();
 protected:
     Ice::ObjectAdapterPtr mAdapter;
     AsteriskSCF::System::Logging::Logger mLogger;
+    
+    /**
+     * Replication context for...replicating state
+     */
+    MediaOperationReplicationContextPtr mReplicationContext;
+
     /**
      * The name of the factory. Used as a GUID for the service locator params
      * that we register.
      */
     const std::string mName;
+    
     /**
      * Attributes of this factory.
      * Let's the service locator know that we can translate from ulaw to alaw and
      * from alaw to ulaw.
      */
     AsteriskSCF::Media::V1::MediaOperationServiceLocatorParamsPtr mLocatorParams;
+
+    AsteriskSCF::Media::V1::MediaOperationFactoryPrx mProxy;
 };
 
 typedef IceUtil::Handle<MediaOperationFactoryImpl> MediaOperationFactoryImplPtr;
diff --git a/src/MediaOperationStateReplicatorListener.cpp b/src/MediaOperationStateReplicatorListener.cpp
index 99e391f..59ff82d 100644
--- a/src/MediaOperationStateReplicatorListener.cpp
+++ b/src/MediaOperationStateReplicatorListener.cpp
@@ -40,6 +40,37 @@ void MediaOperationStateReplicatorListenerImpl::stateRemovedForItems(
         const MediaOperationStateItemSeq&,
         const Ice::Current&)
 {
+    class Visitor : public MediaOperationStateItemVisitor
+    {
+    public:
+        Visitor(const Ice::ObjectAdapterPtr& adapter)
+            : mAdapter(adapter) { }
+    private:
+        //XXX VISIT HERE THURSDAY FRIDAY MORNING
+        void visitUlawAlawMediaOperationStateItem(const UlawAlawMediaOperationStateItemPtr& ulawAlaw)
+        {
+            UlawAlawFactoryPtr factory = UlawAlawFactoryPtr::dynamicCast(mAdapter->find(ulawAlaw->factoryId));
+
+            if (!factory)
+            {
+                return;
+            }
+
+            factory->createMediaOperation(
+                    ulawAlaw->sourceFormat,
+                    ulawAlaw->sinkFormat,
+                    ulawAlaw->operationId);
+        }
+        Ice::ObjectAdapterPtr mAdapter;
+    };
+
+    MediaOperationStateItemVisitorPtr v = new Visitor(mAdapter);
+
+    for (MediaOperationStateItemSeq::const_iterator iter = items.begin();
+            iter != items.end(); ++iter)
+    {
+        (*iter)->visit(v);
+    }
 }
 
 void MediaOperationStateReplicatorListenerImpl::stateSet(
diff --git a/src/MediaOperationsCore.cpp b/src/MediaOperationsCore.cpp
index 2f09a20..11d244e 100644
--- a/src/MediaOperationsCore.cpp
+++ b/src/MediaOperationsCore.cpp
@@ -118,14 +118,16 @@ private:
     {
         MediaOperationFactoryPrx factoryProxy =
             MediaOperationFactoryPrx::uncheckedCast(getServiceAdapter()->add(factory, getCommunicator()->stringToIdentity(factory->getName())));
-    
+
+        factory->setProxy(factoryProxy);
         mFactories.push_back(std::make_pair(factory, factoryProxy));
     }
     
     void createOperationFactories()
     {
         lg(Debug) << "Creating UlawAlawFactory";
-        createAndRegisterFactory(new UlawAlawFactory(getServiceAdapter(), lg));
+        createAndRegisterFactory(new UlawAlawFactory(getServiceAdapter(), lg,
+                    boost::static_pointer_cast<MediaOperationReplicationContext>(getReplicationContext())));
     }
 
     void locateStateReplicator()
diff --git a/src/ulaw_alaw.cpp b/src/ulaw_alaw.cpp
index 475bce5..6d80960 100644
--- a/src/ulaw_alaw.cpp
+++ b/src/ulaw_alaw.cpp
@@ -17,6 +17,7 @@
 #include "ulaw_alaw.h"
 
 #include <AsteriskSCF/Media/Formats/AudioFormats.h>
+#include <AsteriskSCF/Replication/MediaOperationsCore/MediaOperationsCoreIf.h>
 #include <IceUtil/UUID.h>
 #include <pjmedia.h>
 
@@ -30,6 +31,7 @@ using namespace AsteriskSCF::Media::V1;
 using namespace AsteriskSCF::Core::Discovery::V1;
 using namespace AsteriskSCF::Media::Formats::Audio::V1;
 using namespace AsteriskSCF::System::Logging;
+using namespace AsteriskSCF::Replication::MediaOperationsCore::V1;
 
 class UlawAlawOperation;
 typedef IceUtil::Handle<UlawAlawOperation> UlawAlawOperationPtr;
@@ -167,14 +169,57 @@ public:
     UlawAlawOperation(const Ice::ObjectAdapterPtr& adapter,
             const Logger& logger,
             const FormatPtr& sourceFormat,
-            const FormatPtr& sinkFormat)
+            const FormatPtr& sinkFormat,
+            const Ice::Identity& factoryId,
+            const MediaOperationReplicationContextPtr& replicationContext)
         : mAdapter(adapter),
         mLogger(logger),
-        mSourceFormat(sourceFormat),
-        mSinkFormat(sinkFormat),
-        mSource(new UlawAlawSource(mLogger, mSourceFormat)),
-        mSink(new UlawAlawSink(mLogger, mSinkFormat, this))
+        mStateItem(new UlawAlawMediaOperationStateItem),
+        mSource(new UlawAlawSource(mLogger, sourceFormat)),
+        mSink(new UlawAlawSink(mLogger, sinkFormat, this)),
+        mReplicationContext(replicationContext)
     {
+        mStateItem->sourceFormat = sourceFormat;
+        mStateItem->sinkFormat = sinkFormat;
+        mStateItem->factoryId = factoryId;
+    }
+
+    void setState()
+    {
+        if (mReplicationContext->isReplicating() == false)
+        {
+            return;
+        }
+
+        try
+        {
+            MediaOperationStateItemSeq seq;
+            seq.push_back(mStateItem);
+            mReplicationContext->getReplicator().tryOneWay()->setState(seq);
+        }
+        catch (const Ice::Exception& ex)
+        {
+            mLogger(Error) << "Exception caught while attempting to replicate state: " << ex.what();
+        }
+    }
+
+    void removeState()
+    {
+        if (mReplicationContext->isReplicating() == false)
+        {
+            return;
+        }
+
+        try
+        {
+            MediaOperationStateItemSeq seq;
+            seq.push_back(mStateItem);
+            mReplicationContext->getReplicator().tryOneWay()->removeStateForItems(seq);
+        }
+        catch (const Ice::Exception& ex)
+        {
+            mLogger(Error) << "Exception caught while attempting to replicate state: " << ex.what();
+        }
     }
 
     MediaOperationPrx activate(const std::string& id)
@@ -186,6 +231,10 @@ public:
         mSourceProxy =
             StreamSourcePrx::uncheckedCast(mAdapter->add(mSource, mAdapter->getCommunicator()->stringToIdentity(id +".Source")));
 
+        mStateItem->operationId = mProxy->ice_getIdentity().name;
+
+        setState();
+
         return mProxy;
     }
 
@@ -225,7 +274,7 @@ public:
     
     FramePtr translate(const FramePtr inFrame)
     {
-        if (inFrame->mediaFormat->name != mSinkFormat->name)
+        if (inFrame->mediaFormat->name != mStateItem->sinkFormat->name)
         {
             mLogger(Error) << "Cannot translate frame because the format is not what we expect.";
             throw UnsupportedMediaFormatException();
@@ -244,7 +293,7 @@ public:
             std::transform(inFrame->payload.begin(), inFrame->payload.end(), outPayload.begin(), std::bind1st(std::mem_fun(&UlawAlawOperation::alaw2ulaw), this));
         }
     
-        outFrame = new Frame(mSourceFormat, outPayload);
+        outFrame = new Frame(mStateItem->sourceFormat, outPayload);
         return outFrame;
     }
 
@@ -260,17 +309,18 @@ private:
     Ice::ObjectAdapterPtr mAdapter;
     Logger mLogger;
     MediaOperationPrx mProxy;
-    FormatPtr mSourceFormat;
-    FormatPtr mSinkFormat;
+    UlawAlawMediaOperationStateItemPtr mStateItem;
     UlawAlawSourcePtr mSource;
     StreamSourcePrx mSourceProxy;
     UlawAlawSinkPtr mSink;
     StreamSinkPrx mSinkProxy;
+    MediaOperationReplicationContextPtr mReplicationContext;
 };
 
 UlawAlawFactory::UlawAlawFactory(const Ice::ObjectAdapterPtr& adapter,
-        const Logger& logger)
-    : MediaOperationFactoryImpl(adapter, logger, "UlawAlawFactory")
+        const Logger& logger,
+        const MediaOperationReplicationContextPtr& replicationContext)
+    : MediaOperationFactoryImpl(adapter, logger, replicationContext, "UlawAlawFactory")
 {
     MediaOperationAttributes ulawToAlaw;
     ulawToAlaw.inputFormat = new G711uLAW();
@@ -386,11 +436,11 @@ MediaOperationPrx UlawAlawFactory::createMediaOperation(
                 mAdapter,
                 mLogger,
                 sourceFormat,
-                sinkFormat));
+                sinkFormat,
+                getProxy()->ice_getIdentity(),
+                mReplicationContext));
 
     MediaOperationPrx proxy = operation->activate(operationId);
-    //Here be where state replication should go
-
     return proxy;
 }
 
diff --git a/src/ulaw_alaw.h b/src/ulaw_alaw.h
index cc2eb3d..4d4c56c 100644
--- a/src/ulaw_alaw.h
+++ b/src/ulaw_alaw.h
@@ -28,7 +28,12 @@ class UlawAlawFactory : public MediaOperationFactoryImpl
 {
 public:
     UlawAlawFactory(const Ice::ObjectAdapterPtr&,
-            const AsteriskSCF::System::Logging::Logger& logger);
+            const AsteriskSCF::System::Logging::Logger& logger,
+            const MediaOperationReplicationContextPtr& replicationContext);
+
+    void replicateState(const AsteriskSCF::Media::V1::MediaOperationPrx& proxy,
+            const AsteriskSCF::Media::V1::FormatPtr& sourceFormat,
+            const AsteriskSCF::Media::V1::FormatPtr& sinkFormat);
 
     AsteriskSCF::Media::V1::MediaOperationPrx createMediaOperation(
             const AsteriskSCF::Media::V1::StreamSourcePrx& source,

commit 91d0dc55e0eb12f481c0ea26a2162e05ff428498
Author: Mark Michelson <mmichelson at digium.com>
Date:   Thu Aug 18 16:17:17 2011 -0500

    Use the base component class.
    
    The point of this was to get easy access to a replication context so
    that I could replicate state like a boss.

diff --git a/slice/AsteriskSCF/Replication/MediaOperationsCore/MediaOperationsCoreIf.ice b/slice/AsteriskSCF/Replication/MediaOperationsCore/MediaOperationsCoreIf.ice
index d407b45..3b5d04e 100644
--- a/slice/AsteriskSCF/Replication/MediaOperationsCore/MediaOperationsCoreIf.ice
+++ b/slice/AsteriskSCF/Replication/MediaOperationsCore/MediaOperationsCoreIf.ice
@@ -33,6 +33,9 @@ module MediaOperationsCore
 module V1
 {
 
+const string StateReplicatorComponentCategory = "SipStateReplicatorComponent";
+const string StateReplicatorDiscoveryCategory = "SipStateReplicator";
+
 ["visitor"] local class MediaOperationStateItemVisitor
 {
 };
@@ -46,6 +49,10 @@ module V1
 ["visitor:MediaOperationStateItemVisitor"] class MediaOperationStateItem
 {
     /**
+     * The State replicator template in ice-util-cpp requires a key parameter.
+     */
+    string key;
+    /**
      * The identity of the factory from which this operation was made.
      * We use this to look up the factory in the object adapter so that
      * we can create a new media operation
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index a23bc93..acdcd12 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -1,24 +1,25 @@
 include_directories(${logger_dir}/include)
 include_directories(${astscf-ice-util-cpp_dir}/include)
 
-astscf_component_init(media_operations_core)
-astscf_component_add_files(media_operations_core ulaw_alaw.h)
-astscf_component_add_files(media_operations_core ulaw_alaw.cpp)
-astscf_component_add_files(media_operations_core MediaOperationFactoryImpl.h)
-astscf_component_add_files(media_operations_core MediaOperationFactoryImpl.cpp)
-astscf_component_add_files(media_operations_core MediaOperationsCore.cpp)
-astscf_component_add_files(media_operations_core MediaOperationStateReplicatorListener.cpp)
-astscf_component_add_files(media_operations_core MediaOperationStateReplicator.h)
-astscf_component_add_slices(media_operations_core PROJECT AsteriskSCF/Replication/MediaOperationsCore/MediaOperationsCoreIf.ice)
-astscf_component_add_slice_collection_libraries(media_operations_core ASTSCF)
-astscf_component_build_icebox(media_operations_core)
-target_link_libraries(media_operations_core logging-client astscf-ice-util-cpp)
+astscf_component_init(MediaOperationsCore)
+astscf_component_add_files(MediaOperationsCore ulaw_alaw.h)
+astscf_component_add_files(MediaOperationsCore ulaw_alaw.cpp)
+astscf_component_add_files(MediaOperationsCore MediaOperationFactoryImpl.h)
+astscf_component_add_files(MediaOperationsCore MediaOperationFactoryImpl.cpp)
+astscf_component_add_files(MediaOperationsCore MediaOperationsCore.cpp)
+astscf_component_add_files(MediaOperationsCore MediaOperationStateReplicatorListener.cpp)
+astscf_component_add_files(MediaOperationsCore MediaOperationReplicationContext.h)
+astscf_component_add_files(MediaOperationsCore MediaOperationStateReplicator.h)
+astscf_component_add_slices(MediaOperationsCore PROJECT AsteriskSCF/Replication/MediaOperationsCore/MediaOperationsCoreIf.ice)
+astscf_component_add_slice_collection_libraries(MediaOperationsCore ASTSCF)
+astscf_component_build_icebox(MediaOperationsCore)
+target_link_libraries(MediaOperationsCore logging-client astscf-ice-util-cpp)
 
-pjproject_link(media_operations_core pjlib)
-pjproject_link(media_operations_core pjlib-util)
-pjproject_link(media_operations_core pjmedia)
-pjproject_link(media_operations_core pjnath)
-astscf_component_install(media_operations_core)
+pjproject_link(MediaOperationsCore pjlib)
+pjproject_link(MediaOperationsCore pjlib-util)
+pjproject_link(MediaOperationsCore pjmedia)
+pjproject_link(MediaOperationsCore pjnath)
+astscf_component_install(MediaOperationsCore)
 
 astscf_component_init(MediaOperationStateReplicator)
 astscf_component_add_files(MediaOperationStateReplicator MediaOperationStateReplicatorApp.cpp)
diff --git a/src/MediaOperationReplicationContext.h b/src/MediaOperationReplicationContext.h
new file mode 100644
index 0000000..be75189
--- /dev/null
+++ b/src/MediaOperationReplicationContext.h
@@ -0,0 +1,89 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+//I pretty much copied and pasted this from the sip version, changing
+//a few relevant bits here and there.
+#pragma once
+
+#include <AsteriskSCF/Discovery/SmartProxy.h>
+#include <AsteriskSCF/Replication/ReplicationContext.h>
+#include <AsteriskSCF/Component/TestContext.h>
+
+#include <boost/shared_ptr.hpp>
+
+#include "MediaOperationStateReplicator.h"
+
+namespace AsteriskSCF
+{
+
+namespace MediaOperationsCore
+{
+
+typedef AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::Replication::MediaOperationsCore::V1::MediaOperationStateReplicatorPrx> ReplicatorSmartPrx;
+
+/** 
+ * This class provides the component's classes with the context needed to perform replication. 
+ */
+class MediaOperationReplicationContext : public AsteriskSCF::Replication::ReplicationContext
+{
+public:
+    MediaOperationReplicationContext(AsteriskSCF::Replication::ReplicationStateType state) : 
+        AsteriskSCF::Replication::ReplicationContext(state)
+    {
+    }
+
+    // Override
+    virtual bool isReplicating() 
+    {
+        // If the base context says we aren't replicating, we aren't. 
+        if (ReplicationContext::isReplicating() == false)
+        {
+            return false;
+        }
+
+        // Do we have a replicator proxy?
+        if (mReplicator.isInitialized())
+        {
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Get a reference to the state replicator service. 
+     */
+    ReplicatorSmartPrx getReplicator()
+    {
+        return mReplicator;
+    }
+
+    /** 
+     * Sets the reference to the state replicator service. 
+     */
+    void setReplicator(const ReplicatorSmartPrx& replicator)
+    {
+        mReplicator = replicator;
+    }
+
+private:
+    ReplicatorSmartPrx mReplicator;
+};
+
+typedef boost::shared_ptr<MediaOperationReplicationContext> MediaOperationReplicationContextPtr;
+
+} // end MediaOperationsCore
+} // end AsteriskSCF
diff --git a/src/MediaOperationStateReplicator.h b/src/MediaOperationStateReplicator.h
index 1930ace..c802f28 100644
--- a/src/MediaOperationStateReplicator.h
+++ b/src/MediaOperationStateReplicator.h
@@ -43,5 +43,7 @@ private:
     Ice::ObjectAdapterPtr mAdapter;
 };
 
+typedef IceUtil::Handle<MediaOperationStateReplicatorListenerImpl> MediaOperationStateReplicatorListenerImplPtr;
+
 }
 }
diff --git a/src/MediaOperationStateReplicatorApp.cpp b/src/MediaOperationStateReplicatorApp.cpp
index 887bec1..ed01019 100644
--- a/src/MediaOperationStateReplicatorApp.cpp
+++ b/src/MediaOperationStateReplicatorApp.cpp
@@ -19,6 +19,8 @@
 #include <Ice/Ice.h>
 #include <IceBox/IceBox.h>
 
+#include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
+#include <AsteriskSCF/Replication/MediaOperationsCore/MediaOperationsCoreIf.h>
 #include <AsteriskSCF/logger.h>
 
 using namespace AsteriskSCF::System::Logging;
@@ -30,6 +32,15 @@ Logger lg = getLoggerFactory().getLogger("AsteriskSCF.MediaOperationStateReplica
 const std::string ReplicatorId("MediaOperationStateReplicator");
 }
 
+namespace AsteriskSCF
+{
+
+namespace MediaOperationsCore
+{
+
+using namespace AsteriskSCF::Core::Discovery::V1;
+using namespace AsteriskSCF::Replication::MediaOperationsCore::V1;
+
 class MediaOperationStateReplicatorService : public IceBox::Service
 {
 public:
@@ -40,6 +51,10 @@ public:
 
     virtual void stop();
 
+    void createStateReplicator(const Ice::CommunicatorPtr& communicator);
+    void registerWithServiceLocator(const Ice::CommunicatorPtr& communicator);
+
+    ServiceLocatorManagementPrx mManagement;
     Ice::ObjectAdapterPtr mAdapter;
     MediaOperationStateReplicatorImplPtr mReplicator;
 };
@@ -73,7 +88,7 @@ void MediaOperationStateReplicatorService::start(
     mAdapter = communicator->createObjectAdapter("MediaOperationStateReplicatorAdapter");
 
     createStateReplicator(communicator);
-    registerWithServiceLocator();
+    registerWithServiceLocator(communicator);
 
     mAdapter->activate();
 }
@@ -90,3 +105,6 @@ ASTSCF_DLL_EXPORT IceBox::Service* create(Ice::CommunicatorPtr)
     return new MediaOperationStateReplicatorService;
 }
 }
+
+} //end namespace MediaOperationsCore
+} //end namespace AsteriskSCF
diff --git a/src/MediaOperationsCore.cpp b/src/MediaOperationsCore.cpp
index 12684d3..2f09a20 100644
--- a/src/MediaOperationsCore.cpp
+++ b/src/MediaOperationsCore.cpp
@@ -17,10 +17,11 @@
 #include <Ice/Ice.h>
 #include <IceBox/IceBox.h>
 
-#include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
+#include <AsteriskSCF/Component/Component.h>
 #include <AsteriskSCF/Media/MediaOperationIf.h>
 #include <AsteriskSCF/logger.h>
 
+#include "MediaOperationReplicationContext.h"
 #include "MediaOperationFactoryImpl.h"
 #include "ulaw_alaw.h"
 
@@ -30,6 +31,7 @@ namespace
 {
 Logger lg = getLoggerFactory().getLogger("AsteriskSCF.MediaOperationsCore");
 const std::string CompareGuid("MediaOperationLocatorParamsCompare");
+const std::string ComponentServiceDiscoveryCategory("MediaOperationsCoreComponentService");
 };
 
 namespace AsteriskSCF
@@ -40,6 +42,10 @@ namespace MediaOperationsCore
 
 using namespace AsteriskSCF::Core::Discovery::V1;
 using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Replication;
+using namespace AsteriskSCF::Component;
+using namespace AsteriskSCF::Replication::MediaOperationsCore::V1;
+using namespace AsteriskSCF::Discovery;
 
 typedef std::vector<std::pair<MediaOperationFactoryImplPtr, MediaOperationFactoryPrx> > MediaOperationFactorySeq;
 
@@ -92,96 +98,192 @@ private:
 
 typedef IceUtil::Handle<MediaOperationsCompare> MediaOperationsComparePtr;
 
-class MediaOperationsCoreApp : public IceBox::Service
+class MediaOperationsComponent : public AsteriskSCF::Component::Component
 {
 public:
-    void start(
-            const std::string&,
-            const Ice::CommunicatorPtr&,
-            const Ice::StringSeq&);
-    void stop();
+    MediaOperationsComponent()
+        : AsteriskSCF::Component::Component(lg, ComponentServiceDiscoveryCategory),
+        mListeningToReplicator(false) { }
+
+    ~MediaOperationsComponent() { }
 
 private:
+    ReplicationContextPtr createReplicationContext(ReplicationStateType state)
+    {
+        MediaOperationReplicationContextPtr context(new MediaOperationReplicationContext(state));
+        return context;
+    };
 
-    void createAndRegisterFactory(const MediaOperationFactoryImplPtr& factory);
-    void createOperationFactories();
-    void registerWithServiceLocator();
+    void createAndRegisterFactory(const MediaOperationFactoryImplPtr& factory)
+    {
+        MediaOperationFactoryPrx factoryProxy =
+            MediaOperationFactoryPrx::uncheckedCast(getServiceAdapter()->add(factory, getCommunicator()->stringToIdentity(factory->getName())));
+    
+        mFactories.push_back(std::make_pair(factory, factoryProxy));
+    }
+    
+    void createOperationFactories()
+    {
+        lg(Debug) << "Creating UlawAlawFactory";
+        createAndRegisterFactory(new UlawAlawFactory(getServiceAdapter(), lg));
+    }
 
-    ServiceLocatorManagementPrx mManagement;
-    Ice::ObjectAdapterPtr mAdapter;
+    void locateStateReplicator()
+    {
+        if (getReplicationContext()->getState() == ACTIVE_STANDALONE)
+        {
+            return;
+        }
 
-    MediaOperationFactorySeq mFactories;
+        ServiceLocatorParamsPtr params(new ServiceLocatorParams);
+        params->category = StateReplicatorDiscoveryCategory;
+        params->service =
+            getCommunicator()->getProperties()->getPropertyWithDefault(getName() + ".StateReplicatorService", "default");
 
-    MediaOperationsComparePtr mMediaOperationsCompare;
-    ServiceLocatorParamsComparePrx mMediaOperationsComparePrx;
-};
+        try
+        {
+            AsteriskSCF::Discovery::SmartProxy<MediaOperationStateReplicatorPrx> pw(getServiceLocator(), params, lg);
+            MediaOperationReplicationContextPtr context = 
+                boost::static_pointer_cast<MediaOperationReplicationContext>(getReplicationContext());
 
-void MediaOperationsCoreApp::createAndRegisterFactory(const MediaOperationFactoryImplPtr& factory)
-{
-    MediaOperationFactoryPrx factoryProxy =
-        MediaOperationFactoryPrx::uncheckedCast(mAdapter->add(factory, mAdapter->getCommunicator()->stringToIdentity(factory->getName())));
+            context->setReplicator(pw);
+        }
+        catch (...)
+        {
+            lg(Error) << "Exception caught attempting to locate state replicator";
+        }
+    }
 
-    mFactories.push_back(std::make_pair(factory, factoryProxy));
+    void wrapServicesForRegistration()
+    {
+        //Since we use a subclass of ServiceLocatorParams, we can't use
+        //the base Component class's wrapServiceForRegistration function.
+        //
+        //XXX Would it be worthwhile to write a wrapServiceForRegistration
+        //overload in the base component class that just took locator params
+        //and an optional compareguid?
+        for (MediaOperationFactorySeq::iterator iter = mFactories.begin();
+                iter != mFactories.end(); ++iter)
+        {
+            LocatorRegistrationWrapperPtr wrapper(new LocatorRegistrationWrapper(
+                        getCommunicator(),
+                        getServiceLocatorManagementProperty(),
+                        iter->second,
+                        iter->first->getName(),
+                        iter->first->getLocatorParams(),
+                        CompareGuid));
+            
+            managePrimaryService(wrapper);
+        }
+    }
 
-}
+    // Below are overrides of the base component class.
 
-void MediaOperationsCoreApp::createOperationFactories()
-{
-    lg(Debug) << "Creating UlawAlawFactory";
-    createAndRegisterFactory(new UlawAlawFactory(mAdapter, lg));
-}
+    void createPrimaryServices()
+    {
+        createOperationFactories();
+    }
 
-void MediaOperationsCoreApp::registerWithServiceLocator()
-{
-    for (MediaOperationFactorySeq::iterator iter = mFactories.begin();
-            iter != mFactories.end(); ++iter)
+    void findRemoteServices()
     {
-        ServiceManagementPrx factoryServiceManagement =
-            mManagement->addService(iter->second, iter->first->getName());
-        factoryServiceManagement->addLocatorParams(iter->first->getLocatorParams(), CompareGuid);
+        locateStateReplicator();
     }
 
-    mMediaOperationsCompare = new MediaOperationsCompare(mFactories);
-    mMediaOperationsComparePrx = ServiceLocatorParamsComparePrx::uncheckedCast(mAdapter->addWithUUID(mMediaOperationsCompare));
+    void preparePrimaryServicesForDiscovery()
+    {
+        wrapServicesForRegistration();
+    }
 
-    mManagement->addCompare(CompareGuid, mMediaOperationsComparePrx);
-}
+    //Once Primary services are registered, we need to tell the locator
+    //of our custom comparator.
+    void onRegisterPrimaryServices()
+    {
+        MediaOperationsComparePtr compare = new MediaOperationsCompare(mFactories);
+        ServiceLocatorParamsComparePrx compareProxy = ServiceLocatorParamsComparePrx::uncheckedCast(getServiceAdapter()->addWithUUID(compare));
 
-void MediaOperationsCoreApp::start(
-        const std::string&,
-        const Ice::CommunicatorPtr& communicator,
-        const Ice::StringSeq&)
-{
-    mManagement = ServiceLocatorManagementPrx::checkedCast(communicator->propertyToProxy("ServiceLocatorManagementProxy"));
-    mAdapter = communicator->createObjectAdapter("MediaOperationsCoreAdapter");
+        getServiceLocatorManagement()->addCompare(CompareGuid, compareProxy);
+    }
 
-    createOperationFactories();
-    registerWithServiceLocator();
+    void onUnregisterPrimaryServices()
+    {
+        getServiceLocatorManagement()->removeCompare(CompareGuid);
+    }
 
-    mAdapter->activate();
-}
+    void createReplicationStateListeners()
+    {
+        mReplicatorListener = new MediaOperationStateReplicatorListenerImpl(getServiceAdapter());
 
-void MediaOperationsCoreApp::stop()
-{
-    for (MediaOperationFactorySeq::iterator iter = mFactories.begin();
-            iter != mFactories.end(); ++iter)
+        MediaOperationStateReplicatorListenerPrx replicatorListener =
+            MediaOperationStateReplicatorListenerPrx::uncheckedCast(getBackplaneAdapter()->addWithUUID(mReplicatorListener));
+        
+        mReplicatorListenerProxy = MediaOperationStateReplicatorListenerPrx::uncheckedCast(replicatorListener->ice_oneway());
+    }
+
+    void listenToStateReplicators()
     {
-        try
+        if (mListeningToReplicator == true)
         {
-            mAdapter->remove(iter->second->ice_getIdentity());
+            return;
+        }
+
+        MediaOperationReplicationContextPtr context =
+            boost::static_pointer_cast<MediaOperationReplicationContext>(getReplicationContext());
+
+        if (!context->getReplicator().isInitialized())
+        {
+            return;
+        }
+
+        try
+        { 
+            if (context->getState() == STANDBY_IN_REPLICA_GROUP)
+            {
+                context->getReplicator()->addListener(mReplicatorListenerProxy);
+                mListeningToReplicator = true;
+            }
         }
         catch (const Ice::Exception& ex)
         {
-            lg(Error) << "Error attempting to remove a factory from the adapter" << ex.what();
+            lg(Error) << ex.what();
+            throw;
         }
     }
-}
+
+    void stopListeningToStateReplicators()
+    {
+        MediaOperationReplicationContextPtr context = 
+            boost::static_pointer_cast<MediaOperationReplicationContext>(getReplicationContext());
+    
+        if ((!context->getReplicator().isInitialized()) || (mListeningToReplicator == false))
+        {
+            return;
+        }
+    
+        try
+        {
+            context->getReplicator()->removeListener(mReplicatorListenerProxy);
+            mListeningToReplicator = false;
+        }
+        catch (const Ice::Exception& e)
+        {
+            lg(Error) << e.what();
+            throw;
+        }
+    }
+
+    bool mListeningToReplicator;
+
+    MediaOperationStateReplicatorListenerImplPtr mReplicatorListener;
+    MediaOperationStateReplicatorListenerPrx mReplicatorListenerProxy;
+
+    MediaOperationFactorySeq mFactories;
+};
 
 extern "C"
 {
 ASTSCF_DLL_EXPORT IceBox::Service* create(Ice::CommunicatorPtr)
 {
-    return new MediaOperationsCoreApp;
+    return new AsteriskSCF::MediaOperationsCore::MediaOperationsComponent;
 }
 }
 
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index d30992f..82b0a08 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -1,21 +1,21 @@
 include_directories(${astscf-ice-util-cpp_dir}/include)
 
-astscf_component_init(media_operations_core_test)
-astscf_component_add_files(media_operations_core_test TestMediaOperations.cpp)
-astscf_component_add_files(media_operations_core_test TestStreamSource.cpp)
-astscf_component_add_files(media_operations_core_test TestStreamSource.h)
-astscf_component_add_files(media_operations_core_test TestStreamSink.cpp)
-astscf_component_add_files(media_operations_core_test TestStreamSink.h)
-astscf_component_add_boost_libraries(media_operations_core_test unit_test_framework)
-astscf_component_add_slice_collection_libraries(media_operations_core_test ASTSCF)
-astscf_component_build_icebox(media_operations_core_test)
+astscf_component_init(MediaOperationsCoreTest)
+astscf_component_add_files(MediaOperationsCoreTest TestMediaOperations.cpp)
+astscf_component_add_files(MediaOperationsCoreTest TestStreamSource.cpp)
+astscf_component_add_files(MediaOperationsCoreTest TestStreamSource.h)
+astscf_component_add_files(MediaOperationsCoreTest TestStreamSink.cpp)
+astscf_component_add_files(MediaOperationsCoreTest TestStreamSink.h)
+astscf_component_add_boost_libraries(MediaOperationsCoreTest unit_test_framework)
+astscf_component_add_slice_collection_libraries(MediaOperationsCoreTest ASTSCF)
+astscf_component_build_icebox(MediaOperationsCoreTest)
 
-astscf_test_icebox(media_operations_core_test config/test_component.config)
-target_link_libraries(media_operations_core_test astscf-ice-util-cpp)
+astscf_test_icebox(MediaOperationsCoreTest config/test_component.config)
+target_link_libraries(MediaOperationsCoreTest astscf-ice-util-cpp)
 
-pjproject_link(media_operations_core_test pjsip)
-pjproject_link(media_operations_core_test pjmedia)
-pjproject_link(media_operations_core_test pjnath)
-pjproject_link(media_operations_core_test pjlib-util)
-pjproject_link(media_operations_core_test pjlib)
+pjproject_link(MediaOperationsCoreTest pjsip)
+pjproject_link(MediaOperationsCoreTest pjmedia)
+pjproject_link(MediaOperationsCoreTest pjnath)
+pjproject_link(MediaOperationsCoreTest pjlib-util)
+pjproject_link(MediaOperationsCoreTest pjlib)
 

-----------------------------------------------------------------------


-- 
asterisk-scf/release/media_operations_core.git



More information about the asterisk-scf-commits mailing list