[asterisk-scf-commits] asterisk-scf/release/media_rtp_pjmedia.git branch "master" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Wed Sep 14 17:43:42 CDT 2011


branch "master" has been updated
       via  458c78b6a0b5ca95265fb2ff8ad1ebbf0c2228ac (commit)
       via  a042827c7bd657208756eb8176fb9b59c7ba60a8 (commit)
       via  7743183d3e581b8b0b0fbb13f52b593384d3ea0b (commit)
       via  6ab655ebd6b1d46172ba735b2541249a88a5092d (commit)
       via  aa81de303e27c5afb5d1d136581ead995c23d0ed (commit)
       via  5a88bcca5f0505c3f4176631315c327efe91ba11 (commit)
       via  f367da4739906648c80c678121756047a97213e8 (commit)
       via  432e98a60d5da408de623badd3cbbdd1b2d9974c (commit)
       via  27b6f831e45fcac62c47bc0a8e6346bceb41312f (commit)
       via  0950e6d32f6bdb665ad54090a324a5f93ace1b3b (commit)
       via  51a1d11783701bff96ba621e275824e0aae6454c (commit)
       via  f32a2b7829c7442bbd0bfbfeec6c4c0511b00bc4 (commit)
       via  1bf93e4bb3b668e45fb1d9227814efbce416a031 (commit)
       via  7abcf7e1c41b909a2aaf1a2c44be0baf1c3ada56 (commit)
       via  851ca93f58b1fab9a0e6349852a016bae9a76374 (commit)
       via  b6db1d069bcf3013327d8adee49e7b4d8a1d6c0a (commit)
       via  fa8e68b106cd0c1b6f49733764c0c29f6d728387 (commit)
       via  c62a7f5070867013ae428a3c9af2c83f6840a716 (commit)
       via  cd50386c941baa0c1661ccfefaeb3ef1dd51a291 (commit)
       via  68e7ebfd67f8a6f899011f6c07297e284a4a19dd (commit)
       via  018c66abb6b5a4dec2f4d4ff64f7cb3d87055ad8 (commit)
       via  bebb64eeace7496c0da09b238f293ae35b296d36 (commit)
       via  c356be141c7f6cca773a9316a1ff4b3e392258af (commit)
       via  2f6e2062d7d27b364fd48386fa4e1006f3dc6d87 (commit)
      from  415cb5b5543d7f509877a4ada82178e04d9de1c4 (commit)

Summary of changes:
 .../MediaRTPPJMedia/RtpStateReplicationIf.ice      |   51 ++
 src/CMakeLists.txt                                 |    4 +
 src/Component.cpp                                  |   24 +-
 src/MediaRTPpjmedia.cpp                            |  724 ++++++++++++++++++++
 src/RTPSession.cpp                                 |  198 +++++-
 src/RTPSession.h                                   |    8 +-
 src/RTPSink.cpp                                    |   51 ++-
 src/RTPSink.h                                      |   11 +-
 src/RTPSource.cpp                                  |   35 +
 src/RTPSource.h                                    |    5 +
 src/RTPTelephonyEventSink.cpp                      |  226 ++++++
 src/RTPTelephonyEventSink.h                        |   81 +++
 src/RTPTelephonyEventSource.cpp                    |  243 +++++++
 src/RTPTelephonyEventSource.h                      |  108 +++
 src/ReplicationAdapter.h                           |    2 +
 src/RtpStateReplicatorListener.cpp                 |  127 ++--
 src/SessionAdapter.h                               |    2 +
 test/TestRTPICE.cpp                                |    8 +-
 test/TestRTPpjmedia.cpp                            |   14 +-
 19 files changed, 1817 insertions(+), 105 deletions(-)
 create mode 100644 src/MediaRTPpjmedia.cpp
 create mode 100644 src/RTPTelephonyEventSink.cpp
 create mode 100644 src/RTPTelephonyEventSink.h
 create mode 100644 src/RTPTelephonyEventSource.cpp
 create mode 100644 src/RTPTelephonyEventSource.h


- Log -----------------------------------------------------------------
commit 458c78b6a0b5ca95265fb2ff8ad1ebbf0c2228ac
Author: Mark Michelson <mmichelson at digium.com>
Date:   Wed Sep 14 17:44:59 2011 -0500

    Fix compilation problem in RtpStateReplicatorListener

diff --git a/src/RtpStateReplicatorListener.cpp b/src/RtpStateReplicatorListener.cpp
index e3edc22..95fbef3 100644
--- a/src/RtpStateReplicatorListener.cpp
+++ b/src/RtpStateReplicatorListener.cpp
@@ -112,9 +112,11 @@ public:
                     localitem = newitem;
                     mImpl->mStateItems.insert(make_pair(item->sessionId, newitem));
 
