[asterisk-scf-commits] team/ken.hunt/route_async_routing.git branch "route_async" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Wed Dec 22 19:43:49 UTC 2010
branch "route_async" has been updated
via fbb061f636eff41df0a87504b6fbb7cde81d6b37 (commit)
from d74afeba80c8b862e7021b8378bf38f92565d317 (commit)
Summary of changes:
src/EndpointRegistry.cpp | 10 ++++
src/RoutingServiceEventPublisher.cpp | 81 ++++++++++++++++++++-------------
src/SessionRouter.cpp | 37 ++++++++++-----
3 files changed, 84 insertions(+), 44 deletions(-)
- Log -----------------------------------------------------------------
commit fbb061f636eff41df0a87504b6fbb7cde81d6b37
Author: Ken Hunt <ken.hunt at digium.com>
Date: Wed Dec 22 13:43:11 2010 -0600
Fixed deadlock issue in Event Publisher.
diff --git a/src/EndpointRegistry.cpp b/src/EndpointRegistry.cpp
index 79b96b6..8c9ba89 100644
--- a/src/EndpointRegistry.cpp
+++ b/src/EndpointRegistry.cpp
@@ -145,6 +145,11 @@ public:
}
}
+ ~LookupResultCollector()
+ {
+ lg(Debug) << "LookupResultCollector being destroyed. ";
+ }
+
/**
* Collect results of AMI lookups from multiple EndpointLocators.
*/
@@ -221,6 +226,11 @@ public:
{
}
+ ~LookupCallback()
+ {
+ lg(Debug) << "LookupCallback being destroyed. ";
+ }
+
void lookupResult(const EndpointSeq& endpoints)
{
// delegation to thread safe object
diff --git a/src/RoutingServiceEventPublisher.cpp b/src/RoutingServiceEventPublisher.cpp
index 73cbc18..9bceb87 100644
--- a/src/RoutingServiceEventPublisher.cpp
+++ b/src/RoutingServiceEventPublisher.cpp
@@ -44,6 +44,7 @@ public:
RoutingServiceEventPublisherPriv(const Ice::ObjectAdapterPtr& adapter) :
mAdapter(adapter), mInitialized(false)
{
+ boost::lock_guard<boost::mutex> lock(mLock);
initialize();
}
@@ -91,7 +92,17 @@ public:
}
Ice::ObjectPrx publisher = topic->getPublisher();
- mEventTopic = Event::RoutingEventsPrx::uncheckedCast(publisher);
+ Ice::ObjectPrx oneway;
+ try
+ {
+ oneway = publisher->ice_oneway();
+ }
+ catch (const Ice::NoEndpointException&)
+ {
+ assert(0); // All operations callable via icestorm must support oneway.
+ }
+
+ mEventTopic = Event::RoutingEventsPrx::uncheckedCast(oneway);
mInitialized = true;
}
@@ -118,7 +129,7 @@ public:
}
public:
- Event::RoutingEventsPrx mEventTopic;
+ Event::RoutingEventsPrx mEventTopic; // Using one-way proxy.
boost::mutex mLock;
private:
@@ -140,11 +151,12 @@ RoutingServiceEventPublisher::RoutingServiceEventPublisher(const Ice::ObjectAdap
void RoutingServiceEventPublisher::lookupEvent(const std::string& destination,
AsteriskSCF::Core::Routing::V1::Event::OperationResult result, const Ice::Current &)
{
- boost::lock_guard<boost::mutex> lock(mImpl->mLock);
-
- if (!mImpl->isInitialized())
- {
- return;
+ { // scope for the lock
+ boost::lock_guard<boost::mutex> lock(mImpl->mLock);
+ if (!mImpl->isInitialized())
+ {
+ return;
+ }
}
try
@@ -165,11 +177,12 @@ void RoutingServiceEventPublisher::addEndpointLocatorEvent(const std::string& lo
const ::AsteriskSCF::Core::Routing::V1::RegExSeq& regexList, AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
const Ice::Current &)
{
- boost::lock_guard<boost::mutex> lock(mImpl->mLock);
-
- if (!mImpl->isInitialized())
- {
- return;
+ { // scope for the lock
+ boost::lock_guard<boost::mutex> lock(mImpl->mLock);
+ if (!mImpl->isInitialized())
+ {
+ return;
+ }
}
try
@@ -188,11 +201,12 @@ void RoutingServiceEventPublisher::addEndpointLocatorEvent(const std::string& lo
void RoutingServiceEventPublisher::removeEndpointLocatorEvent(const std::string& locatorId,
AsteriskSCF::Core::Routing::V1::Event::OperationResult result, const Ice::Current &)
{
- boost::lock_guard<boost::mutex> lock(mImpl->mLock);
-
- if (!mImpl->isInitialized())
- {
- return;
+ { // scope for the lock
+ boost::lock_guard<boost::mutex> lock(mImpl->mLock);
+ if (!mImpl->isInitialized())
+ {
+ return;
+ }
}
try
@@ -212,11 +226,12 @@ void RoutingServiceEventPublisher::setEndpointLocatorDestinationIdsEvent(const s
const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList, AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
const Ice::Current &)
{
- boost::lock_guard<boost::mutex> lock(mImpl->mLock);
-
- if (!mImpl->isInitialized())
- {
- return;
+ { // scope for the lock
+ boost::lock_guard<boost::mutex> lock(mImpl->mLock);
+ if (!mImpl->isInitialized())
+ {
+ return;
+ }
}
try
@@ -234,11 +249,12 @@ void RoutingServiceEventPublisher::setEndpointLocatorDestinationIdsEvent(const s
*/
void RoutingServiceEventPublisher::clearEndpointLocatorsEvent(const Ice::Current &)
{
- boost::lock_guard<boost::mutex> lock(mImpl->mLock);
-
- if (!mImpl->isInitialized())
- {
- return;
+ { // scope for the lock
+ boost::lock_guard<boost::mutex> lock(mImpl->mLock);
+ if (!mImpl->isInitialized())
+ {
+ return;
+ }
}
try
@@ -256,11 +272,12 @@ void RoutingServiceEventPublisher::clearEndpointLocatorsEvent(const Ice::Current
*/
void RoutingServiceEventPublisher::setPolicyEvent(const std::string& policy, const Ice::Current &)
{
- boost::lock_guard<boost::mutex> lock(mImpl->mLock);
-
- if (!mImpl->isInitialized())
- {
- return;
+ { // scope for the lock
+ boost::lock_guard<boost::mutex> lock(mImpl->mLock);
+ if (!mImpl->isInitialized())
+ {
+ return;
+ }
}
try
diff --git a/src/SessionRouter.cpp b/src/SessionRouter.cpp
index b160069..fb10b26 100644
--- a/src/SessionRouter.cpp
+++ b/src/SessionRouter.cpp
@@ -396,12 +396,13 @@ struct SessionContext
{
public:
/**
- * Constructor. The BridgeManager isn't initialized, but configured via a setter.
+ * Constructor. The BridgeManager isn't initialized. It's configured later via a setter
+ * due to component initialization sequence.
*/
SessionContext(const Ice::ObjectAdapterPtr& adapter,
const EndpointRegistryPtr& registry,
const RoutingEventsPtr& publisher,
- const boost::shared_ptr<WorkQueue> &mWorkQueue)
+ const boost::shared_ptr<WorkQueue>& workQueue)
: adapter(adapter),
endpointRegistry(registry),
eventPublisher(publisher),
@@ -412,8 +413,8 @@ public:
Ice::ObjectAdapterPtr adapter;
EndpointRegistryPtr endpointRegistry;
RoutingEventsPtr eventPublisher;
- AsteriskSCF::SmartProxy::SmartProxy<BridgeManagerPrx> bridgeManager;
boost::shared_ptr<WorkQueue> workQueue;
+ AsteriskSCF::SmartProxy::SmartProxy<BridgeManagerPrx> bridgeManager;
};
/**
@@ -687,6 +688,12 @@ protected: // These protected operations are utiltity functions.
return removedSessions;
}
+ void setState(const boost::function<void ()>& stateHandler, std::string stateName)
+ {
+ lg(Debug) << "Operation setting new state handler " << stateName;
+ mCurrentStateHandler = stateHandler;
+ }
+
protected:
T mInitiatorCallback;
SessionContext mSessionContext;
@@ -696,6 +703,8 @@ protected:
EndpointSeq mLookupResult;
SessionListenerAllocatorPtr mListenerManager;
OperationsManager* mOperationsManager;
+
+private:
boost::function<void ()> mCurrentStateHandler; // Lightweight state machine. Current state handles doWork() for a given state.
}; // class SessionRouterOperation
@@ -756,7 +765,9 @@ private:
mListenerManager = listener;
// Set the state to exectute once we've looked up our endpoints.
- mCurrentStateHandler = boost::bind(&RouteSessionOperation::establishBridgeState, this);
+ setState(boost::bind(&RouteSessionOperation::establishBridgeState, this), "establishBridgeState");
+
+ // mCurrentStateHandler = boost::bind(&RouteSessionOperation::establishBridgeState, this);
// Lookup the destination. This will use AMI, and the callback should
// schedule us to execute again.
@@ -777,10 +788,10 @@ private:
return;
}
- assert(mEndpoints.size() > 0);
+ assert(mLookupResult.size() > 0);
// Add a session to the endpoints.
- SessionSeq newSessions = createSessionForEndpoints(mEndpoints, mDestination);
+ SessionSeq newSessions = createSessionForEndpoints(mLookupResult, mDestination);
if (mListenerManager->getListener()->getNumSessions() < 2)
{
@@ -834,9 +845,6 @@ private:
string mDestination;
::Ice::Current mIceCurrent;
- // Implementation state
- EndpointSeq mEndpoints;
-
}; // class RouteSessionOperation
/**
@@ -893,7 +901,8 @@ private:
lg(Debug) << "connectBridgedSessionsWithDestination(): Routing destination " << mDestination;
// Set the state to exectute after lookup.
- mCurrentStateHandler = boost::bind(&ConnectBridgedSessionsWithDestinationOperation::establishBridgeState, this);
+ setState(boost::bind(&ConnectBridgedSessionsWithDestinationOperation::establishBridgeState, this), "establishBridgeState");
+ // mCurrentStateHandler = boost::bind(&ConnectBridgedSessionsWithDestinationOperation::establishBridgeState, this);
// Lookup the destination. This will use AMI, and the callback should
// schedule us to execute again.
@@ -914,7 +923,7 @@ private:
}
// Add a session
- SessionSeq newSessions = createSessionForEndpoints(mEndpoints, mDestination);
+ SessionSeq newSessions = createSessionForEndpoints(mLookupResult, mDestination);
if (mListenerManager->getListener()->getNumSessions() < 2)
{
@@ -962,7 +971,6 @@ private:
::Ice::Current mIceCurrent;
// Implementation state
- EndpointSeq mEndpoints;
BridgePrx mBridge;
SessionSeq mRemainingSessions;
@@ -1077,6 +1085,11 @@ public:
{
}
+ ~LookupCallback()
+ {
+ lg(Debug) << "LookupCallback destroyed.";
+ }
+
public: // Overrides.
virtual void ice_exception(const ::std::exception& e)
-----------------------------------------------------------------------
--
team/ken.hunt/route_async_routing.git
More information about the asterisk-scf-commits
mailing list