[asterisk-scf-commits] asterisk-scf/integration/media_rtp_pjmedia.git branch "alternate_source_cleanup" created.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Thu Jun 28 14:10:25 CDT 2012


branch "alternate_source_cleanup" has been created
        at  d866db0ec2b66194d3f646b7c330959b21b8b3b4 (commit)

- Log -----------------------------------------------------------------
commit d866db0ec2b66194d3f646b7c330959b21b8b3b4
Author: Brent Eagles <beagles at digium.com>
Date:   Thu Jun 28 16:36:40 2012 -0230

    Trying an alternate approach to cleaning up RTPSource. RTPSink is pretty
    straigthforward, but in the source, there are multiple elements vying for the
    same objects. Added a basic label->impldata global lookup table. I think there
    needs to be more there for it to be deployment ready though. Basic smoke tests
    that resulted in the transport shutting down clean without rtp packets and rtp
    packets being received for destroyed RTP sessions completed successfully. More polish is required however.

diff --git a/config/RTPConfigurator.py b/config/RTPConfigurator.py
old mode 100755
new mode 100644
diff --git a/src/Configuration.h b/src/Configuration.h
old mode 100755
new mode 100644
diff --git a/src/PJMEDIATransport.cpp b/src/PJMEDIATransport.cpp
index 2546b8b..c0047b4 100644
--- a/src/PJMEDIATransport.cpp
+++ b/src/PJMEDIATransport.cpp
@@ -65,6 +65,22 @@ AddressPtr PJMEDIATransport::remoteAddress()
     return fromInfo(info);
 }
 
+void PJMEDIATransport::attachLabel(const std::string& label)
+{
+    boost::shared_ptr<char> data(new char[label.size() + 1]);
+    strcpy(data.get(), label.c_str());
+    if (mLabelData)
+    {
+        pjmedia_transport_detach(mTransport, mLabelData.get());
+    }
+    mLabelData = data;
+}
+
+boost::shared_ptr<char> PJMEDIATransport::getLabelData()
+{
+    return mLabelData;
+}
+
 AddressPtr PJMEDIATransport::getLocalAddressImpl() 
 {
     pjmedia_transport_info info;
diff --git a/src/PJMEDIATransport.h b/src/PJMEDIATransport.h
index ca4889f..f4754f5 100644
--- a/src/PJMEDIATransport.h
+++ b/src/PJMEDIATransport.h
@@ -54,9 +54,13 @@ public:
      * down the objects with lots of unnecessary code.
      **/
     virtual void addFacets(const Ice::ObjectAdapterPtr&, const Ice::Identity&) {}
+    
+    void attachLabel(const std::string& label);
+    boost::shared_ptr<char> getLabelData();
 
 protected:
     pjmedia_transport* mTransport;
+    boost::shared_ptr<char> mLabelData;
 
     PJMEDIATransport(pjmedia_transport* t);
 
diff --git a/src/RTPSession.cpp b/src/RTPSession.cpp
index 81eb5e8..2221b5e 100644
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@ -1209,6 +1209,10 @@ RTPSessionPrx RTPSessionImpl::activate(
         }
         RTPSessionPrx result = RTPSessionPrx::uncheckedCast(mAdapter->add(this, id));
         mTransport->addFacets(mAdapter, id);
+        //
+        // The source is an active element and must be explicitly "turned on"
+        //
+        mStreamSource->activate();
         return result;
     }
     catch (...)
@@ -1239,11 +1243,14 @@ void RTPSessionImpl::destroy()
      */
     {
         boost::unique_lock<boost::shared_mutex> lock(mLock);
+        mStreamSource->destroy();
+        mStreamSink->destroy();
         mStreamSource = 0;
         mStreamSink = 0;
 
         mSessionAdapter.reset();
         mTransport.reset();
+        mRtcpSessionInterface = 0;
     }
 
     /* All we have to do is remove ourselves from the ASM, our smart pointerness will cause us to
diff --git a/src/RTPSink.cpp b/src/RTPSink.cpp
index 72c3f5f..1c7be33 100644
--- a/src/RTPSink.cpp
+++ b/src/RTPSink.cpp
@@ -93,6 +93,8 @@ public:
      * The parent session's id.
      */
     const string mSessionId;
