[asterisk-scf-commits] asterisk-scf/integration/media_operations_core.git branch "retry_deux" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Mon Mar 19 11:19:22 CDT 2012


branch "retry_deux" has been updated
       via  f2777b25d2078ab0f82152aa19c7dbb30e7c21c3 (commit)
       via  34aaab0c5db49634234593023f0342a38fb668b5 (commit)
       via  a3bb13300bdc645aabd5c07dcb25bb75ed7ea680 (commit)
      from  667591c3b27242bb2962e8fa80ce49d7474c7358 (commit)

Summary of changes:
 src/InbandTelephonyEvents.cpp |   80 ++++++++++++++++++++++++++++++----------
 src/InbandTelephonyEvents.h   |    2 +
 2 files changed, 62 insertions(+), 20 deletions(-)


- Log -----------------------------------------------------------------
commit f2777b25d2078ab0f82152aa19c7dbb30e7c21c3
Author: David M. Lee <dlee at digium.com>
Date:   Mon Mar 19 11:17:43 2012 -0500

    Fixed thread safety issues in InbandTelephonyEventSource

diff --git a/src/InbandTelephonyEvents.cpp b/src/InbandTelephonyEvents.cpp
index b1d07dc..c02097f 100644
--- a/src/InbandTelephonyEvents.cpp
+++ b/src/InbandTelephonyEvents.cpp
@@ -20,6 +20,7 @@
 #include <AsteriskSCF/Replication/MediaOperationsCore/MediaOperationsCoreIf.h>
 #include <AsteriskSCF/Operations/OperationContext.h>
 
+#include <boost/thread.hpp>
 #include <IceUtil/UUID.h>
 
 #include "TranslatorOperation.h"
@@ -78,6 +79,7 @@ class InbandTelephonyEventOperation : public TranslatorOperation
             const TelephonyEventSinkSeq& sinks,
             const Ice::Current&)
         {
+            boost::unique_lock<boost::shared_mutex> lock(mMutex);
             // naturally idempotent - multiple adds are harmless
             for (TelephonyEventSinkSeq::const_iterator i = sinks.begin(); i != sinks.end(); ++i)
             {
@@ -94,6 +96,7 @@ class InbandTelephonyEventOperation : public TranslatorOperation
             const TelephonyEventSinkSeq& sinks,
             const Ice::Current&)
         {
+            boost::unique_lock<boost::shared_mutex> lock(mMutex);
             // naturally idempotent - multiple removes are harmless
             for (TelephonyEventSinkSeq::const_iterator i = sinks.begin(); i != sinks.end(); ++i)
             {
@@ -105,12 +108,21 @@ class InbandTelephonyEventOperation : public TranslatorOperation
         void getSinks_async(const AMD_TelephonyEventSource_getSinksPtr& cb,
                 const Ice::Current&)
         {
+            boost::shared_lock<boost::shared_mutex> lock(mMutex);
             cb->ice_response(mSinks);
         }
 
         void distributeToSinks(const TelephonyEventPtr& event)
         {
-            for (TelephonyEventSinkSeq::iterator iter = mSinks.begin(); iter != mSinks.end(); ++iter)
+            // it's unwise to invoke remote operations while holding a mutex.
+            // copy the sink list while holding the mutex, then use the copy.
+            TelephonyEventSinkSeq sinks;
+            {
+                boost::shared_lock<boost::shared_mutex> lock(mMutex);
+                sinks = mSinks;
+            }
+
+            for (TelephonyEventSinkSeq::iterator iter = sinks.begin(); iter != sinks.end(); ++iter)
             {
                 try
                 {
@@ -123,6 +135,7 @@ class InbandTelephonyEventOperation : public TranslatorOperation
             }
         }
     private:
+        boost::shared_mutex mMutex;
         TelephonyEventSinkSeq mSinks;
         Logger mLogger;
     };

commit 34aaab0c5db49634234593023f0342a38fb668b5
Author: David M. Lee <dlee at digium.com>
Date:   Mon Mar 19 10:47:25 2012 -0500

    Moved cookie to ValueOperationContextCookie template

diff --git a/src/InbandTelephonyEvents.cpp b/src/InbandTelephonyEvents.cpp
index 9d4e6ab..b1d07dc 100644
--- a/src/InbandTelephonyEvents.cpp
+++ b/src/InbandTelephonyEvents.cpp
@@ -53,32 +53,7 @@ private:
     const std::string mFormatName;
 };
 
-class CreateMediaOperationCookie : public OperationContextCookie
-{
-public:
-    void set(const MediaOperationPrx& v)
-    {
-        val = v;
-    }
-
-    void set(const IceUtil::Exception& e)
-    {
-        ex.reset(e.ice_clone());
-    }
-
-    MediaOperationPrx get() const
-    {
-        if (ex)
-        {
-            ex->ice_throw();
-        }
-        return val;
-    }
-
-private:
-    boost::shared_ptr<IceUtil::Exception> ex;
-    MediaOperationPrx val;
-};
+typedef ValueOperationContextCookie<MediaOperationPrx> CreateMediaOperationCookie;
 typedef boost::shared_ptr<CreateMediaOperationCookie> CreateMediaOperationCookiePtr;
 
 } // anonymous namespace
@@ -414,7 +389,7 @@ MediaOperationPrx InbandTelephonyEventOperationFactory::createMediaOperation(
     }
     catch (const IceUtil::Exception& e)
     {
-        cookie->set(e);
+        cookie->setException(e);
         throw;
     }
 }