+                    RTPAllocationOutputsPtr outputs (new RTPAllocationOutputs);
+
                     localitem->setSession(
                          AsteriskSCF::PJMediaRTP::RTPSession::create(mImpl->mAdapter, mImpl->mEnvironment, item,
-                                    mImpl->mReplicationContext, mImpl->mConfigurationService));
+                                    mImpl->mReplicationContext, mImpl->mConfigurationService, item->options, outputs));
                 }
                 else
                 {

commit a042827c7bd657208756eb8176fb9b59c7ba60a8
Merge: 415cb5b 7743183
Author: Mark Michelson <mmichelson at digium.com>
Date:   Wed Sep 14 16:52:15 2011 -0500

    Merge branch 'telephony-events2'
    
    Conflicts:
    	src/CMakeLists.txt
    	src/MediaRTPpjmedia.cpp
    	src/RTPSession.cpp
    	src/RTPSession.h
    	src/RtpStateReplicatorListener.cpp
    	test/TestRTPpjmedia.cpp

diff --cc src/CMakeLists.txt
index 28e729d,b1ca3d0..d130546
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@@ -2,14 -2,17 +2,18 @@@ include_directories(${astscf-ice-util-c
  include_directories(${logger_dir}/include)
  
  astscf_component_init(media_rtp_pjmedia)
 -astscf_component_add_files(media_rtp_pjmedia MediaRTPpjmedia.cpp)
 +astscf_component_add_files(media_rtp_pjmedia Component.cpp)
  astscf_component_add_files(media_rtp_pjmedia RTPSession.cpp)
  astscf_component_add_files(media_rtp_pjmedia RTPSource.cpp)
+ astscf_component_add_files(media_rtp_pjmedia RTPTelephonyEventSource.cpp)
  astscf_component_add_files(media_rtp_pjmedia RTPSink.cpp)
+ astscf_component_add_files(media_rtp_pjmedia RTPTelephonyEventSink.cpp)
  astscf_component_add_files(media_rtp_pjmedia RTPSession.h)
  astscf_component_add_files(media_rtp_pjmedia RTPSource.h)
+ astscf_component_add_files(media_rtp_pjmedia RTPTelephonyEventSource.h)
  astscf_component_add_files(media_rtp_pjmedia RTPSink.h)
 +astscf_component_add_files(media_rtp_pjmedia RtpReplicationContext.h)
+ astscf_component_add_files(media_rtp_pjmedia RTPTelephonyEventSink.h)
  astscf_component_add_files(media_rtp_pjmedia RtpStateReplicatorListener.cpp)
  astscf_component_add_files(media_rtp_pjmedia RtpStateReplicator.h)
  astscf_component_add_files(media_rtp_pjmedia RTPConfiguration.cpp)
diff --cc src/Component.cpp
index c029909,0000000..9530a1f
mode 100644,000000..100644
--- a/src/Component.cpp
+++ b/src/Component.cpp
@@@ -1,621 -1,0 +1,637 @@@
 +/*
 + * Asterisk SCF -- An open-source communications framework.
 + *
 + * Copyright (C) 2010, Digium, Inc.
 + *
 + * See http://www.asterisk.org for more information about
 + * the Asterisk SCF project. Please do not directly contact
 + * any of the maintainers of this project for assistance;
 + * the project provides a web site, mailing lists and IRC
 + * channels for your use.
 + *
 + * This program is free software, distributed under the terms of
 + * the GNU General Public License Version 2. See the LICENSE.txt file
 + * at the top of the source tree.
 + */
 +
 +#include <pjlib.h>
 +#include <pjmedia.h>
 +
 +#include <Ice/Ice.h>
 +#include <IceBox/IceBox.h>
 +#include <IceUtil/UUID.h>
 +
 +#include <boost/shared_ptr.hpp>
 +
 +#include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
 +#include <AsteriskSCF/Media/MediaIf.h>
 +#include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
 +#include <AsteriskSCF/System/Component/ConfigurationIf.h>
 +#include <AsteriskSCF/Logger/IceLogger.h>
 +#include <AsteriskSCF/logger.h>
 +#include <AsteriskSCF/Discovery/SmartProxy.h>
 +#include <AsteriskSCF/Component/Component.h>
 +
 +#include "RtpReplicationContext.h"
 +#include "RTPSession.h"
 +#include "RtpStateReplicator.h"
 +#include "RTPConfiguration.h"
 +#include "RtpConfigurationIf.h"
 +#include "PJMediaEnvironment.h"
 +
 +using namespace std;
 +using namespace AsteriskSCF::Core::Discovery::V1;
 +using namespace AsteriskSCF::Media::V1;
 +using namespace AsteriskSCF::Media::RTP::V1;
 +using namespace AsteriskSCF::Replication::MediaRTPPJMedia::V1;
 +using namespace AsteriskSCF::Configuration::MediaRTPPJMedia::V1;
 +using namespace AsteriskSCF::System::Configuration::V1;
 +using namespace AsteriskSCF::System::Component::V1;
 +using namespace AsteriskSCF::System::Logging;
 +using namespace AsteriskSCF::Discovery;
 +using namespace AsteriskSCF::Replication;
 +using namespace AsteriskSCF::PJMediaRTP;
 +
 +namespace
 +{
 +Logger lg = getLoggerFactory().getLogger("AsteriskSCF.MediaRTP");
 +}
 +
 +static const string ReplicaServiceId("MediaRtpReplica");
 +static const string MediaServiceId("RTPMediaService");
 +static const string MediaComparatorServiceId("RTPMediaServiceComparator");
 +
 +/**
 + * Implementation of the RTPMediaService interface as defined in MediaRTPIf.ice
 + */
 +class RTPMediaServiceImpl : public RTPMediaService
 +{
 +public:
 +    RTPMediaServiceImpl(const Ice::ObjectAdapterPtr&, 
 +      const RtpReplicationContextPtr& replicationContext,
 +      const ConfigurationServiceImplPtr&);
 +
-     RTPSessionPrx allocate(const RTPServiceLocatorParamsPtr&, const Ice::Current&);
++    RTPSessionPrx allocate(
++            const RTPServiceLocatorParamsPtr&,
++            const RTPOptionsPtr&,
++            RTPAllocationOutputsPtr&,
++            const Ice::Current&);
++
 +    pj_pool_factory *getPoolFactory() { return mEnvironment->poolFactory(); };
 +
 +    PJMediaEnvironmentPtr getEnvironment()
 +    {
 +        return mEnvironment;
 +    }
 +
 +private:
 +    Ice::ObjectAdapterPtr mAdapter;
 +    PJMediaEnvironmentPtr mEnvironment;
 +    RtpReplicationContextPtr mReplicationContext;
 +    ConfigurationServiceImplPtr mConfigurationService;
 +
 +#if CONTROL_POINTS_ENABLED
 +    AsteriskSCF::PJMediaRTPTesting mMediaServiceSwitchBoard;
 +#endif
 +};
 +
 +/**
 + * Typedef which gives us a smart pointer type for RTPMediaServiceImpl class.
 + */
 +typedef IceUtil::Handle<RTPMediaServiceImpl> RTPMediaServiceImplPtr;
 +
 +/**
 + * Implementation of the ServiceLocatorParamsCompare class
 + */
 +class RTPMediaServiceCompareServiceImpl : public ServiceLocatorParamsCompare
 +{
 +public:
 +    RTPMediaServiceCompareServiceImpl(const ConfigurationServiceImplPtr& config) :
 +        mConfig(config)
 +    {
 +    }
 +
 +    bool isSupported(const ServiceLocatorParamsPtr& locatorParams, const Ice::Current&)
 +    {
 +    RTPServiceLocatorParamsPtr params;
 +
 +    if (!(params = RTPServiceLocatorParamsPtr::dynamicCast(locatorParams)))
 +    {
 +        return false;
 +    }
 +
 +    bool result = true;
 +
 +    // This is done on purpose for additional checks in the future
 +    if (params->ipv6 == true)
 +    {
 +#if defined(PJ_HAS_IPV6) && PJ_HAS_IPV6!=0
 +        result = true;
 +#else
 +        result = false;
 +#endif
 +    }
 +        //
 +        // We can ignore the SRTP criteria since we support it one way or the other.
 +        //
 +        if (!result)
 +        {
 +            return false;
 +        }
 +        
 +        RTPOverICEServiceLocatorParamsPtr iceParams = RTPOverICEServiceLocatorParamsPtr::dynamicCast(locatorParams);
 +        if (iceParams)
 +        {
 +            if (iceParams->enableRTPOverICE)
 +            {
 +                NATConfigPtr natConfig = mConfig->natConfig();
 +
 +                if (natConfig && natConfig->isSTUNEnabled())
 +                {
 +                    if (iceParams->enableTURN)
 +                    {
 +                        if (!natConfig->isTURNEnabled())
 +                        {
 +                            result = false;
 +                        }
 +                    }
 +                }
 +                else
 +                {
 +                    result = false;
 +                }
 +            }
 +            //
 +            // We ignore the else case because we can definitely do non-ICE related stuff... its not clear
 +            // that negative matches in this case should be exclusionary. Actual ICE usage will be specified
 +            // when the RTP session is allocated.
 +            //
 +        }
 +
 +    return result;
 +    }
 +
 +private:
 +    ConfigurationServiceImplPtr mConfig;
 +};
 +
 +/**
 + * Implementation of the Component class. 
 + */
 +class Component : public AsteriskSCF::Component::Component
 +{
 +public:
 +    Component() : 
 +       AsteriskSCF::Component::Component(lg, AsteriskSCF::Media::RTP::V1::ComponentServiceDiscoveryCategory), 
 +      mListeningToReplicator(false), mGeneralState(new RtpGeneralStateItem()) { mGeneralState->key = IceUtil::generateUUID(); };
 +
 +private:
 +    // Required base Component overrides
 +    virtual void createPrimaryServices();
 +    virtual void preparePrimaryServicesForDiscovery();
 +    virtual void createReplicationStateListeners();
 +    virtual void stopListeningToStateReplicators();
 +    virtual void listenToStateReplicators();
 +    virtual void findRemoteServices();
 +    virtual void onRegisterPrimaryServices();
 +
 +    // Optional base Component notifcation overrides
 +    virtual void onSuspend();
 +    virtual void onResume();
 +    virtual void onPreInitialize();
 +    virtual void onStop();    
 +    virtual void onStart();
 +
 +    // Other base Component overrides
 +    virtual void prepareBackplaneServicesForDiscovery();
 +    ReplicationContextPtr createReplicationContext(ReplicationStateType state);
 +
 +    // A proxy to the service locator manager for the component service.
 +    ServiceManagementPrx mComponentServiceManagement;
 +
 +    // State replicator listener.
 +    RtpStateReplicatorListenerPtr mReplicatorListener;
 +    RtpStateReplicatorListenerPrx mReplicatorListenerProxy;
 +    bool mListeningToReplicator;
 +
 +    // An instance of the general state information class.
 +    RtpGeneralStateItemPtr mGeneralState;
 +
 +    // Media service
 +    RTPMediaServiceImplPtr mRtpMediaServicePtr;
 +    RTPMediaServicePrx mRtpMediaServicePrx;
 +    LocatorRegistrationWrapperPtr mRtpMediaServiceRegistration;
 +
 +    // Media comparator service
 +    ServiceLocatorParamsComparePtr mRtpMediaComparatorService; 
 +    ServiceLocatorParamsComparePrx mRtpMediaComparatorServicePrx;
 +    RTPOverICEServiceLocatorParamsPtr mRtpOverIceLocatorParams;
 +
 +    // Configuration state 
 +    ConfigurationServiceImplPtr mConfigurationService;
 +    ConfigurationServicePrx mConfigurationServicePrx;
 +    LocatorRegistrationWrapperPtr mConfigurationRegistration;
 +};
 +
 +void Component::onSuspend() 
 +{
 +    mGeneralState->serviceManagement->suspend();
 +}
 +
 +void Component::onResume() 
 +{
 +    mGeneralState->serviceManagement->unsuspend();
 +}
 +
 +/**
 + * Wrapper class around pj_thread_desc.
 + */
 +class ThreadDescWrapper
 +{
 +public:
 +    /**
 +     * pjthread thread description information, must persist for the life of the thread
 +     */
 +    pj_thread_desc mDesc;
 +};
 +
 +/**
 + * Type definition used to create a smart pointer for the above.
 + */
 +typedef boost::shared_ptr<ThreadDescWrapper> ThreadDescWrapperPtr;
 +
 +/**
 + * Implementation of the Ice::ThreadNotification class.
 + */
 +class pjlibHook : public Ice::ThreadNotification
 +{
 +public:
 +    /**
 +     * Implementation of the start function which is called when a thread starts.
 +     */
 +    void start()
 +    {
 +        ThreadDescWrapperPtr wrapper = ThreadDescWrapperPtr(new ThreadDescWrapper());
 +        pj_thread_t *thread;
 +        pj_thread_register("ICE Thread", wrapper->mDesc, &thread);
 +        boost::lock_guard<boost::mutex> lock(mLock);
 +        pjThreads.insert(make_pair(thread, wrapper));
 +    }
 +
 +    /**
 +     * Implementation of the stop function which is called when a thread stops.
 +     */
 +    void stop()
 +    {
 +        if (pj_thread_is_registered())
 +        {
 +            boost::lock_guard<boost::mutex> lock(mLock);
 +            pjThreads.erase(pj_thread_this());
 +        }
 +    }
 +private:
 +    /**
 +     * A map containing thread lifetime persistent data.
 +     */
 +    map<pj_thread_t*, ThreadDescWrapperPtr> pjThreads;
 +
 +    /**
 +     * Mutex to protect the map
 +     */
 +    boost::mutex mLock;
 +};
 +
 +/**
 + * Constructor for the RTPMediaServiceImpl class.
 + */
 +RTPMediaServiceImpl::RTPMediaServiceImpl(const Ice::ObjectAdapterPtr& adapter,
 +    const RtpReplicationContextPtr& replicationContext,
 +    const ConfigurationServiceImplPtr& configurationService) :
 +    mAdapter(adapter), 
 +    mEnvironment(PJMediaEnvironment::create(adapter->getCommunicator()->getProperties(), configurationService)),
 +    mReplicationContext(replicationContext), 
 +    mConfigurationService(configurationService)
 +{
 +}
 +
 +/**
 + * Implementation of the allocate method as defined in MediaRTPIf.ice
 + */
- RTPSessionPrx RTPMediaServiceImpl::allocate(const RTPServiceLocatorParamsPtr& params, const Ice::Current&)
++RTPSessionPrx RTPMediaServiceImpl::allocate(
++        const RTPServiceLocatorParamsPtr& params,
++        const RTPOptionsPtr& options,
++        RTPAllocationOutputsPtr& outputs,
++        const Ice::Current&)
 +{
-     return AsteriskSCF::PJMediaRTP::RTPSession::create(mAdapter, IceUtil::generateUUID(), params, mEnvironment,
-             mReplicationContext, mConfigurationService);
++    return AsteriskSCF::PJMediaRTP::RTPSession::create(
++            mAdapter,
++            IceUtil::generateUUID(),
++            params,
++            mEnvironment,
++            mReplicationContext,
++            mConfigurationService,
++            options,
++            outputs);
 +}
 +
 +void Component::onPreInitialize() 
 +{
 +    /* Initialize pjlib as pjmedia will be using it */
 +    pj_status_t status = pj_init();
 +    if (status != PJ_SUCCESS)
 +    {
 +        lg(Error) << "PJ library initialization failed.";
 +        return;
 +    }
 +
 +    if ((status = pjlib_util_init()) != PJ_SUCCESS)
 +    {
 +        lg(Error) << "PJ Utility library initialization failed.";
 +        return;
 +    }
 +
 +    lg(Info) << "Initializing pjmedia rtp component" << endl;
 +
 +    Ice::InitializationData id;
 +    id.threadHook = new pjlibHook();
 +    id.properties = getCommunicator()->getProperties();
 +
 +    // To use our thread-hook, we need to set an alternate 
 +    // communicator in our Component base. 
 +    setCommunicator(Ice::initialize(id));
 +}
 +
 +/**
 + * Override of factory method to create our custom replication context. 
 + */
 +ReplicationContextPtr Component::createReplicationContext(ReplicationStateType state)
 +{
 +    RtpReplicationContextPtr context(new RtpReplicationContext(state));
 +    return context;
 +}
 +
 +/**
 + * Create the objects that implement the main services this component provides 
 + * the system.
 + */
 +void Component::createPrimaryServices()
 +{
 +    try
 +    {
 +        RtpReplicationContextPtr rtpReplicationContext = 
 +            boost::static_pointer_cast<RtpReplicationContext>(getReplicationContext());
 +
 +        mConfigurationService = ConfigurationServiceImpl::create();
 +        mConfigurationServicePrx = mConfigurationService->activate(getBackplaneAdapter(), IceUtil::generateUUID());
 +
 +        mRtpMediaServicePtr =
 +           new RTPMediaServiceImpl(getServiceAdapter(), rtpReplicationContext, mConfigurationService);
 +        mRtpMediaServicePrx = RTPMediaServicePrx::uncheckedCast(getServiceAdapter()->add(mRtpMediaServicePtr,
 +           getCommunicator()->stringToIdentity(MediaServiceId)));
 +
 +        mRtpMediaComparatorService = new RTPMediaServiceCompareServiceImpl(mConfigurationService);
 +        mRtpMediaComparatorServicePrx = ServiceLocatorParamsComparePrx::uncheckedCast(
 +              getServiceAdapter()->add(mRtpMediaComparatorService, 
 +                  getCommunicator()->stringToIdentity(MediaComparatorServiceId)));
 +
 +
 +        mRtpOverIceLocatorParams = new RTPOverICEServiceLocatorParams;
 +        mRtpOverIceLocatorParams->category = "rtp";
 +        PJMediaEnvironmentPtr mediaEnvironment = mRtpMediaServicePtr->getEnvironment();
 +
 +        //
 +        // Service wide configuration is done through properties allowing certain features
 +        // to be completely disabled.
 +        //
 +        NATConfigPtr natConfig = mediaEnvironment->natConfig();
 +        if (natConfig && natConfig->isSTUNEnabled())
 +        {
 +            mRtpOverIceLocatorParams->enableRTPOverICE = true;
 +            mRtpOverIceLocatorParams->enableTURN = natConfig->isTURNEnabled();
 +        }
 +        else
 +        {
 +            mRtpOverIceLocatorParams->enableRTPOverICE = false;
 +            mRtpOverIceLocatorParams->enableTURN = false;
 +        }
 +
 +        if (rtpReplicationContext->isActive() == true)
 +        {
 +            mGeneralState->comparatorId = IceUtil::generateUUID();
 +            getServiceLocatorManagement()->addCompare(mGeneralState->comparatorId, mRtpMediaComparatorServicePrx);
 +        }
 +    
 +    }
 +    catch(const Ice::Exception& e)
 +    {
 +       lg(Critical) << getName() << " : " << BOOST_CURRENT_FUNCTION << " : " << e.what();
 +    }
 +}
 +
 +/**
 + * Prepare this component's backplane interfaces for the Service Locator.
 + * This enables other Asterisk SCF components to locate our interfaces.
 + */
 +void Component::prepareBackplaneServicesForDiscovery()
 +{
 +    // Insure the default Component services are prepped. 
 +    AsteriskSCF::Component::Component::prepareBackplaneServicesForDiscovery();
 +
 +    try
 +    {
 +        // Register our configuration interface with the Service Locator.
 +        mConfigurationRegistration = wrapServiceForRegistration(mConfigurationServicePrx, 
 +                                                                ConfigurationDiscoveryCategory);
 +        manageBackplaneService(mConfigurationRegistration);
 +    }
 +    catch(const std::exception& e)
 +    {
 +        lg(Error) << "Exception in " << getName() << ", " << BOOST_CURRENT_FUNCTION <<  " : " << e.what();
 +    }
 +}
 +
 +void Component::findRemoteServices() 
 +{
 +    if (getReplicationContext()->getState() == ACTIVE_STANDALONE)
 +    {
 +        return;
 +    }
 +
 +    // Look for the configured state replicator or default one
 +    ServiceLocatorParamsPtr replicatorParams = new ServiceLocatorParams();
 +    replicatorParams->category = StateReplicatorDiscoveryCategory;
 +    replicatorParams->service =
 +        getCommunicator()->getProperties()->getPropertyWithDefault("Rtp.StateReplicatorService", "default");
 +
 +    try
 +    {
 +        RtpReplicationContextPtr rtpReplicationContext = 
 +            boost::static_pointer_cast<RtpReplicationContext>(getReplicationContext());
 +
 +        AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx> pw(getServiceLocator(), replicatorParams, lg);
 +        rtpReplicationContext->setReplicator(pw);
 +
 +        // Since we're not in standalone mode, we'll get our configuration updates routed via the
 +        // replicator service. 
 +        ConfigurationReplicatorPrx configurationReplicator = ConfigurationReplicatorPrx::checkedCast(
 +            rtpReplicationContext->getReplicator().initialize(), ReplicatorFacet);
 +        configurationReplicator->registerConfigurationService(mConfigurationServicePrx);
 +
 +    }
 +    catch (const std::exception& e)
 +    {
 +        lg(Error) << getName() << ": " << BOOST_CURRENT_FUNCTION << " State replicator could not be found, operating without. " << e.what();
 +    }
 +}
 +
 +void Component::createReplicationStateListeners()
 +{
 +    try
 +    {
 +        RtpReplicationContextPtr rtpReplicationContext = 
 +            boost::static_pointer_cast<RtpReplicationContext>(getReplicationContext());
 +
 +       // Create and publish our state replicator listener interface.
 +        mReplicatorListener = new RtpStateReplicatorListenerI(getServiceAdapter(), mRtpMediaServicePtr->getEnvironment(),
 +            mGeneralState, rtpReplicationContext, mConfigurationService);
 +        RtpStateReplicatorListenerPrx replicatorListener = RtpStateReplicatorListenerPrx::uncheckedCast(
 +             getBackplaneAdapter()->addWithUUID(mReplicatorListener));
 +        mReplicatorListenerProxy = RtpStateReplicatorListenerPrx::uncheckedCast(replicatorListener->ice_oneway());
 +            
 +        lg(Debug) << "Got proxy to RTP state replicator";
 +    }
 +    catch(const Ice::Exception &e)
 +    {
 +        lg(Error) << getName() << " in " << BOOST_CURRENT_FUNCTION << " : " << e.what();
 +        throw;
 +    }
 +}
 +
 +void Component::listenToStateReplicators()
 +{
 +    RtpReplicationContextPtr rtpReplicationContext = 
 +        boost::static_pointer_cast<RtpReplicationContext>(getReplicationContext());
 +
 +    if (mListeningToReplicator == true)
 +    {
 +        return;
 +    }
 +
 +    if (!rtpReplicationContext->getReplicator().isInitialized())
 +    {
 +        lg(Error) << getName() << " : State replicator could not be found. Unable to listen for state updates!";
 +        return;
 +    }
 +
 +    try
 +    {
 +        // Are we in standby mode?
 +        if (rtpReplicationContext->getState() == STANDBY_IN_REPLICA_GROUP)
 +        {
 +            rtpReplicationContext->getReplicator()->addListener(mReplicatorListenerProxy);
 +            mListeningToReplicator = true;
 +        }
 +    }
 +    catch (const Ice::Exception& e)
 +    {
 +        lg(Error) << e.what();
 +        throw;
 +    }
 +}
 +
 +/** 
 + * Unregister as a listener to our state replicator. 
 + * A component in active mode doesn't neeed to listen to
 + * state replication data. 
 + */
 +void Component::stopListeningToStateReplicators()
 +{
 +    RtpReplicationContextPtr rtpReplicationContext = 
 +        boost::static_pointer_cast<RtpReplicationContext>(getReplicationContext());
 +
 +    if ((!rtpReplicationContext->getReplicator().isInitialized()) || (mListeningToReplicator == false))
 +    {
 +        return;
 +    }
 +
 +    try
 +    {
 +        rtpReplicationContext->getReplicator()->removeListener(mReplicatorListenerProxy);
 +        mListeningToReplicator = false;
 +    }
 +    catch (const Ice::Exception& e)
 +    {
 +        lg(Error) << e.what();
 +        throw;
 +    }
 +}
 +
 +/**
 + * Prepares this component's primary public interfaces for discovery via the Service Locator.
 + * This enables other Asterisk SCF components to locate the interfaces we are publishing.
 + */
 +void Component::preparePrimaryServicesForDiscovery()
 +{
 +    try
 +    {
 +        mRtpMediaServiceRegistration = wrapServiceForRegistration(mRtpMediaServicePrx,
 +                                                                  "rtp");
 +        managePrimaryService(mRtpMediaServiceRegistration); 
 +    }
 +        catch(const std::exception& e)
 +    {
 +        lg(Error) << "Unable to publish component interfaces in " << getName() << BOOST_CURRENT_FUNCTION << 
 +            ". Exception: " << e.what();
 +        throw; // rethrow
 +    }
 +}
 +
 +void Component::onRegisterPrimaryServices()
 +{
 +    if (getReplicationContext()->isActive() == false)
 +    {
 +        return;
 +    }
 +
 +    mGeneralState->serviceManagement = mRtpMediaServiceRegistration->getServiceManagement();
 +    mGeneralState->serviceManagement->addLocatorParams(mRtpOverIceLocatorParams, mGeneralState->comparatorId);
 +}
 +
 +void Component::onStart() 
 +{
 +    // Note: I don't think this is necessary. If we make the 
 +    // comparator computed from a "service" identifier (which could default
 +    // to "default"), there's nothing replicated here that the standby component
 +    // couldn't already determine itself. 
 +    if (getReplicationContext()->isReplicating() == true)
 +    {
 +        RtpReplicationContextPtr rtpReplicationContext = 
 +            boost::static_pointer_cast<RtpReplicationContext>(getReplicationContext());
 +
 +        RtpStateItemSeq items;
 +        items.push_back(mGeneralState);
 +        RtpStateReplicatorPrx oneway = RtpStateReplicatorPrx::uncheckedCast(rtpReplicationContext->getReplicator()->ice_oneway());
 +        oneway->setState(items);
 +    }
 +}
 +
 +void Component::onStop() 
 +{
 +    if (getReplicationContext()->isActive() == true)
 +    {
 +       mGeneralState->serviceManagement->unregister();
 +    }
 +
 +    if (!mGeneralState->comparatorId.empty())
 +    {
 +        getServiceLocatorManagement()->removeCompare(mGeneralState->comparatorId);
 +    }
 +}
 +
 +extern "C"
 +{
 +ASTSCF_DLL_EXPORT IceBox::Service* create(Ice::CommunicatorPtr)
 +{
 +    return new Component;
 +}
 +}
diff --cc src/RTPSession.cpp
index 9839a19,df04bdc..50416e8
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@@ -144,8 -146,9 +146,8 @@@ public
              const string& id,
              const PJMediaEnvironmentPtr& env,
              const AsteriskSCF::Media::RTP::V1::RTPServiceLocatorParamsPtr& params,
- 	    const RtpReplicationContextPtr& replicationContext,
- 	    const ConfigurationServiceImplPtr&);
 -            const AsteriskSCF::System::Component::V1::ReplicaPrx& replicaControl, 
 -	        const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpStateReplicatorPrx>&,
++	        const RtpReplicationContextPtr& replicationContext,
+ 	        const ConfigurationServiceImplPtr&);
      
      RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter,
              const string& sessionIdentity,    
@@@ -154,7 -157,7 +156,7 @@@
              const AsteriskSCF::Media::V1::FormatSeq& formats,
              bool isIPv6,
              bool srtp,
- 	    const RtpReplicationContextPtr& replicationContext,
 -            const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpStateReplicatorPrx>&,
++	        const RtpReplicationContextPtr& replicationContext,
              const ConfigurationServiceImplPtr& configurationServant);
  
      ~RTPSessionImpl();