+
+    bool mDestroyed;
 };
 
 /**
@@ -105,7 +107,8 @@ StreamSinkRTPImplPriv::StreamSinkRTPImplPriv(
     mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
     mSessionAdapter(session), mTransport(transport),
     mSinkStateItem(new RTPStreamSinkStateItem),
-    mSessionId(sessionId)
+    mSessionId(sessionId),
+    mDestroyed(false)
 {
     pjmedia_rtp_session_init(&mOutgoingSession, 0, pj_rand());
     mSinkStateItem->sessionId = sessionId;
@@ -127,6 +130,11 @@ StreamSinkRTPImpl::StreamSinkRTPImpl(
 TelephonyEventSinkPrx StreamSinkRTPImpl::createTelephonyEventSink(Ice::ObjectAdapterPtr& adapter, const Ice::Identity& id)
 {
     boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
+    if (mImpl->mDestroyed)
+    {
+        throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+    }
+
     mImpl->mTelephonyEventSink =
         new RTPTelephonyEventSink(
             &mImpl->mOutgoingSession,
@@ -148,8 +156,13 @@ RTPTelephonyEventSinkPtr StreamSinkRTPImpl::getTelephonyEventSink()
  */
 void StreamSinkRTPImpl::write(const AsteriskSCF::Media::V1::FrameSeq& frames, const Ice::Current&)
 {
-//    boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
     // Don't even bother if no remote address information is present
+
+    boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
+    if (mImpl->mDestroyed)
+    {
+        throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+    }
     if (mImpl->mSinkStateItem->remoteAddress.empty() || !mImpl->mSinkStateItem->remotePort)
     {
         return;
@@ -235,6 +248,10 @@ void StreamSinkRTPImpl::setSource(
         return;
     }
     boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
+    if (mImpl->mDestroyed)
+    {
+        throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+    }
     mImpl->mSinkStateItem->source = source;
 
     mImpl->mSessionAdapter->replicateState(mImpl->mSinkStateItem);
@@ -246,6 +263,10 @@ void StreamSinkRTPImpl::setSource(
 AsteriskSCF::Media::V1::StreamSourcePrx StreamSinkRTPImpl::getSource(const Ice::Current&)
 {
     boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
+    if (mImpl->mDestroyed)
+    {
+        throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+    }
     return mImpl->mSinkStateItem->source;
 }
 
@@ -255,6 +276,10 @@ AsteriskSCF::Media::V1::StreamSourcePrx StreamSinkRTPImpl::getSource(const Ice::
 AsteriskSCF::Media::V1::FormatSeq StreamSinkRTPImpl::getFormats(const Ice::Current&)
 {
     boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
+    if (mImpl->mDestroyed)
+    {
+        throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+    }
     return mImpl->mSessionAdapter->getFormats();
 }
 
@@ -272,7 +297,7 @@ std::string StreamSinkRTPImpl::getId(const Ice::Current&)
  * Implementation of the setRemoteDetails method as defined in MediaRTPIf.ice
  */
 void StreamSinkRTPImpl::setRemoteDetails(const AsteriskSCF::System::V1::OperationContextPtr& context,
-    const string& address, Ice::Int port, const Ice::Current&)
+        const string& address, Ice::Int port, const Ice::Current&)
 {
     ContextDataPtr data = checkAndThrow(mImpl->mOperationContextCache, context);
 
@@ -289,6 +314,10 @@ void StreamSinkRTPImpl::setRemoteDetails(const AsteriskSCF::System::V1::Operatio
         /* This method is essentially a passthru to the RTPSourceImpl. It takes care of
          * actually attaching the transport.
          */
+        if (mImpl->mDestroyed)
+        {
+            throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+        }
         mImpl->mSessionAdapter->setRemoteDetails(address, port);
 
         /* We do store it though in case we have not yet received a packet from the remote side but
@@ -318,6 +347,10 @@ void StreamSinkRTPImpl::setRemoteDetails(const AsteriskSCF::System::V1::Operatio
 std::string StreamSinkRTPImpl::getRemoteAddress(const Ice::Current&)
 {
     boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
+    if (mImpl->mDestroyed)
+    {
+        throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+    }
     if (mImpl->mTransport && mImpl->mTransport->remoteAddress())
     {
         string address = mImpl->mTransport->remoteAddress()->hostname();
@@ -332,6 +365,10 @@ std::string StreamSinkRTPImpl::getRemoteAddress(const Ice::Current&)
 Ice::Int StreamSinkRTPImpl::getRemotePort(const Ice::Current&)
 {
     boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
+    if (mImpl->mDestroyed)
+    {
+        throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+    }
     if (mImpl->mTransport && mImpl->mTransport->remoteAddress())
     {
         int port = mImpl->mTransport->remoteAddress()->port();
@@ -352,7 +389,7 @@ RTPStreamSinkStateItemPtr StreamSinkRTPImpl::getStateItem()
 RTPTelephonyEventSinkStateItemPtr StreamSinkRTPImpl::getTelephonyEventSinkStateItem()
 {
     boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
-    if (mImpl->mTelephonyEventSink)
+    if (!mImpl->mDestroyed && mImpl->mTelephonyEventSink)
     {
         return mImpl->mTelephonyEventSink->getStateItem();
     }
@@ -362,14 +399,20 @@ RTPTelephonyEventSinkStateItemPtr StreamSinkRTPImpl::getTelephonyEventSinkStateI
 void StreamSinkRTPImpl::setRemoteDetailsImpl(const std::string& host, Ice::Int port)
 {
     boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
-    mImpl->mSinkStateItem->remoteAddress = host;
-    mImpl->mSinkStateItem->remotePort = port;
+    if (!mImpl->mDestroyed)
+    {
+        mImpl->mSinkStateItem->remoteAddress = host;
+        mImpl->mSinkStateItem->remotePort = port;
+    }
 }
 
 void StreamSinkRTPImpl::setSourceImpl(const AsteriskSCF::Media::V1::StreamSourcePrx& proxy)
 {
     boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
-    mImpl->mSinkStateItem->source = proxy;
+    if (!mImpl->mDestroyed)
+    {
+        mImpl->mSinkStateItem->source = proxy;
+    }
 }
 
 Ice::ByteSeq StreamSinkRTPImpl::encodeAudioPayload(const FramePayloadPtr& toEncode, const AudioFormatPtr& audioFormat)
@@ -416,3 +459,13 @@ Ice::ByteSeq StreamSinkRTPImpl::encodeVideoPayload(const FramePayloadPtr& toEnco
     }
     return bytePayload->payload;
 }
+
+void StreamSinkRTPImpl::destroy()
+{
+    boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
+    mImpl->mDestroyed = true;
+    mImpl->mSessionAdapter.reset();
+    mImpl->mTransport.reset();
+    mImpl->mSinkStateItem = 0;
+    mImpl->mTelephonyEventSink = 0;
+}
diff --git a/src/RTPSink.h b/src/RTPSink.h
index 51ad583..721324f 100644
--- a/src/RTPSink.h
+++ b/src/RTPSink.h
@@ -61,6 +61,12 @@ public:
             const AsteriskSCF::Media::V1::FramePayloadPtr& framePayload,
             const AsteriskSCF::Media::V1::VideoFormatPtr&);
 
+    /**
+     * Invoked by RTPSession when it is going away. Should release
+     * transports, cleanup mImpl, etc.
+     */
+    void destroy();
+
 private:
     /**
      * Private implementation data for StreamSinkRTPImpl.
diff --git a/src/RTPSource.cpp b/src/RTPSource.cpp
index a1cd040..a53022b 100644
--- a/src/RTPSource.cpp
+++ b/src/RTPSource.cpp
@@ -55,9 +55,16 @@ using namespace AsteriskSCF::PJMEDIARTP;
 using namespace AsteriskSCF::Media::Formats::Other::V1;
 using namespace AsteriskSCF::SessionCommunications::V1;
 
+//
+// TODO: It is very difficult to write a clean shutdown with the way this
+// code is structured. It should really be refactored with an eye to clean
+// separation of object ownership and locking.
+//
+
 namespace
 {
 Logger lg = getLoggerFactory().getLogger("AsteriskSCF.MediaRTP");
+
 }
 
 /**
@@ -91,7 +98,8 @@ public:
                      : mSessionAdapter(sessionAdapter),
                        mSink(sink),
                        mTransport(transport),
-                       mThreadDescriptor(threadDesciptor)
+                       mThreadDescriptor(threadDesciptor),
+                       mDestroyed(false)
     {
     }
 
@@ -107,6 +115,13 @@ public:
         void *packet;
         int packet_size;
 
+        if (isShutdown())
+        {
+            mSessionAdapter.reset();
+            mTransport.reset();
+            return;
+        }
+
         pjmedia_rtcp_build_rtcp(mSessionAdapter->getRtcpSession(), &packet, &packet_size);
         pjmedia_transport_send_rtcp(mTransport->getTransport(), packet, packet_size);
 
@@ -128,6 +143,18 @@ public:
         }
     }
 
+    void shutdown()
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        mDestroyed = true;
+    }
+
+    bool isShutdown()
+    {
+        boost::shared_lock<boost::shared_mutex> lock(mLock);
+        return mDestroyed;
+    }
+
 private:
     /**
      * A pointer to the session adapter.
@@ -145,6 +172,9 @@ private:
     PJMEDIATransportPtr mTransport;
 
     ThreadDescWrapperPtr mThreadDescriptor;
+
+    boost::shared_mutex mLock;
+    bool mDestroyed;
 };
 
 /**
@@ -220,7 +250,60 @@ public:
     boost::shared_mutex mLock;
 
     ThreadDescWrapperPtr mThreadDescriptor;
+    bool mDestroyed;
+
+    AsteriskSCF::Media::V1::FramePayloadPtr decodeAudioPayload(
+        const Ice::ByteSeq&,
+        const AsteriskSCF::Media::V1::AudioFormatPtr&);
+
+    AsteriskSCF::Media::V1::FramePayloadPtr decodeVideoPayload(
+        const Ice::ByteSeq&,
+        const AsteriskSCF::Media::V1::VideoFormatPtr&);
 };
+typedef boost::shared_ptr<StreamSourceRTPImplPriv> StreamSourceRTPImplPrivPtr;
+
+/**
+ * A helper class to help keep track of classes that are associated with a
+ * media session without having memory management headaches of associating 
+ * pointers to live objects with pjmedia transports.
+ */
+class StreamTable
+{
+public:
+    void add(const std::string& lbl, const StreamSourceRTPImplPrivPtr& info)
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        InfoMap::const_iterator iter = mMap.find(lbl);
+        if (iter == mMap.end())
+        {
+            mMap[lbl] = info; 
+        }
+    }
+
+    void remove(const std::string& lbl)
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        mMap.erase(lbl);
+    }
+
+    StreamSourceRTPImplPrivPtr get(const std::string& lbl)
+    {
+        boost::shared_lock<boost::shared_mutex> lock(mLock);
+        InfoMap::const_iterator iter = mMap.find(lbl);
+        if (iter != mMap.end())
+        {
+            return iter->second;
+        }
+        return StreamSourceRTPImplPrivPtr(); 
+    } 
+public:
+    boost::shared_mutex mLock;
+
+    typedef std::map<std::string, StreamSourceRTPImplPrivPtr> InfoMap;
+    InfoMap mMap;
+};
+
+StreamTable streamTable;
 
 /**
  * Constructor for the StreamSourceRTPImplPriv class.
@@ -235,7 +318,8 @@ StreamSourceRTPImplPriv::StreamSourceRTPImplPriv(const SessionAdapterPtr& sessio
     mSessionId(sessionId),
     mSource(source),
     mSink(sink),
-    mThreadDescriptor(new ThreadDescWrapper)
+    mThreadDescriptor(new ThreadDescWrapper),
+    mDestroyed(false)
 {
     pjmedia_rtp_session_init(&mIncomingSession, 0, 0);
     mSourceStateItem->sessionId = sessionId;
@@ -247,7 +331,6 @@ StreamSourceRTPImplPriv::StreamSourceRTPImplPriv(const SessionAdapterPtr& sessio
  */
 StreamSourceRTPImplPriv::~StreamSourceRTPImplPriv()
 {
-    // Destroy the RTCP transmission timer if it exists
     if (mTimer)
     {
         mTimer->destroy();
@@ -261,7 +344,8 @@ StreamSourceRTPImpl::StreamSourceRTPImpl(const SessionAdapterPtr& session,
                                          const PJMEDIATransportPtr& transport, const string& sessionId,
                                          const StreamSourceRTPPrx& source,
                                          const StreamSinkRTPPrx& sink) :
-    mImpl(new StreamSourceRTPImplPriv(session, transport, sessionId, source, sink))
+    mImpl(new StreamSourceRTPImplPriv(session, transport, sessionId, source, sink)),
+    mLabelBuffer(0)
 {
 }
 
@@ -283,9 +367,15 @@ RTPTelephonyEventSourcePtr StreamSourceRTPImpl::getTelephonyEventSource()
 void StreamSourceRTPImpl::addSink(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
 {
     // naturally idempotent; no retry logic needed
+    // TODO: NOT actually correct, but that's a different story!
     boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+    if (mImpl->mDestroyed)
+    {
+        throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+    }
 
     // Do not allow the same sink to be added multiple times
+    // TODO: find is bogus.. it should be on id only, not endpoints.
     if (std::find(mImpl->mSourceStateItem->sinks.begin(),
                   mImpl->mSourceStateItem->sinks.end(),
                   sink) !=
@@ -306,7 +396,12 @@ void StreamSourceRTPImpl::removeSink(const AsteriskSCF::System::V1::OperationCon
     const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
 {
     // naturally idempotent; no retry logic needed
+    // TODO: NOT actually correct, but that's a different story!
     boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+    if (mImpl->mDestroyed)
+    {
+        throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+    }
 
     mImpl->mSourceStateItem->sinks.erase(std::remove(mImpl->mSourceStateItem->sinks.begin(),
 	    mImpl->mSourceStateItem->sinks.end(),
@@ -320,6 +415,11 @@ void StreamSourceRTPImpl::removeSink(const AsteriskSCF::System::V1::OperationCon
  */
 AsteriskSCF::Media::V1::StreamSinkSeq StreamSourceRTPImpl::getSinks(const Ice::Current&)
 {
+    boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
+    if (mImpl->mDestroyed)
+    {
+        throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+    }
     return mImpl->mSourceStateItem->sinks;
 }
 
@@ -328,6 +428,11 @@ AsteriskSCF::Media::V1::StreamSinkSeq StreamSourceRTPImpl::getSinks(const Ice::C
  */
 AsteriskSCF::Media::V1::FormatSeq StreamSourceRTPImpl::getFormats(const Ice::Current&)
 {
+    boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
+    if (mImpl->mDestroyed)
+    {
+        throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+    }
     return mImpl->mSessionAdapter->getFormats();
 }
 
@@ -336,6 +441,11 @@ AsteriskSCF::Media::V1::FormatSeq StreamSourceRTPImpl::getFormats(const Ice::Cur
  */
 std::string StreamSourceRTPImpl::getId(const Ice::Current&)
 {
+    boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
+    if (mImpl->mDestroyed)
+    {
+        throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+    }
     /* For now utilize the id of the session */
     return mImpl->mSessionId;
 }
@@ -347,6 +457,11 @@ void StreamSourceRTPImpl::requestFormat(
     const AsteriskSCF::System::V1::OperationContextPtr&,
     const AsteriskSCF::Media::V1::FormatPtr&, const Ice::Current&)
 {
+    boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
+    if (mImpl->mDestroyed)
+    {
+        throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+    }
     // We do not currently support switching formats.
     throw MediaFormatSwitchException();
 }
@@ -356,6 +471,11 @@ void StreamSourceRTPImpl::requestFormat(
  */
 std::string StreamSourceRTPImpl::getLocalAddress(const Ice::Current&)
 {
+    boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
+    if (mImpl->mDestroyed)
+    {
+        throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+    }
     AsteriskSCF::Helpers::AddressPtr addr = mImpl->mTransport->localAddress();
     if (addr)
     {
@@ -369,6 +489,11 @@ std::string StreamSourceRTPImpl::getLocalAddress(const Ice::Current&)
  */
 Ice::Int StreamSourceRTPImpl::getLocalPort(const Ice::Current&)
 {
+    boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
+    if (mImpl->mDestroyed)
+    {
+        throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+    }
     AsteriskSCF::Helpers::AddressPtr addr = mImpl->mTransport->localAddress();
     if (addr)
     {
@@ -382,8 +507,6 @@ Ice::Int StreamSourceRTPImpl::getLocalPort(const Ice::Current&)
  */
 static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
 {
-    StreamSourceRTPImpl* source = static_cast<StreamSourceRTPImpl*>(userdata);
-
     /* Ensure that no errors occurred when reading this packet in */
     if (size < 0)
     {
@@ -391,14 +514,38 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
         return;
     }
 
+    const char* infoLabel = static_cast<const char*>(userdata);
+    if (!infoLabel)
+    {
+        lg(Error) << "Attempted to process an RTP packet with no associated session information."; 
+        return;
+    }
+    StreamSourceRTPImplPrivPtr impl = streamTable.get(infoLabel);
+    if (!impl)
+    {
+        return;
+    }
+    //////////////////////////////////////////////////////////////////////////////////////////////////
+    // WARNING!
+    // There is a lot of stuff happening from here down that is done with
+    // shared data that was NOT locked. Going back to RTPSession here is a
+    // mistake as well, info that is needed from there should be pushed to
+    // the shared structure. Moving the lock back to avoid shutdown issues.
+    //
     const pjmedia_rtp_hdr* header;
     Ice::Byte* payload;
     unsigned int payload_size;
 
+    boost::shared_lock<boost::shared_mutex> lock(impl->mLock);
+    if (impl->mDestroyed)
+    {
+        return;
+    }
+
     /* We have to cast 'size' to an 'int' here so the compiler won't warn about
      * doing it implicitly.
      */
-    pj_status_t status = pjmedia_rtp_decode_rtp(&source->mImpl->mIncomingSession, packet, (int) size, &header,
+    pj_status_t status = pjmedia_rtp_decode_rtp(&impl->mIncomingSession, packet, (int) size, &header,
             (const void**)&payload, &payload_size);
 
     if (status != PJ_SUCCESS)
@@ -409,10 +556,10 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
 
     // Update RTP stack information before writing to sinks, it's fine to do it
     pjmedia_rtp_status rtpStatus;
-    pjmedia_rtp_session_update2(&source->mImpl->mIncomingSession, header, &rtpStatus, PJ_FALSE);
+    pjmedia_rtp_session_update2(&impl->mIncomingSession, header, &rtpStatus, PJ_FALSE);
 
     // Update RTCP information
-    pjmedia_rtcp_rx_rtp2(source->mImpl->mSessionAdapter->getRtcpSession(), pj_ntohs(header->seq), pj_ntohl(header->ts),
+    pjmedia_rtcp_rx_rtp2(impl->mSessionAdapter->getRtcpSession(), pj_ntohs(header->seq), pj_ntohl(header->ts),
                          payload_size, ((rtpStatus.status.value && rtpStatus.status.flag.bad) || !payload_size)
                          ? PJ_TRUE : PJ_FALSE);
 
@@ -420,23 +567,24 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
     if (rtpStatus.status.value && rtpStatus.status.flag.badssrc)
     {
         std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx> listeners =
-            source->mImpl->mSessionAdapter->getReceiverReportListeners();
+            impl->mSessionAdapter->getReceiverReportListeners();
 
         for (std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx>::const_iterator listener = listeners.begin();
              listener != listeners.end();
              ++listener)
         {
-            (*listener)->sourceSsrcChanged(AsteriskSCF::Operations::createContext(), source->mImpl->mSource, source->mImpl->mIncomingSession.peer_ssrc);
+            (*listener)->sourceSsrcChanged(AsteriskSCF::Operations::createContext(), impl->mSource, 
+                    impl->mIncomingSession.peer_ssrc);
         }
     }
 
-    if (source->mImpl->mSourceStateItem->sinks.empty())
+    if (impl->mSourceStateItem->sinks.empty())
     {
 	// No sinks present so frames can not go anywhere
 	return;
     }
 
-    FormatPtr mediaformat = source->mImpl->mSessionAdapter->getFormat(header->pt);
+    FormatPtr mediaformat = impl->mSessionAdapter->getFormat(header->pt);
 
     if (!mediaformat)
     {
@@ -455,7 +603,7 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
         AudioFramePtr frame = new AudioFrame();
 
         Ice::ByteSeq bytePayload(payload, payload + payload_size);
-        frame->payload = source->decodeAudioPayload(bytePayload, audioformat);
+        frame->payload = impl->decodeAudioPayload(bytePayload, audioformat);
 
         // Populate the common data
         frame->mediaFormat = mediaformat;
@@ -470,7 +618,7 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
         VideoFramePtr frame = new VideoFrame();
 
         Ice::ByteSeq bytePayload(payload, payload + payload_size);
-        frame->payload = source->decodeVideoPayload(bytePayload, videoformat);
+        frame->payload = impl->decodeVideoPayload(bytePayload, videoformat);
 
         frame->mediaFormat = mediaformat;
         frame->timestamp = header->ts;
@@ -480,7 +628,7 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
     }
     else if ((rfc4733 = RFC4733Ptr::dynamicCast(mediaformat)))
     {
-        source->mImpl->mTelephonyEventSource->read(header, payload);
+        impl->mTelephonyEventSource->read(header, payload);
     }
 
     if (frames.empty())
@@ -489,10 +637,8 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
         return;
     }
 
-    boost::shared_lock<boost::shared_mutex> lock(source->mImpl->mLock);
-
-    for (StreamSinkSeq::iterator sink = source->mImpl->mSourceStateItem->sinks.begin();
-         sink != source->mImpl->mSourceStateItem->sinks.end();
+    for (StreamSinkSeq::iterator sink = impl->mSourceStateItem->sinks.begin();
+         sink != impl->mSourceStateItem->sinks.end();
          ++sink)
     {
         try
@@ -504,7 +650,6 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
             lg(Error) << "Exception caught while attempting to write media to RTP sink " << (*sink);
         }
     }
-
 }
 
 /**
@@ -512,8 +657,6 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
  */
 static void receiveRTCP(void *userdata, void *packet, pj_ssize_t size)
 {
-    StreamSourceRTPImpl* source = static_cast<StreamSourceRTPImpl*>(userdata);
-
     /* Ensure that no errors occurred when reading this packet in */
     if (size < 0)
     {
@@ -521,23 +664,41 @@ static void receiveRTCP(void *userdata, void *packet, pj_ssize_t size)
         return;
     }
 
-    pjmedia_rtcp_rx_rtcp(source->mImpl->mSessionAdapter->getRtcpSession(), packet, size);
+    const char* infoLabel = static_cast<const char*>(userdata);
+    if (!infoLabel)
+    {
+        lg(Error) << "Attempted to process an RTP packet with no associated session information."; 
+        return;
+    }
+    StreamSourceRTPImplPrivPtr impl = streamTable.get(infoLabel);
+    if (!impl)
+    {
+        return;
+    }
+
+    boost::shared_lock<boost::shared_mutex> lock(impl->mLock);
+    if (impl->mDestroyed)
+    {
+        return;
+    }
+
+    pjmedia_rtcp_rx_rtcp(impl->mSessionAdapter->getRtcpSession(), packet, size);
 
     std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx> listeners =
-        source->mImpl->mSessionAdapter->getReceiverReportListeners();
+        impl->mSessionAdapter->getReceiverReportListeners();
 
     if (listeners.empty())
     {
         return;
     }
 
-    AsteriskSCF::Media::RTCP::V1::StatisticsPtr statistics = source->mImpl->mSessionAdapter->getReceiverReportStatistics();
+    AsteriskSCF::Media::RTCP::V1::StatisticsPtr statistics = impl->mSessionAdapter->getReceiverReportStatistics();
 
     for (std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx>::const_iterator listener = listeners.begin();
          listener != listeners.end();
          ++listener)
     {
-        (*listener)->sourceStatisticsUpdated(AsteriskSCF::Operations::createContext(), source->mImpl->mSource, statistics);
+        (*listener)->sourceStatisticsUpdated(AsteriskSCF::Operations::createContext(), impl->mSource, statistics);
     }
 }
 
@@ -574,11 +735,12 @@ void StreamSourceRTPImpl::setRemoteDetails(const string& address, Ice::Int port)
     pj_sockaddr_set_port(&addr, static_cast<pj_uint16_t>(port));
 
     /* In case we were already attached go ahead and detach */
-    pjmedia_transport_detach(mImpl->mTransport->getTransport(), this);
+    mImpl->mTransport->attachLabel(mImpl->mSessionId);
 
     /* All ready... actually do it! */
-    status = pjmedia_transport_attach(mImpl->mTransport->getTransport(), this, &addr, &transportInfo.src_rtcp_name,
-                                      pj_sockaddr_get_len(&addr), &receiveRTP, &receiveRTCP);
+    status = pjmedia_transport_attach(mImpl->mTransport->getTransport(), 
+            mImpl->mTransport->getLabelData().get(), &addr, &transportInfo.src_rtcp_name, 
+            pj_sockaddr_get_len(&addr), &receiveRTP, &receiveRTCP);
 
     if (status != PJ_SUCCESS)
     {
@@ -619,10 +781,10 @@ void StreamSourceRTPImpl::setRemoteRtcpDetails(const std::string& address, Ice::
     pj_sockaddr_set_port(&addr, static_cast<pj_uint16_t>(port));
 
     /* In case we were already attached go ahead and detach */
-    pjmedia_transport_detach(mImpl->mTransport->getTransport(), this);
+    mImpl->mTransport->attachLabel(mImpl->mSessionId);
 
     /* All ready... actually do it! */
-    status = pjmedia_transport_attach(mImpl->mTransport->getTransport(), this, &transportInfo.src_rtp_name, &addr,
+    status = pjmedia_transport_attach(mImpl->mTransport->getTransport(), mImpl->mTransport->getLabelData().get(), &transportInfo.src_rtp_name, &addr,
                                       pj_sockaddr_get_len(&addr), &receiveRTP, &receiveRTCP);
 
     if (status != PJ_SUCCESS)
@@ -647,12 +809,14 @@ void StreamSourceRTPImpl::setRemoteRtcpDetails(const std::string& address, Ice::
  */
 RTPStreamSourceStateItemPtr StreamSourceRTPImpl::getStateItem()
 {
+    boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
     return mImpl->mSourceStateItem;
 }
 
 RTPTelephonyEventSourceStateItemPtr StreamSourceRTPImpl::getTelephonyEventSourceStateItem()
 {
-    if (mImpl->mTelephonyEventSource)
+    boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
+    if (mImpl->mDestroyed && mImpl->mTelephonyEventSource)
     {
         return mImpl->mTelephonyEventSource->getStateItem();
     }
@@ -662,10 +826,13 @@ RTPTelephonyEventSourceStateItemPtr StreamSourceRTPImpl::getTelephonyEventSource
 void StreamSourceRTPImpl::setSinksImpl(const AsteriskSCF::Media::V1::StreamSinkSeq& sinks)
 {
     boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
-    mImpl->mSourceStateItem->sinks = sinks;
+    if (!mImpl->mDestroyed)
+    {
+        mImpl->mSourceStateItem->sinks = sinks;
+    }
 }
 
-FramePayloadPtr StreamSourceRTPImpl::decodeAudioPayload(const Ice::ByteSeq& toDecode, const AudioFormatPtr& audioFormat)
+FramePayloadPtr StreamSourceRTPImplPriv::decodeAudioPayload(const Ice::ByteSeq& toDecode, const AudioFormatPtr& audioFormat)
 {
     if (audioFormat->sampleSize == 8)
     {
@@ -684,8 +851,48 @@ FramePayloadPtr StreamSourceRTPImpl::decodeAudioPayload(const Ice::ByteSeq& toDe
     return 0;
 }
 
-FramePayloadPtr StreamSourceRTPImpl::decodeVideoPayload(const Ice::ByteSeq& toDecode, const VideoFormatPtr&)
+FramePayloadPtr StreamSourceRTPImplPriv::decodeVideoPayload(const Ice::ByteSeq& toDecode, const VideoFormatPtr&)
 {
     //Assume for now video payloads use 8-bit samples...
     return new ByteSeqPayload(toDecode);
 }
+
+void StreamSourceRTPImpl::destroy()
+{
+    string lbl;
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+        if (mImpl->mDestroyed)
+        {
+            return;
+        }
+        lbl = mImpl->mSessionId;
+        mImpl->mDestroyed = true;
+        mImpl->mSourceStateItem = 0;
+        mImpl->mTelephonyEventSource = 0;
+        mImpl->mSessionAdapter.reset();
+        // Destroy the RTCP transmission timer if it exists
+        if (mImpl->mTimer)
+        {
+            mImpl->mTimer->destroy();
+            mImpl->mTimer = 0;
+        }
+    }
+    streamTable.remove(lbl);
+}
+
+boost::shared_ptr<StreamSourceRTPImplPriv> StreamSourceRTPImpl::getImpl()
+{
+    return mImpl;
+}
+
+void StreamSourceRTPImpl::activate()
+{
+    boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+    if (!mLabelBuffer)
+    {
+        mLabelBuffer = new char[mImpl->mSessionId.size() + 1];
+        strcpy(mLabelBuffer, mImpl->mSessionId.c_str());
+        streamTable.add(mImpl->mSessionId, mImpl);
+    }
+}
diff --git a/src/RTPSource.h b/src/RTPSource.h
index b07f744..4f6338b 100644
--- a/src/RTPSource.h
+++ b/src/RTPSource.h
@@ -49,18 +49,24 @@ public:
     AsteriskSCF::Replication::MediaRTPPJMEDIA::V1::RTPTelephonyEventSourceStateItemPtr getTelephonyEventSourceStateItem();
 
     void setSinkImpl(const AsteriskSCF::Media::V1::StreamSinkPrx& proxy);
-    AsteriskSCF::Media::V1::FramePayloadPtr decodeAudioPayload(
-            const Ice::ByteSeq&,
-            const AsteriskSCF::Media::V1::AudioFormatPtr&);
-
-    AsteriskSCF::Media::V1::FramePayloadPtr decodeVideoPayload(
-            const Ice::ByteSeq&,
-            const AsteriskSCF::Media::V1::VideoFormatPtr&);
 
     AsteriskSCF::SessionCommunications::V1::TelephonyEventSourcePrx createTelephonyEventSource(Ice::ObjectAdapterPtr& adapter, const Ice::Identity& id);
     RTPTelephonyEventSourcePtr getTelephonyEventSource();
 
     /**
+     * Invoked by RTPSession when it is going away. Should release
+     * transports, cleanup mImpl, etc.
+     */
+    void destroy();
+
+    void activate();
+
+    boost::shared_ptr<StreamSourceRTPImplPriv> getImpl();
+
+private:
+    char* mLabelBuffer;
+
+    /**
      * Private implementation data for StreamSourceRTPImpl.
      * Note: This is public on purpose so that our RTP callback can access it.
      */

commit 9bc18b0c49e8f0144fa1256aa4aba89ac7b932a4
Author: Brent Eagles <beagles at digium.com>
Date:   Wed Jun 20 17:14:38 2012 -0230

    Fix a servant cleanup issue!

diff --git a/src/RTPSession.cpp b/src/RTPSession.cpp
index 72ecc51..81eb5e8 100644
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@ -1222,8 +1222,8 @@ RTPSessionPrx RTPSessionImpl::activate(
 void RTPSessionImpl::destroy()
 {
     /* Drop the source and sink from the ASM */
-    mAdapter->remove(mStreamSourceProxy->ice_getIdentity());
-    mAdapter->remove(mStreamSinkProxy->ice_getIdentity());
+    mAdapter->removeAllFacets(mStreamSourceProxy->ice_getIdentity());
+    mAdapter->removeAllFacets(mStreamSinkProxy->ice_getIdentity());
 
     if (mTelephonyEventSourcePrx)
     {
@@ -1249,7 +1249,7 @@ void RTPSessionImpl::destroy()
     /* All we have to do is remove ourselves from the ASM, our smart pointerness will cause us to
      * destruct and then cleanup will occur.
      */
-    mAdapter->remove(mAdapter->getCommunicator()->stringToIdentity(mId));
+    mAdapter->removeAllFacets(mAdapter->getCommunicator()->stringToIdentity(mId));
 }
 
 class ReplicationAdapterImpl : public ReplicationAdapter
diff --git a/test/TestRTPICE.cpp b/test/TestRTPICE.cpp
index 45aae7b..a8f0b96 100644
--- a/test/TestRTPICE.cpp
+++ b/test/TestRTPICE.cpp
@@ -89,9 +89,15 @@ public:
 
     ~IceEnvironment()
     {
-        if (mCommunicator)
+        try
+        {
+            if (mCommunicator)
+            {
+                mCommunicator->destroy();
+            }
+        }
+        catch (...)
         {
-            mCommunicator->shutdown();
         }
     }
 

commit 933ffdf5bcb4b74c106c5ac2275b2b1f847a2248
Author: Brent Eagles <beagles at digium.com>
Date:   Mon Jun 18 14:14:06 2012 -0230

    Fixed a stray reference count bug in RTPSession. The bug did not exist in
    the internal "destroy()" method that is used by replicas so merged the two
    and implemented the release() external method in terms of the internal
    destroy() method.

diff --git a/src/RTPSession.cpp b/src/RTPSession.cpp
index d3fe04c..72ecc51 100644
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@ -781,29 +781,8 @@ void RTPSessionImpl::release(const Ice::Current&)
     // Remove everything from the state replicator if present
     removeState(mSessionStateItem, mStreamSink->getStateItem(), mStreamSource->getStateItem());
 
-    /* Drop the source and sink from the ASM */
-    mAdapter->remove(mStreamSourceProxy->ice_getIdentity());
-    mAdapter->remove(mStreamSinkProxy->ice_getIdentity());
-
-    if (mTelephonyEventSourcePrx)
-    {
-        mAdapter->remove(mTelephonyEventSourcePrx->ice_getIdentity());
-    }
-    if (mTelephonyEventSinkPrx)
-    {
-        mAdapter->remove(mTelephonyEventSinkPrx->ice_getIdentity());
-    }
-
-    /* Since the source and sink have a pointer back to the session we need to get rid of them,
-     * which will in turn get rid of ourselves once we are removed from the ASM.
-     */
-    mStreamSource = 0;
-    mStreamSink = 0;
 
-    /* All we have to do is remove ourselves from the ASM, our smart pointerness will cause us to
-     * destruct and then cleanup will occur.
-     */
-    mAdapter->remove(mAdapter->getCommunicator()->stringToIdentity(mId));
+    destroy();
 }
 
 /**
@@ -1246,6 +1225,15 @@ void RTPSessionImpl::destroy()
     mAdapter->remove(mStreamSourceProxy->ice_getIdentity());
     mAdapter->remove(mStreamSinkProxy->ice_getIdentity());
 
+    if (mTelephonyEventSourcePrx)
+    {
+        mAdapter->remove(mTelephonyEventSourcePrx->ice_getIdentity());
+    }
+    if (mTelephonyEventSinkPrx)
+    {
+        mAdapter->remove(mTelephonyEventSinkPrx->ice_getIdentity());
+    }
+
     /* Since both the source and sink have a pointer back to the session we need to get rid of them,
      * which will in turn get rid of ourselves once we are removed from the ASM.
      */
@@ -1253,7 +1241,9 @@ void RTPSessionImpl::destroy()
         boost::unique_lock<boost::shared_mutex> lock(mLock);
         mStreamSource = 0;
         mStreamSink = 0;
+
         mSessionAdapter.reset();
+        mTransport.reset();
     }
 
     /* All we have to do is remove ourselves from the ASM, our smart pointerness will cause us to

commit 52e9445fc7ab2a592ecc86e3861a05e0816407ed
Author: Darren Sessions <dsessions at digium.com>
Date:   Tue May 22 11:21:01 2012 -0500

    Updated the RTP configurator python script to reflect some minor changes made to the SIP configurator that make life easier when running configurator scripts. These updates include using the ASTSCF_HOME env variable to construct a path for the configurator component and then appending that path along with the Ice Python lib path to the system path.

diff --git a/config/RTPConfigurator.py b/config/RTPConfigurator.py
index 5d7e4b9..4b336b8 100755
--- a/config/RTPConfigurator.py
+++ b/config/RTPConfigurator.py
@@ -19,10 +19,16 @@
 # RTP configurator
 
 # Bring in the common configuration infrastructure
-import Ice, Configurator, sys, os
+import os, sys
+
+sys.path.append(os.environ["ASTSCF_HOME"] + "/configurator")
+sys.path.append("/opt/Ice-3.4/python")
+
+import Ice, Configurator
 
 # Load our component specific configuration definitions
 Configurator.astscfLoadSlice("media_rtp_pjmedia", "AsteriskSCF/Configuration/MediaRTPPJMEDIA/RTPConfigurationIf.ice")
+
 import AsteriskSCF.Configuration.MediaRTPPJMEDIA.V1
 
 # Add our own visitor implementations for the sections we support

commit 29b69ef13c04cb6aae14e39911edae18c3f54f8f
Author: Ken Hunt <ken.hunt at digium.com>
Date:   Tue May 8 11:40:51 2012 -0500

    Changes for new retry logic.

diff --git a/config/RTP.config b/config/RTP.config
index 47b975d..30e0289 100644
--- a/config/RTP.config
+++ b/config/RTP.config
@@ -12,7 +12,7 @@ endport=20000
 workerthreadcount=4
 
 # IPv4 address we should bind sessions to
-#ipv4bind=
+ipv4bind=127.0.0.1
 
 # IPv6 address we should bind sessions to
 #ipv6bind=
diff --git a/config/test_component.conf b/config/test_component.conf
index 8b7b63b..377d264 100644
--- a/config/test_component.conf
+++ b/config/test_component.conf
@@ -3,6 +3,7 @@
 #
 # Icebox Configuration
 #
+Ice.ThreadPool.Client.Size=4
 
 IceBox.InheritProperties=1
 IceBox.LoadOrder=ServiceDiscovery,RTPStateReplicator,MediaServiceRTP,MediaRTPpjmediaTest
@@ -15,6 +16,9 @@ Ice.Override.Timeout=5000
 
 IceBox.Service.ServiceDiscovery=ServiceLocator:create
 
+# For unit test we run without state replication.
+ServiceDiscovery.Standalone = true
+
 ServiceDiscovery.IceStorm.InstanceName=ServiceDiscovery
 
 ServiceDiscovery.IceStorm.TopicManager.Endpoints=tcp -h 127.0.0.1 -p 4421
diff --git a/config/test_component_v6.conf b/config/test_component_v6.conf
index 6c991c6..8daa140 100644
--- a/config/test_component_v6.conf
+++ b/config/test_component_v6.conf
@@ -4,6 +4,8 @@
 # Icebox Configuration
 #
 
+Ice.ThreadPool.Client.Size=4
+
 IceBox.InheritProperties=1
 IceBox.LoadOrder=ServiceDiscovery,RTPStateReplicator,MediaServiceRTP,MediaRTPpjmediaTest
 
@@ -15,6 +17,9 @@ Ice.Override.Timeout=5000
 
 IceBox.Service.ServiceDiscovery=ServiceLocator:create
 
+# For unit test we run without state replication.
+ServiceDiscovery.Standalone=true
+
 ServiceDiscovery.IceStorm.InstanceName=ServiceDiscovery
 
 ServiceDiscovery.IceStorm.TopicManager.Endpoints=tcp -h 127.0.0.1 -p 4421
diff --git a/config/test_rtp_ice.conf b/config/test_rtp_ice.conf
index 4edbeca..68fbee7 100644
--- a/config/test_rtp_ice.conf
+++ b/config/test_rtp_ice.conf
@@ -4,6 +4,7 @@
 # Icebox Configuration
 #
 RTPConfiguration.Name=rtpoice
+Ice.ThreadPool.Client.Size=4
 IceBox.InheritProperties=1
 IceBox.LoadOrder=ServiceDiscovery,MediaRTPpjmedia,MediaRTPpjmediaTest
 
@@ -24,10 +25,11 @@ LocatorService.Proxy=LocatorService:tcp -h 127.0.0.1 -p 4411
 
 IceBox.Service.MediaRTPpjmedia=MediaRTPPJMEDIA:create
 
+MediaRTPpjmedia.ServiceName=test
+
 # Adapter parameters for this component
-MediaRTPpjmediaAdapter.Endpoints=default -h 127.0.0.1
-MediaRTPpjmediaAdapterLocal.Endpoints=default -h 127.0.0.1
-MediaRTPpjmediaAdapterLogger.Endpoints=default -h 127.0.0.1
+MediaRTPpjmedia.ServiceAdapter.Endpoints=default -h 127.0.0.1 -p 4471
+MediaRTPpjmedia.BackplaneAdapter.Endpoints=default -h 127.0.0.1 -p 4472
 
 # A proxy to the service locator management service
 ServiceLocatorManagementProxy=LocatorServiceManagement:tcp -h 127.0.0.1 -p 4422
@@ -47,6 +49,9 @@ IceBox.Service.MediaRTPpjmediaTest=MediaRTPPJMEDIAIceTest:create
 
 IceBox.Service.ServiceDiscovery=ServiceLocator:create
 
+# For unit test we run without state replication.
+ServiceDiscovery.Standalone = true
+
 ServiceDiscovery.IceStorm.InstanceName=ServiceDiscovery
 
 ServiceDiscovery.IceStorm.TopicManager.Endpoints=tcp -h 127.0.0.1 -p 4421
diff --git a/slice/AsteriskSCF/Replication/MediaRTPPJMEDIA/RTPStateReplicationIf.ice b/slice/AsteriskSCF/Replication/MediaRTPPJMEDIA/RTPStateReplicationIf.ice
index 1bdd788..4d7099f 100644
--- a/slice/AsteriskSCF/Replication/MediaRTPPJMEDIA/RTPStateReplicationIf.ice
+++ b/slice/AsteriskSCF/Replication/MediaRTPPJMEDIA/RTPStateReplicationIf.ice
@@ -23,6 +23,7 @@
 #include <AsteriskSCF/Media/RTP/MediaRTPIf.ice>
 #include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.ice>
 #include <AsteriskSCF/System/Component/ConfigurationIf.ice>
+#include <AsteriskSCF/System/OperationsIf.ice>
 
 module AsteriskSCF
 {
@@ -53,16 +54,16 @@ module V1
 
     interface RTPStateReplicatorListener
     {
-	void stateRemoved(Ice::StringSeq itemKeys);
-	void stateSet(RTPStateItemSeq items);
+	void stateRemoved(AsteriskSCF::System::V1::OperationContext operationContext, Ice::StringSeq itemKeys);
+	void stateSet(AsteriskSCF::System::V1::OperationContext operationContext, RTPStateItemSeq items);
     };
 
     interface RTPStateReplicator
     {
-	void addListener(RTPStateReplicatorListener *listener);
-	void removeListener(RTPStateReplicatorListener *listener);
-	void setState (RTPStateItemSeq items);
-	void removeState(Ice::StringSeq items);
+	void addListener(AsteriskSCF::System::V1::OperationContext operationContext, RTPStateReplicatorListener *listener);
+	void removeListener(AsteriskSCF::System::V1::OperationContext operationContext, RTPStateReplicatorListener *listener);
+	void setState (AsteriskSCF::System::V1::OperationContext operationContext, RTPStateItemSeq items);
+	void removeState(AsteriskSCF::System::V1::OperationContext operationContext, Ice::StringSeq items);
 	idempotent RTPStateItemSeq getState(Ice::StringSeq itemKeys);
 	idempotent RTPStateItemSeq getAllState();
     };
diff --git a/src/Component.cpp b/src/Component.cpp
index 43d1166..e1956dc 100644
--- a/src/Component.cpp
+++ b/src/Component.cpp
@@ -1,7 +1,7 @@
 /*
  * Asterisk SCF -- An open-source communications framework.
  *
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2012, Digium, Inc.
  *
  * See http://www.asterisk.org for more information about
  * the Asterisk SCF project. Please do not directly contact
@@ -22,40 +22,47 @@
 #include <IceUtil/UUID.h>
 
 #include <boost/shared_ptr.hpp>
+#include <AsteriskSCF/Operations/OperationContext.h>
 
+#include <AsteriskSCF/Component/Component.h>
 #include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
+#include <AsteriskSCF/Discovery/SmartProxy.h>
+#include <AsteriskSCF/Logger.h>
+#include <AsteriskSCF/Logger/IceLogger.h>
 #include <AsteriskSCF/Media/MediaIf.h>
 #include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
-#include <AsteriskSCF/System/Component/ConfigurationIf.h>
-#include <AsteriskSCF/Logger/IceLogger.h>
-#include <AsteriskSCF/Logger.h>
-#include <AsteriskSCF/Discovery/SmartProxy.h>
-#include <AsteriskSCF/Component/Component.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
 #include <AsteriskSCF/PJLIB/ThreadHook.h>
+#include <AsteriskSCF/System/Component/ConfigurationIf.h>
 
+#include "PJMEDIAEnvironment.h"
+#include "RTPConfiguration.h"
+#include "RTPConfigurationIf.h"
 #include "RTPReplicationContext.h"
 #include "RTPSession.h"
 #include "RTPStateReplicator.h"
-#include "RTPConfiguration.h"
-#include "RTPConfigurationIf.h"
-#include "PJMEDIAEnvironment.h"
 
-using namespace std;
+using namespace AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1;
 using namespace AsteriskSCF::Core::Discovery::V1;
-using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Discovery;
 using namespace AsteriskSCF::Media::RTP::V1;
+using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Operations;
+using namespace AsteriskSCF::PJMEDIARTP;
 using namespace AsteriskSCF::Replication::MediaRTPPJMEDIA::V1;
-using namespace AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1;
-using namespace AsteriskSCF::System::Configuration::V1;
+using namespace AsteriskSCF::Replication;
 using namespace AsteriskSCF::System::Component::V1;
+using namespace AsteriskSCF::System::Configuration::V1;
 using namespace AsteriskSCF::System::Logging;
-using namespace AsteriskSCF::Discovery;
-using namespace AsteriskSCF::Replication;
-using namespace AsteriskSCF::PJMEDIARTP;
+using namespace std;
 
 namespace
 {
 Logger lg = getLoggerFactory().getLogger("AsteriskSCF.MediaRTP");
+
+typedef ContextResultData<RTPSessionPrx> AllocateResultData;
+typedef boost::shared_ptr<AllocateResultData> AllocateResultDataPtr;
+
 }
 
 static const string ReplicaServiceId("MediaRTPReplica");
@@ -73,10 +80,11 @@ public:
       const ConfigurationServiceImplPtr&);
 
     RTPSessionPrx allocate(
-            const RTPServiceLocatorParamsPtr&,
-            const RTPOptionsPtr&,
-            RTPAllocationOutputsPtr&,
-            const Ice::Current&);
+        const AsteriskSCF::System::V1::OperationContextPtr&,
+        const RTPServiceLocatorParamsPtr&,
+        const RTPOptionsPtr&,
+        RTPAllocationOutputsPtr&,
+        const Ice::Current&);
 
     pj_pool_factory *getPoolFactory() { return mEnvironment->poolFactory(); };
 
@@ -86,6 +94,7 @@ public:
     }
 
 private:
+    OperationContextCachePtr mOperationContextCache;
     Ice::ObjectAdapterPtr mAdapter;
     PJMEDIAEnvironmentPtr mEnvironment;
     RTPReplicationContextPtr mReplicationContext;
@@ -202,6 +211,7 @@ private:
     virtual void onPreInitialize();
     virtual void onStop();
     virtual void onStart();
+    virtual void onActivated();
 
     // Other base Component overrides
     virtual void prepareBackplaneServicesForDiscovery();
@@ -236,12 +246,12 @@ private:
 
 void Component::onSuspend()
 {
-    mGeneralState->serviceManagement->suspend();
+    mGeneralState->serviceManagement->suspend(AsteriskSCF::Operations::createContext());
 }
 
 void Component::onResume()
 {
-    mGeneralState->serviceManagement->unsuspend();
+    mGeneralState->serviceManagement->unsuspend(AsteriskSCF::Operations::createContext());
 }
 
 /**
@@ -250,6 +260,7 @@ void Component::onResume()
 RTPMediaServiceImpl::RTPMediaServiceImpl(const Ice::ObjectAdapterPtr& adapter,
     const RTPReplicationContextPtr& replicationContext,
     const ConfigurationServiceImplPtr& configurationService) :
+    mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
     mAdapter(adapter),
     mEnvironment(PJMEDIAEnvironment::create(adapter->getCommunicator()->getProperties(), configurationService)),
     mReplicationContext(replicationContext),
@@ -261,12 +272,24 @@ RTPMediaServiceImpl::RTPMediaServiceImpl(const Ice::ObjectAdapterPtr& adapter,
  * Implementation of the allocate method as defined in MediaRTPIf.ice
  */
 RTPSessionPrx RTPMediaServiceImpl::allocate(
-        const RTPServiceLocatorParamsPtr& params,
-        const RTPOptionsPtr& options,
-        RTPAllocationOutputsPtr& outputs,
-        const Ice::Current&)
+    const AsteriskSCF::System::V1::OperationContextPtr& context,
+    const RTPServiceLocatorParamsPtr& params,
+    const RTPOptionsPtr& options,
+    RTPAllocationOutputsPtr& outputs,
+    const Ice::Current&)
 {
-    return AsteriskSCF::PJMEDIARTP::RTPSession::create(
+    std::pair<bool, AllocateResultDataPtr> data =
+        getContextSync<AllocateResultDataPtr>(mOperationContextCache, context);
+
+    if (data.first)
+    {
+        // retry detected
+        return data.second->getResult();
+    }
+
+    try
+    {
+        RTPSessionPrx r = AsteriskSCF::PJMEDIARTP::RTPSession::create(
             mAdapter,
             IceUtil::generateUUID(),
             params,
@@ -275,6 +298,19 @@ RTPSessionPrx RTPMediaServiceImpl::allocate(
             mConfigurationService,
             options,
             outputs);
+        data.second->setResult(r);
+        return r;
+    }
+    catch (const std::exception& e)
+    {
+        data.second->setException(e);
+        throw;
+    }
+    catch (...)
+    {
+        data.second->setException();
+        throw;
+    }
 }
 
 void Component::onPreInitialize()
@@ -356,7 +392,7 @@ void Component::createPrimaryServices()
 
         if (rtpReplicationContext->isActive() == true)
         {
-            getServiceLocatorManagement()->addCompare(getName() + ".RTP.Comparator", mRTPMediaComparatorServicePrx);
+            getServiceLocatorManagement()->addCompare(AsteriskSCF::Operations::createContext(), getName() + ".RTP.Comparator", mRTPMediaComparatorServicePrx);
         }
 
     }
@@ -413,7 +449,7 @@ void Component::findRemoteServices()
         // replicator service.
         ConfigurationReplicatorPrx configurationReplicator = ConfigurationReplicatorPrx::checkedCast(
             rtpReplicationContext->getReplicator().initialize(), ReplicatorFacet);
-        configurationReplicator->registerConfigurationService(mConfigurationServicePrx);
+        configurationReplicator->registerConfigurationService(AsteriskSCF::Operations::createContext(), mConfigurationServicePrx);
 
     }
     catch (const std::exception& e)
@@ -466,7 +502,7 @@ void Component::listenToStateReplicators()
         // Are we in standby mode?
         if (rtpReplicationContext->getState() == STANDBY_IN_REPLICA_GROUP)
         {
-            rtpReplicationContext->getReplicator()->addListener(mReplicatorListenerProxy);
+            rtpReplicationContext->getReplicator()->addListener(AsteriskSCF::Operations::createContext(), mReplicatorListenerProxy);
             mListeningToReplicator = true;
         }
     }
@@ -494,7 +530,7 @@ void Component::stopListeningToStateReplicators()
 
     try
     {
-        rtpReplicationContext->getReplicator()->removeListener(mReplicatorListenerProxy);
+        rtpReplicationContext->getReplicator()->removeListener(AsteriskSCF::Operations::createContext(), mReplicatorListenerProxy);
         mListeningToReplicator = false;
     }
     catch (const Ice::Exception& e)
@@ -532,7 +568,19 @@ void Component::onRegisterPrimaryServices()
     }
 
     mGeneralState->serviceManagement = mRTPMediaServiceRegistration->getServiceManagement();
-    mGeneralState->serviceManagement->addLocatorParams(mRTPOverIceLocatorParams, getName() + ".RTP.Comparator");
+    mGeneralState->serviceManagement->addLocatorParams(AsteriskSCF::Operations::createContext(),
+        mRTPOverIceLocatorParams, getName() + ".RTP.Comparator");
+}
+
+void Component::onActivated()
+{
+    RTPReplicationContextPtr rtpReplicationContext =
+        boost::static_pointer_cast<RTPReplicationContext>(getReplicationContext());
+
+    RTPStateItemSeq items;
+    items.push_back(mGeneralState);
+    RTPStateReplicatorPrx oneway = RTPStateReplicatorPrx::uncheckedCast(rtpReplicationContext->getReplicator()->ice_oneway());
+    oneway->setState(AsteriskSCF::Operations::createContext(), items);
 }
 
 void Component::onStart()
@@ -545,7 +593,7 @@ void Component::onStart()
         RTPStateItemSeq items;
         items.push_back(mGeneralState);
         RTPStateReplicatorPrx oneway = RTPStateReplicatorPrx::uncheckedCast(rtpReplicationContext->getReplicator()->ice_oneway());
-        oneway->setState(items);
+        oneway->setState(AsteriskSCF::Operations::createContext(), items);
     }
 }
 
@@ -553,10 +601,10 @@ void Component::onStop()
 {
     if (getReplicationContext()->isActive() == true)
     {
-       mGeneralState->serviceManagement->unregister();
+       mGeneralState->serviceManagement->unregister(AsteriskSCF::Operations::createContext());
     }
 
-    getServiceLocatorManagement()->removeCompare(getName() + ".RTP.Comparator");
+    getServiceLocatorManagement()->removeCompare(AsteriskSCF::Operations::createContext(), getName() + ".RTP.Comparator");
 }
 
 extern "C"
diff --git a/src/ICETransport.cpp b/src/ICETransport.cpp
index 994b1ad..b36c996 100644
--- a/src/ICETransport.cpp
+++ b/src/ICETransport.cpp
@@ -14,39 +14,38 @@
  * at the top of the source tree.
  */
 
-#include "ICETransport.h"
-#include "PJUtil.h"
-
-#include <pjmedia.h>
 #include <pjlib.h>
+#include <pjmedia.h>
 #include <pjnath.h>
 
-#include <AsteriskSCF/System/ExceptionsIf.h>
 #include <map>
+#include <sstream>
+
 #include <boost/thread.hpp>
-#include <boost/thread/shared_mutex.hpp>
 
-#include <AsteriskSCF/System/NAT/NATTraversalIf.h>
 #include <Ice/Ice.h>
-#include <sstream>
-#include <AsteriskSCF/Logger.h>
 #include <IceUtil/UUID.h>
 
+#include <AsteriskSCF/Logger.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
+#include <AsteriskSCF/System/ExceptionsIf.h>
+#include <AsteriskSCF/System/NAT/NATTraversalIf.h>
+
+#include "ICETransport.h"
+#include "PJUtil.h"
+
+using namespace AsteriskSCF::Helpers;
+using namespace AsteriskSCF::Operations;
 using namespace AsteriskSCF::PJMEDIARTP;
-using namespace AsteriskSCF::System::V1;
 using namespace AsteriskSCF::PJUtil;
-using namespace std;
-using namespace AsteriskSCF::Helpers;
 using namespace AsteriskSCF::System::Logging;
 using namespace AsteriskSCF::System::NAT::V1;
+using namespace AsteriskSCF::System::V1;
+using namespace std;
 
 namespace
 {
 Logger logger = getLoggerFactory().getLogger("AsteriskSCF.MediaRTP");
-}
-
-namespace 
-{
 
 class ICEAgentImpl : public InteractiveConnectionAgent
 {
@@ -54,6 +53,7 @@ public:
 
     ICEAgentImpl(const Ice::ObjectAdapterPtr& adapter, const Ice::Identity& id, const PJMEDIAEnvironmentPtr& env,
         const PJMEDIAEndpointPtr& ep) :
+        mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
         mAdapter(adapter),
         mId(id),
         mShuttingDown(false),
@@ -84,10 +84,22 @@ public:
         return mRole;
     }
 
+    typedef AMDContextResultData<CandidatePtr, AMD_InteractiveConnectionAgent_negotiatePtr> NegotiateContextData;
+    typedef boost::shared_ptr<NegotiateContextData> NegotiateContextDataPtr;
+
     void negotiate_async(const AMD_InteractiveConnectionAgent_negotiatePtr& callback,
-        const string& hostname, Ice::Int port, const CandidateSeq& candidates,
-        const Ice::Current&)
+            const AsteriskSCF::System::V1::OperationContextPtr& context,
+            const string& hostname, Ice::Int port, const CandidateSeq& candidates,
+            const Ice::Current&)
     {
+        NegotiateContextDataPtr data =
+            getContext<NegotiateContextData>(mOperationContextCache, context, callback);
+
+        if (!data)
+        {
+            // retry detected
+            return;
+        }
 
         boost::unique_lock<boost::shared_mutex> lock(mLock);
         stateCheck();
@@ -99,12 +111,12 @@ public:
             // TODO: are we going to support cancellable negotiations.
             //
         }
-        mCurrentNegotiation = callback;
+        mCurrentNegotiation = data->getProxy();
 
         //
-        // So how this works is we create a remote SDP and call pjmedia_transport_start() easy peasy. (Same deal 
+        // So how this works is we create a remote SDP and call pjmedia_transport_start() easy peasy. (Same deal
         //
-        pjmedia_sdp_session* remoteSDPSession = 
+        pjmedia_sdp_session* remoteSDPSession =
             static_cast<pjmedia_sdp_session*>(pj_pool_zalloc(mEnv->memoryPool(), sizeof(pjmedia_sdp_session)));
 
 
@@ -178,7 +190,7 @@ public:
 
         //
         // I was concerned about the fact that for a given SIP session, there might be multiple media
-        // streams and multiple candidates. I'm not sure that its actually too much of an issue even 
+        // streams and multiple candidates. I'm not sure that its actually too much of an issue even
         // if multiple media types are muxed on a single ICE negotiated flow, but there will need to be
         // some redesign to pull in the multiple media streams associated with the session. For the moment
         // we will operation under the premise that we are dealing with a single media stream.
@@ -191,7 +203,7 @@ public:
         {
             CandidatePtr candidate = *i;
             ostringstream os;
-            os << "candidate:" << candidate->foundation << ' ' << candidate->componentId <<  " UDP " << 
+            os << "candidate:" << candidate->foundation << ' ' << candidate->componentId <<  " UDP " <<
                 candidate->priority << ' ' << candidate->mappedAddress << ' ' << candidate->mappedPort << " typ ";
             string hostType;
             switch (candidate->type)
@@ -216,7 +228,7 @@ public:
             }
             string t = os.str();
             pj_str_t candidateStr = pj_str(const_cast<char*>(t.c_str()));
-            pjmedia_sdp_attr* newAttribute = pjmedia_sdp_attr_create(mEnv->memoryPool(), 
+            pjmedia_sdp_attr* newAttribute = pjmedia_sdp_attr_create(mEnv->memoryPool(),
                 "candidate", &candidateStr);
             pjmedia_sdp_attr_add(&currentMedia->attr_count, currentMedia->attr, newAttribute);
         }
@@ -425,7 +437,7 @@ public:
                             candidateObj->mappedAddress = candidateObj->baseAddress;
                             candidateObj->mappedPort = candidateObj->basePort;
                             candidateObj->baseAddress = baseAddress;
-                            candidateObj->basePort = basePort; 
+                            candidateObj->basePort = basePort;
                         }
                         else
                         {
@@ -493,6 +505,7 @@ public:
 
 private:
     boost::shared_mutex mLock;
+    OperationContextCachePtr mOperationContextCache;
     Ice::ObjectAdapterPtr mAdapter;
     Ice::Identity mId;
     bool mShuttingDown;
@@ -553,7 +566,7 @@ boost::shared_mutex ICECallbackAdapter::mLock;
 // invoked before we get a chance to add the transport.  The solution to that is to allow an entry to be created when
 // the ICE completion callback arrives and there isn't a table entry. When the addEntry runs, it will see the entry and
 // simply update the appropriate field.
-// 
+//
 void ICECallbackAdapter::addEntry(pjmedia_transport* transport, const ICETransportPtr& callback)
 {
     bool alreadyKnown = false;
@@ -626,7 +639,7 @@ void ICECallbackAdapter::onICEComplete(pjmedia_transport* transport, pj_ice_stra
 {
     //
     // AFAICT, only PJ_ICE_STRANS_OP_NEGOTIATION should get here.
-    // 
+    //
     switch (operation)
     {
         case PJ_ICE_STRANS_OP_INIT:
@@ -684,18 +697,18 @@ void ICECallbackAdapter::onICEComplete(pjmedia_transport* transport, pj_ice_stra
         case PJ_ICE_STRANS_OP_KEEP_ALIVE:
             //
             // Keep alive has successfully completed. FWICT this should not get here.
-            // 
+            //
             break;
-    };
+    }
 }
 
-}
+} // anonymous namespace
 
 ICETransport::~ICETransport()
 {
     //
     // TODO : cleanup ICE transport, the transport itself is closed by the parent class.
-    // 
+    //
     ICECallbackAdapter::removeEntry(mTransport);
 }
 
@@ -728,7 +741,7 @@ void ICETransport::onSetupComplete(pjmedia_transport* transport, int status)
         {
             //
             // Address has changed! We need to let Session listeners know!
-            // TODO! 
+            // TODO!
             //
             pj_memcpy(mLastKnownAddr.get(), &info.sock_info.rtp_addr_name, sizeof(pj_sockaddr));
         }
@@ -738,21 +751,33 @@ void ICETransport::onSetupComplete(pjmedia_transport* transport, int status)
     mMonitor.notify_one();
 }
 
-AddressPtr ICETransport::localAddress() 
+AddressPtr ICETransport::localAddress()
 {
     boost::unique_lock<boost::mutex> lock(mLock);
     if (mLocalAddress)
     {
         return mLocalAddress;
     }
+    //
+    // Retry check for local address for max of 2.5 seconds, then proceed as if it was unknown.
+    //
     for (size_t i = 0; i < 5 && !mLocalAddress; ++i)
     {
-        mMonitor.wait(lock);
+        try
+        {
+            mMonitor.timed_wait(lock, boost::posix_time::milliseconds(500));
+        }
+        catch (const boost::thread_interrupted&)
+        {
+        }
+        catch (const boost::thread_resource_error&)
+        {
+        }
     }
     return mLocalAddress;
 }
 
-AddressPtr ICETransport::remoteAddress() 
+AddressPtr ICETransport::remoteAddress()
 {
     boost::unique_lock<boost::mutex> lock(mLock);
     return mRemoteAddress;
@@ -791,7 +816,7 @@ void ICETransport::start()
     PJICECallbackPtr callback(new pjmedia_ice_cb);
     callback->on_ice_complete = &ICECallbackAdapter::onICEComplete;
     NATModulePtr natModule = NATModule::create(mConfig, mEndpoint);
-    pj_status_t result = pjmedia_ice_create(mEndpoint->endpoint(), "ASCF_ICE_MEDIA", (mEnableRTCP ? 2 : 1), 
+    pj_status_t result = pjmedia_ice_create(mEndpoint->endpoint(), "ASCF_ICE_MEDIA", (mEnableRTCP ? 2 : 1),
         natModule->configuration(), callback.get(), &t);
     if (fail(result))
     {
diff --git a/src/RTPConfiguration.cpp b/src/RTPConfiguration.cpp
index 3c5650a..178bf22 100644
--- a/src/RTPConfiguration.cpp
+++ b/src/RTPConfiguration.cpp
@@ -1,7 +1,7 @@
 /*
  * Asterisk SCF -- An open-source communications framework.
  *
- * Copyright (C) 2011, Digium, Inc.
+ * Copyright (C) 2011-2012, Digium, Inc.
  *
  * See http://www.asterisk.org for more information about
  * the Asterisk SCF project. Please do not directly contact
@@ -14,19 +14,22 @@
  * at the top of the source tree.
  */
 
-#include "RTPConfigurationIf.h"
-#include "RTPConfiguration.h"
 
 #include <IceUtil/UUID.h>
-#include <AsteriskSCF/System/Component/ConfigurationIf.h>
 
 #include <boost/thread.hpp>
-#include <boost/thread/shared_mutex.hpp>
 
-using namespace AsteriskSCF::System::Configuration::V1;
+#include <AsteriskSCF/System/Component/ConfigurationIf.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
+
+#include "RTPConfigurationIf.h"
+#include "RTPConfiguration.h"
+
+using namespace AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1;
+using namespace AsteriskSCF::Operations;
 using namespace AsteriskSCF::PJMEDIARTP;
+using namespace AsteriskSCF::System::Configuration::V1;
 using namespace std;
-using namespace AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1;
 
 /**
  * Implementation of the configuration service.
@@ -34,24 +37,25 @@ using namespace AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1;
 class ConfigurationServiceServant : virtual public ConfigurationServiceImpl
 {
 public:
+    ConfigurationServiceServant() : mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)) {}
 
     /**
      * AsteriskSCF::System::Configuration::V1 interface. Slice to C++ mapping.
      */
     AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq
     getConfiguration(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
-    
+
     AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq
     getConfigurationAll(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
-    
+
     AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq
     getConfigurationGroups(const Ice::Current&);
 
-    void setConfiguration(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
+    void setConfiguration(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
 
-    void removeConfigurationItems(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&,
+    void removeConfigurationItems(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&,
             const Ice::Current&);
-    void removeConfigurationGroups(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&,
+    void removeConfigurationGroups(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&,
             const Ice::Current&);
 
     /**
@@ -90,12 +94,12 @@ public:
     {
         mICEConfig = newConfig;
     }
-    
+
     void replaceConfig(const NATConfigPtr& newConfig)
     {
         mNATConfig = newConfig;
     }
-    
+
     void replaceConfig(const SRTPConfigurationPtr& newConfig)
     {
         mSRTPConfig = newConfig;
@@ -121,7 +125,7 @@ public:
         //
         if (mGeneralGroup)
         {
-            ConfigurationItemDict::const_iterator item = 
+            ConfigurationItemDict::const_iterator item =
                 mGeneralGroup->configurationItems.find(EnableSRTPItemName);
             if (item != mGeneralGroup->configurationItems.end())
             {
@@ -150,6 +154,7 @@ public:
     }
 
 private:
+    OperationContextCachePtr mOperationContextCache;
     /**
      * General RTP configuration
      */
@@ -223,7 +228,7 @@ ConfigurationGroupSeq ConfigurationServiceServant::getConfiguration(
         void visitRTPICEConfigurationGroup(const RTPICEConfigurationGroupPtr& group)
         {
             RTPICEConfigurationGroupPtr currentGroup = mImpl->getICEConfigurationGroup();
-            
+
             if (!currentGroup)
             {
                 return;
@@ -233,19 +238,19 @@ ConfigurationGroupSeq ConfigurationServiceServant::getConfiguration(
                     returnedGroup->configurationItems);
             mGroups.push_back(returnedGroup);
         }
-        
+
         ConfigurationServiceServantPtr mImpl;
         ConfigurationGroupSeq& mGroups;
     };
-    
+
     ConfigurationGroupSeq newGroups;
     RTPConfigurationGroupVisitorPtr v = new GroupVisitor(this, newGroups);
-    
+
     for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
     {
         (*group)->visit(v);
     }
-    
+
     return newGroups;
 }
 
@@ -259,7 +264,7 @@ ConfigurationGroupSeq ConfigurationServiceServant::getConfigurationAll(
             mImpl(impl), mGroups(visitorGroups)
         {
         }
- 
+
     private:
         void visitRTPGeneralGroup(const ::AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1::RTPGeneralGroupPtr&)
         {
@@ -278,21 +283,21 @@ ConfigurationGroupSeq ConfigurationServiceServant::getConfigurationAll(
                 mGroups.push_back(g);
             }
         }
-        
+
         ConfigurationServiceServantPtr mImpl;
         ConfigurationGroupSeq& mGroups;
     };
-    
+
     ConfigurationGroupSeq newGroups;
     RTPConfigurationGroupVisitorPtr v = new GroupVisitor(this, newGroups);
 
     boost::shared_lock<boost::shared_mutex> lock(mLock);
-    
+
     for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
     {
         (*group)->visit(v);
     }
-    
+
     return newGroups;
 }
 
@@ -300,7 +305,7 @@ ConfigurationGroupSeq ConfigurationServiceServant::getConfigurationGroups(const
 {
     ConfigurationGroupSeq groups;
     boost::shared_lock<boost::shared_mutex> lock(mLock);
-   
+
     if (mGeneralGroup)
     {
         groups.push_back(new RTPGeneralGroup);
@@ -309,21 +314,21 @@ ConfigurationGroupSeq ConfigurationServiceServant::getConfigurationGroups(const
     {
         groups.push_back(new RTPICEConfigurationGroup);
     }
-    
+
     return groups;
 }
 
-void ConfigurationServiceServant::setConfiguration(const ConfigurationGroupSeq& groups,
+void ConfigurationServiceServant::setConfiguration(const AsteriskSCF::System::V1::OperationContextPtr& context, const ConfigurationGroupSeq& groups,
     const Ice::Current&)
 {
     class GroupVisitor : public RTPConfigurationGroupVisitor
     {
     public:
-        GroupVisitor(const ConfigurationServiceServantPtr& impl) : 
+        GroupVisitor(const ConfigurationServiceServantPtr& impl) :
             mImpl(impl)
         {
         }
- 
+
     private:
         /**
          * Helper function which performs serial number checking of items
@@ -340,22 +345,22 @@ void ConfigurationServiceServant::setConfiguration(const ConfigurationGroupSeq&
                 {
                     continue;
                 }
-  
+
                 ConfigurationItemDict::const_iterator localItem = localItems.find(item->first);
-  
+
                 if (localItem == localItems.end())
                 {
                     // This is a new item so serial checking does not apply
                     continue;
                 }
-  
+
                 if (item->second->serialNumber < localItem->second->serialNumber)
                 {
                     throw SerialConflict(group, item->second);
                 }
             }
         }
- 
+
         void visitRTPGeneralGroup(const ::AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1::RTPGeneralGroupPtr& group)
         {
             RTPGeneralGroupPtr g = mImpl->getGeneralGroup();
@@ -367,7 +372,7 @@ void ConfigurationServiceServant::setConfiguration(const ConfigurationGroupSeq&
             {
                 performSerialCheck(group->configurationItems, g->configurationItems, group);
             }
-     
+
             for (ConfigurationItemDict::const_iterator item = group->configurationItems.begin();
                  item != group->configurationItems.end();
                  ++item)
@@ -415,7 +420,7 @@ void ConfigurationServiceServant::setConfiguration(const ConfigurationGroupSeq&
                         mImpl->replaceConfig(ICEConfiguration::create(mMaxCandidates, mMaxCalls));
                     }
                 }
-                
+
 
                 void visitSTUNServerItem(const STUNServerItemPtr& item)
                 {
@@ -454,11 +459,11 @@ void ConfigurationServiceServant::setConfiguration(const ConfigurationGroupSeq&
                         mMaxCandidates = item->maxCandidates;
                     }
                 }
-                
+
             private:
 
                 ConfigurationServiceServantPtr mImpl;
-                
+
                 bool mCreateRTPICEConfig;
                 bool mCreateNATConfig;
 
@@ -481,7 +486,7 @@ void ConfigurationServiceServant::setConfiguration(const ConfigurationGroupSeq&
             }
 
             RTPConfigurationItemVisitorPtr v(new Visitor(mImpl));
-     
+
             for (ConfigurationItemDict::const_iterator item = group->configurationItems.begin();
                  item != group->configurationItems.end();
                  ++item)
@@ -495,7 +500,13 @@ void ConfigurationServiceServant::setConfiguration(const ConfigurationGroupSeq&
 
         ConfigurationServiceServantPtr mImpl;
     };
-    
+
+    if (!mOperationContextCache->addOperationContext(context))
+    {
+        // retry detected
+        return;
+    }
+
     RTPConfigurationGroupVisitorPtr v = new GroupVisitor(this);
 
     boost::unique_lock<boost::shared_mutex> lock(mLock);
@@ -506,12 +517,13 @@ void ConfigurationServiceServant::setConfiguration(const ConfigurationGroupSeq&
 }
 
 void ConfigurationServiceServant::removeConfigurationItems(
+    const AsteriskSCF::System::V1::OperationContextPtr& context,
     const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups, const Ice::Current&)
 {
     class GroupVisitor : public RTPConfigurationGroupVisitor
     {
     public:
-        GroupVisitor(const ConfigurationServiceServantPtr& impl) : 
+        GroupVisitor(const ConfigurationServiceServantPtr& impl) :
             mImpl(impl)
         {
         }
@@ -535,7 +547,7 @@ void ConfigurationServiceServant::removeConfigurationItems(
                 localItems.erase(localItem);
             }
         }
- 
+
         void visitRTPGeneralGroup(const ::AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1::RTPGeneralGroupPtr& group)
... 3564 lines suppressed ...


-- 
asterisk-scf/integration/media_rtp_pjmedia.git



More information about the asterisk-scf-commits mailing list