[asterisk-scf-commits] asterisk-scf/release/ice-util-cpp.git branch "master" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Tue May 8 17:27:21 CDT 2012
branch "master" has been updated
via 009bfb4fb947845756d7eed4492fe1aa3eb8db65 (commit)
from ce0acf68279442b649f27b4be6ef1ce537f02b54 (commit)
Summary of changes:
.../Discovery/LocatorRegistrationWrapper.h | 25 +-
include/AsteriskSCF/Helpers/Retry.h | 21 +-
include/AsteriskSCF/Operations/ExceptionWrapper.h | 120 ++++
include/AsteriskSCF/Operations/OperationContext.h | 46 ++
.../AsteriskSCF/Operations/OperationContextCache.h | 158 ++++++
include/AsteriskSCF/Operations/OperationMonitor.h | 594 ++++++++++++++++++++
include/AsteriskSCF/Replication/StateReplicator.h | 61 ++-
src/CMakeLists.txt | 1 +
src/Component/Component.cpp | 81 ++-
src/Component/ComponentStateReplicator.cpp | 22 +-
src/Helpers/CMakeLists.txt | 1 +
src/NAT/Candidates.cpp | 2 +-
src/Operations/CMakeLists.txt | 3 +
src/Operations/OperationContext.cpp | 91 +++
src/Operations/OperationContextCache.cpp | 340 +++++++++++
src/Operations/OperationMonitor.cpp | 86 +++
test/CMakeLists.txt | 5 +
test/Component/ComponentTest.cpp | 9 +-
.../LocatorRegistrationTest.cpp | 47 +-
.../OperationContextCacheTest.cpp | 102 ++++
.../OperationContextCacheTest.h} | 13 +-
test/OperationMonitor/OperationMonitorTest.cpp | 162 ++++++
.../OperationMonitorTest.h} | 13 +-
test/Replication/CMakeLists.txt | 1 +
test/Replication/MockStateReplicatorListener.h | 6 +-
test/Replication/TestStateReplicator.cpp | 34 +-
test/Replication/slice/StateReplicatorTestIf.ice | 28 +-
test/ThreadPool/TestThreadPool.cpp | 2 +-
test/UtilityTests.cpp | 38 +-
29 files changed, 1953 insertions(+), 159 deletions(-)
create mode 100644 include/AsteriskSCF/Operations/ExceptionWrapper.h
create mode 100644 include/AsteriskSCF/Operations/OperationContext.h
create mode 100644 include/AsteriskSCF/Operations/OperationContextCache.h
create mode 100644 include/AsteriskSCF/Operations/OperationMonitor.h
create mode 100644 src/Operations/CMakeLists.txt
create mode 100644 src/Operations/OperationContext.cpp
create mode 100644 src/Operations/OperationContextCache.cpp
create mode 100644 src/Operations/OperationMonitor.cpp
create mode 100644 test/OperationContextCache/OperationContextCacheTest.cpp
copy test/{PropertyHelper/PropertyHelperTest.h => OperationContextCache/OperationContextCacheTest.h} (67%)
create mode 100644 test/OperationMonitor/OperationMonitorTest.cpp
copy test/{PropertyHelper/PropertyHelperTest.h => OperationMonitor/OperationMonitorTest.h} (67%)
- Log -----------------------------------------------------------------
commit 009bfb4fb947845756d7eed4492fe1aa3eb8db65
Author: Ken Hunt <ken.hunt at digium.com>
Date: Tue May 8 17:02:19 2012 -0500
Changes for new retry logic.
diff --git a/include/AsteriskSCF/Discovery/LocatorRegistrationWrapper.h b/include/AsteriskSCF/Discovery/LocatorRegistrationWrapper.h
index 0d3b233..3791f3e 100644
--- a/include/AsteriskSCF/Discovery/LocatorRegistrationWrapper.h
+++ b/include/AsteriskSCF/Discovery/LocatorRegistrationWrapper.h
@@ -17,9 +17,13 @@
#include <Ice/Ice.h>
#include <IceUtil/Thread.h>
-#include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
+#include <IceUtil/UUID.h>
#include <string>
+#include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
+#include <AsteriskSCF/Helpers/Retry.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
+
namespace AsteriskSCF
{
namespace Discovery
@@ -57,23 +61,25 @@ public:
}
/**
- * The main registration function. There is *no* exception handling, so the caller must be prepared
+ * The main registration function. The only exceptions handled are those related
+ * to retries (for failover scenarios) so the caller must be prepared
* to handle whatever might be thrown.
**/
bool registerService()
{
AsteriskSCF::Core::Discovery::V1::ServiceLocatorManagementPrx management =
- AsteriskSCF::Core::Discovery::V1::ServiceLocatorManagementPrx::checkedCast(
- mCommunicator->stringToProxy(mProxyString));
+ AsteriskSCF::Core::Discovery::V1::ServiceLocatorManagementPrx::checkedCast(
+ mCommunicator->stringToProxy(mProxyString));
+
if (management)
{
IceUtil::Mutex::Lock lock(mLock);
mServiceManagement =
AsteriskSCF::Core::Discovery::V1::ServiceManagementPrx::uncheckedCast(
- management->addService(mService, mName));
+ management->addService(AsteriskSCF::Operations::createContext(), mService, mName));
if (mServiceManagement)
{
- mServiceManagement->addLocatorParams(mParameters, mComparatorGUID);
+ mServiceManagement->addLocatorParams(AsteriskSCF::Operations::createContext(), mParameters, mComparatorGUID);
return true;
}
}
@@ -95,7 +101,7 @@ public:
return;
}
}
- mServiceManagement->unregister();
+ mServiceManagement->unregister(AsteriskSCF::Operations::createContext());
}
void suspend()
@@ -107,7 +113,7 @@ public:
return;
}
}
- mServiceManagement->suspend();
+ mServiceManagement->suspend(AsteriskSCF::Operations::createContext());
}
void unsuspend()
@@ -119,7 +125,7 @@ public:
return;
}
}
- mServiceManagement->unsuspend();
+ mServiceManagement->unsuspend(AsteriskSCF::Operations::createContext());
}
AsteriskSCF::Core::Discovery::V1::ServiceManagementPrx getServiceManagement()
@@ -133,7 +139,6 @@ public:
}
private:
-
//
// This template doesn't use boost locking simply because it already has a physical dependency
// to Ice runtime, so avoiding adding a second seemed reasonable.
diff --git a/include/AsteriskSCF/Helpers/Retry.h b/include/AsteriskSCF/Helpers/Retry.h
index 3d89bfb..bcaa2f7 100644
--- a/include/AsteriskSCF/Helpers/Retry.h
+++ b/include/AsteriskSCF/Helpers/Retry.h
@@ -55,7 +55,7 @@ public:
*
* @returns true if there are attempts left.
**/
- bool canRetry()
+ bool canRetry() const
{
return mCounter < mMaxRetries;
}
@@ -63,18 +63,29 @@ public:
/**
* Sleep for the configured interval.
*
- * @returns true if there are attempts left.
+ * @returns true if this operation performed a sleep.
+ * Indicates the caller should attempt a retry.
**/
bool retry()
{
+ if (canRetry() == false)
+ {
+ return false;
+ }
+
IceUtil::ThreadControl::sleep(mRetryInterval);
++mCounter;
- return canRetry();
+ return true;
+ }
+
+ size_t getMaxRetries() const
+ {
+ return mMaxRetries;
}
private:
- size_t mMaxRetries;
- IceUtil::Time mRetryInterval;
+ const size_t mMaxRetries;
+ const IceUtil::Time mRetryInterval;
size_t mCounter;
};
diff --git a/include/AsteriskSCF/Operations/ExceptionWrapper.h b/include/AsteriskSCF/Operations/ExceptionWrapper.h
new file mode 100644
index 0000000..76a3a10
--- /dev/null
+++ b/include/AsteriskSCF/Operations/ExceptionWrapper.h
@@ -0,0 +1,120 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2012, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#pragma once
+
+#include <Ice/Exception.h>
+#include <Ice/LocalException.h>
+#include <exception>
+#include <string>
+#include <boost/shared_ptr.hpp>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
+
+namespace AsteriskSCF
+{
+
+namespace Operations
+{
+
+typedef boost::shared_ptr<IceUtil::Exception> ExceptionPtr;
+class ExceptionWrapper;
+typedef boost::shared_ptr<ExceptionWrapper> ExceptionWrapperPtr;
+
+/**
+ * ExceptionWrapper is OperationContextCookie derived class for
+ * storing exceptional results for use later on.
+ */
+class ExceptionWrapper
+{
+
+public:
+ /**
+ * Several conversion constructors are provided for
+ * convenience. While conversion constructors can be problematic,
+ * the nature of this class is "to wrap"... so there!
+ */
+ explicit
+ ExceptionWrapper(const IceUtil::Exception& ex) :
+ mException(ex.ice_clone())
+ {
+ }
+
+ /**
+ * Unfortunately std::exception does not provide a way to clone
+ * the most-derived type at runtime. As the ultimate goal is to
+ * send this exception to an Ice client, we need only do as well
+ * as the Ice runtime can -- create an
+ * Ice::UnknownException(). The __FILE__ and __LINE__ will not be
+ * related to the original exception, but does have value in that
+ * it shows where the std::exception->Ice exception translation
+ * occurs.
+ */
+ explicit
+ ExceptionWrapper(const std::exception& ex)
+ {
+ // properly handle the case where ex is actually an IceUtil::Exception
+ const IceUtil::Exception* iceEx = dynamic_cast<const IceUtil::Exception*>(&ex);
+
+ if (iceEx)
+ {
+ mException.reset(iceEx->ice_clone());
+ }
+ else
+ {
+ mException.reset(new Ice::UnknownException(__FILE__, __LINE__, ex.what()));
+ }
+ }
+
+ /**
+ * Intended for those catch (...) cases. Something can be passed back,
+ * but like std::exceptions, Ice will end up sending an
+ * UnknownException.
+ */
+ explicit
+ ExceptionWrapper(const std::string& msg) :
+ mException(new Ice::UnknownException(__FILE__, __LINE__, msg))
+ {
+ }
+
+ /**
+ * Accessor to get the wrapped exception.
+ */
+ ExceptionPtr exception() const
+ {
+ return mException;
+ }
+
+ static ExceptionWrapperPtr create(const Ice::Exception& x)
+ {
+ return ExceptionWrapperPtr(new ExceptionWrapper(x));
+ }
+
+ static ExceptionWrapperPtr create(const std::exception& x)
+ {
+ return ExceptionWrapperPtr(new ExceptionWrapper(x));
+ }
+
+ static ExceptionWrapperPtr create(const std::string& msg)
+ {
+ return ExceptionWrapperPtr(new ExceptionWrapper(msg));
+ }
+
+private:
+ ExceptionPtr mException;
+};
+
+} /* end of namespace Operations */
+} /* end of namespace AsteriskSCF */
diff --git a/include/AsteriskSCF/Operations/OperationContext.h b/include/AsteriskSCF/Operations/OperationContext.h
new file mode 100644
index 0000000..fef2d01
--- /dev/null
+++ b/include/AsteriskSCF/Operations/OperationContext.h
@@ -0,0 +1,46 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#pragma once
+
+#include <IceUtil/UUID.h>
+#include <AsteriskSCF/System/OperationsIf.h>
+
+namespace AsteriskSCF
+{
+namespace Operations
+{
+
+/**
+ * Create a new OperationContext with a new transaction id.
+ */
+ASTSCF_DLL_EXPORT AsteriskSCF::System::V1::OperationContextPtr createContext();
+
+/**
+ * Create a new OperationContext that has the same transaction id as the input argument.
+ * @param context The source OperationContext that contains the transaction id to use.
+ */
+ASTSCF_DLL_EXPORT AsteriskSCF::System::V1::OperationContextPtr createContext(const AsteriskSCF::System::V1::OperationContextPtr& sourceContext);
+
+/**
+ * Create a new OperationContext using a name calculated from the sourceContext,
+ * and having the same transactionId as the source context.
+ */
+ASTSCF_DLL_EXPORT AsteriskSCF::System::V1::OperationContextPtr calculateOperationContext(
+ const AsteriskSCF::System::V1::OperationContextPtr& sourceContext,
+ const std::string& modifier);
+
+} // End namespace Operations
+} // End namespace AsteriskSCF
diff --git a/include/AsteriskSCF/Operations/OperationContextCache.h b/include/AsteriskSCF/Operations/OperationContextCache.h
new file mode 100644
index 0000000..5842caf
--- /dev/null
+++ b/include/AsteriskSCF/Operations/OperationContextCache.h
@@ -0,0 +1,158 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2012, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#pragma once
+
+#include <boost/thread.hpp>
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/shared_ptr.hpp>
+#include <IceUtil/Timer.h>
+
+#include <AsteriskSCF/System/OperationsIf.h>
+#include <AsteriskSCF/Logger.h>
+
+namespace AsteriskSCF
+{
+namespace Operations
+{
+
+class OperationContextCacheEntry;
+typedef ASTSCF_DLL_EXPORT boost::shared_ptr<OperationContextCacheEntry> OperationContextCacheEntryPtr;
+
+class OperationContextPruner;
+typedef IceUtil::Handle<OperationContextPruner> OperationContextPrunerPtr;
+
+class ASTSCF_DLL_EXPORT OperationContextCookie
+{
+public:
+ virtual ~OperationContextCookie() {}
+};
+typedef ASTSCF_DLL_EXPORT boost::shared_ptr<OperationContextCookie> OperationContextCookiePtr;
+
+class OperationContextCache;
+typedef boost::shared_ptr<OperationContextCache> OperationContextCachePtr;
+
+/**
+ * We haven't decided on a good TTL for the cache yet, so we might as well pick something and be consistent.
+ */
+const int DEFAULT_TTL_SECONDS = 180;
+
+/**
+ * Utiltity class that provides a queryable cache of OperationContext objects.
+ */
+class ASTSCF_DLL_EXPORT OperationContextCache : public IceUtil::Shared
+{
+public:
+ /**
+ * Factory method for non-logging cache.
+ * @param ttlSeconds The time-to-live for the OperationContexts being cached.
+ * Entries will remain in the cache for at least the provided value, but can
+ * remain in cache longer.
+ */
+ static OperationContextCachePtr create(int ttlSeconds);
+
+ /**
+ * Factory method for logging cache.
+ * @param ttlSeconds The time-to-live for the OperationContexts being cached.
+ * Entries will remain in the cache for at least the provided value, but can
+ * remain in cache longer.
+ * @param logger The logger to log to.
+ * @param label Label to apply when logging to identify this cache.
+ */
+ static OperationContextCachePtr create(int ttlSeconds,
+ const AsteriskSCF::System::Logging::Logger& logger,
+ const std::string& label);
+
+ ~OperationContextCache();
+
+ /**
+ * Caches the specified context if it isn't already in the cache.
+ *
+ * @param operationContext The context to add to the cache.
+ * @return true The context was added, which means it wasn't already in the cache.
+ *
+ * @note Make sure you don't confuse the return value of this operation with the return
+ * value of the 'contains' operation.
+ */
+ bool addOperationContext(const AsteriskSCF::System::V1::OperationContextPtr& operationContext);
+
+ /**
+ * Caches the specified context if it isn't already in the cache, and associate a cookie with it.
+ *
+ * @param operationContext The context to add to the cache.
+ * @param inCookie A cookie object to associate with this entry in the cache.
+ * @param existingCookie This value will be set by this method to the cookie of an existing
+ * operationContext if there was already an entry in the cache with the same identity.
+ * @return true The context was added, which means it wasn't already in the cache.
+ *
+ * @note Make sure you don't confuse the return value of this operation with the return
+ * value of the 'contains' operation.
+ */
+ bool addOperationContext(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const OperationContextCookiePtr& inCookie,
+ OperationContextCookiePtr& existingCookie);
+
+ /**
+ * Tests if the specified context is in the cache.
+ */
+ bool contains(const AsteriskSCF::System::V1::OperationContextPtr& operationContext);
+
+ /**
+ * This will remove an OperationContext from the cache if one exists with the given id.
+ * Removal is typically done automatically within the cache based on an internal timer.
+ * This operation exists to support clients that wish to force an immediate removal of a
+ * context themselves.
+ */
+ void removeOperationContext(const AsteriskSCF::System::V1::OperationContextPtr& operationContext);
+
+ /**
+ * Drop entries that are older than the TTL.
+ * @note This method is called by an internal timer task, so clients
+ * of this class don't need to call it. (There's no harm if a client does call it.)
+ */
+ void prune();
+
+ /**
+ * Retrieve the number of entries currently in the cache.
+ */
+ std::size_t size();
+
+private:
+
+ //
+ // TODO: why not make these protected and permit derivation?
+ //
+ OperationContextCache(int ttlSeconds);
+
+ OperationContextCache(int ttlSeconds,
+ const AsteriskSCF::System::Logging::Logger& logger,
+ const std::string& label);
+
+ OperationContextCacheEntryPtr get(const AsteriskSCF::System::V1::OperationContextPtr& operationContext);
+ void logStaleList(std::vector<std::string>& staleList);
+ void setPruner(const OperationContextPrunerPtr& pruner);
+
+ AsteriskSCF::System::Logging::Logger mLogger;
+ bool mLoggingEnabled;
+ std::string mLoggingLabel;
+ boost::shared_mutex mLock;
+ OperationContextPrunerPtr mPruner;
+ IceUtil::Time mTTL;
+ std::map<std::string, OperationContextCacheEntryPtr> mCache;
+};
+
+} // Operations
+} // AsteriskSCF
diff --git a/include/AsteriskSCF/Operations/OperationMonitor.h b/include/AsteriskSCF/Operations/OperationMonitor.h
new file mode 100644
index 0000000..968f910
--- /dev/null
+++ b/include/AsteriskSCF/Operations/OperationMonitor.h
@@ -0,0 +1,594 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2012, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#pragma once
+
+#include <Ice/Exception.h>
+#include <exception>
+#include <string>
+#include <boost/shared_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
+#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
+#include <AsteriskSCF/Operations/ExceptionWrapper.h>
+
+namespace AsteriskSCF
+{
+namespace Operations
+{
+
+/**
+ * The ContextMonitor provides a mechanism for waiting for a result when multiple
+ * upcalls occur for the same OperationContext. Of course this is only relevant for
+ * non-AMD implementations.
+ */
+class ASTSCF_DLL_EXPORT ContextMonitor : public IceUtil::Shared
+{
+public:
+ ContextMonitor() :
+ mCompleted(false),
+ mCancelled(false)
+ {
+ }
+
+ bool isCompleted()
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
+ return mCompleted;
+ }
+
+ bool waitForResults()
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
+ while (!mCompleted && !mCancelled)
+ {
+ mMonitor.wait();
+ }
+ return mCompleted;
+ }
+
+ void setCompleted()
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
+ mCompleted = true;
+ mMonitor.notify();
+ }
+
+ /**
+ * Revisit: this was added before there was a specific use for it. If it is not referenced,
+ * it probably should be removed (along with the related member variable of course).
+ */
+ void cancel()
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
+ mCancelled = true;
+ mMonitor.notify();
+ }
+
+private:
+ bool mCompleted;
+ bool mCancelled;
+ IceUtil::Monitor<IceUtil::Mutex> mMonitor;
+};
+typedef ASTSCF_DLL_EXPORT IceUtil::Handle<ContextMonitor> ContextMonitorPtr;
+
+class ASTSCF_DLL_EXPORT ContextData : virtual public AsteriskSCF::Operations::OperationContextCookie,
+ virtual public boost::enable_shared_from_this<ContextData>
+{
+public:
+ ContextData() :
+ mMonitor(new ContextMonitor)
+ {
+ }
+
+ /**
+ * Get a results monitor object for this context's data.
+ */
+ ContextMonitorPtr getMonitor();
+
+ /**
+ * If the operations performed for this operation context result
+ * in an exception that should be reported back to the client,
+ * then this should be non-null and contain a wrapper to the
+ * exception to be passed back.
+ */
+ ExceptionWrapperPtr getException();
+
+ void setException(const ExceptionWrapperPtr& exception);
+
+ void setException(const std::exception& exception)
+ {
+ setException(ExceptionWrapper::create(exception));
+ }
+
+ void setException()
+ {
+ // All exceptions _should_ derive from std::exception, so we really
+ // should never get here. but not everyone does what they should,
+ // do they.
+ setException(ExceptionWrapper::create("Unknown unexpected exception"));
+ }
+
+ /**
+ * A quick, single point accessor to determine the completion
+ * statius of the related operation. The alternative would be to
+ * get the monitor and ask it.
+ */
+ bool isCompleted();
+
+ virtual void setCompleted()
+ {
+ mMonitor->setCompleted();
+ onSetCompleted();
+ }
+
+ /**
+ * Shorthand for getMonitor()->waitForResults(). TODO: see how much
+ * effort it would take to hide the monitor altogether.
+ */
+ bool waitForResults()
+ {
+ return mMonitor->waitForResults();
+ }
+
+protected:
+ IceUtil::Mutex mLock;
+
+ ContextMonitorPtr mMonitor;
+ ExceptionWrapperPtr mExceptionResult;
+
+ virtual void onSetException()
+ {
+ }
+
+ virtual void onSetCompleted()
+ {
+ }
+};
+typedef ASTSCF_DLL_EXPORT boost::shared_ptr<ContextData> ContextDataPtr;
+
+class ASTSCF_DLL_EXPORT CancelledOperationException : public std::exception {
+public:
+ const char* what() const throw() { return "Cancelled operation"; }
+};
+
+/**
+ * Template derived class of ContextData.
+ * TODO: comment on rationale. The general idea is that the casting
+ * can occur on the context result data, not on what is inside it.
+ * Derived classes are also a bit more interesting because they will
+ * be specific to a certain result type.. but this whole notion needs
+ * to be worded better. Hence the TODO.
+ */
+
+template <typename RT>
+class ASTSCF_DLL_EXPORT ContextResultData : virtual public ContextData
+{
+public:
+ void setResult(const RT& val);
+
+ /**
+ * Can't get much simpler for synchronous calls than this:
+ * - blocks until results are computed
+ * - throws an exception if such a thing occurred.
+ * - otherwise returns result.
+ */
+ RT getResult()
+ {
+ if (!mMonitor->waitForResults())
+ {
+ //
+ // TODO: Unsure what the best course of action is here.. this
+ // implies that the operation was cancelled and results should
+ // not be expected.
+ //
+ throw CancelledOperationException();
+ }
+
+ if (mExceptionResult)
+ {
+ mExceptionResult->exception()->ice_throw();
+ }
+ return mResult;
+ }
+
+ // A smart pointer for this type.
+ typedef boost::shared_ptr<ContextResultData<RT> > ptr_type;
+
+protected:
+ virtual void onSetResult() {}
+ RT mResult;
+};
+
+template <typename RT>
+ASTSCF_DLL_EXPORT void ContextResultData<RT>::setResult(const RT& val)
+{
+ if (isCompleted())
+ {
+ return;
+ }
+
+ IceUtil::LockT<IceUtil::Mutex> lock(mLock);
+ mResult = val;
+ setCompleted();
+ onSetResult();
+}
+
+/**
+ * Not every AMD type operation has a result value.
+ */
+template <class CB>
+class ASTSCF_DLL_EXPORT AMDContextData : virtual public ContextData
+{
+public:
+ template<class T >
+ class AMDProxy : virtual public T::element_type, virtual public IceUtil::Shared
+ {
+ public:
+ void ice_response()
+ {
+ if (mParent)
+ {
+ mParent->setCompleted();
+ //
+ // The reset breaks the mutual reference count.
+ //
+ mParent.reset();
+ }
+ }
+
+ void ice_exception(const std::exception& ex)
+ {
+ if (mParent)
+ {
+ const Ice::Exception* ix = dynamic_cast<const Ice::Exception*>(&ex);
+ if (ix)
+ {
+ mParent->setException(ExceptionWrapper::create(*ix));
+ return;
+ }
+ mParent->setException(ExceptionWrapper::create(ex));
+ //
+ // The reset breaks the mutual reference count.
+ //
+ mParent.reset();
+ }
+ }
+
+ void ice_exception()
+ {
+ if (mParent)
+ {
+ mParent->setException(ExceptionWrapper::create("Unexpected unknown exception"));
+ //
+ // The reset breaks the mutual reference count.
+ //
+ mParent.reset();
+ }
+ }
+
+ AMDProxy(const boost::shared_ptr<ContextData>& d) :
+ mParent(d)
+ {
+ }
+
+ private:
+ boost::shared_ptr<ContextData> mParent;
+ };
+
+ AMDContextData() {}
+ AMDContextData(const CB& cb)
+ {
+ mCallbacks.push_back(cb);
+ }
+
+ void addCB(const CB& cbPtr);
+
+ CB getProxy()
+ {
+ {
+ //
+ // We need to do this lazy initialization because we need to setup the relationship
+ // between the AMD proxy and this object instance *after* this object instance
+ // has been fully constructed. Otherwise we can get into some nasty stuff.
+ //
+ IceUtil::LockT<IceUtil::Mutex> lock(mLock);
+ if (!mAMDProxy)
+ {
+ mAMDProxy = new AMDProxy<CB>(shared_from_this());
+ }
+ }
+ return mAMDProxy;
+ }
+
+ // A smart pointer for this type.
+ typedef boost::shared_ptr<AMDContextData<CB> > ptr_type;
+
+ typedef CB callback_type;
+
+private:
+ void onSetException();
+ void onSetCompleted();
+ CB mAMDProxy;
+ std::vector<CB> mCallbacks;
+};
+
+template <class CB>
+ASTSCF_DLL_EXPORT void AMDContextData<CB>::addCB(const CB& cbPtr)
+{
+ IceUtil::LockT<IceUtil::Mutex> lock(mLock);
+ if (isCompleted())
+ {
+ if (mExceptionResult)
+ {
+ cbPtr->ice_exception(*(mExceptionResult->exception()));
+ }
+ else
+ {
+ cbPtr->ice_response();
+ }
+ return;
+ }
+ mCallbacks.push_back(cbPtr);
+}
+
+template <class CB>
+ASTSCF_DLL_EXPORT void AMDContextData<CB>::onSetException()
+{
+ for (typename std::vector<CB>::const_iterator iter= mCallbacks.begin();
+ iter != mCallbacks.end(); ++iter)
+ {
+ (*iter)->ice_exception(*(mExceptionResult->exception()));
+ }
+}
+
+template <class CB>
+ASTSCF_DLL_EXPORT void AMDContextData<CB>::onSetCompleted()
+{
+ for (typename std::vector<CB>::const_iterator iter= mCallbacks.begin();
+ iter != mCallbacks.end(); ++iter)
+ {
+ (*iter)->ice_response();
+ }
+}
+
+/**
+ *
+ * AMDContextResultData has the added feature that it can be passed as
+ * an AMD callback object for CB's element type. The standard AMD
+ * callback methods are overridden and behave in the same manner as
+ * the synchronous result's setResult/setException methods. The AMD
+ * ContextResultData also stores all AMD callback objects that are
+ * part of asynchronous upcalls for the same operation context.
+ *
+ **/
+template <typename RT, class CB>
+class ASTSCF_DLL_EXPORT AMDContextResultData : virtual public ContextResultData<RT>, virtual public ContextData
+{
+public:
+ template< typename RTi, class CBi >
+ class ASTSCF_DLL_EXPORT AMDProxy : virtual public CBi::element_type, virtual public IceUtil::Shared
+ {
+ public:
+ void ice_response(const RTi& result)
+ {
+ if (mParent)
+ {
+ mParent->setResult(result);
+ //
+ // The reset breaks the mutual reference count.
+ //
+ mParent.reset();
+ }
+ }
+
+ void ice_exception(const std::exception& ex)
+ {
+ if (mParent)
+ {
+ const Ice::Exception* ix = dynamic_cast<const Ice::Exception*>(&ex);
+ if (ix)
+ {
+ mParent->setException(ExceptionWrapper::create(*ix));
+ return;
+ }
+ mParent->setException(ExceptionWrapper::create(ex));
+ //
+ // The reset breaks the mutual reference count.
+ //
+ mParent.reset();
+ }
+ }
+
+ void ice_exception()
+ {
+ if (mParent)
+ {
+ mParent->setException(ExceptionWrapper::create("Unexpected unknown exception"));
+ //
+ // The reset breaks the mutual reference count.
+ //
+ mParent.reset();
+ }
+ }
+
+ AMDProxy(const boost::shared_ptr<AMDContextResultData<RTi, CBi> >& d) :
+ mParent(d)
+ {
+ }
+
+ private:
+ boost::shared_ptr<AMDContextResultData<RTi, CBi> > mParent;
+ };
+
+ AMDContextResultData()
+ {
+ }
+
+ AMDContextResultData(const CB& cb)
+ {
+ mCallbacks.push_back(cb);
+ }
+
+ CB getProxy()
+ {
+ {
+ //
+ // We need to do this lazy initialization because we need to setup the relationship
+ // between the AMD proxy and this object instance *after* this object instance
+ // has been fully constructed. Otherwise we can get into some nasty stuff.
+ //
+ IceUtil::Mutex::Lock lock(mLock);
+ if (!mAMDProxy)
+ {
+ mAMDProxy = new AMDProxy<RT, CB>(
+ boost::dynamic_pointer_cast<AMDContextResultData<RT, CB> >(shared_from_this()));
+ }
+ }
+ return mAMDProxy;
+ }
+
+ bool addCB(const CB& cbPtr)
+ {
+ IceUtil::LockT<IceUtil::Mutex> lock(mLock);
+ if (ContextData::isCompleted())
+ {
+ if (mExceptionResult)
+ {
+ cbPtr->ice_exception(*(mExceptionResult->exception()));
+ }
+ else
+ {
+ cbPtr->ice_response(ContextResultData<RT>::mResult);
+ }
+ return true;
+ }
+ mCallbacks.push_back(cbPtr);
+ return false;
+ }
+
+ size_t callbackCount()
+ {
+ return mCallbacks.size();
+ }
+
+ // A smart pointer for this type.
+ typedef boost::shared_ptr<AMDContextResultData<RT,CB> > ptr_type;
+
+ typedef CB callback_type;
+
+private:
+ void onSetResult()
+ {
+ for (typename std::vector<CB>::const_iterator iter = mCallbacks.begin();
+ iter != mCallbacks.end(); ++iter)
+ {
+ (*iter)->ice_response(ContextResultData<RT>::mResult);
+ }
+ }
+
+ void onSetException()
+ {
+ for (typename std::vector<CB>::const_iterator iter= mCallbacks.begin();
+ iter != mCallbacks.end(); ++iter)
+ {
+ (*iter)->ice_exception(*(mExceptionResult->exception()));
+ }
+ }
+ CB mAMDProxy;
+ std::vector<CB> mCallbacks;
+};
+
+/**
+ * Simple file scope helper for the "add" methods. The signatures for these
+ * methods do not include any kind of results, so we only need to consider
+ * exceptions and one method will do for all of that. This function is a
+ * kind of do-it-all (or most of it at least). It returns a new ContextData
+ * object if this is the first upcall for the provided context, otherwise
+ * it returns a nil reference. But before it returns a nil reference, it
+ * waits for the first caller to set the completion status either by
+ * setting it to complete or setting an exception. If an exception is set,
+ * that exception is thrown instead of returning the nil reference.
+ *
+ * TODO: the existing function name is poor to the extreme so some thought
+ * needs to go into picking a new one. Also, while I wrote this, I really
+ * dislike the "candy machine" kind of interface. I would prefer that we
+ * find some way to treat the cache atomically from the caller as it may
+ * lead the way for some clearer idioms.
+ */
+ASTSCF_DLL_EXPORT ContextDataPtr checkAndThrow(const AsteriskSCF::Operations::OperationContextCachePtr& cache,
+ const AsteriskSCF::System::V1::OperationContextPtr& context);
+
+/**
+ * Gets a OperationContextCookie subclass for a given context, creating it if necessary. This is useful for
+ * non-AMD invocations that return a value.
+ *
+ * @param DT a boost::shared_ptr to the OperationContextCookie subclass
+ * @param cache Cache to get cookie from
+ * @param context Context to look up in cache
+ * @return A pair indicating true if it was a cache hit, and the cookie that's now in the cache for the given
+ * context
+ */
+template <class DT>
+ASTSCF_DLL_EXPORT std::pair<bool, DT> getContextSync(const AsteriskSCF::Operations::OperationContextCachePtr& cache,
+ const AsteriskSCF::System::V1::OperationContextPtr& context)
+{
+ bool cacheHit = false;
+ DT c(new typename DT::element_type);
+ AsteriskSCF::Operations::OperationContextCookiePtr o;
+
+ if (!cache->addOperationContext(context, c, o))
+ {
+ c = boost::dynamic_pointer_cast<typename DT::element_type>(o);
+ assert(c);
+ cacheHit = true;
+ }
+ return std::make_pair(cacheHit, c);
+}
+
+/**
+ * Gets a OperationContextCookie subclass for a given context, creating it if necessary.
+ *
+ * If the context is already in the cache, the amdCallback is added to the existing cookie and null is
+ * returned; nothing else need be done.
+ *
+ * If the context is not in the cache, a new cookie is created, the amdCallback is added to it, and the
+ * new cookie is returned. The caller then needs to complete normal execution, and either setException() or
+ * setResult() on the cookie when finished.
+ *
+ * NOTE: This version assumes the cookie type T is either an instance of AMDContextResultData or AMDContextData.
+ * It uses the exposed callback_type and ptr_type. It needs a test.
+ *
+ * @param T AMDContextResultData or AMDContextData instance
+ */
+template <class T>
+ASTSCF_DLL_EXPORT typename T::ptr_type getContext(const AsteriskSCF::Operations::OperationContextCachePtr& cache,
+ const AsteriskSCF::System::V1::OperationContextPtr& context,
+ const typename T::callback_type& amdCallback)
+{
+ std::pair<bool, typename T::ptr_type> cacheHit = getContextSync<typename T::ptr_type>(cache, context);
+
+ cacheHit.second->addCB(amdCallback);
+
+ if (cacheHit.first)
+ {
+ return typename T::ptr_type();
+ }
+ return cacheHit.second;
+}
+
+} /* End of namespace Operations */
+} /* End of namespace AsteriskSCF */
diff --git a/include/AsteriskSCF/Replication/StateReplicator.h b/include/AsteriskSCF/Replication/StateReplicator.h
index e75e4ea..d9a4424 100644
--- a/include/AsteriskSCF/Replication/StateReplicator.h
+++ b/include/AsteriskSCF/Replication/StateReplicator.h
@@ -19,6 +19,8 @@
#include <Ice/Current.h>
#include <boost/thread/thread.hpp>
#include <boost/thread/shared_mutex.hpp>
+#include <AsteriskSCF/System/OperationsIf.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
namespace AsteriskSCF
{
@@ -68,13 +70,20 @@ public:
{
// Types: T - Listener type, U - State Item seq.
public:
- SetStateNotice(const U& stateSeq) : mStateSeq(stateSeq) {}
+ SetStateNotice(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const U& stateSeq)
+ : mOperationContext(operationContext),
+ mStateSeq(stateSeq)
+ {
+ }
+
~SetStateNotice() {}
void operator() (const T& x)
{
try
{
- x->stateSet(mStateSeq);
+ x->stateSet(mOperationContext, mStateSeq);
}
catch(...)
{
@@ -84,6 +93,7 @@ public:
// should at the very least log these exceptions.
}
}
+ AsteriskSCF::System::V1::OperationContextPtr mOperationContext;
U mStateSeq;
};
@@ -94,13 +104,20 @@ public:
{
// Types: T - Listener type, V - Key Item seq.
public:
- RemoveStateByKeyNotice(const V& keys) : mKeys(keys) {}
+ RemoveStateByKeyNotice(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const V& keys)
+ : mOperationContext(operationContext),
+ mKeys(keys)
+ {
+ }
+
~RemoveStateByKeyNotice() {}
void operator() (const T& x)
{
try
{
- x->stateRemoved(mKeys);
+ x->stateRemoved(mOperationContext, mKeys);
}
catch(...)
{
@@ -110,6 +127,7 @@ public:
// should at the very least log these exceptions.
}
}
+ AsteriskSCF::System::V1::OperationContextPtr mOperationContext;
V mKeys;
};
@@ -120,13 +138,20 @@ public:
{
// Types: T - Listener type, U - State Item seq.
public:
- RemoveStateNotice(const U& stateSeq) : mStateSeq(stateSeq) {}
+ RemoveStateNotice(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const U& stateSeq)
+ : mOperationContext(operationContext),
+ mStateSeq(stateSeq)
+ {
+ }
+
~RemoveStateNotice() {}
void operator() (T x)
{
try
{
- x->stateRemovedForItems(mStateSeq);
+ x->stateRemovedForItems(mOperationContext, mStateSeq);
}
catch(...)
{
@@ -136,6 +161,7 @@ public:
// should at the very least log these exceptions.
}
}
+ AsteriskSCF::System::V1::OperationContextPtr mOperationContext;
U mStateSeq;
};
@@ -168,7 +194,8 @@ public:
* Adds a listener of state update notices.
* @Override
*/
- void addListener(const L& listener, const Ice::Current& = ::Ice::Current())
+ void addListener(const AsteriskSCF::System::V1::OperationContextPtr&,
+ const L& listener, const Ice::Current& = ::Ice::Current())
{
{ // critical scope
boost::unique_lock<boost::shared_mutex> lock(mListenerLock);
@@ -177,14 +204,15 @@ public:
// Give this new listener the current state.
boost::shared_lock<boost::shared_mutex> lock(mStateLock);
- listener->stateSet(mStateItems);
+ listener->stateSet(AsteriskSCF::Operations::createContext(), mStateItems);
}
/**
* Removes a listener of state update notices.
* @Override
*/
- void removeListener(const L& listener, const Ice::Current& = ::Ice::Current())
+ void removeListener(const AsteriskSCF::System::V1::OperationContextPtr&,
+ const L& listener, const Ice::Current& = ::Ice::Current())
{
boost::unique_lock<boost::shared_mutex> lock(mListenerLock);
typename std::vector<L>::iterator it = std::find_if(mListeners.begin(), mListeners.end(), IdentifyListener<L>(listener));
@@ -209,6 +237,7 @@ public:
*/
void clearState()
{
+ AsteriskSCF::System::V1::OperationContextPtr op = AsteriskSCF::Operations::createContext();
std::vector<S> items;
{ // critical scope
@@ -218,14 +247,14 @@ public:
}
boost::shared_lock<boost::shared_mutex> lock(mListenerLock);
- for_each(mListeners.begin(), mListeners.end(), RemoveStateNotice<L,std::vector<S> >(items));
+ for_each(mListeners.begin(), mListeners.end(), RemoveStateNotice<L,std::vector<S> >(op, items));
}
/**
* Add or update the specified state variables, and notify listeners.
* @Override
*/
- void setState(const std::vector<S>& items, const Ice::Current& = ::Ice::Current())
+ void setState(const AsteriskSCF::System::V1::OperationContextPtr& operationContext, const std::vector<S>& items, const Ice::Current& = ::Ice::Current())
{
{ // critical scope
boost::unique_lock<boost::shared_mutex> lock(mStateLock);
@@ -247,14 +276,14 @@ public:
}
boost::shared_lock<boost::shared_mutex> lock(mListenerLock);
- for_each( mListeners.begin(), mListeners.end(), SetStateNotice<L,std::vector<S> >(items) );
+ for_each( mListeners.begin(), mListeners.end(), SetStateNotice<L,std::vector<S> >(operationContext, items) );
}
/**
* Remove specified state variables identified by the specified keys, and notify listeners.
* @Override
*/
- void removeState(const std::vector<K>& ids, const Ice::Current& = ::Ice::Current())
+ void removeState(const AsteriskSCF::System::V1::OperationContextPtr& operationContext, const std::vector<K>& ids, const Ice::Current& = ::Ice::Current())
{
{ // critical scope
boost::unique_lock<boost::shared_mutex> lock(mStateLock);
@@ -271,7 +300,7 @@ public:
}
boost::shared_lock<boost::shared_mutex> lock(mListenerLock);
- for_each(mListeners.begin(), mListeners.end(), RemoveStateByKeyNotice<L,std::vector<K> >(ids));
+ for_each(mListeners.begin(), mListeners.end(), RemoveStateByKeyNotice<L,std::vector<K> >(operationContext, ids));
}
/**
@@ -279,7 +308,7 @@ public:
* The item's key is used to identify the item to remove. Any other field is ignored.
* @Override
*/
- void removeStateForItems(const std::vector<S>& items, const Ice::Current& = ::Ice::Current())
+ void removeStateForItems(const AsteriskSCF::System::V1::OperationContextPtr& operationContext, const std::vector<S>& items, const Ice::Current& = ::Ice::Current())
{
{ // critical scope
boost::unique_lock<boost::shared_mutex> lock(mStateLock);
@@ -296,7 +325,7 @@ public:
}
boost::shared_lock<boost::shared_mutex> lock(mListenerLock);
- for_each(mListeners.begin(), mListeners.end(), RemoveStateNotice<L,std::vector<S> >(items));
+ for_each(mListeners.begin(), mListeners.end(), RemoveStateNotice<L,std::vector<S> >(operationContext, items));
}
/**
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index c8aa4c7..9b8897c 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -3,6 +3,7 @@ file(GLOB_RECURSE project_headers ../include/*.h)
astscf_component_add_files(ASTSCFIceUtilCpp ${project_headers})
add_subdirectory(CollocatedIceStorm)
add_subdirectory(Component)
+add_subdirectory(Operations)
add_subdirectory(Replication)
add_subdirectory(WorkQueue)
add_subdirectory(ThreadPool)
diff --git a/src/Component/Component.cpp b/src/Component/Component.cpp
index 5511ca0..1fa087e 100644
--- a/src/Component/Component.cpp
+++ b/src/Component/Component.cpp
@@ -28,6 +28,7 @@
#include <AsteriskSCF/System/Component/ComponentServiceIf.h>
#include <AsteriskSCF/System/Component/ReplicaIf.h>
#include <AsteriskSCF/Discovery/LocatorRegistrationWrapper.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
using namespace std;
using namespace AsteriskSCF::System::Component::V1;
@@ -35,6 +36,7 @@ using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::Core::Discovery::V1;
using namespace AsteriskSCF::Discovery;
using namespace AsteriskSCF::Replication;
+using namespace AsteriskSCF::System::V1;
namespace AsteriskSCF
{
@@ -59,17 +61,17 @@ public:
}
public: // Overrides of the ComponentService interface.
- void suspend(const ::Ice::Current&)
+ void suspend(const OperationContextPtr&, const ::Ice::Current&)
{
mComponent.suspended();
}
- void resume(const ::Ice::Current&)
+ void resume(const OperationContextPtr&, const ::Ice::Current&)
{
mComponent.resumed();
}
- void shutdown(const ::Ice::Current&)
+ void shutdown(const OperationContextPtr&, const ::Ice::Current&)
{
// This should probably get the icebox admin interface and call
// that shutdown() operartion.
@@ -95,24 +97,28 @@ public:
{
}
- void setTestMode(const std::string& mode, const ::Ice::Current&)
+ void setTestMode(const OperationContextPtr&,
+ const std::string& mode, const ::Ice::Current&)
{
mComponent.setTestMode(mode);
}
- void setTestModeWithArgs(const std::string& mode,
+ void setTestModeWithArgs(const OperationContextPtr&,
+ const std::string& mode,
const ComponentTestParamSeq& args,
const ::Ice::Current&)
{
mComponent.setTestModeWithArgs(mode, args);
}
- void clearTestMode(const string& mode, const ::Ice::Current&)
+ void clearTestMode(const OperationContextPtr&,
+ const string& mode, const ::Ice::Current&)
{
mComponent.clearTestMode(mode);
}
- void clearTestModes(const ::Ice::Current&)
+ void clearTestModes(const OperationContextPtr&,
+ const ::Ice::Current&)
{
mComponent.clearTestModes();
}
@@ -156,8 +162,14 @@ public:
mProxyInitialized = true;
}
- bool activate(const ::Ice::Current&)
+ bool activate(const OperationContextPtr& operationContext,
+ const ::Ice::Current&)
{
+ if (mComponent.isActive())
+ {
+ return true;
+ }
+
mComponent.activated();
if (!mProxyInitialized)
@@ -168,13 +180,14 @@ public:
for (vector<ReplicaListenerPrx>::const_iterator listener = mListeners.begin();
listener != mListeners.end(); ++listener)
{
- (*listener)->begin_activated(mReplicaPrx);
+ (*listener)->begin_activated(operationContext, mReplicaPrx);
}
return true;
}
- void standby(const ::Ice::Current&)
+ void standby(const OperationContextPtr& operationContext,
+ const ::Ice::Current&)
{
mComponent.standby();
@@ -186,19 +199,33 @@ public:
for (vector<ReplicaListenerPrx>::const_iterator listener = mListeners.begin();
listener != mListeners.end(); ++listener)
{
- (*listener)->begin_onStandby(mReplicaPrx);
+ (*listener)->begin_onStandby(operationContext, mReplicaPrx);
}
}
- void addListener(const AsteriskSCF::System::Component::V1::ReplicaListenerPrx& listener,
+ void addListener(const OperationContextPtr&,
+ const AsteriskSCF::System::Component::V1::ReplicaListenerPrx& listener,
const ::Ice::Current&)
{
+ vector<ReplicaListenerPrx>::iterator it = find(mListeners.begin(), mListeners.end(), listener);
+ if (it == mListeners.end())
+ {
+ return;
+ }
+
mListeners.push_back(listener);
}
- void removeListener(const AsteriskSCF::System::Component::V1::ReplicaListenerPrx& listener,
+ void removeListener(const OperationContextPtr&,
+ const AsteriskSCF::System::Component::V1::ReplicaListenerPrx& listener,
const ::Ice::Current&)
{
+ vector<ReplicaListenerPrx>::iterator it = find(mListeners.begin(), mListeners.end(), listener);
+ if (it == mListeners.end())
+ {
+ return;
+ }
+
mListeners.erase(std::remove(mListeners.begin(), mListeners.end(), listener),
mListeners.end());
}
@@ -217,7 +244,7 @@ private:
/**
* Listeners that we need to push state change notifications out to.
*/
- vector<AsteriskSCF::System::Component::V1::ReplicaListenerPrx> mListeners;
+ vector<ReplicaListenerPrx> mListeners;
}; // class ReplicaImpl
@@ -274,6 +301,8 @@ void Component::activated()
// Notify subclasses
onActivated();
+
+ mLogger(Info) << mName << " activated.";
}
catch(const Ice::Exception& e)
{
@@ -292,6 +321,8 @@ void Component::standby()
// Notify subclasses
onStandby();
+
+ mLogger(Info) << mName << " placed on standby.";
}
catch(const Ice::Exception& e)
{
@@ -643,16 +674,11 @@ void Component::initReplicationContext()
}
else
{
- // NOTE: In the near future, Standalone instances are the only
- // instances that will default to being active. When that is in
- // place, non-standalone instances will need to be made active via
- // the Replica interface. But for now, we default to active unless
- // the soon-to-be obsolete Standby property is set.
- if (AsteriskSCF::getBooleanPropertyValueWithDefault(mCommunicator->getProperties(),
- mName + ".Standby", false) == false)
- {
- state = ACTIVE_IN_REPLICA_GROUP;
- }
+ // NOTE: In the past, when Standalone was false, we would default
+ // to ACTIVE_IN_REPLICA_GROUP unless the now obsolete *.Standby property
+ // was set. Now, all components in a replica group default to
+ // STANDBY_IN_REPLICA_GROUP. Deployments must activate one via the
+ // Replica interface.
}
// Create the replication context.
@@ -965,6 +991,7 @@ void Component::start(const string& name,
*/
void Component::resumed()
{
+ mLogger(Info) << "Resuming " << mName << " ...";
if (!mRunning)
{
// Standby mode?
@@ -1002,6 +1029,8 @@ void Component::resumed()
// Notify subclasses
onResume();
+ mLogger(Info) << mName << " resumed.";
+
mRunning = true;
}
@@ -1058,14 +1087,14 @@ void Component::suspendService(bool shuttingDown)
*/
void Component::suspended()
{
- mLogger(Info) << "Suspending...";
+ mLogger(Info) << "Suspending " << mName << " ...";
suspendService(false);
// Notify subclasses
onSuspend();
- mLogger(Info) << "Suspended.";
+ mLogger(Info) << mName << " suspended.";
}
/**
diff --git a/src/Component/ComponentStateReplicator.cpp b/src/Component/ComponentStateReplicator.cpp
index 2b8686e..3b5f7ff 100644
--- a/src/Component/ComponentStateReplicator.cpp
+++ b/src/Component/ComponentStateReplicator.cpp
@@ -55,12 +55,16 @@ class ConfigurationReplicatorImpl : public ConfigurationReplicator
{
public:
ConfigurationReplicatorImpl(const IceStorm::TopicPrx& topic) : mConfigurationReplicationTopic(topic) { };
- void registerConfigurationService(const AsteriskSCF::System::Configuration::V1::ConfigurationServicePrx&, const Ice::Current&);
+ void registerConfigurationService(
+ const AsteriskSCF::System::V1::OperationContextPtr&,
+ const AsteriskSCF::System::Configuration::V1::ConfigurationServicePrx&, const Ice::Current&);
private:
IceStorm::TopicPrx mConfigurationReplicationTopic;
};
-void ConfigurationReplicatorImpl::registerConfigurationService(const AsteriskSCF::System::Configuration::V1::ConfigurationServicePrx& service, const Ice::Current&)
+void ConfigurationReplicatorImpl::registerConfigurationService(
+ const AsteriskSCF::System::V1::OperationContextPtr&,
+ const AsteriskSCF::System::Configuration::V1::ConfigurationServicePrx& service, const Ice::Current&)
{
if (mConfigurationReplicationTopic)
{
@@ -195,14 +199,22 @@ void ComponentStateReplicator::onPreServiceCreation()
// Publish the configuration service IceStorm topic so everybody gets configuration
mConfigurationManagement = ServiceManagementPrx::uncheckedCast(
- getServiceLocatorManagement()->addService(mConfigurationPublisher, ""));
+ getServiceLocatorManagement()->addService(AsteriskSCF::Operations::createContext(),
+ mConfigurationPublisher, ""));
ServiceLocatorParamsPtr configurationParams = new ServiceLocatorParams();
configurationParams->category = mConfigurationDiscoveryCategory;
configurationParams->service = getCommunicator()->getProperties()->getPropertyWithDefault(getName() + ".ServiceName",
"default");
- configurationParams->id = getCommunicator()->getProperties()->getPropertyWithDefault(getName() + ".Name", "");
- mConfigurationManagement->addLocatorParams(configurationParams, "");
+ configurationParams->id = getName();
+
+ mLogger(Info) << "Configuration servant for replica group published using locator params:";
+ mLogger(Info) << " category = " << configurationParams->category;
+ mLogger(Info) << " service = " << configurationParams->service;
+ mLogger(Info) << " id = " << configurationParams->id;
+
+ mConfigurationManagement->addLocatorParams(AsteriskSCF::Operations::createContext(),
+ configurationParams, "");
}
}
diff --git a/src/Helpers/CMakeLists.txt b/src/Helpers/CMakeLists.txt
index 61dfeb4..ff99cca 100644
--- a/src/Helpers/CMakeLists.txt
+++ b/src/Helpers/CMakeLists.txt
@@ -1 +1,2 @@
astscf_component_add_files(ASTSCFIceUtilCpp Network.cpp)
+
diff --git a/src/NAT/Candidates.cpp b/src/NAT/Candidates.cpp
index 4c8174e..bb84fb9 100644
--- a/src/NAT/Candidates.cpp
+++ b/src/NAT/Candidates.cpp
@@ -14,9 +14,9 @@
* at the top of the source tree.
*/
+#include <AsteriskSCF/Helpers/Network.h>
#include <boost/asio/ip/address.hpp>
#include <AsteriskSCF/NAT/Candidates.h>
-#include <AsteriskSCF/Helpers/Network.h>
#include <sstream>
#include <boost/functional/hash.hpp>
diff --git a/src/Operations/CMakeLists.txt b/src/Operations/CMakeLists.txt
new file mode 100644
index 0000000..900941f
--- /dev/null
+++ b/src/Operations/CMakeLists.txt
@@ -0,0 +1,3 @@
+astscf_component_add_files(ASTSCFIceUtilCpp OperationContextCache.cpp)
+astscf_component_add_files(ASTSCFIceUtilCpp OperationContext.cpp)
+astscf_component_add_files(ASTSCFIceUtilCpp OperationMonitor.cpp)
diff --git a/src/Operations/OperationContext.cpp b/src/Operations/OperationContext.cpp
new file mode 100644
index 0000000..4c4ce8a
--- /dev/null
+++ b/src/Operations/OperationContext.cpp
@@ -0,0 +1,91 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#include <boost/thread.hpp>
+#include <boost/version.hpp>
+#include <boost/lexical_cast.hpp>
+#include <boost/uuid/uuid.hpp>
+#include <boost/uuid/uuid_generators.hpp>
+#include <boost/uuid/uuid_io.hpp>
+
+#include <AsteriskSCF/System/OperationsIf.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
+
+using namespace AsteriskSCF::System::V1;
+
+namespace AsteriskSCF
+{
+namespace Operations
+{
+
+boost::thread_specific_ptr<boost::uuids::random_generator> uuidGenerator;
+
+std::string getUuid()
+{
+ boost::uuids::random_generator *gen = uuidGenerator.get();
+ if (gen == 0)
+ {
+ gen = new boost::uuids::random_generator;
+ uuidGenerator.reset(gen);
+ }
+ boost::uuids::uuid u = (*gen)();
+
+// BOOST 1.44 added a to_string() function which is faster than lexical_cast.
+#if ((BOOST_VERSION / 100) >= 1044)
+ return boost::uuids::to_string(u);
+#else
+ return boost::lexical_cast<std::string>(u);
+#endif
+}
+
+/**
+ * Create a new OperationContext with a new transaction id.
+ */
+OperationContextPtr createContext()
+{
+ AsteriskSCF::System::V1::OperationContextPtr newContext(new AsteriskSCF::System::V1::OperationContext);
+
+ newContext->id = getUuid();
+
+ // To make it easy to identify the "source" operation id, we set the transaction id to the
+ // same value as the original OperationContext id.
+ newContext->transactionId = newContext->id;
+ return newContext;
+}
+
+/**
+ * Create a new OperationContext that has the same transaction id as the input argument.
+ * @param context The source OperationContext that contains the transaction id to use.
+ */
+OperationContextPtr createContext(const OperationContextPtr& operationContext)
+{
+ AsteriskSCF::System::V1::OperationContextPtr newContext(new AsteriskSCF::System::V1::OperationContext);
+
+ newContext->id = getUuid();
+ newContext->transactionId = operationContext->transactionId;
+ return newContext;
+}
+
+OperationContextPtr calculateOperationContext(
+ const OperationContextPtr& sourceContext,
+ const std::string& modifier)
+{
+ std::string calculateId = sourceContext->id + "." + modifier;
+
+ return new OperationContext(calculateId, sourceContext->transactionId);
+}
+
+} // End namespace Operations
+} // End namespace AsteriskSCF
diff --git a/src/Operations/OperationContextCache.cpp b/src/Operations/OperationContextCache.cpp
new file mode 100644
index 0000000..03c0f25
--- /dev/null
+++ b/src/Operations/OperationContextCache.cpp
@@ -0,0 +1,340 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2012, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#include <boost/thread/locks.hpp>
+
+#include <AsteriskSCF/Operations/OperationContextCache.h>
+
+using namespace AsteriskSCF::System::V1;
+using namespace AsteriskSCF::System::Logging;
+
+namespace
+{
+Logger lg = AsteriskSCF::System::Logging::getLoggerFactory().getLogger("AsteriskSCF.Operations");
+}
+
+namespace AsteriskSCF
+{
+namespace Operations
+{
+
+/**
+ * Wrapper to hold OperationContext with a timestamp in the cache.
+ */
+class OperationContextCacheEntry : public IceUtil::Shared
+{
+public:
+ /**
+ * ctor
+ * @param context The context to wrap.
+ * @param ttl The time-to-live to use in isStale() tests.
+ */
+ OperationContextCacheEntry(const OperationContextPtr& context, const IceUtil::Time& ttl) :
+ mContext(context),
+ mTimestamp(IceUtil::Time::now()),
+ mTTL(ttl)
+ {
+ }
+
+ OperationContextCacheEntry(const OperationContextPtr& context, const OperationContextCookiePtr& cookie, const IceUtil::Time& ttl) :
+ mContext(context),
+ mCookie(cookie),
+ mTimestamp(IceUtil::Time::now()),
+ mTTL(ttl)
+ {
+ }
+
+ OperationContextPtr getContext()
+ {
+ return mContext;
+ }
+
+ OperationContextCookiePtr getCookie()
+ {
+ return mCookie;
+ }
+
+ /**
+ * Compares this entries timestamp to the passed in time.
+ * Return true if the difference is greater than the TTL.
+ */
+ bool isStale(const IceUtil::Time& now)
+ {
+ if ((now - mTimestamp) > mTTL)
+ {
+ return true;
+ }
+ return false;
+ }
+
+private:
+ OperationContextPtr mContext;
+ OperationContextCookiePtr mCookie;
+ IceUtil::Time mTimestamp;
+ IceUtil::Time mTTL;
+};
+
+/**
+ * A TimerTask to instigate the pruning of the cache.
+ * The OperationContextCache manages the life of this task and its
+ * internal timer.
+ */
+class OperationContextPruner : public IceUtil::TimerTask
+{
+public:
+ OperationContextPruner(const OperationContextCachePtr& cache, int ttlSeconds) :
+ mCache(cache),
+ mTimer(new IceUtil::Timer)
+ {
+ mTimer->scheduleRepeated(this, IceUtil::Time::seconds(ttlSeconds));
+ }
+
+ /**
+ * Override for the TimerTask interface.
+ */
+ void runTimerTask()
+ {
+ if (boost::shared_ptr<OperationContextCache> cache = mCache.lock())
+ {
+ cache->prune();
+ }
+ }
+
+ void cancel()
+ {
+ mTimer->destroy();
+ }
+
+private:
+ boost::weak_ptr<OperationContextCache> mCache; // Weak pointer to avoid circular refs
+ IceUtil::TimerPtr mTimer;
+};
+
+/**
+ * Factory method for non-logging cache.
+ */
+OperationContextCachePtr OperationContextCache::create(int ttlSeconds)
+{
+ OperationContextCachePtr cache(new OperationContextCache(ttlSeconds));
+
+ OperationContextPrunerPtr pruner = new OperationContextPruner(cache, ttlSeconds);
+
+ cache->setPruner(pruner);
+
+ return cache;
+}
+
+/**
+ * Factory method for logging cache.
+ */
+OperationContextCachePtr OperationContextCache::create(int ttlSeconds,
+ const AsteriskSCF::System::Logging::Logger& logger,
+ const std::string& label)
+{
+ OperationContextCachePtr cache(new OperationContextCache(ttlSeconds, logger, label));
+
+ OperationContextPrunerPtr pruner = new OperationContextPruner(cache, ttlSeconds);
+
+ cache->setPruner(pruner);
+
+ return cache;
+}
+
+void OperationContextCache::setPruner(const OperationContextPrunerPtr& pruner)
+{
+ mPruner = pruner;
+}
+
+/**
+ * @param ttlSeconds The time to live for the cache, specified in seconds.
+ * This is a minimum time for an OperationContext to be cached. They
+ * may be cached longer.
+ */
+OperationContextCache::OperationContextCache(int ttlSeconds)
+ : mLogger(lg),
+ mLoggingEnabled(false),
+ mLoggingLabel(""),
+ mTTL(IceUtil::Time::seconds(ttlSeconds))
+{
+}
+
+/**
+ * Alternate constructor that enables logging.
+ * @param ttlSeconds The time to live for the cache, specified in seconds.
+ * This is a minimum time for an OperationContext to be cached. They
+ * may be cached longer.
+ * @param logger Logger to use.
+ * @param label Label to apply when logging to identify this cache.
+ */
+OperationContextCache::OperationContextCache(int ttlSeconds,
+ const Logger& logger,
+ const std::string& label)
+ : mLogger(logger),
+ mLoggingEnabled(true),
+ mLoggingLabel(label),
+ mTTL(IceUtil::Time::seconds(ttlSeconds))
+{
+}
+
+OperationContextCache::~OperationContextCache()
+{
+ mPruner->cancel();
+}
+
+/**
+ * Non-locking operation for code sharing.
+ */
+OperationContextCacheEntryPtr OperationContextCache::get(const OperationContextPtr& operationContext)
+{
+ std::map<std::string, OperationContextCacheEntryPtr>::iterator entry = mCache.find(operationContext->id);
+ if (entry == mCache.end())
+ {
+ return OperationContextCacheEntryPtr(); // null
+ }
+
+ return entry->second;
+}
+
+/**
+ * Caches the specified context if it isn't already in the cache.
+ *
+ * @param operationContext The context to add to the cache.
+ * @return true The context was added, which means it wasn't already in the cache.
+ *
+ * @note Make sure you don't confuse the return value of this operation with the return
+ * value of the 'contains' operation.
+ */
+bool OperationContextCache::addOperationContext(const OperationContextPtr& operationContext)
+{
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+
+ OperationContextCacheEntryPtr existingEntry = get(operationContext);
+ if (existingEntry)
+ {
+ return false;
+ }
+
+ OperationContextCacheEntryPtr entry(new OperationContextCacheEntry(operationContext, mTTL));
+ mCache[operationContext->id] = entry;
+
+ return true;
+}
+
+/**
+ * Caches the specified context if it isn't already in the cache, and associate a cookie with it.
+ *
+ * @param operationContext The context to add to the cache.
+ * @param inCookie A cookie object to associate with this entry in the cache.
+ * @param existingCookie This value will be set by this method to the cookie of an existing
+ * operationContext, if there was already one in the cache with this id.
+ * @return true The context was added, which means it wasn't already in the cache.
+ *
+ * @note Make sure you don't confuse the return value of this operation with the return
+ * value of the 'contains' operation.
+ */
... 1057 lines suppressed ...
--
asterisk-scf/release/ice-util-cpp.git
More information about the asterisk-scf-commits
mailing list