@@@ -434,8 -464,9 +465,8 @@@ RTPSessionImpl::RTPSessionImpl(const Ic
          const string& id,
          const PJMediaEnvironmentPtr& env,
          const RTPServiceLocatorParamsPtr& params,
- 	const RtpReplicationContextPtr& replicationContext,
 -        const ReplicaPrx& replicaService,
 -        const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>& stateReplicator,
 -        const ConfigurationServiceImplPtr& configurationService) :
++	    const RtpReplicationContextPtr& replicationContext,
 +        const ConfigurationServiceImplPtr& configurationService) : 
      mEnvironment(env),
      mEndpoint(PJMediaEndpoint::create(env)),
      mId(id),
@@@ -563,85 -596,6 +594,85 @@@ std::string RTPSessionImpl::getId(cons
      return mId;
  }
  
 +/** 
 + * Local implementation. 
 + */
 +void RTPSessionImpl::setCookies(const AsteriskSCF::Media::V1::SessionCookieDict& cookieMap)
 +{
 +    boost::unique_lock<boost::shared_mutex> lock(mLock);
 +    mSessionStateItem->cookies = cookieMap;
 +}
 +
 +/** 
 + * Support for the corresponding API call. 
 + */
 +void RTPSessionImpl::setCookies(const AsteriskSCF::Media::V1::SessionCookies& cookies, 
 +                const Ice::Current&)
 +{
 +    { // scope the lock
 +        boost::unique_lock<boost::shared_mutex> lock(mLock);
 +        for (AsteriskSCF::Media::V1::SessionCookies::const_iterator i = cookies.begin();
 +             i != cookies.end();  ++i)
 +        {
 +            mSessionStateItem->cookies[(*i)->ice_id()] = (*i);
 +        }
 +    }
 +
 +    if (mReplicationContext->isReplicating() == true)
 +    {
-         replicateState(mSessionStateItem, 0, 0);
++        replicateState(mSessionStateItem, 0, 0, 0, 0);
 +    }
 +}
 +
 +/** 
 + * Implementation of the corresponding API call. 
 + */
 +void RTPSessionImpl::getCookies_async(
 +        const AsteriskSCF::Media::V1::AMD_Session_getCookiesPtr &cb,
 +        const AsteriskSCF::Media::V1::SessionCookies& cookiesToGet, 
 +        const Ice::Current&)
 +{
 +    AsteriskSCF::Media::V1::SessionCookies results;
 +
 +    boost::unique_lock<boost::shared_mutex> lock(mLock);
 +    for (AsteriskSCF::Media::V1::SessionCookies::const_iterator i = cookiesToGet.begin();
 +	    i != cookiesToGet.end();
 +	    ++i)
 +    {
 +	AsteriskSCF::Media::V1::SessionCookieDict::const_iterator cookie = mSessionStateItem->cookies.find((*i)->ice_id());
 +
 +	if (cookie == mSessionStateItem->cookies.end())
 +	{
 +	    continue;
 +	}
 +
 +	results.push_back(cookie->second);
 +    }
 +
 +    cb->ice_response(results);
 +}
 +
 +/** 
 + * Implementation of the corresponding API call. 
 + */
 +void RTPSessionImpl::removeCookies(const AsteriskSCF::Media::V1::SessionCookies& cookies, 
 +                const Ice::Current&)
 +{
 +    { // scope the lock
 +        boost::unique_lock<boost::shared_mutex> lock(mLock);
 +        for (AsteriskSCF::Media::V1::SessionCookies::const_iterator i = cookies.begin();
 +                i != cookies.end(); ++i)
 +        {
 +            mSessionStateItem->cookies.erase((*i)->ice_id());
 +        }
 +    }
 +
 +    if (mReplicationContext->isReplicating() == true)
 +    {
-         replicateState(mSessionStateItem, 0, 0);
++        replicateState(mSessionStateItem, 0, 0, 0, 0);
 +    }
 +}
 +
  /**
   * Implementation of the release method as defined in MediaRTPIf.ice
   */