commit a3bb13300bdc645aabd5c07dcb25bb75ed7ea680
Author: David M. Lee <dlee at digium.com>
Date:   Fri Mar 16 13:21:22 2012 -0500

    Added retry correctness

diff --git a/src/InbandTelephonyEvents.cpp b/src/InbandTelephonyEvents.cpp
index 57a8ec9..9d4e6ab 100644
--- a/src/InbandTelephonyEvents.cpp
+++ b/src/InbandTelephonyEvents.cpp
@@ -1,7 +1,7 @@
 /*
  * Asterisk SCF -- An open-source communications framework.
  *
- * Copyright (C) 2011, Digium, Inc.
+ * Copyright (C) 2011-2012, Digium, Inc.
  *
  * See http://www.asterisk.org for more information about
  * the Asterisk SCF project. Please do not directly contact
@@ -31,6 +31,7 @@ using namespace AsteriskSCF::Media::V1;
 using namespace AsteriskSCF::SessionCommunications::V1;
 using namespace AsteriskSCF::System::Logging;
 using namespace AsteriskSCF::Media::Formats::Audio::V1;
+using namespace AsteriskSCF::Operations;
 using namespace AsteriskSCF::Replication::MediaOperationsCore::V1;
 
 namespace
@@ -52,7 +53,35 @@ private:
     const std::string mFormatName;
 };
 
+class CreateMediaOperationCookie : public OperationContextCookie
+{
+public:
+    void set(const MediaOperationPrx& v)
+    {
+        val = v;
+    }
+
+    void set(const IceUtil::Exception& e)
+    {
+        ex.reset(e.ice_clone());
+    }
+
+    MediaOperationPrx get() const
+    {
+        if (ex)
+        {
+            ex->ice_throw();
+        }
+        return val;
+    }
+
+private:
+    boost::shared_ptr<IceUtil::Exception> ex;
+    MediaOperationPrx val;
 };
+typedef boost::shared_ptr<CreateMediaOperationCookie> CreateMediaOperationCookiePtr;
+
+} // anonymous namespace
 
 namespace AsteriskSCF
 {
@@ -60,19 +89,21 @@ namespace AsteriskSCF
 namespace MediaOperationsCore
 {
 
+const int TTL_SECONDS = 180;
+
 class InbandTelephonyEventOperation : public TranslatorOperation
 {
     class InbandTelephonyEventSource : public TelephonyEventSource
     {
     public:
-        InbandTelephonyEventSource(const Logger& logger)
-            : mLogger(logger) { }
+        InbandTelephonyEventSource(const Logger& logger) : mLogger(logger) { }
 
         void addSinks_async(const AMD_TelephonyEventSource_addSinksPtr& cb,
             const AsteriskSCF::System::V1::OperationContextPtr&,
             const TelephonyEventSinkSeq& sinks,
             const Ice::Current&)
         {
+            // naturally idempotent - multiple adds are harmless
             for (TelephonyEventSinkSeq::const_iterator i = sinks.begin(); i != sinks.end(); ++i)
             {
                 if (std::find_if(mSinks.begin(), mSinks.end(), IdentityComparePredicate<TelephonyEventSinkPrx>(*i)) == mSinks.end())
@@ -88,6 +119,7 @@ class InbandTelephonyEventOperation : public TranslatorOperation
             const TelephonyEventSinkSeq& sinks,
             const Ice::Current&)
         {
+            // naturally idempotent - multiple removes are harmless
             for (TelephonyEventSinkSeq::const_iterator i = sinks.begin(); i != sinks.end(); ++i)
             {
                 mSinks.erase(remove_if(mSinks.begin(), mSinks.end(), IdentityComparePredicate<TelephonyEventSinkPrx>(*i)), mSinks.end());
@@ -332,39 +364,59 @@ InbandTelephonyEventOperationFactory::InbandTelephonyEventOperationFactory(
     : MediaOperationFactoryImpl(adapter,
             logger,
             replicationContext,
-            "InbandTelephonyEventFactory")
+            "InbandTelephonyEventFactory"),
+      mOperationContextCache(Operations::OperationContextCache::create(TTL_SECONDS, logger, ""))
 {
     mLocatorParams->category = MediaOperationDiscoveryCategory;
     mLocatorParams->service = InbandTelephonyEventsDiscoveryCategory;
 };
 
 MediaOperationPrx InbandTelephonyEventOperationFactory::createMediaOperation(
-            const AsteriskSCF::System::V1::OperationContextPtr&,
+            const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
             const StreamSourcePrx& source,
             const StreamSinkPrx& sink,
             const Ice::Current&)
 {
-    if (sink == 0 || source == 0)
+    const CreateMediaOperationCookiePtr cookie(new CreateMediaOperationCookie);
+    OperationContextCookiePtr existingCookie;
+    mOperationContextCache->addOperationContext(operationContext, cookie, existingCookie);
+
+    if (existingCookie)
     {
-        throw UnsupportedMediaFormatException();
+        CreateMediaOperationCookiePtr c = boost::dynamic_pointer_cast<CreateMediaOperationCookie>(existingCookie);
+        return c->get();
     }
 
-    FormatSeq sourceFormats = source->getFormats();
-    FormatSeq sinkFormats = sink->getFormats();
+    try
+    {
+        if (sink == 0 || source == 0)
+        {
+            throw UnsupportedMediaFormatException();
+        }
 
-    MediaOperationPrx proxy = tryFormat(sourceFormats, sinkFormats, SignedLinear8Name);
+        FormatSeq sourceFormats = source->getFormats();
+        FormatSeq sinkFormats = sink->getFormats();
 
-    if (!proxy)
-    {
-        proxy = tryFormat(sourceFormats, sinkFormats, SignedLinear16Name);
-    }
+        MediaOperationPrx proxy = tryFormat(sourceFormats, sinkFormats, SignedLinear8Name);
 
-    if (!proxy)
+        if (!proxy)
+        {
+            proxy = tryFormat(sourceFormats, sinkFormats, SignedLinear16Name);
+        }
+
+        if (!proxy)
+        {
+            throw UnsupportedMediaFormatException();
+        }
+
+        cookie->set(proxy);
+        return proxy;
+    }
+    catch (const IceUtil::Exception& e)
     {
-        throw UnsupportedMediaFormatException();
+        cookie->set(e);
+        throw;
     }
-
-    return proxy;
 }
 
 MediaOperationPrx InbandTelephonyEventOperationFactory::tryFormat(const FormatSeq& sourceFormats,
diff --git a/src/InbandTelephonyEvents.h b/src/InbandTelephonyEvents.h
index db1919a..e955424 100644
--- a/src/InbandTelephonyEvents.h
+++ b/src/InbandTelephonyEvents.h
@@ -20,6 +20,7 @@
 
 #include <AsteriskSCF/Media/MediaOperationIf.h>
 #include <AsteriskSCF/Media/MediaIf.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
 
 namespace AsteriskSCF
 {
@@ -50,6 +51,7 @@ public:
      */
     void createOrUpdateMediaOperation(const AsteriskSCF::Replication::MediaOperationsCore::V1::InbandTelephonyEventOperationStateItemPtr& item);
 private:
+    Operations::OperationContextCachePtr mOperationContextCache;
 
     /**
      * Internal method to create an operation using a specific format

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/media_operations_core.git



More information about the asterisk-scf-commits mailing list