[asterisk-scf-commits] asterisk-scf/integration/media_operations_core.git branch "retry_deux" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Tue Apr 17 17:29:27 CDT 2012


branch "retry_deux" has been updated
       via  6a47e2804b1911f5a6a796fe9765b8e579f07730 (commit)
      from  9dfe52713c4189d6f797e693dc7915538f20f403 (commit)

Summary of changes:
 src/InbandTelephonyEvents.cpp |   32 +++++++++++++++++++++++++-------
 src/TranslatorSink.cpp        |   14 ++++++++++----
 src/TranslatorSink.h          |    4 +++-
 src/TranslatorSource.cpp      |   27 ++++++++++++++++++++-------
 src/TranslatorSource.h        |    4 +++-
 5 files changed, 61 insertions(+), 20 deletions(-)


- Log -----------------------------------------------------------------
commit 6a47e2804b1911f5a6a796fe9765b8e579f07730
Author: David M. Lee <dlee at digium.com>
Date:   Tue Apr 17 17:28:21 2012 -0500

    Retry logic where I though it wasn't needed.
    
    Old requests can potentially get 'hung up' and come in much later, after
    a retry has been processed. *sigh*

diff --git a/src/InbandTelephonyEvents.cpp b/src/InbandTelephonyEvents.cpp
index 0d8be0d..7dc02cf 100644
--- a/src/InbandTelephonyEvents.cpp
+++ b/src/InbandTelephonyEvents.cpp
@@ -67,15 +67,24 @@ class InbandTelephonyEventOperation : public TranslatorOperation
     class InbandTelephonyEventSource : public TelephonyEventSource
     {
     public:
-        InbandTelephonyEventSource(const Logger& logger) : mLogger(logger) { }
+        InbandTelephonyEventSource(const Logger& logger) :
+            mLogger(logger), mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)) { }
 
         void addSinks_async(const AMD_TelephonyEventSource_addSinksPtr& cb,
-            const AsteriskSCF::System::V1::OperationContextPtr&,
+            const AsteriskSCF::System::V1::OperationContextPtr& context,
             const TelephonyEventSinkSeq& sinks,
             const Ice::Current&)
         {
+            typedef AMDContextData<AMD_TelephonyEventSource_addSinksPtr> Data;
+            typedef Data::ptr_type DataPtr;
+            DataPtr data = getContext<Data>(mOperationContextCache, context, cb);
+            if (!data)
+            {
+                // retry detected
+                return;
+            }
+
             boost::unique_lock<boost::shared_mutex> lock(mMutex);
-            // naturally idempotent - multiple adds are harmless
             for (TelephonyEventSinkSeq::const_iterator i = sinks.begin(); i != sinks.end(); ++i)
             {
                 if (std::find_if(mSinks.begin(), mSinks.end(), IdentityComparePredicate<TelephonyEventSinkPrx>(*i)) == mSinks.end())
@@ -83,21 +92,29 @@ class InbandTelephonyEventOperation : public TranslatorOperation
                     mSinks.push_back(*i);
                 }
             }
-            cb->ice_response();
+            data->getProxy()->ice_response();
         }
 
         void removeSinks_async(const AMD_TelephonyEventSource_removeSinksPtr& cb,
-            const AsteriskSCF::System::V1::OperationContextPtr&,
+            const AsteriskSCF::System::V1::OperationContextPtr& context,
             const TelephonyEventSinkSeq& sinks,
             const Ice::Current&)
         {
+            typedef AMDContextData<AMD_TelephonyEventSource_removeSinksPtr> Data;
+            typedef Data::ptr_type DataPtr;
+            DataPtr data = getContext<Data>(mOperationContextCache, context, cb);
+            if (!data)
+            {
+                // retry detected
+                return;
+            }
+
             boost::unique_lock<boost::shared_mutex> lock(mMutex);
-            // naturally idempotent - multiple removes are harmless
             for (TelephonyEventSinkSeq::const_iterator i = sinks.begin(); i != sinks.end(); ++i)
             {
                 mSinks.erase(remove_if(mSinks.begin(), mSinks.end(), IdentityComparePredicate<TelephonyEventSinkPrx>(*i)), mSinks.end());
             }
-            cb->ice_response();
+            data->getProxy()->ice_response();
         }
 
         void getSinks_async(const AMD_TelephonyEventSource_getSinksPtr& cb,
@@ -133,6 +150,7 @@ class InbandTelephonyEventOperation : public TranslatorOperation
         boost::shared_mutex mMutex;
         TelephonyEventSinkSeq mSinks;
         Logger mLogger;
+        OperationContextCachePtr mOperationContextCache;
     };
 
     typedef IceUtil::Handle<InbandTelephonyEventSource> InbandTelephonyEventSourcePtr;