@@@ -832,13 -805,17 +882,17 @@@ RTPTelephonyEventSinkPtr RTPSessionImpl
  /**
   * API call which replicates state items.
   */
- void RTPSessionImpl::replicateState(const RtpSessionStateItemPtr& session, const RtpStreamSinkStateItemPtr& sink,
-         const RtpStreamSourceStateItemPtr& source)
+ void RTPSessionImpl::replicateState(
+         const RtpSessionStateItemPtr& session,
+         const RtpStreamSinkStateItemPtr& sink,
+         const RtpStreamSourceStateItemPtr& source,
+         const RTPTelephonyEventSinkStateItemPtr& telephonyEventSink,
+         const RTPTelephonyEventSourceStateItemPtr& telephonyEventSource)
  {
      // If state replication has been disabled do nothing
 -    if (!mStateReplicator || mReplicaService->isActive() == false)
 +    if (mReplicationContext->isReplicating() == false)
      {
 -	return;
 +        return;
      }
  
      RtpStateItemSeq items;
@@@ -855,15 -832,33 +909,25 @@@
  
      if (source)
      {
 -	items.push_back(source);
 +        items.push_back(source);
      }
  
+     if (telephonyEventSink)
+     {
+         items.push_back(telephonyEventSink);
+     }
+ 
+     if (telephonyEventSource)
+     {
+         items.push_back(telephonyEventSource);
+     }
+ 
      if (items.size() == 0)
      {
 -	return;
 +        return;
      }
  
 -    try
 -    {
 -	RtpStateReplicatorPrx oneway = RtpStateReplicatorPrx::uncheckedCast(mStateReplicator->ice_oneway());
 -	oneway->setState(items);
 -    }
 -    catch (...)
 -    {
 -	mStateReplicator->setState(items);
 -    }
 +    mReplicationContext->getReplicator().tryOneWay()->setState(items);
  }
  
  /**
@@@ -1028,28 -1087,40 +1149,41 @@@ private
  RTPSessionPrx AsteriskSCF::PJMediaRTP::RTPSession::create(const Ice::ObjectAdapterPtr& adapter,
          const std::string& id, const RTPServiceLocatorParamsPtr& params,
          const PJMediaEnvironmentPtr& environment, 
 -        const AsteriskSCF::System::Component::V1::ReplicaPrx& replicaControl,
 -        const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>& stateReplicator,
 +        const RtpReplicationContextPtr& replicationContext, 
-         const ConfigurationServiceImplPtr& configuration)
+         const ConfigurationServiceImplPtr& configuration,
+         const RTPOptionsPtr& options,
+         RTPAllocationOutputsPtr& outputs)
  {
      RTPSessionImplPtr servant(new RTPSessionImpl(adapter, id, environment, params,
 -                    replicaControl, stateReplicator, configuration));
 +                    replicationContext, configuration));
-     return servant->activate(id);
+     return servant->activate(id, options, outputs);
  }
  
  ReplicationAdapterPtr AsteriskSCF::PJMediaRTP::RTPSession::create(const Ice::ObjectAdapterPtr& adapter,
-   const PJMediaEnvironmentPtr& environment, 
-   const RtpSessionStateItemPtr& item,
-   const RtpReplicationContextPtr& replicationContext, 
-   const ConfigurationServiceImplPtr& configuration)
+         const PJMediaEnvironmentPtr& environment, 
+         const RtpSessionStateItemPtr& item,
++        const RtpReplicationContextPtr& replicationContext, 
+         const ConfigurationServiceImplPtr& configuration,
+         const RTPOptionsPtr& options,
+         RTPAllocationOutputsPtr& outputs)
  {
      RTPSessionImplPtr servant(new RTPSessionImpl(adapter, 
                      adapter->getCommunicator()->identityToString(item->sessionIdentity), 
                      environment,
                      item->port, item->formats, item->ipv6, item->srtp,
 -                    AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>(), 
 +                    replicationContext, 
                      configuration));
+ 
 +    servant->setCookies(item->cookies);
-     servant->activate(item->sessionIdentity, item->sourceIdentity, item->sinkIdentity);
++
+     servant->activate(
+             item->sessionIdentity,
+             item->sourceIdentity,
+             item->sinkIdentity,
+             item->telephonyEventSourceIdentity,
+             item->telephonyEventSinkIdentity,
+             options,
+             outputs);
+ 
      return ReplicationAdapterPtr(new ReplicationAdapterImpl(servant));
  }
--
diff --cc src/RTPSession.h
index 962d680,60f2e35..3fd558e
--- a/src/RTPSession.h
+++ b/src/RTPSession.h
@@@ -32,15 -31,20 +32,19 @@@ public
              const std::string& id,
              const AsteriskSCF::Media::RTP::V1::RTPServiceLocatorParamsPtr& params,
              const PJMediaEnvironmentPtr& environment,
 -            const AsteriskSCF::System::Component::V1::ReplicaPrx& replicaControl,
 -            const AsteriskSCF::Discovery::SmartProxy<
 -            AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpStateReplicatorPrx>& stateReplicator,
 +            const RtpReplicationContextPtr& replicationContext, 
-             const ConfigurationServiceImplPtr& configuration
+             const ConfigurationServiceImplPtr& configuration,
+             const AsteriskSCF::Media::RTP::V1::RTPOptionsPtr&,
+             AsteriskSCF::Media::RTP::V1::RTPAllocationOutputsPtr&
      );
  
      static ReplicationAdapterPtr create(const Ice::ObjectAdapterPtr& objectAdapter,
              const PJMediaEnvironmentPtr& environment,
              const AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpSessionStateItemPtr& update,
 +            const RtpReplicationContextPtr& replicationContext,
-             const ConfigurationServiceImplPtr& configuration
+             const ConfigurationServiceImplPtr& configuration,
+             const AsteriskSCF::Media::RTP::V1::RTPOptionsPtr&,
+             AsteriskSCF::Media::RTP::V1::RTPAllocationOutputsPtr&
      );
  };
  } /* End of namespace RTPMedia */
