[asterisk-scf-commits] asterisk-scf/integration/routing.git branch "master" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Wed Oct 20 14:43:58 CDT 2010
branch "master" has been updated
via 29002b549efa656c4c283fe05f018ac6c1516fdd (commit)
via 8b15c984277b1a59cd8f1680faf129d513522938 (commit)
via 127302f2f760b07e45054bd309494f12bbc4ddc9 (commit)
from 8fceaf9ac7db7712612fb612b233a41297f18bb1 (commit)
Summary of changes:
src/EndpointRegistry.cpp | 94 +++++++++++++++++++++++++++++++++++++++-------
src/SessionRouter.cpp | 41 ++++++++++++++++----
2 files changed, 113 insertions(+), 22 deletions(-)
- Log -----------------------------------------------------------------
commit 29002b549efa656c4c283fe05f018ac6c1516fdd
Author: Ken Hunt <ken.hunt at digium.com>
Date: Wed Oct 20 14:42:55 2010 -0500
Added locking.
diff --git a/src/EndpointRegistry.cpp b/src/EndpointRegistry.cpp
index aec9d5e..19cce64 100644
--- a/src/EndpointRegistry.cpp
+++ b/src/EndpointRegistry.cpp
@@ -7,7 +7,7 @@
*/
#include <boost/regex.hpp>
#include <boost/thread/thread.hpp>
-#include <boost/thread/locks.hpp>
+#include <boost/thread/shared_mutex.hpp>
#include "RoutingServiceEventPublisher.h"
#include "EndpointRegistry.h"
@@ -97,7 +97,7 @@ public:
void getEndpointLocatorMapCopy( EndpointLocatorMap& destination)
{
- boost::unique_lock<boost::shared_mutex> lock(mLock);
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
destination.insert(mEndpointLocatorMap.begin(), mEndpointLocatorMap.end());
}
@@ -133,7 +133,7 @@ void EndpointRegistry::addEndpointLocator(const std::string& locatorId, const Re
EndpointLocatorMapIterator end(0);
{ // critical scope
- boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
existing = mImpl->mEndpointLocatorMap.find(locatorId);
end = mImpl->mEndpointLocatorMap.end();
@@ -173,7 +173,7 @@ void EndpointRegistry::removeEndpointLocator(const std::string& locatorId, const
EndpointLocatorMapIterator end(0);
{ // critical scope
- boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
existing = mImpl->mEndpointLocatorMap.find(locatorId);
end = mImpl->mEndpointLocatorMap.end();
@@ -214,7 +214,7 @@ void EndpointRegistry::setEndpointLocatorDestinationIds(const std::string& locat
EndpointLocatorMapIterator end(0);
{ // critical scope
- boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
EndpointLocatorMapIterator existing = mImpl->mEndpointLocatorMap.find(locatorId);
end = mImpl->mEndpointLocatorMap.end();
}
diff --git a/src/SessionRouter.cpp b/src/SessionRouter.cpp
index 8d157da..b513020 100644
--- a/src/SessionRouter.cpp
+++ b/src/SessionRouter.cpp
@@ -6,7 +6,7 @@
* All rights reserved.
*/
#include <boost/thread/thread.hpp>
-#include <boost/thread/locks.hpp>
+#include <boost/thread/shared_mutex.hpp>
#include "SessionRouter.h"
#include "EndpointRegistry.h"
@@ -89,7 +89,7 @@ public:
SessionSeq cacheSessions;
{
- boost::unique_lock<boost::shared_mutex> lock(mLock);
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
cacheSessions = mSessions;
}
@@ -149,9 +149,11 @@ public:
}
}
}
- const SessionSeq& getSessions()
+
+ const int getNumSessions()
{
- return mSessions;
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ return mSessions.size();
}
bool isTerminated() // Lots of shoring up to do for asynchronous operations!
@@ -166,7 +168,7 @@ public:
{
SessionSeq sessionsToCall;
{ // critical scope
- boost::unique_lock<boost::shared_mutex> lock(mLock);
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
sessionsToCall = mSessions;
}
@@ -385,7 +387,7 @@ void SessionRouter::routeSession(const AsteriskSCF::SessionCommunications::V1::S
}
}
- if (listener->getSessions().size() < 2)
+ if (listener->getNumSessions() < 2)
{
throw SessionCreationException(destination);
}
@@ -497,7 +499,7 @@ void SessionRouter::connectBridgedSessionsWithDestination(const SessionPrx& sess
}
}
- if (listener->getSessions().size() < 2)
+ if (listener->getNumSessions() < 2)
{
lg(Error) << "connectBridgedSessionsWithDestination(): Unable to create a new session for destination " << destination << " in connectBridgedSessionsWithDestination(). " ;
throw SessionCreationException(destination);
commit 8b15c984277b1a59cd8f1680faf129d513522938
Merge: 127302f 8fceaf9
Author: Ken Hunt <ken.hunt at digium.com>
Date: Wed Oct 20 09:42:52 2010 -0500
Merge branch 'master' of git.asterisk.org:asterisk-scf/integration/routing
commit 127302f2f760b07e45054bd309494f12bbc4ddc9
Author: Ken Hunt <ken.hunt at digium.com>
Date: Tue Oct 19 23:55:55 2010 -0500
Added mutex and locks on relevant data items.
diff --git a/src/EndpointRegistry.cpp b/src/EndpointRegistry.cpp
index 97842d8..aec9d5e 100644
--- a/src/EndpointRegistry.cpp
+++ b/src/EndpointRegistry.cpp
@@ -6,6 +6,8 @@
* All rights reserved.
*/
#include <boost/regex.hpp>
+#include <boost/thread/thread.hpp>
+#include <boost/thread/locks.hpp>
#include "RoutingServiceEventPublisher.h"
#include "EndpointRegistry.h"
@@ -54,6 +56,9 @@ public:
vector<boost::regex> regexList;
};
+typedef map<std::string, RegisteredLocator>::iterator EndpointLocatorMapIterator;
+typedef map<std::string, RegisteredLocator> EndpointLocatorMap;
+
/**
* Provides the private implementation of the EndpointRegistry.
*/
@@ -65,11 +70,43 @@ public:
{
}
+ void clearEndpointLocatorMap()
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mEndpointLocatorMap.clear();
+ }
+
+ void eraseLocatorMapItem(EndpointLocatorMapIterator pos)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mEndpointLocatorMap.erase(pos);
+ }
+
+ void eraseLocatorMapItem(const string& locatorId)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mEndpointLocatorMap.erase(locatorId);
+ }
+
+
+ void insertLocatorMapItem(const string& key, const RegisteredLocator& locator)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mEndpointLocatorMap[key] = locator;
+ }
+
+ void getEndpointLocatorMapCopy( EndpointLocatorMap& destination)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ destination.insert(mEndpointLocatorMap.begin(), mEndpointLocatorMap.end());
+ }
+
+ boost::shared_mutex mLock;
+
boost::shared_ptr<ScriptProcessor> mScriptProcessor;
- map<std::string, RegisteredLocator> mEndpointLocatorMap;
+ EndpointLocatorMap mEndpointLocatorMap;
const RoutingEventsPtr mEventPublisher;
};
-typedef map<std::string, RegisteredLocator>::iterator EndpointLocatorMapIterator;
/**
* Constructor.
@@ -91,16 +128,26 @@ void EndpointRegistry::addEndpointLocator(const std::string& locatorId, const Re
try
{
lg(Debug) << "EndpointRegistry::addEndpointLocator() adding locator " << locatorId;
- EndpointLocatorMapIterator existing = mImpl->mEndpointLocatorMap.find(locatorId);
- if (existing != mImpl->mEndpointLocatorMap.end())
+
+ EndpointLocatorMapIterator existing(0);
+ EndpointLocatorMapIterator end(0);
+
+ { // critical scope
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+ existing = mImpl->mEndpointLocatorMap.find(locatorId);
+ end = mImpl->mEndpointLocatorMap.end();
+ }
+
+ if (existing != end)
{
- mImpl->mEndpointLocatorMap.erase(existing);
+ mImpl->eraseLocatorMapItem(existing);
lg(Info) << "Received request to add endpoint with id " << locatorId << " which already exists. Replacing with new proxy.";
}
RegisteredLocator newLocator(locator, regexList);
+ mImpl->insertLocatorMapItem(locatorId, newLocator);
- mImpl->mEndpointLocatorMap[locatorId] = newLocator;
mImpl->mEventPublisher->addEndpointLocatorEvent(locatorId, regexList, Event::SUCCESS);
}
catch (...)
@@ -122,17 +169,25 @@ void EndpointRegistry::removeEndpointLocator(const std::string& locatorId, const
{
lg(Debug) << "EndpointRegistry::removeEndpointLocator() removing locator " << locatorId;
- EndpointLocatorMapIterator existing = mImpl->mEndpointLocatorMap.find(locatorId);
- if (existing == mImpl->mEndpointLocatorMap.end())
+ EndpointLocatorMapIterator existing(0);
+ EndpointLocatorMapIterator end(0);
+
+ { // critical scope
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+ existing = mImpl->mEndpointLocatorMap.find(locatorId);
+ end = mImpl->mEndpointLocatorMap.end();
+ }
+
+ if (existing == end)
{
lg(Warning) << "Received request to remove Endpoint Locator not currently registered. Id = " << locatorId;
-
mImpl->mEventPublisher->removeEndpointLocatorEvent(locatorId, Event::FAILURE);
return;
}
+ mImpl->eraseLocatorMapItem(locatorId);
- mImpl->mEndpointLocatorMap.erase(locatorId);
mImpl->mEventPublisher->removeEndpointLocatorEvent(locatorId, Event::SUCCESS);
lg(Info) << "Removed Endpoint Locator with Id = " << locatorId;
@@ -155,8 +210,16 @@ void EndpointRegistry::setEndpointLocatorDestinationIds(const std::string& locat
{
try
{
- EndpointLocatorMapIterator existing = mImpl->mEndpointLocatorMap.find(locatorId);
- if (existing == mImpl->mEndpointLocatorMap.end())
+ EndpointLocatorMapIterator existing(0);
+ EndpointLocatorMapIterator end(0);
+
+ { // critical scope
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+ EndpointLocatorMapIterator existing = mImpl->mEndpointLocatorMap.find(locatorId);
+ end = mImpl->mEndpointLocatorMap.end();
+ }
+
+ if (existing == end)
{
mImpl->mEventPublisher->setEndpointLocatorDestinationIdsEvent(locatorId, regExList, Event::FAILURE);
throw DestinationNotFoundException(locatorId);
@@ -198,7 +261,10 @@ AsteriskSCF::Core::Endpoint::V1::EndpointSeq EndpointRegistry::lookup(const std:
}
}
- for(EndpointLocatorMapIterator entry = mImpl->mEndpointLocatorMap.begin(); entry != mImpl->mEndpointLocatorMap.end(); ++entry)
+ EndpointLocatorMap locatorMap;
+ mImpl->getEndpointLocatorMapCopy(locatorMap);
+
+ for(EndpointLocatorMapIterator entry = locatorMap.begin(); entry != locatorMap.end(); ++entry)
{
// Test to see if the destination matches any of this entry's regular expressions.
for(vector<boost::regex>::iterator reg = entry->second.regexList.begin(); reg != entry->second.regexList.end(); ++reg)
@@ -253,7 +319,7 @@ void EndpointRegistry::setScriptProcessor(ScriptProcessor* scriptProcessor)
*/
void EndpointRegistry::clearEndpointLocators()
{
- mImpl->mEndpointLocatorMap.clear();
+ mImpl->clearEndpointLocatorMap();
mImpl->mEventPublisher->clearEndpointLocatorsEvent();
}
diff --git a/src/SessionRouter.cpp b/src/SessionRouter.cpp
index 0bb619d..8d157da 100644
--- a/src/SessionRouter.cpp
+++ b/src/SessionRouter.cpp
@@ -5,6 +5,8 @@
*
* All rights reserved.
*/
+#include <boost/thread/thread.hpp>
+#include <boost/thread/locks.hpp>
#include "SessionRouter.h"
#include "EndpointRegistry.h"
@@ -85,8 +87,14 @@ public:
{
mTerminated = true;
+ SessionSeq cacheSessions;
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ cacheSessions = mSessions;
+ }
+
// Forward the stop message to all sessions other than the one that originally sent it.
- for(SessionSeq::iterator s = mSessions.begin(); s != mSessions.end(); ++s)
+ for(SessionSeq::iterator s = cacheSessions.begin(); s != cacheSessions.end(); ++s)
{
try
{
@@ -114,6 +122,7 @@ public:
*/
void addSession(SessionPrx session)
{
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
mSessions.push_back(session);
}
@@ -122,7 +131,10 @@ public:
*/
void addSessionAndListen(SessionPrx session)
{
- mSessions.push_back(session);
+ { // critical scope
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mSessions.push_back(session);
+ }
if (mListenerPrx != 0)
{
@@ -152,7 +164,13 @@ public:
*/
void unregister()
{
- for(SessionSeq::iterator s=mSessions.begin(); s != mSessions.end(); ++s)
+ SessionSeq sessionsToCall;
+ { // critical scope
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ sessionsToCall = mSessions;
+ }
+
+ for(SessionSeq::iterator s=sessionsToCall.begin(); s != sessionsToCall.end(); ++s)
{
try
{
@@ -166,7 +184,10 @@ public:
}
// Since we're through listening to them, we should just drop our references to them.
- mSessions.clear();
+ { // critical scope
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mSessions.clear();
+ }
}
SessionListenerPrx getProxy()
@@ -175,6 +196,8 @@ public:
}
private:
+ boost::shared_mutex mLock;
+
Ice::ObjectAdapterPtr mAdapter;
SessionSeq mSessions;
bool mTerminated;
-----------------------------------------------------------------------
--
asterisk-scf/integration/routing.git
More information about the asterisk-scf-commits
mailing list