diff --git a/src/TranslatorSink.cpp b/src/TranslatorSink.cpp
index 8b56b5b..441ab8d 100644
--- a/src/TranslatorSink.cpp
+++ b/src/TranslatorSink.cpp
@@ -1,7 +1,7 @@
 /*
  * Asterisk SCF -- An open-source communications framework.
  *
- * Copyright (C) 2011, Digium, Inc.
+ * Copyright (C) 2011-2012, Digium, Inc.
  *
  * See http://www.asterisk.org for more information about
  * the Asterisk SCF project. Please do not directly contact
@@ -25,12 +25,14 @@ namespace MediaOperationsCore
 {
 
 using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Operations;
 using namespace AsteriskSCF::System::Logging;
 
 TranslatorSink::TranslatorSink(const Logger& logger,
         const FormatPtr& supportedFormat)
     : mLogger(logger),
-    mId(IceUtil::generateUUID())
+      mId(IceUtil::generateUUID()),
+      mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS))
 {
     mSupportedFormats.push_back(supportedFormat);
 }
@@ -52,11 +54,15 @@ void TranslatorSink::setTranslator(const TranslatorPtr& translator)
     mTranslator = translator;
 }
 
-void TranslatorSink::setSource(const AsteriskSCF::System::V1::OperationContextPtr&,
+void TranslatorSink::setSource(const AsteriskSCF::System::V1::OperationContextPtr& context,
     const StreamSourcePrx& source, const Ice::Current&)
 {
+    if (!mOperationContextCache->addOperationContext(context))
+    {
+        // retry detected
+        return;
+    }
     boost::unique_lock<boost::shared_mutex> lock(mMutex);
-    // naturally idempotent - multiple sets are harmless
     mSource = source;
 }
 
diff --git a/src/TranslatorSink.h b/src/TranslatorSink.h
index 7b16868..bffc1ce 100644
--- a/src/TranslatorSink.h
+++ b/src/TranslatorSink.h
@@ -1,7 +1,7 @@
 /*
  * Asterisk SCF -- An open-source communications framework.
  *
- * Copyright (C) 2011, Digium, Inc.
+ * Copyright (C) 2011-2012, Digium, Inc.
  *
  * See http://www.asterisk.org for more information about
  * the Asterisk SCF project. Please do not directly contact
@@ -20,6 +20,7 @@
 
 #include <AsteriskSCF/Media/MediaIf.h>
 #include <AsteriskSCF/Logger.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
 
 #include "Translator.h"
 
@@ -53,6 +54,7 @@ public:
 
 private:
     boost::shared_mutex mMutex;
+    AsteriskSCF::Operations::OperationContextCachePtr mOperationContextCache;
     AsteriskSCF::System::Logging::Logger mLogger;
     AsteriskSCF::Media::V1::FormatSeq mSupportedFormats;
     AsteriskSCF::Media::V1::StreamSourcePrx mSource;
diff --git a/src/TranslatorSource.cpp b/src/TranslatorSource.cpp
index 08c6053..9d00b6f 100644
--- a/src/TranslatorSource.cpp
+++ b/src/TranslatorSource.cpp
@@ -13,7 +13,10 @@
  * the GNU General Public License Version 2. See the LICENSE.txt file
  * at the top of the source tree.
  */
-#include "IceUtil/UUID.h"
+
+#include <IceUtil/UUID.h>
+
+#include <AsteriskSCF/Operations/OperationMonitor.h>
 
 #include "TranslatorSource.h"
 
@@ -24,12 +27,14 @@ namespace MediaOperationsCore
 {
 
 using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Operations;
 using namespace AsteriskSCF::System::Logging;
 
 TranslatorSource::TranslatorSource(const Logger& logger,
             const FormatPtr& format)
-        : mLogger(logger),
-        mId(IceUtil::generateUUID())
+    : mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
+      mLogger(logger),
+      mId(IceUtil::generateUUID())
 {
     mSupportedFormats.push_back(format);
 }
@@ -39,22 +44,30 @@ TranslatorSource::~TranslatorSource()
     mLogger(Trace) << "TranslatorSource destructor called";
 }
 
-void TranslatorSource::addSink(const AsteriskSCF::System::V1::OperationContextPtr&,
+void TranslatorSource::addSink(const AsteriskSCF::System::V1::OperationContextPtr& context,
     const StreamSinkPrx& sink, const Ice::Current&)
 {
+    if (!mOperationContextCache->addOperationContext(context))
+    {
+        // retry detected
+        return;
+    }
     boost::unique_lock<boost::shared_mutex> lock(mMutex);
-    // naturally idempotent - multiple adds are harmless
     if (std::find(mSinks.begin(), mSinks.end(), sink) == mSinks.end())
     {
         mSinks.push_back(sink);
     }
 }
 
-void TranslatorSource::removeSink(const AsteriskSCF::System::V1::OperationContextPtr&,
+void TranslatorSource::removeSink(const AsteriskSCF::System::V1::OperationContextPtr& context,
     const StreamSinkPrx& sink, const Ice::Current&)
 {
+    if (!mOperationContextCache->addOperationContext(context))
+    {
+        // retry detected
+        return;
+    }
     boost::unique_lock<boost::shared_mutex> lock(mMutex);
-    // naturally idempotent - multiple removes are harmless
     mSinks.erase(std::remove(mSinks.begin(), mSinks.end(), sink), mSinks.end());
 }
 
diff --git a/src/TranslatorSource.h b/src/TranslatorSource.h
index f9ae1c5..e70db26 100644
--- a/src/TranslatorSource.h
+++ b/src/TranslatorSource.h
@@ -18,8 +18,9 @@
 
 #include <boost/thread.hpp>
 
-#include <AsteriskSCF/Media/MediaIf.h>
 #include <AsteriskSCF/Logger.h>
+#include <AsteriskSCF/Media/MediaIf.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
 
 namespace AsteriskSCF
 {
@@ -54,6 +55,7 @@ public:
 
 private:
     boost::shared_mutex mMutex;
+    AsteriskSCF::Operations::OperationContextCachePtr mOperationContextCache;
     AsteriskSCF::System::Logging::Logger mLogger;
     AsteriskSCF::Media::V1::StreamSinkSeq mSinks;
     AsteriskSCF::Media::V1::FormatSeq mSupportedFormats;

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/media_operations_core.git



More information about the asterisk-scf-commits mailing list