diff --cc src/RTPSink.h
index e2ffd9f,c4d8fa4..dd3970b
--- a/src/RTPSink.h
+++ b/src/RTPSink.h
@@@ -44,15 -48,9 +48,16 @@@ public
       * Internal implementation methods.
       */
      AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpStreamSinkStateItemPtr getStateItem();
+     AsteriskSCF::Replication::MediaRTPPJMedia::V1::RTPTelephonyEventSinkStateItemPtr getTelephonyEventSinkStateItem();
      void setRemoteDetailsImpl(const std::string& host, Ice::Int port);
      void setSourceImpl(const AsteriskSCF::Media::V1::StreamSourcePrx& proxy);
 +    Ice::ByteSeq encodeAudioPayload(
 +            const AsteriskSCF::Media::V1::FramePayloadPtr& framePayload,
 +            const AsteriskSCF::Media::V1::AudioFormatPtr& audioFormat);
 +
 +    Ice::ByteSeq encodeVideoPayload(
 +            const AsteriskSCF::Media::V1::FramePayloadPtr& framePayload,
 +            const AsteriskSCF::Media::V1::VideoFormatPtr&);
  
  private:
      /**
diff --cc src/RTPSource.cpp
index 793f662,01597c0..b9bf4f1
--- a/src/RTPSource.cpp
+++ b/src/RTPSource.cpp
@@@ -439,9 -418,13 +461,13 @@@ static void receiveRTP(void *userdata, 
          frame->mediaFormat = mediaformat;
          frame->timestamp = header->ts;
          frame->seqno = header->seq;
 -        copy(payload, payload + payload_size, std::back_inserter(frame->payload));
 +
          frames.push_back(frame);
      }
+     else if ((rfc4733 = RFC4733Ptr::dynamicCast(mediaformat)))
+     {
+         source->mImpl->mTelephonyEventSource->read(header, payload);
+     }
  
      if (frames.empty())
      {
diff --cc src/RTPSource.h
index 0fe640d,23383b8..991811e
--- a/src/RTPSource.h
+++ b/src/RTPSource.h
@@@ -44,16 -45,13 +45,20 @@@ public
      void setRemoteRtcpDetails(const std::string&, Ice::Int);
  
      AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpStreamSourceStateItemPtr getStateItem();
+     AsteriskSCF::Replication::MediaRTPPJMedia::V1::RTPTelephonyEventSourceStateItemPtr getTelephonyEventSourceStateItem();
  
      void setSinkImpl(const AsteriskSCF::Media::V1::StreamSinkPrx& proxy);
 +    AsteriskSCF::Media::V1::FramePayloadPtr decodeAudioPayload(
 +            const Ice::ByteSeq&,
 +            const AsteriskSCF::Media::V1::AudioFormatPtr&);
 +
 +    AsteriskSCF::Media::V1::FramePayloadPtr decodeVideoPayload(
 +            const Ice::ByteSeq&,
 +            const AsteriskSCF::Media::V1::VideoFormatPtr&);
  
+     AsteriskSCF::SessionCommunications::V1::TelephonyEventSourcePrx createTelephonyEventSource(Ice::ObjectAdapterPtr& adapter, const Ice::Identity& id);
+     RTPTelephonyEventSourcePtr getTelephonyEventSource();
+ 
      /**
       * Private implementation data for StreamSourceRTPImpl.
       * Note: This is public on purpose so that our RTP callback can access it.
diff --cc src/RtpStateReplicatorListener.cpp
index 5a39804,e35140d..e3edc22
--- a/src/RtpStateReplicatorListener.cpp
+++ b/src/RtpStateReplicatorListener.cpp
@@@ -85,74 -79,96 +85,95 @@@ public
      
      void setStateNoticeImpl(const RtpStateItemSeq& items)
      {
-     class visitor : public AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpStateItemVisitor
-     {
-     public:
-             visitor(RtpStateReplicatorListenerImpl *impl) : mImpl(impl)
 -	class visitor : public AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpStateItemVisitor
 -	{
 -	public:
 -            visitor(RtpStateReplicatorListenerImpl *impl) : mImpl(impl)
 -	    {
 -	    }
 -
 -	private:
 -	    RtpStateReplicatorListenerImpl *mImpl;
 -
 -	    void visitRtpGeneralStateItem(const RtpGeneralStateItemPtr &item)
 -	    {
 -		mImpl->mGeneralState->serviceManagement = item->serviceManagement;
 -		mImpl->mGeneralState->comparatorId = item->comparatorId;
 -	    }
 -		    
 -	    void visitRtpSessionStateItem(const RtpSessionStateItemPtr &item)
 -	    {
 -		map<string, boost::shared_ptr<RtpStateReplicatorItem> >::iterator i = mImpl->mStateItems.find(item->sessionId);
 -		boost::shared_ptr<RtpStateReplicatorItem> localitem;
 -
 -		if (i == mImpl->mStateItems.end())
 -		{
 -		    boost::shared_ptr<RtpStateReplicatorItem> newitem(new RtpStateReplicatorItem());
 -		    localitem = newitem;
 -		    mImpl->mStateItems.insert(make_pair(item->sessionId, newitem));
 -
 -            RTPAllocationOutputsPtr outputs;
 -            localitem->setSession(
 -                    AsteriskSCF::PJMediaRTP::RTPSession::create(mImpl->mAdapter, mImpl->mEnvironment, item,
 -                        mImpl->mConfigurationService, item->options, outputs));
 -		}
 -		else
 -		{
 -		    localitem = i->second;
 -		}
 -		
++        class visitor : public AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpStateItemVisitor
 +        {
-         }
++        public:
++                visitor(RtpStateReplicatorListenerImpl *impl) : mImpl(impl)
++            {
++            }
 +
-     private:
-         RtpStateReplicatorListenerImpl *mImpl;
++        private:
++            RtpStateReplicatorListenerImpl *mImpl;
 +
-         void visitRtpGeneralStateItem(const RtpGeneralStateItemPtr &item)
-         {
-             mImpl->mGeneralState->serviceManagement = item->serviceManagement;
-             mImpl->mGeneralState->comparatorId = item->comparatorId;
-         }
-             
-         void visitRtpSessionStateItem(const RtpSessionStateItemPtr &item)
-         {
-             map<string, boost::shared_ptr<RtpStateReplicatorItem> >::iterator i = mImpl->mStateItems.find(item->sessionId);
-             boost::shared_ptr<RtpStateReplicatorItem> localitem;
- 
-             if (i == mImpl->mStateItems.end())
++            void visitRtpGeneralStateItem(const RtpGeneralStateItemPtr &item)
 +            {
-                 boost::shared_ptr<RtpStateReplicatorItem> newitem(new RtpStateReplicatorItem());
-                 localitem = newitem;
-                 mImpl->mStateItems.insert(make_pair(item->sessionId, newitem));
- 
-                 localitem->setSession(
-                      AsteriskSCF::PJMediaRTP::RTPSession::create(mImpl->mAdapter, mImpl->mEnvironment, item,
-                                 mImpl->mReplicationContext, mImpl->mConfigurationService));
++                mImpl->mGeneralState->serviceManagement = item->serviceManagement;
++                mImpl->mGeneralState->comparatorId = item->comparatorId;
 +            }
-             else
++                
++            void visitRtpSessionStateItem(const RtpSessionStateItemPtr &item)
 +            {
-                 localitem = i->second;
++                map<string, boost::shared_ptr<RtpStateReplicatorItem> >::iterator i = mImpl->mStateItems.find(item->sessionId);
++                boost::shared_ptr<RtpStateReplicatorItem> localitem;
++
++                if (i == mImpl->mStateItems.end())
++                {
++                    boost::shared_ptr<RtpStateReplicatorItem> newitem(new RtpStateReplicatorItem());
++                    localitem = newitem;
++                    mImpl->mStateItems.insert(make_pair(item->sessionId, newitem));
++
++                    localitem->setSession(
++                         AsteriskSCF::PJMediaRTP::RTPSession::create(mImpl->mAdapter, mImpl->mEnvironment, item,
++                                    mImpl->mReplicationContext, mImpl->mConfigurationService));
++                }
++                else
++                {
++                    localitem = i->second;
++                }
++            
+                 //
+                 // TODO: This appears to happen in testing on occasion. Should verify if this should be
+                 // expected.
+                 //
+                 if (localitem)
+                 {
+                     localitem->getSession()->update(item);
+                 }
 -	    }
 -		    
 -	    void visitRtpStreamSinkStateItem(const RtpStreamSinkStateItemPtr &item)
 -	    {
 -		map<string, boost::shared_ptr<RtpStateReplicatorItem> >::iterator i =
 -                    mImpl->mStateItems.find(item->sessionId);
 -		if (i != mImpl->mStateItems.end())
 -		{
 +            }
-         
-             //
-             // TODO: This appears to happen in testing on occasion. Should verify if this should be
-             // expected.
-             //
-             if (localitem)
++                
++            void visitRtpStreamSinkStateItem(const RtpStreamSinkStateItemPtr &item)
 +            {
-                 localitem->getSession()->update(item);
++                map<string, boost::shared_ptr<RtpStateReplicatorItem> >::iterator i =
++                            mImpl->mStateItems.find(item->sessionId);
++                if (i != mImpl->mStateItems.end())
++                {
+                     i->second->getSession()->update(item);
 -		}
 -	    }
 -		    
 -	    void visitRtpStreamSourceStateItem(const RtpStreamSourceStateItemPtr &item)
 -	    {
 -		map<string, boost::shared_ptr<RtpStateReplicatorItem> >::iterator i =
 -                    mImpl->mStateItems.find(item->sessionId);
 -		if (i != mImpl->mStateItems.end())
 -		{
++                }
 +            }
-         }
-             
-         void visitRtpStreamSinkStateItem(const RtpStreamSinkStateItemPtr &item)
-         {
-             map<string, boost::shared_ptr<RtpStateReplicatorItem> >::iterator i =
-                         mImpl->mStateItems.find(item->sessionId);
-             if (i != mImpl->mStateItems.end())
++                
++            void visitRtpStreamSourceStateItem(const RtpStreamSourceStateItemPtr &item)
 +            {
-                 i->second->getSession()->update(item);
++                map<string, boost::shared_ptr<RtpStateReplicatorItem> >::iterator i =
++                        mImpl->mStateItems.find(item->sessionId);
++                if (i != mImpl->mStateItems.end())
++                {
+                     i->second->getSession()->update(item);
 -		}
 -	    }
++                }
 +            }
-         }
-             
-         void visitRtpStreamSourceStateItem(const RtpStreamSourceStateItemPtr &item)
-         {
-             map<string, boost::shared_ptr<RtpStateReplicatorItem> >::iterator i =
+ 
+             void visitRTPTelephonyEventSourceStateItem(const RTPTelephonyEventSourceStateItemPtr& item)
+             {
+                 map<string, boost::shared_ptr<RtpStateReplicatorItem> >::iterator i =
                      mImpl->mStateItems.find(item->sessionId);
-             if (i != mImpl->mStateItems.end())
+                 if (i != mImpl->mStateItems.end())
+                 {
+                     i->second->getSession()->update(item);
+                 }
+             }
+ 
+             void visitRTPTelephonyEventSinkStateItem(const RTPTelephonyEventSinkStateItemPtr& item)
              {
-                 i->second->getSession()->update(item);
+                 map<string, boost::shared_ptr<RtpStateReplicatorItem> >::iterator i =
+                     mImpl->mStateItems.find(item->sessionId);
+                 if (i != mImpl->mStateItems.end())
+                 {
+                     i->second->getSession()->update(item);
+                 }
              }
-         }
-     };
+ 
 -	};
++        };
  
 -	AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpStateItemVisitorPtr v = new visitor(this);
 +        AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpStateItemVisitorPtr v = new visitor(this);
  
          for (RtpStateItemSeq::const_iterator item = items.begin(); item != items.end(); ++item)
          {
diff --cc test/TestRTPpjmedia.cpp
index 6e2edec,14d3e4c..6bda9b9
--- a/test/TestRTPpjmedia.cpp
+++ b/test/TestRTPpjmedia.cpp
@@@ -483,13 -478,16 +483,16 @@@ BOOST_AUTO_TEST_CASE(AllocateRTPSession
          FormatSeq formats;
          formats.push_back(format);
  
-         // You might think "geez, this should deadlock due to state replication" but no, we use one ways for that
-         boost::mutex::scoped_lock lock(Testbed.mLock);
+ 	    // You might think "geez, this should deadlock due to state replication" but no, we use one ways for that
+ 	    boost::mutex::scoped_lock lock(Testbed.mLock);
+ 
+         RTPOptionsPtr options(new RTPOptions);
+         RTPAllocationOutputsPtr outputs(new RTPAllocationOutputs);
  
-         Testbed.session = service->allocate(params);
+         Testbed.session = service->allocate(params, options, outputs);
  
 -	// Give the RTP component time to replicate this session
 -	Testbed.mCondition.wait(lock);
 +        // Give the RTP component time to replicate this session
 +        Testbed.mCondition.wait(lock);
  
          allocated = true;
      }

commit 7743183d3e581b8b0b0fbb13f52b593384d3ea0b
Author: Mark Michelson <mmichelson at digium.com>
Date:   Thu Aug 11 11:01:04 2011 -0500

    Some state replication fixes so that IDs of telephony event sources and sinks are properly replicated.

diff --git a/slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice b/slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice
index 0ddeee5..675f19f 100644
--- a/slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice
+++ b/slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice
@@ -87,8 +87,11 @@ module V1
 	int port;
 	Ice::Identity sinkIdentity;
 	Ice::Identity sourceIdentity;
+        Ice::Identity telephonyEventSourceIdentity;
+        Ice::Identity telephonyEventSinkIdentity;
 	AsteriskSCF::Media::V1::FormatSeq formats;
 	AsteriskSCF::Media::RTP::V1::PayloadMap payloadstoFormats;
+        AsteriskSCF::Media::RTP::V1::RTPOptions options;
 	bool ipv6;
         bool srtp;
         string remoteRtcpAddress;
diff --git a/src/RTPSession.cpp b/src/RTPSession.cpp
index dd7ba39..df04bdc 100644
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@ -223,6 +223,8 @@ public:
             const Ice::Identity& id,
             const Ice::Identity& sourceId,
             const Ice::Identity& sinkId,
+            const Ice::Identity& telephonyEventSourceId,
+            const Ice::Identity& telephonyEventSinkId,
             const RTPOptionsPtr& options,
             RTPAllocationOutputsPtr& outputs);
 
@@ -920,13 +922,25 @@ RTPSessionPrx RTPSessionImpl::activate(
     assert(id == mId);
     Ice::Identity sourceId(mAdapter->getCommunicator()->stringToIdentity(IceUtil::generateUUID()));
     Ice::Identity sinkId(mAdapter->getCommunicator()->stringToIdentity(IceUtil::generateUUID()));
-    return activate(mAdapter->getCommunicator()->stringToIdentity(id), sourceId, sinkId, options, outputs);
+    Ice::Identity telephonyEventSourceId(mAdapter->getCommunicator()->stringToIdentity(IceUtil::generateUUID()));
+    Ice::Identity telephonyEventSinkId(mAdapter->getCommunicator()->stringToIdentity(IceUtil::generateUUID()));
+
+    return activate(
+            mAdapter->getCommunicator()->stringToIdentity(id),
+            sourceId,
+            sinkId,
+            telephonyEventSourceId,
+            telephonyEventSinkId,
+            options,
+            outputs);
 }
 
 RTPSessionPrx RTPSessionImpl::activate(
         const Ice::Identity& id,
         const Ice::Identity& sourceId,
         const Ice::Identity& sinkId,
+        const Ice::Identity& telephonyEventSourceId,
+        const Ice::Identity& telephonyEventSinkId,
         const RTPOptionsPtr& options,
         RTPAllocationOutputsPtr& outputs)
 {
@@ -950,11 +964,11 @@ RTPSessionPrx RTPSessionImpl::activate(
             outputs = new RTPAllocationOutputs();
 
             mTelephonyEventSourcePrx =
-                mStreamSource->createTelephonyEventSource(mAdapter);
+                mStreamSource->createTelephonyEventSource(mAdapter, telephonyEventSourceId);
             outputs->eventSources.push_back(mTelephonyEventSourcePrx);
 
             mTelephonyEventSinkPrx =
-                mStreamSink->createTelephonyEventSink(mAdapter);
+                mStreamSink->createTelephonyEventSink(mAdapter, telephonyEventSinkId);
             outputs->eventSinks.push_back(mTelephonyEventSinkPrx);
         }
 
@@ -982,6 +996,7 @@ RTPSessionPrx RTPSessionImpl::activate(
             {
                 mSessionStateItem->port = 0;
             }
+            mSessionStateItem->options = options;
             replicateState(
                     mSessionStateItem,
                     mStreamSink->getStateItem(),
@@ -1096,7 +1111,16 @@ ReplicationAdapterPtr AsteriskSCF::PJMediaRTP::RTPSession::create(const Ice::Obj
                     item->port, item->formats, item->ipv6, item->srtp,
                     AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>(), 
                     configuration));
-    servant->activate(item->sessionIdentity, item->sourceIdentity, item->sinkIdentity, options, outputs);
+
+    servant->activate(
+            item->sessionIdentity,
+            item->sourceIdentity,
+            item->sinkIdentity,
+            item->telephonyEventSourceIdentity,
+            item->telephonyEventSinkIdentity,
+            options,
+            outputs);
+
     return ReplicationAdapterPtr(new ReplicationAdapterImpl(servant));
 }
 
diff --git a/src/RTPSink.cpp b/src/RTPSink.cpp
index e90d63a..0e00731 100644
--- a/src/RTPSink.cpp
+++ b/src/RTPSink.cpp
@@ -106,7 +106,7 @@ StreamSinkRTPImpl::StreamSinkRTPImpl(
 {
 }
 
-TelephonyEventSinkPrx StreamSinkRTPImpl::createTelephonyEventSink(Ice::ObjectAdapterPtr& adapter)
+TelephonyEventSinkPrx StreamSinkRTPImpl::createTelephonyEventSink(Ice::ObjectAdapterPtr& adapter, const Ice::Identity& id)
 {
     mImpl->mTelephonyEventSink =
         new RTPTelephonyEventSink(
@@ -115,7 +115,7 @@ TelephonyEventSinkPrx StreamSinkRTPImpl::createTelephonyEventSink(Ice::ObjectAda
             mImpl->mSessionAdapter,
             mImpl->mSessionId);
 
-    return TelephonyEventSinkPrx::uncheckedCast(adapter->addWithUUID(mImpl->mTelephonyEventSink));
+    return TelephonyEventSinkPrx::uncheckedCast(adapter->add(mImpl->mTelephonyEventSink, id));
 }
 
 RTPTelephonyEventSinkPtr StreamSinkRTPImpl::getTelephonyEventSink()
diff --git a/src/RTPSink.h b/src/RTPSink.h
index 4c08486..c4d8fa4 100644
--- a/src/RTPSink.h
+++ b/src/RTPSink.h
@@ -42,7 +42,7 @@ public:
     std::string getRemoteAddress(const Ice::Current&);
     Ice::Int getRemotePort(const Ice::Current&);
 
-    AsteriskSCF::SessionCommunications::V1::TelephonyEventSinkPrx createTelephonyEventSink(Ice::ObjectAdapterPtr& adapter);
+    AsteriskSCF::SessionCommunications::V1::TelephonyEventSinkPrx createTelephonyEventSink(Ice::ObjectAdapterPtr& adapter, const Ice::Identity& id);
     RTPTelephonyEventSinkPtr getTelephonyEventSink();
     /**
      * Internal implementation methods.
diff --git a/src/RTPSource.cpp b/src/RTPSource.cpp
index 529bbc4..01597c0 100644
--- a/src/RTPSource.cpp
+++ b/src/RTPSource.cpp
@@ -217,11 +217,11 @@ StreamSourceRTPImpl::StreamSourceRTPImpl(const SessionAdapterPtr& session,
 {
 }
 
-TelephonyEventSourcePrx StreamSourceRTPImpl::createTelephonyEventSource(Ice::ObjectAdapterPtr& adapter)
+TelephonyEventSourcePrx StreamSourceRTPImpl::createTelephonyEventSource(Ice::ObjectAdapterPtr& adapter, const Ice::Identity& id)
 {
     mImpl->mTelephonyEventSource = new RTPTelephonyEventSource(mImpl->mSessionAdapter, mImpl->mSessionId);
 
-    return TelephonyEventSourcePrx::uncheckedCast(adapter->addWithUUID(mImpl->mTelephonyEventSource));
+    return TelephonyEventSourcePrx::uncheckedCast(adapter->add(mImpl->mTelephonyEventSource, id));
 }
 
 RTPTelephonyEventSourcePtr StreamSourceRTPImpl::getTelephonyEventSource()
diff --git a/src/RTPSource.h b/src/RTPSource.h
index aa57773..23383b8 100644
--- a/src/RTPSource.h
+++ b/src/RTPSource.h
@@ -49,7 +49,7 @@ public:
 
     void setSinkImpl(const AsteriskSCF::Media::V1::StreamSinkPrx& proxy);
 
-    AsteriskSCF::SessionCommunications::V1::TelephonyEventSourcePrx createTelephonyEventSource(Ice::ObjectAdapterPtr& adapter);
+    AsteriskSCF::SessionCommunications::V1::TelephonyEventSourcePrx createTelephonyEventSource(Ice::ObjectAdapterPtr& adapter, const Ice::Identity& id);
     RTPTelephonyEventSourcePtr getTelephonyEventSource();
 
     /**
diff --git a/src/RtpStateReplicatorListener.cpp b/src/RtpStateReplicatorListener.cpp
index 5c287ce..e35140d 100644
--- a/src/RtpStateReplicatorListener.cpp
+++ b/src/RtpStateReplicatorListener.cpp
@@ -106,12 +106,10 @@ public:
 		    localitem = newitem;
 		    mImpl->mStateItems.insert(make_pair(item->sessionId, newitem));
 
-            //XXX We need to sort out the options and outputs here.
-            RTPOptionsPtr options(new RTPOptions);
-            RTPAllocationOutputsPtr outputs(new RTPAllocationOutputs);
+            RTPAllocationOutputsPtr outputs;
             localitem->setSession(
                     AsteriskSCF::PJMediaRTP::RTPSession::create(mImpl->mAdapter, mImpl->mEnvironment, item,
-                        mImpl->mConfigurationService, options, outputs));
+                        mImpl->mConfigurationService, item->options, outputs));
 		}
 		else
 		{

commit 6ab655ebd6b1d46172ba735b2541249a88a5092d
Author: Mark Michelson <mmichelson at digium.com>
Date:   Thu Aug 11 09:30:11 2011 -0500

    Be sure to set the sessionId and key of state items for telephony event sources and sinks.

diff --git a/src/RTPSink.cpp b/src/RTPSink.cpp
index 3b37056..e90d63a 100644
--- a/src/RTPSink.cpp
+++ b/src/RTPSink.cpp
@@ -112,7 +112,8 @@ TelephonyEventSinkPrx StreamSinkRTPImpl::createTelephonyEventSink(Ice::ObjectAda
         new RTPTelephonyEventSink(
             &mImpl->mOutgoingSession,
             mImpl->mTransport,
-            mImpl->mSessionAdapter);
+            mImpl->mSessionAdapter,
+            mImpl->mSessionId);
 
     return TelephonyEventSinkPrx::uncheckedCast(adapter->addWithUUID(mImpl->mTelephonyEventSink));
 }
diff --git a/src/RTPSource.cpp b/src/RTPSource.cpp
index 81e4978..529bbc4 100644
--- a/src/RTPSource.cpp
+++ b/src/RTPSource.cpp
@@ -219,7 +219,7 @@ StreamSourceRTPImpl::StreamSourceRTPImpl(const SessionAdapterPtr& session,
 
 TelephonyEventSourcePrx StreamSourceRTPImpl::createTelephonyEventSource(Ice::ObjectAdapterPtr& adapter)
 {
-    mImpl->mTelephonyEventSource = new RTPTelephonyEventSource(mImpl->mSessionAdapter);
+    mImpl->mTelephonyEventSource = new RTPTelephonyEventSource(mImpl->mSessionAdapter, mImpl->mSessionId);
 
     return TelephonyEventSourcePrx::uncheckedCast(adapter->addWithUUID(mImpl->mTelephonyEventSource));
 }
diff --git a/src/RTPTelephonyEventSink.cpp b/src/RTPTelephonyEventSink.cpp
index e2108c3..82cf3a7 100644
--- a/src/RTPTelephonyEventSink.cpp
+++ b/src/RTPTelephonyEventSink.cpp
@@ -17,6 +17,7 @@
 #include "RTPTelephonyEventSink.h"
 
 #include <AsteriskSCF/Media/Formats/OtherFormats.h>
+#include <IceUtil/UUID.h>
 
 namespace
 {
@@ -43,9 +44,12 @@ using namespace AsteriskSCF::Replication::MediaRTPPJMedia::V1;
 RTPTelephonyEventSink::RTPTelephonyEventSink(
         pjmedia_rtp_session *session,
         const PJMediaTransportPtr& transport,
-        const SessionAdapterPtr& sessionAdapter)
+        const SessionAdapterPtr& sessionAdapter,
+        const std::string& sessionId)
     : mSession(session), mTransport(transport), mSessionAdapter(sessionAdapter), mStateItem(new RTPTelephonyEventSinkStateItem)
 {
+    mStateItem->sessionId = sessionId;
+    mStateItem->key = IceUtil::generateUUID();
     mStateItem->segmentno = 0;
 }
 
diff --git a/src/RTPTelephonyEventSink.h b/src/RTPTelephonyEventSink.h
index 49aa3df..407c1b2 100644
--- a/src/RTPTelephonyEventSink.h
+++ b/src/RTPTelephonyEventSink.h
@@ -30,7 +30,8 @@ public:
     RTPTelephonyEventSink(
             pjmedia_rtp_session *session,
             const AsteriskSCF::PJMediaRTP::PJMediaTransportPtr& transport,
-            const AsteriskSCF::PJMediaRTP::SessionAdapterPtr& sessionAdapter);
+            const AsteriskSCF::PJMediaRTP::SessionAdapterPtr& sessionAdapter,
+            const std::string& sessionId);
 
     /**
      * Overrides of TelephonyEventSink interface
diff --git a/src/RTPTelephonyEventSource.cpp b/src/RTPTelephonyEventSource.cpp
index aee56f9..3a6631a 100644
--- a/src/RTPTelephonyEventSource.cpp
+++ b/src/RTPTelephonyEventSource.cpp
@@ -15,8 +15,10 @@
  */
 
 #include "RTPTelephonyEventSource.h"
+
 #include <AsteriskSCF/logger.h>
 #include <pjmedia.h>
+#include <IceUtil/UUID.h>
 
 using namespace AsteriskSCF::SessionCommunications::V1;
 using namespace AsteriskSCF::System::Logging;
@@ -28,9 +30,13 @@ namespace
 Logger lg = getLoggerFactory().getLogger("AsteriskSCF.MediaRTP");
 }
 
-RTPTelephonyEventSource::RTPTelephonyEventSource(const SessionAdapterPtr& sessionAdapter) 
+RTPTelephonyEventSource::RTPTelephonyEventSource(
+        const SessionAdapterPtr& sessionAdapter,
+        const std::string& sessionId)
     : mSessionAdapter(sessionAdapter), mStateItem(new RTPTelephonyEventSourceStateItem())
 {
+    mStateItem->sessionId = sessionId;
+    mStateItem->key = IceUtil::generateUUID();
     mStateItem->onEnd = false;
     mStateItem->timestamp = 0;
 }
diff --git a/src/RTPTelephonyEventSource.h b/src/RTPTelephonyEventSource.h
index c5c4fc2..cd7eb13 100644
--- a/src/RTPTelephonyEventSource.h
+++ b/src/RTPTelephonyEventSource.h
@@ -28,7 +28,9 @@
 class RTPTelephonyEventSource : public AsteriskSCF::SessionCommunications::V1::TelephonyEventSource
 {
 public:
-    RTPTelephonyEventSource(const AsteriskSCF::PJMediaRTP::SessionAdapterPtr& sessionAdapter);
+    RTPTelephonyEventSource(
+            const AsteriskSCF::PJMediaRTP::SessionAdapterPtr& sessionAdapter,
+            const std::string& sessionId);
 
     /**
      * Overrides of TelephonyEventSource interface

commit aa81de303e27c5afb5d1d136581ead995c23d0ed
Author: Mark Michelson <mmichelson at digium.com>
Date:   Wed Aug 10 17:47:32 2011 -0500

    Add returns in places where pjmedia library calls have failed.

diff --git a/src/RTPTelephonyEventSink.cpp b/src/RTPTelephonyEventSink.cpp
index 2974fd3..e2108c3 100644
--- a/src/RTPTelephonyEventSink.cpp
+++ b/src/RTPTelephonyEventSink.cpp
@@ -140,7 +140,7 @@ void RTPTelephonyEventSink::write_async(
 
     if (status != PJ_SUCCESS)
     {
-        //fffffffuuuuuuuuuuuu
+        return;
     }
 
     char packet[1500];
@@ -152,7 +152,7 @@ void RTPTelephonyEventSink::write_async(
 
     if (status != PJ_SUCCESS)
     {
-        //fffffffuuuuuuuuuuuu
+        return;
     }
 
     if (replicationNeeded)

commit 5a88bcca5f0505c3f4176631315c327efe91ba11
Author: Mark Michelson <mmichelson at digium.com>
Date:   Wed Aug 10 17:38:53 2011 -0500

    Add documentation.

diff --git a/slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice b/slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice
index c0783d0..0ddeee5 100644
--- a/slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice
+++ b/slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice
@@ -109,14 +109,49 @@ module V1
 
     class RTPTelephonyEventSinkStateItem extends RtpStateItem
     {
+        /**
+         * RFC 4733 events' duration field is only 16-bits, and the
+         * duration is measured in timestamp units. This means that for
+         * 8000 Hz streams, the duration field is only capable of
+         * representing a duration of ~8 seconds.
+         * 
+         * RFC 4733's answer to this is to separate the event into "segments."
+         * When the duration exceeds 0xFFFF, then a packet is sent with duration
+         * 0xFFFF, first. Then a new packet is sent with the RTP timestamp
+         * increased by 0xFFFF. The duration is then reset and calculated
+         * relative to this new timestamp.
+         *
+         * This member indicates the number of 0xFFFF timestamp unit segments
+         * we have processed for the event we currently are writing.
+         */
         int segmentno;
+
+        /**
+         * Proxy for the source from which we're receiving events
+         */
         AsteriskSCF::SessionCommunications::V1::TelephonyEventSource* source;
     };
 
     class RTPTelephonyEventSourceStateItem extends RtpStateItem
     {
+        /**
+         * When processing incoming RFC 4733 events, typically 3 end packets
+         * will be received. To prevent duplicate end events from being sent
+         * to sinks, we keep track of whether we have already received an
+         * end for the event being read.
+         */
         bool onEnd;
+        /**
+         * The RTP timestamp of the beginning for the event we are processing.
+         * We use this when calculating the duration of events. See the explanation
+         * in RTPTelephonyEventSinkStateItem for how RFC 4733 deals with long
+         * events. We can always send an accurate duration by subracting the 
+         * original RTP timestamp from the RTP timestamp in the current packet.
+         */
         int timestamp;
+        /**
+         * Sinks to write events to
+         */
         AsteriskSCF::SessionCommunications::V1::TelephonyEventSinkSeq sinks;
     };
 
diff --git a/src/RTPTelephonyEventSink.h b/src/RTPTelephonyEventSink.h
index 6768b7f..49aa3df 100644
--- a/src/RTPTelephonyEventSink.h
+++ b/src/RTPTelephonyEventSink.h
@@ -32,6 +32,9 @@ public:
             const AsteriskSCF::PJMediaRTP::PJMediaTransportPtr& transport,
             const AsteriskSCF::PJMediaRTP::SessionAdapterPtr& sessionAdapter);
 
+    /**
+     * Overrides of TelephonyEventSink interface
+     */
     void write_async(
             const AsteriskSCF::SessionCommunications::V1::AMD_TelephonyEventSink_writePtr&,
             const AsteriskSCF::SessionCommunications::V1::TelephonyEventPtr&,
@@ -46,12 +49,25 @@ public:
             const AsteriskSCF::SessionCommunications::V1::AMD_TelephonyEventSink_getSourcePtr&,
             const Ice::Current&);
 
+    /**
+     * Used by the state replicator listener to update the state of replicas
+     */
     void updateState(const AsteriskSCF::Replication::MediaRTPPJMedia::V1::RTPTelephonyEventSinkStateItemPtr& item);
 
+    /**
+     * Accessor used to get the state item. Used during state replication
+     */
     AsteriskSCF::Replication::MediaRTPPJMedia::V1::RTPTelephonyEventSinkStateItemPtr getStateItem();
 private:
 
+    /**
+     * Translate DTMF from ASCII into its RFC 4733-designated payload value
+     */
     pj_uint8_t translateDTMF(Ice::Byte signal);
+
+    /**
+     * Replicate state
+     */
     void replicateState();
 
     pjmedia_rtp_session *mSession;
diff --git a/src/RTPTelephonyEventSource.h b/src/RTPTelephonyEventSource.h
index 3749160..c5c4fc2 100644
--- a/src/RTPTelephonyEventSource.h
+++ b/src/RTPTelephonyEventSource.h
@@ -30,6 +30,9 @@ class RTPTelephonyEventSource : public AsteriskSCF::SessionCommunications::V1::T
 public:
     RTPTelephonyEventSource(const AsteriskSCF::PJMediaRTP::SessionAdapterPtr& sessionAdapter);
 
+    /**
+     * Overrides of TelephonyEventSource interface
+     */
     void addSink_async(
             const AsteriskSCF::SessionCommunications::V1::AMD_TelephonyEventSource_addSinkPtr& cb,
             const AsteriskSCF::SessionCommunications::V1::TelephonyEventSinkPrx& sink,
@@ -39,22 +42,60 @@ public:
             const AsteriskSCF::SessionCommunications::V1::AMD_TelephonyEventSource_getSinksPtr& cb,
             const Ice::Current&);
 
+    /**
+     * Used to read a new RFC 4733 event in.
+     */
     void read(const pjmedia_rtp_hdr *hdr, const Ice::Byte* payload);
 
+    /**
+     * Used by the state replicator listener to update
+     * the state of a replica
+     */
     void updateState(const AsteriskSCF::Replication::MediaRTPPJMedia::V1::RTPTelephonyEventSourceStateItemPtr&);
 
+    /**
+     * Accessor for the state item. Used in state replication
+     */
     AsteriskSCF::Replication::MediaRTPPJMedia::V1::RTPTelephonyEventSourceStateItemPtr getStateItem();
 protected:
     ~RTPTelephonyEventSource();
 
 private:
 
+    /**
+     * Calculate the duration for the event.
+     *
+     * RFC 4733 event duration is expressed in timestamp units (i.e. samples). This function
+     * does two calculations to determine the duration in milliseconds.
+     *
+     * First, the duration expressed in the RFC 4733 event is converted to milliseconds.
+     * 
+     * Second, the RTP timestamp at the beginning of the event is subtracted from the RTP
+     * timestamp of the event being read and converted to milliseconds.
+     *
+     * These values are added together to determine the event duration.
+     */
     int calculateDuration(const pjmedia_rtp_hdr *header, const pjmedia_rtp_dtmf_event *event);
+    /**
+     * The following are methods for sending specific types of events to sinks
+     */
     void sendBeginEvent(const pjmedia_rtp_dtmf_event *event);
     void sendContinuationEvent(const pjmedia_rtp_dtmf_event *event, int duration);
     void sendEndEvent(const pjmedia_rtp_dtmf_event *event, int duration);
+
+    /**
+     * Send the resulting event to all sinks
+     */
     void distributeToSinks(const AsteriskSCF::SessionCommunications::V1::TelephonyEventPtr& event);
+
+    /**
+     * Translate DTMF from its RFC 4733 event payload to ASCII representation
+     */
     Ice::Byte translateDTMF(pj_uint8_t event);
+
+    /**
+     * Replicate state to replicas
+     */
     void replicateState();
 
     AsteriskSCF::PJMediaRTP::SessionAdapterPtr mSessionAdapter;

commit f367da4739906648c80c678121756047a97213e8
Author: Mark Michelson <mmichelson at digium.com>
Date:   Wed Aug 10 16:22:51 2011 -0500

    Add calls to actually replicate state.

diff --git a/src/RTPTelephonyEventSink.cpp b/src/RTPTelephonyEventSink.cpp
index 74e109f..2974fd3 100644
--- a/src/RTPTelephonyEventSink.cpp
+++ b/src/RTPTelephonyEventSink.cpp
@@ -62,6 +62,7 @@ void RTPTelephonyEventSink::write_async(
     pjmedia_rtp_dtmf_event payload;
     bool setMarker = false;
     bool tsRollover = false;
+    bool replicationNeeded = false;
     if ((beginDTMF = BeginDTMFEventPtr::dynamicCast(event)))
     {
         payload.event = translateDTMF(beginDTMF->signal);
@@ -69,6 +70,7 @@ void RTPTelephonyEventSink::write_async(
         payload.duration = Duration;
         setMarker = true;
         mStateItem->segmentno = 0;
+        replicationNeeded = true;
     }
     else if (endDTMF = EndDTMFEventPtr::dynamicCast(event))
     {
@@ -79,6 +81,7 @@ void RTPTelephonyEventSink::write_async(
         if (tsRollover)
         {
             ++mStateItem->segmentno;
+            replicationNeeded = true;
         }
 
         payload.event = translateDTMF(endDTMF->signal);
@@ -94,6 +97,7 @@ void RTPTelephonyEventSink::write_async(
         if (tsRollover)
         {
             ++mStateItem->segmentno;
+            replicationNeeded = true;
         }
 
         payload.event = translateDTMF(continueDTMF->signal);
@@ -107,6 +111,7 @@ void RTPTelephonyEventSink::write_async(
         payload.duration = Duration;
         setMarker = true;
         mStateItem->segmentno = 0;
+        replicationNeeded = true;
     }
 
     const void *header;
@@ -150,6 +155,11 @@ void RTPTelephonyEventSink::write_async(
         //fffffffuuuuuuuuuuuu
     }
 
+    if (replicationNeeded)
+    {
+        mSessionAdapter->replicateState(mStateItem);
+    }
+
     cb->ice_response();
 }
 
@@ -159,6 +169,7 @@ void RTPTelephonyEventSink::setSource_async(
         const Ice::Current&)
 {
     mStateItem->source = source;
+    mSessionAdapter->replicateState(mStateItem);
     cb->ice_response();
 }
 
diff --git a/src/RTPTelephonyEventSource.cpp b/src/RTPTelephonyEventSource.cpp
index b7cb804..aee56f9 100644
--- a/src/RTPTelephonyEventSource.cpp
+++ b/src/RTPTelephonyEventSource.cpp
@@ -49,6 +49,7 @@ void RTPTelephonyEventSource::addSink_async(
     {
         mStateItem->sinks.push_back(sink);
     }
+    mSessionAdapter->replicateState(mStateItem);
     cb->ice_response();
 }
 
@@ -76,6 +77,7 @@ void RTPTelephonyEventSource::read(const pjmedia_rtp_hdr* header, const Ice::Byt
 
     bool isEnd = (event->e_vol & 0x80);
 
+    bool replicationNeeded =false;
     //We'll typically get three end frames. We don't need to send
     //multiple end events though, which is why we check to see if
     //we already are processing end frames.
@@ -84,6 +86,7 @@ void RTPTelephonyEventSource::read(const pjmedia_rtp_hdr* header, const Ice::Byt
         int duration = calculateDuration(header, event);
         sendEndEvent(event, duration);
         mStateItem->onEnd = true;
+        replicationNeeded = true;
     }
     else if (!isEnd)
     {
@@ -97,6 +100,7 @@ void RTPTelephonyEventSource::read(const pjmedia_rtp_hdr* header, const Ice::Byt
             sendBeginEvent(event);
             mStateItem->onEnd = false;
             mStateItem->timestamp = pj_ntohl(header->ts);
+            replicationNeeded = true;
         }
         else
         {
@@ -105,6 +109,11 @@ void RTPTelephonyEventSource::read(const pjmedia_rtp_hdr* header, const Ice::Byt
             sendContinuationEvent(event, duration);
         }
     }
+
+    if (replicationNeeded)
+    {
+        mSessionAdapter->replicateState(mStateItem);
+    }
 }
 
 int RTPTelephonyEventSource::calculateDuration(const pjmedia_rtp_hdr *header, const pjmedia_rtp_dtmf_event *event)

commit 432e98a60d5da408de623badd3cbbdd1b2d9974c
Author: Mark Michelson <mmichelson at digium.com>
Date:   Wed Aug 10 16:16:28 2011 -0500

    Infrastructure for state replication is completely set up.
    
    Now all that's needed is for strategic calls to replicateState()
    to be called.

diff --git a/slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice b/slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice
index 4c44683..c0783d0 100644
--- a/slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice
+++ b/slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice
@@ -107,6 +107,19 @@ module V1
 	AsteriskSCF::Media::V1::StreamSinkSeq sinks;
     };
 
+    class RTPTelephonyEventSinkStateItem extends RtpStateItem
+    {
+        int segmentno;
+        AsteriskSCF::SessionCommunications::V1::TelephonyEventSource* source;
+    };
+
+    class RTPTelephonyEventSourceStateItem extends RtpStateItem
+    {
+        bool onEnd;
+        int timestamp;
+        AsteriskSCF::SessionCommunications::V1::TelephonyEventSinkSeq sinks;
+    };
+
 }; /* module V1 */
 
... 2992 lines suppressed ...


-- 
asterisk-scf/release/media_rtp_pjmedia.git



More information about the asterisk-scf-commits mailing list