[asterisk-scf-commits] asterisk-scf/integration/routing.git branch "route_async" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Mon Mar 7 01:15:44 CST 2011
branch "route_async" has been updated
via 87c4e76713e5abd9cecae5175f7e2612842bda2e (commit)
from 0f1310bcaa67642b0bc73bea931dfab7a673c49c (commit)
Summary of changes:
CMakeLists.txt | 1 +
config/routing-state-replicator.conf | 18 ++
local-slice/BasicRoutingStateReplicationIf.ice | 206 +++++++++++++++++
src/BasicRoutingServiceApp.cpp | 224 +++++++++++++++++--
src/BasicRoutingStateReplicator.h | 54 +++++
src/BasicRoutingStateReplicatorApp.cpp | 234 ++++++++++++++++++++
src/BasicRoutingStateReplicatorListener.cpp | 112 ++++++++++
src/CMakeLists.txt | 40 ++++-
src/ConnectBridgedSessionsOperation.cpp | 9 +-
src/ConnectBridgedSessionsOperation.h | 21 ++-
...nectBridgedSessionsWithDestinationOperation.cpp | 29 ++-
...onnectBridgedSessionsWithDestinationOperation.h | 31 +++-
src/EndpointRegistry.cpp | 26 ++-
src/EndpointRegistry.h | 10 +-
src/RouteSessionOperation.cpp | 29 ++-
src/RouteSessionOperation.h | 35 +++-
src/RoutingServiceEventPublisher.cpp | 101 +++++++--
src/RoutingServiceEventPublisher.h | 17 ++-
src/SessionListener.cpp | 12 +-
src/SessionListener.h | 7 +-
src/SessionRouter.cpp | 13 +-
src/SessionRouter.h | 4 +-
src/SessionRouterOperation.h | 68 +++---
23 files changed, 1185 insertions(+), 116 deletions(-)
create mode 100644 config/routing-state-replicator.conf
create mode 100644 local-slice/BasicRoutingStateReplicationIf.ice
create mode 100644 src/BasicRoutingStateReplicator.h
create mode 100644 src/BasicRoutingStateReplicatorApp.cpp
create mode 100644 src/BasicRoutingStateReplicatorListener.cpp
- Log -----------------------------------------------------------------
commit 87c4e76713e5abd9cecae5175f7e2612842bda2e
Author: Ken Hunt <ken.hunt at digium.com>
Date: Thu Mar 3 10:51:39 2011 -0600
Changes to support replication.
diff --git a/CMakeLists.txt b/CMakeLists.txt
index a0fd807..66e9657 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -28,6 +28,7 @@ if(NOT integrated_build STREQUAL "true")
set(integrated_build false)
endif()
+add_subdirectory(local-slice)
add_subdirectory(src)
add_subdirectory(config)
add_subdirectory(test)
diff --git a/config/routing-state-replicator.conf b/config/routing-state-replicator.conf
new file mode 100644
index 0000000..c1c67ed
--- /dev/null
+++ b/config/routing-state-replicator.conf
@@ -0,0 +1,18 @@
+# Adapter parameters for this component
+RoutingStateReplicator.Endpoints=default
+RoutingStateReplicator.ThreadPool.Size=4
+
+# A proxy to the IceStorm topic manager
+TopicManager.Proxy=AsteriskSCFIceStorm/TopicManager:default -p 10000
+
+# A proxy to the service locator management service
+LocatorServiceManagement.Proxy=LocatorServiceManagement:tcp -p 4422
+
+# A proxy to the service locator service
+LocatorService.Proxy=LocatorService:tcp -p 4411
+
+# The name of the state replicator
+RoutingStateReplicator.Name=default
+
+IceBox.InheritProperties = 1
+IceBox.Service.RoutingStateReplicator=RoutingStateReplicator:create
diff --git a/local-slice/BasicRoutingStateReplicationIf.ice b/local-slice/BasicRoutingStateReplicationIf.ice
new file mode 100644
index 0000000..75cebde
--- /dev/null
+++ b/local-slice/BasicRoutingStateReplicationIf.ice
@@ -0,0 +1,206 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#pragma once
+#include <Ice/BuiltinSequences.ice>
+#include <Ice/Identity.ice>
+#include <Core/Discovery/ServiceLocatorIf.ice>
+#include <SessionCommunications/SessionCommunicationsIf.ice>
+
+module AsteriskSCF
+{
+module BasicRoutingService
+{
+["suppress"]
+module V1
+{
+ const string StateReplicatorComponentCategory = "RoutingStateReplicatorComponent";
+ const string StateReplicatorDiscoveryCategory = "RoutingStateReplicator";
+
+ class RoutingStateReplicatorParams extends AsteriskSCF::Core::Discovery::V1::ServiceLocatorParams
+ {
+ string name;
+ };
+
+ class RoutingStateItem
+ {
+ string key;
+ };
+
+ sequence<RoutingStateItem> RoutingStateItemSeq;
+
+ interface RoutingStateReplicatorListener
+ {
+ void stateRemoved(Ice::StringSeq itemKeys);
+ void stateSet(RoutingStateItemSeq items);
+ };
+
+ interface RoutingStateReplicator
+ {
+ void addListener(RoutingStateReplicatorListener *listener);
+ void removeListener(RoutingStateReplicatorListener *listener);
+ void setState (RoutingStateItemSeq items);
+ void removeState(Ice::StringSeq items);
+ idempotent RoutingStateItemSeq getState(Ice::StringSeq itemKeys);
+ idempotent RoutingStateItemSeq getAllState();
+ };
+
+/***
+ enum RouteSessionOpState
+ {
+ START,
+ WAITLOOKUP,
+ BRIDGING,
+ COMPLETE
+ };
+
+ enum ConnectBridgedSessionsWithDestinationOpState
+ {
+ START,
+ WAIT_LOOKUP_RESULTS,
+ BRIDGING,
+ COMPLETE
+ };
+
+ enum ConnectBridgedSessionsOpState
+ {
+ START,
+ COMPLETE
+ };
+
+***/
+
+ ///////////////////////////////////
+ // These state items represent the state transistions of the RouteSession operation.
+
+ /**
+ * Indicates the RouteSessionOperation started.
+ * The key (in the base state item) is the transactionId of this operation.
+ */
+ class RouteSessionOpStart extends RoutingStateItem
+ {
+ string transactionId;
+ AsteriskSCF::SessionCommunications::V1::Session *source;
+ string destination;
+ };
+
+
+ class RouteSessionOpWaitLookupState extends RoutingStateItem
+ {
+ string transactionId;
+ };
+
+
+ class RouteSessionOpBridgingState extends RoutingStateItem
+ {
+ string transactionId;
+ AsteriskSCF::Core::Endpoint::V1::EndpointSeq endpoints;
+ };
+
+ /**
+ * Indicates the RouteSessionOperation completed.
+ * The key (in the base state item) is the transactionId of this operation.
+ */
+ class RouteSessionOpComplete extends RoutingStateItem
+ {
+ string transactionId;
+ };
+
+
+ /////////////////////////////////////
+ // These state items represent the state transistions of the ConnectBridgedSessionsWithDestination operation.
+
+ /**
+ * Indicates the ConnectBridgedSessionsWithDestinationOperation started.
+ * The key (in the base state item) is the transactionId of this operation.
+ */
+ class ConnectBridgedSessionsWithDestinationOpStart
+ {
+ string transactionId;
+ AsteriskSCF::SessionCommunications::V1::Session *sessionToReplace;
+ string destination;
+ };
+
+ class ConnectBridgedSessionsWithDestinationOpWaitLookupState extends RoutingStateItem
+ {
+ string transactionId;
+ };
+
+
+ class ConnectBridgedSessionsWithDestinationOpBridgingState extends RoutingStateItem
+ {
+ string transactionId;
+ AsteriskSCF::Core::Endpoint::V1::EndpointSeq endpoints;
+ };
+
+ /**
+ * Indicates the ConnectBridgedSessionsWithDestinationOperation completed.
+ * The key (in the base state item) is the transactionId of this operation.
+ */
+ class ConnectBridgedSessionsWithDestinationOpComplete extends RoutingStateItem
+ {
+ string transactionId;
+ };
+
+ /////////////////////////////////////
+ // These state items represent the state transistions of the ConnectBridgedSessions operation.
+
+ /**
+ * Indicates the ConnectBridgedSessionsOperation started.
+ * The key (in the base state item) is the transactionId of this operation.
+ */
+ class ConnectBridgedSessionsOpStart extends RoutingStateItem
+ {
+ string transactionId;
+ AsteriskSCF::SessionCommunications::V1::Session *sessionToReplace;
+ AsteriskSCF::SessionCommunications::V1::Session *bridgedSession;
+
+ string destination;
+ };
+
+ /**
+ * Indicates the ConnectBridgedSessionsOperation completed.
+ * The key (in the base state item) is the transactionId of this operation.
+ */
+ class ConnectBridgedSessionsOpComplete extends RoutingStateItem
+ {
+ string transactionId;
+ };
+
+ ///////////////////////////////////
+ // Endpoint locator state items
+
+ /**
+ * Represents an added endpoint locator.
+ * The key (in the base state item) is the locatorId.
+ */
+ class EndpointLocatorAdd extends RoutingStateItem
+ {
+ AsteriskSCF::Core::Routing::V1::RegExSeq regExList;
+ AsteriskSCF::Core::Routing::V1::EndpointLocator *locator;
+ };
+
+ /**
+ * Represents a removed endpoint locator.
+ * The key (in the base state item) is the locatorId.
+ */
+ class EndpointLocatorRemove extends RoutingStateItem
+ {
+ };
+
+}; //module V1
+}; //module BasicRouting
+}; //module Asterisk SCF
diff --git a/src/BasicRoutingServiceApp.cpp b/src/BasicRoutingServiceApp.cpp
index 5a5bc78..b572f43 100644
--- a/src/BasicRoutingServiceApp.cpp
+++ b/src/BasicRoutingServiceApp.cpp
@@ -20,17 +20,20 @@
#include <boost/shared_ptr.hpp>
#include "SmartProxy.h"
+#include "ReplicaIf.h"
#include "Core/Routing/RoutingIf.h"
#include "Core/Discovery/ServiceLocatorIf.h"
#include "SessionCommunications/SessionCommunicationsIf.h"
#include "System/Component/ComponentServiceIf.h"
+#include "BasicRoutingStateReplicationIf.h"
+#include "BasicRoutingStateReplicator.h"
#include "LuaScriptProcessor.h"
#include "RoutingServiceEventPublisher.h"
#include "EndpointRegistry.h"
#include "RoutingAdmin.h"
#include "SessionRouter.h"
-#include "AsteriskSCF/WorkQueue/SimpleWorkQueue.h"
+#include "AsteriskSCF/Threading/SimpleWorkQueue.h"
#include "IceLogger.h"
#include "logger.h"
@@ -42,6 +45,7 @@ using namespace AsteriskSCF::Core::Discovery::V1;
using namespace AsteriskSCF::System::Component::V1;
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::SmartProxy;
+using namespace AsteriskSCF::BasicRoutingService::V1;
namespace
{
@@ -61,7 +65,7 @@ public:
: mDone(false),
mInitialized(false),
mRunning(false),
- mWorkQueue( new SimpleWorkQueue("SessionRouterWorkQueue", lg))
+ mWorkQueue( new AsteriskSCF::Threading::SimpleWorkQueue("SessionRouterWorkQueue", lg))
{
}
@@ -79,20 +83,28 @@ public:
void resume();
void suspend();
+ void activated();
+ void onStandby();
+
public: // Overrides of IceBox::Service
virtual void start(const string& name, const Ice::CommunicatorPtr& ic, const Ice::StringSeq& args);
virtual void stop();
private:
void initialize();
+ void locateBridgeManager();
+ void locateStateReplicator();
void registerWithServiceLocator();
void deregisterFromServiceLocator();
void setCategory(const Discovery::V1::ServiceManagementPrx& serviceManagement, const string& category);
+ void listenToStateReplicator();
+ void stopListeningToStateReplicator();
+ void pushStateReplicator();
bool mDone;
bool mInitialized;
bool mRunning;
- boost::shared_ptr<SimpleWorkQueue> mWorkQueue;
+ boost::shared_ptr<AsteriskSCF::Threading::SimpleWorkQueue> mWorkQueue;
std::string mAppName;
ServiceLocatorManagementPrx mServiceLocatorManagement;
@@ -108,9 +120,15 @@ private:
RoutingServiceAdminPtr mAdminInteface;
ComponentServicePtr mComponentService;
AsteriskSCF::SmartProxy::SmartProxy<BridgeManagerPrx> mBridgeManager;
- RoutingEventsPtr mEventPublisher;
+ RoutingServiceEventPublisherPtr mEventPublisher;
EndpointRegistryPtr mEndpointRegistry;
+ // Replication support
+ ReplicaPtr mReplicaService;
+ AsteriskSCF::SmartProxy::SmartProxy<RoutingStateReplicatorPrx> mStateReplicator;
+ RoutingStateReplicatorListenerPtr mReplicatorListener;
+ RoutingStateReplicatorListenerPrx mReplicatorListenerProxy;
+
// Implementation
Ice::ObjectAdapterPtr mAdapter;
Ice::CommunicatorPtr mCommunicator;
@@ -120,6 +138,7 @@ static const string RegistryLocatorObjectId("RoutingServiceLocatorRegistry");
static const string RoutingAdminObjectId("RoutingAdmin");
static const string ComponentServiceId("BasicRoutingComponent");
static const string SessionRouterObjectId("SessionRouter");
+static const string ReplicaServiceId("BasicRoutingServiceReplica");
/**
* This class provides implementation for the ComponentService interface.
@@ -154,6 +173,133 @@ private:
};
/**
+ * This class provides implementation for the Replica interface.
+ * It also tracks the active/standby state of this component.
+ */
+class ReplicaImpl : public Replica
+{
+public:
+ ReplicaImpl(BasicRoutingServiceApp &app, Ice::ObjectAdapterPtr adapter, bool active) : mApp(app), mAdapter(adapter), mPaused(false), mActive(active)
+ {
+ if (mActive)
+ {
+ activate();
+ }
+ else
+ {
+ standby();
+ }
+ }
+
+ bool isActive(const Ice::Current&)
+ {
+ return mActive;
+ }
+
+ bool isActive()
+ {
+ return mActive;
+ }
+
+ bool activate(const Ice::Current& = ::Ice::Current())
+ {
+ mActive = true;
+ mApp.activated();
+
+ for (vector<AsteriskSCF::System::Component::V1::ReplicaListenerPrx>::const_iterator listener = mListeners.begin(); listener != mListeners.end(); ++listener)
+ {
+ (*listener)->activated(ReplicaPrx::uncheckedCast(mAdapter->createDirectProxy(mAdapter->getCommunicator()->stringToIdentity(ReplicaServiceId))));
+ }
+
+ return true;
+ }
+
+ void standby(const Ice::Current& = ::Ice::Current())
+ {
+ mActive = false;
+ mApp.onStandby();
+
+ for (vector<AsteriskSCF::System::Component::V1::ReplicaListenerPrx>::const_iterator listener = mListeners.begin(); listener != mListeners.end(); ++listener)
+ {
+ (*listener)->onStandby(ReplicaPrx::uncheckedCast(mAdapter->createDirectProxy(mAdapter->getCommunicator()->stringToIdentity(ReplicaServiceId))));
+ }
+ }
+
+ void addListener(const AsteriskSCF::System::Component::V1::ReplicaListenerPrx& listener, const Ice::Current&)
+ {
+ mListeners.push_back(listener);
+ }
+
+ void removeListener(const AsteriskSCF::System::Component::V1::ReplicaListenerPrx& listener, const Ice::Current&)
+ {
+ mListeners.erase(std::remove(mListeners.begin(), mListeners.end(), listener), mListeners.end());
+ }
+
+private:
+ /**
+ * Pointer to the object adapter we exist on.
+ */
+ Ice::ObjectAdapterPtr mAdapter;
+
+ /**
+ * Listeners that we need to push state change notifications out to.
+ */
+ vector<AsteriskSCF::System::Component::V1::ReplicaListenerPrx> mListeners;
+
+ bool mPaused;
+ bool mActive;
+
+ BasicRoutingServiceApp& mApp;
+};
+
+void BasicRoutingServiceApp::activated()
+{
+ mEventPublisher->setActive(true);
+ mEndpointRegistry->setActive(true);
+ stopListeningToStateReplicator();
+}
+
+void BasicRoutingServiceApp::onStandby()
+{
+ mEventPublisher->setActive(false);
+ mEndpointRegistry->setActive(false);
+ listenToStateReplicator();
+}
+
+/**
+ * Register as a listener to our state replicator.
+ * This is how a standby component stays updated with dynamic state changes
+ * in the active component.
+ */
+void BasicRoutingServiceApp::listenToStateReplicator()
+{
+ // Do we have a reference to our state replicator?
+ if (mStateReplicator == 0)
+ {
+ return;
+ }
+
+ // Are we in standby mode?
+ if (mReplicaService->isActive() == false)
+ {
+ mStateReplicator->addListener(mReplicatorListenerProxy);
+ }
+}
+
+/**
+ *
+ */
+void BasicRoutingServiceApp::stopListeningToStateReplicator()
+{
+ if (mReplicaService->isActive() == true)
+ {
+ return;
+ }
+
+ mStateReplicator->removeListener(mReplicatorListenerProxy);
+}
+
+/**
* Helper function to add some parameters to one of our registered interfaces in the ServiceLocator, so that
* other components can look up our interfaces.
*/
@@ -242,6 +388,41 @@ void BasicRoutingServiceApp::deregisterFromServiceLocator()
}
/**
+ * Locate the BridgeManager using the Service Locator.
+ */
+void BasicRoutingServiceApp::locateBridgeManager()
+{
+ mBridgeManager = AsteriskSCF::SmartProxy::SmartProxy<BridgeManagerPrx>(
+ mServiceLocator,
+ new ServiceLocatorParams(BridgeServiceDiscoveryCategory),
+ lg);
+ mSessionRouter->setBridgeManager(mBridgeManager);
+
+ if (!mBridgeManager.isInitialized())
+ {
+ lg(Debug) << "Probabaly safe to ignore ServiceNotFound during startup. Will attempt to locate Bridge Service again when it is needed.";
+ }
+}
+
+/**
+ * Locate our State Replicator using the Service Locator.
+ */
+void BasicRoutingServiceApp::locateStateReplicator()
+{
+ BasicRoutingService::V1::RoutingStateReplicatorParamsPtr replicatorParams = new BasicRoutingService::V1::RoutingStateReplicatorParams();
+ replicatorParams->category = BasicRoutingService::V1::StateReplicatorDiscoveryCategory;
+ replicatorParams->name = mCommunicator->getProperties()->getPropertyWithDefault("Sip.StateReplicatorName", "default");
+
+ AsteriskSCF::SmartProxy::SmartProxy<RoutingStateReplicatorPrx> pw(mServiceLocator, replicatorParams, lg);
+ mStateReplicator = pw;
+}
+
+void BasicRoutingServiceApp::pushStateReplicator()
+{
+ mEventPublisher->setStateReplicator(mStateReplicator);
+}
+
+/**
* Create the primary functional objects of this component.
* @param appName Name of the application or component.
*/
@@ -252,7 +433,9 @@ void BasicRoutingServiceApp::initialize()
// Create the adapter.
mAdapter = mCommunicator->createObjectAdapter("BasicRoutingServiceAdapter");
- mEventPublisher = new RoutingServiceEventPublisher(mAdapter);
+ bool isActive = !(mCommunicator->getProperties()->getPropertyWithDefault("BasicRoutingService.Standby", "no") == "yes");
+
+ mEventPublisher = new RoutingServiceEventPublisher(mAdapter, isActive);
// setup the logger
ConfiguredIceLoggerPtr mIceLogger = createIceLogger(mAdapter);
@@ -260,7 +443,7 @@ void BasicRoutingServiceApp::initialize()
// Create and configure the EndpointRegistry.
ScriptProcessor* scriptProcesor(new LuaScriptProcessor());
- mEndpointRegistry = new EndpointRegistry(scriptProcesor, mEventPublisher);
+ mEndpointRegistry = new EndpointRegistry(scriptProcesor, mEventPublisher, isActive);
// Publish the LocatorRegistry interface.
mAdapter->add(mEndpointRegistry, mCommunicator->stringToIdentity(RegistryLocatorObjectId));
@@ -279,6 +462,15 @@ void BasicRoutingServiceApp::initialize()
mComponentService = new ComponentServiceImpl(*this);
mAdapter->add(mComponentService, mCommunicator->stringToIdentity(ComponentServiceId));
+ // Create and publish our Replica interface support. This interface allows this component
+ // to be activated or placed in standby mode.
+ mReplicaService = new ReplicaImpl(*this, mAdapter, isActive);
+ mAdapter->add(mReplicaService, mCommunicator->stringToIdentity(ReplicaServiceId));
+
+ // Create and publish our state replicator listener interface.
+ mReplicatorListener = new RoutingStateReplicatorListenerI(mEndpointRegistry);
+ mReplicatorListenerProxy = RoutingStateReplicatorListenerPrx::uncheckedCast(mAdapter->addWithUUID(mReplicatorListener));
+
mAdapter->activate();
// Get a proxy to the interface for the Service Locator.
@@ -291,16 +483,9 @@ void BasicRoutingServiceApp::initialize()
lg(Error) << "Problems in " << mAppName << " initialization(): " << exception.what();
}
- mBridgeManager = AsteriskSCF::SmartProxy::SmartProxy<BridgeManagerPrx>(
- mServiceLocator,
- new ServiceLocatorParams(BridgeServiceDiscoveryCategory),
- lg);
- mSessionRouter->setBridgeManager(mBridgeManager);
+ locateBridgeManager();
- if (!mBridgeManager.isInitialized())
- {
- lg(Debug) << "Probabaly safe to ignore ServiceNotFound during startup. Will attempt to locate Bridge Service again when it is needed.";
- }
+ locateStateReplicator();
}
/**
@@ -325,6 +510,11 @@ void BasicRoutingServiceApp::start(const string& name, const Ice::CommunicatorPt
// Plug back into the Asterisk SCF discovery system so that the interfaces we provide
// can be located.
registerWithServiceLocator();
+
+ // Register with the state replicator in case we are in standby mode.
+ // This is done here because during initialize(), the reference to the State Replicator isn't
+ // yet available when then ReplicaImpl is created.
+ listenToStateReplicator();
mRunning = true;
lg(Info) << "Started";
@@ -362,6 +552,10 @@ void BasicRoutingServiceApp::stop()
if (mRunning)
{
deregisterFromServiceLocator();
+
+ // Remove our interfaces from the state replicator.
+ stopListeningToStateReplicator();
+
mAdapter->deactivate();
}
diff --git a/src/BasicRoutingStateReplicator.h b/src/BasicRoutingStateReplicator.h
new file mode 100644
index 0000000..3d915c8
--- /dev/null
+++ b/src/BasicRoutingStateReplicator.h
@@ -0,0 +1,54 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#pragma once
+
+#include <Ice/Ice.h>
+#include "BasicRoutingStateReplicationIf.h"
+#include "StateReplicator.h"
+#include "EndpointRegistry.h"
+
+namespace AsteriskSCF
+{
+namespace BasicRoutingService
+{
+typedef AsteriskSCF::StateReplication::StateReplicator< AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicator,
+ AsteriskSCF::BasicRoutingService::V1::RoutingStateItemPtr,
+ std::string,
+ AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorListenerPrx> RoutingStateReplicatorI;
+typedef IceUtil::Handle<RoutingStateReplicatorI> RoutingStateReplicatorIPtr;
+
+/**
+ * Our RoutingStateReplicatorListener implementation.
+ * This object listens for updates from the state replicator when
+ * this service is in standby mode, and ensures that this instance of the
+ * service has the latest dynamic state in case it is activated.
+ */
+class RoutingStateReplicatorListenerI : public AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorListener
+{
+public:
+ RoutingStateReplicatorListenerI(const EndpointRegistryPtr& registry);
+ ~RoutingStateReplicatorListenerI();
+ void stateRemoved(const Ice::StringSeq&, const Ice::Current&);
+ void stateSet(const AsteriskSCF::BasicRoutingService::V1::RoutingStateItemSeq&, const Ice::Current&);
+ bool operator==(RoutingStateReplicatorListenerI &rhs);
+
+private:
+ struct RoutingStateReplicatorListenerImpl *mImpl;
+};
+
+};
+};
diff --git a/src/BasicRoutingStateReplicatorApp.cpp b/src/BasicRoutingStateReplicatorApp.cpp
new file mode 100644
index 0000000..d102102
--- /dev/null
+++ b/src/BasicRoutingStateReplicatorApp.cpp
@@ -0,0 +1,234 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#include <Ice/Ice.h>
+#include <IceUtil/UUID.h>
+#include <IceStorm/IceStorm.h>
+#include <IceBox/IceBox.h>
+
+#include "ServiceLocatorIf.h"
+#include "ComponentServiceIf.h"
+#include "BasicRoutingStateReplicationIf.h"
+#include "BasicRoutingStateReplicator.h"
+#include "IceLogger.h"
+#include "logger.h"
+
+using namespace std;
+using namespace AsteriskSCF::Core;
+using namespace AsteriskSCF::Core::Discovery::V1;
+using namespace AsteriskSCF::System::Component::V1;
+using namespace AsteriskSCF::System::Logging;
+using namespace AsteriskSCF::BasicRoutingService;
+using namespace AsteriskSCF::BasicRoutingService::V1;
+using namespace AsteriskSCF::Core::Routing::V1;
+
+namespace
+{
+Logger &lg = getLoggerFactory().getLogger("AsteriskSCF.BasicRoutingServiceStateReplicator");
+}
+
+class BasicRoutingStateReplicatorService : public IceBox::Service
+{
+public:
+ BasicRoutingStateReplicatorService() { };
+ ~BasicRoutingStateReplicatorService()
+ {
+ mComponentService = 0;
+ mAdapter = 0;
+ mStateReplicator = 0;
+ };
+ virtual void start(const string &name, const Ice::CommunicatorPtr& ic, const Ice::StringSeq& args);
+ virtual void stop();
+
+private:
+ void initialize(std::string appName, const Ice::CommunicatorPtr& ic);
+ void registerWithServiceLocator(const Ice::CommunicatorPtr& ic);
+ void deregisterFromServiceLocator();
+ std::string mAppName;
+ //vector<BasicRoutingStateReplicatorListenerPrx> mListeners;
+ Ice::ObjectAdapterPtr mAdapter;
+ ServiceLocatorManagementPrx mServiceLocatorManagement;
+ Discovery::V1::ServiceManagementPrx mComponentServiceManagement;
+ Discovery::V1::ServiceManagementPrx mStateReplicationManagement;
+ ConfiguredIceLoggerPtr mIceLogger;
+ ComponentServicePtr mComponentService;
+ AsteriskSCF::BasicRoutingService::RoutingStateReplicatorIPtr mStateReplicator;
+};
+
+static const string ComponentServiceId("BasicRoutingStateReplicatorComponent");
+static const string ServiceDiscoveryId("BasicRoutingStateReplicatorService");
+
+/**
+ * This class provides implementation for the ComponentService interface, which
+ * every Asterisk SCF component is expected to publish.
+ */
+class ComponentServiceImpl : public ComponentService
+{
+public:
+ ComponentServiceImpl(BasicRoutingStateReplicatorService &service) : mService(service) {}
+
+public: // Overrides of the ComponentService interface.
+ virtual void suspend(const ::Ice::Current& = ::Ice::Current())
+ {
+ // TBD
+ }
+
+ virtual void resume(const ::Ice::Current& = ::Ice::Current())
+ {
+ // TBD
+ }
+
+ virtual void shutdown(const ::Ice::Current& = ::Ice::Current())
+ {
+ // TBD
+ }
+
+private:
+ BasicRoutingStateReplicatorService& mService;
+};
+
+class BasicRoutingStateReplicatorCompare : public ServiceLocatorParamsCompare
+{
+public:
+ BasicRoutingStateReplicatorCompare(string name) : mName(name) {}
+ bool isSupported(const ServiceLocatorParamsPtr ¶ms, const Ice::Current &)
+ {
+ RoutingStateReplicatorParamsPtr routingParams = RoutingStateReplicatorParamsPtr::dynamicCast(params);
+ if (routingParams->name == mName)
+ {
+ return true;
+ }
+ return false;
+ }
+private:
+ string mName;
+};
+
+typedef IceUtil::Handle<BasicRoutingStateReplicatorCompare> BasicRoutingStateReplicatorComparePtr;
+
+/**
+ * Register this component's primary public interfaces with the Service Locator.
+ * This enables other Asterisk SCF components to locate our interfaces.
+ */
+void BasicRoutingStateReplicatorService::registerWithServiceLocator(const Ice::CommunicatorPtr& ic)
+{
+ try
+ {
+ // Get a proxy to the management interface for the Service Locator, so we can add ourselves into the system discovery mechanisms.
+ mServiceLocatorManagement = ServiceLocatorManagementPrx::checkedCast(ic->propertyToProxy("LocatorServiceManagement.Proxy"));
+
+ if (mServiceLocatorManagement == 0)
+ {
+ lg(Error) << "Unable to obtain proxy to ServiceLocatorManagement interface. Check config file. This component can't be found until this is corrected." << endl;
+ return;
+ }
+
+ // Get a proxy to our ComponentService interface and add it to the Service Locator.
+ Ice::ObjectPrx componentServiceObjectPrx = mAdapter->createDirectProxy(ic->stringToIdentity(ComponentServiceId));
+ ComponentServicePrx componentServicePrx = ComponentServicePrx::checkedCast(componentServiceObjectPrx);
+
+ // The GUID passed in to add service needs to be unique for reporting.
+ string componentServiceGuid(AsteriskSCF::BasicRoutingService::V1::StateReplicatorComponentCategory);
+ mComponentServiceManagement = ServiceManagementPrx::uncheckedCast(mServiceLocatorManagement->addService(componentServicePrx, componentServiceGuid));
+
+ // Add category as a parameter to enable other components look this component up.
+ ServiceLocatorParamsPtr genericparams = new ServiceLocatorParams();
+ genericparams->category = AsteriskSCF::BasicRoutingService::V1::StateReplicatorComponentCategory;
+
+ mComponentServiceManagement->addLocatorParams(genericparams, "");
+
+ Ice::ObjectPrx stateReplicatorObjectPrx = mAdapter->createDirectProxy(ic->stringToIdentity(ServiceDiscoveryId));
+ RoutingStateReplicatorPrx stateReplicatorPrx = RoutingStateReplicatorPrx::checkedCast(stateReplicatorObjectPrx);
+
+ string stateReplicationGuid(AsteriskSCF::BasicRoutingService::V1::StateReplicatorDiscoveryCategory);
+ mStateReplicationManagement = ServiceManagementPrx::uncheckedCast(mServiceLocatorManagement->addService(stateReplicatorPrx, stateReplicationGuid));
+
+ ServiceLocatorParamsPtr discoveryParams = new ServiceLocatorParams();
+ discoveryParams->category = AsteriskSCF::BasicRoutingService::V1::StateReplicatorDiscoveryCategory;
+
+ string replicatorName = ic->getProperties()->getPropertyWithDefault("RoutingStateReplicator.Name", "default");
+ BasicRoutingStateReplicatorCompare* nameCompare = new BasicRoutingStateReplicatorCompare(replicatorName);
+ ServiceLocatorParamsComparePrx compareProxy = ServiceLocatorParamsComparePrx::uncheckedCast(mAdapter->addWithUUID(nameCompare));
+
+ string compareGuid = IceUtil::generateUUID();
+ mServiceLocatorManagement->addCompare(compareGuid, compareProxy);
+ mStateReplicationManagement->addLocatorParams(discoveryParams, compareGuid);
+
+ // TBD... We may have other interfaces to publish to the Service Locator.
+ }
+ catch(...)
+ {
+ lg(Error) << "Exception in " << mAppName << " registerWithServiceLocator()" << endl;
+ }
+}
+
+/**
+ * Deregister this component's primary public interfaces from the Service Locator.
+ * This is done at shutdown, and whenever we want to keep other services from locating
+ * our interfaces.
+ */
+void BasicRoutingStateReplicatorService::deregisterFromServiceLocator()
+{
+ try
+ {
+ mComponentServiceManagement->unregister();
+ }
+ catch(...)
+ {
+ lg(Error) << "Exception in deregisterFromServiceLocator()." << endl;
+ }
+}
+
+void BasicRoutingStateReplicatorService::initialize(const std::string appName, const Ice::CommunicatorPtr& ic)
+{
+ mAdapter = ic->createObjectAdapter("BasicRoutingStateReplicator");
+
+ // setup logging client
+ mIceLogger = createIceLogger(mAdapter);
+ getLoggerFactory().setLogOutput(mIceLogger->getLogger());
+
+ mAppName = appName;
+ // Create and publish our ComponentService interface support.
+ mComponentService = new ComponentServiceImpl(*this);
+ mAdapter->add(mComponentService, ic->stringToIdentity(ComponentServiceId));
+ mStateReplicator = new RoutingStateReplicatorI();
+ mAdapter->add(mStateReplicator, ic->stringToIdentity(ServiceDiscoveryId));
+
+ mAdapter->activate();
+}
+
+void BasicRoutingStateReplicatorService::start(const string &name, const Ice::CommunicatorPtr& ic, const Ice::StringSeq& args)
+{
+ initialize(name, ic);
+ // Plug into the Asterisk SCF discovery system so that the interfaces we provide
+ // can be located.
+ registerWithServiceLocator(ic);
+}
+
+void BasicRoutingStateReplicatorService::stop()
+{
+ // Remove our interfaces from the service locator.
+ deregisterFromServiceLocator();
+}
+
+extern "C"
+{
+ASTERISK_SCF_ICEBOX_EXPORT IceBox::Service* create(Ice::CommunicatorPtr communicator)
+{
+ return new BasicRoutingStateReplicatorService;
+}
+}
+
diff --git a/src/BasicRoutingStateReplicatorListener.cpp b/src/BasicRoutingStateReplicatorListener.cpp
new file mode 100644
index 0000000..d8271fa
--- /dev/null
+++ b/src/BasicRoutingStateReplicatorListener.cpp
@@ -0,0 +1,112 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#include <IceUtil/UUID.h>
+
+#include <boost/thread.hpp>
+#include <boost/shared_ptr.hpp>
+
+#include "BasicRoutingStateReplicator.h"
+
+using namespace AsteriskSCF::BasicRoutingService::V1;
+
+namespace AsteriskSCF
+{
+namespace BasicRoutingService
+{
+
+class RoutingStateReplicatorItem
+{
+public:
+ RoutingStateReplicatorItem() { }
+ ~RoutingStateReplicatorItem()
+ {
+
+ }
+
+private:
+
+};
+
+/**
+ * Hidden details of our RoutingStateReplicatorListener implementation.
+ * This object listens for updates from the state replicator when
+ * this service is in standby mode, and ensures that this instance of the
+ * service is prepared to be activated.
+ */
+struct RoutingStateReplicatorListenerImpl
+{
+public:
+ RoutingStateReplicatorListenerImpl(const EndpointRegistryPtr& registry)
+ : mId(IceUtil::generateUUID())
+ {
+ }
+
+ void removeStateNoticeImpl(const Ice::StringSeq& itemKeys, const Ice::Current& current)
+ {
+ }
+
+ void setStateNoticeImpl(const RoutingStateItemSeq& items, const Ice::Current& current)
+ {
+ for (RoutingStateItemSeq::const_iterator item = items.begin(); item != items.end(); ++item)
+ {
+ EndpointLocatorAddPtr locatorAdd;
+ EndpointLocatorRemovePtr locatorRemove;
+ boost::shared_ptr<RoutingStateReplicatorItem> localitem;
+
+ // Depending on the type of state item we apply it differently
+ if ((locatorAdd = EndpointLocatorAddPtr::dynamicCast((*item))))
+ {
+ mEndpointRegistry->addEndpointLocator(locatorAdd->key, locatorAdd->regExList, locatorAdd->locator, current);
+ }
+ else if ((locatorRemove = EndpointLocatorRemovePtr::dynamicCast((*item))))
+ {
+ mEndpointRegistry->removeEndpointLocator(locatorAdd->key, current);
+ }
+ }
+ }
+
+ std::string mId;
+ EndpointRegistryPtr mEndpointRegistry;
+};
+
+RoutingStateReplicatorListenerI::RoutingStateReplicatorListenerI(const EndpointRegistryPtr& registry)
+ : mImpl(new RoutingStateReplicatorListenerImpl(registry))
+{
+}
+
+RoutingStateReplicatorListenerI::~RoutingStateReplicatorListenerI()
+{
+ delete mImpl;
+}
+
+void RoutingStateReplicatorListenerI::stateRemoved(const Ice::StringSeq& itemKeys, const Ice::Current& current)
+{
+ mImpl->removeStateNoticeImpl(itemKeys, current);
+}
+
+void RoutingStateReplicatorListenerI::stateSet(const RoutingStateItemSeq& items, const Ice::Current& current)
+{
+ mImpl->setStateNoticeImpl(items, current);
+}
+
+bool RoutingStateReplicatorListenerI::operator==(RoutingStateReplicatorListenerI &rhs)
+{
+ return mImpl->mId == rhs.mImpl->mId;
+}
+
+};
+};
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 00fe756..592a0b3 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -7,6 +7,9 @@ asterisk_scf_component_add_slice(BasicRoutingService ServiceLocatorIf)
asterisk_scf_component_add_slice(BasicRoutingService EndpointIf)
asterisk_scf_component_add_slice(BasicRoutingService SessionCommunicationsIf)
asterisk_scf_component_add_slice(BasicRoutingService ComponentServiceIf)
+asterisk_scf_component_add_slice(BasicRoutingService ../local-slice/BasicRoutingStateReplicationIf.ice)
+asterisk_scf_component_add_slice(BasicRoutingService ReplicaIf)
+
asterisk_scf_component_add_file(BasicRoutingService BasicRoutingServiceApp.cpp)
asterisk_scf_component_add_file(BasicRoutingService SessionRouter.cpp)
asterisk_scf_component_add_file(BasicRoutingService SessionRouter.h)
@@ -30,6 +33,8 @@ asterisk_scf_component_add_file(BasicRoutingService ConnectBridgedSessionsOperat
asterisk_scf_component_add_file(BasicRoutingService ConnectBridgedSessionsOperation.cpp)
asterisk_scf_component_add_file(BasicRoutingService SessionListener.h)
asterisk_scf_component_add_file(BasicRoutingService SessionListener.cpp)
+asterisk_scf_component_add_file(BasicRoutingService BasicRoutingStateReplicator.h)
+asterisk_scf_component_add_file(BasicRoutingService BasicRoutingStateReplicatorListener.cpp)
asterisk_scf_component_add_ice_libraries(BasicRoutingService IceStorm)
asterisk_scf_component_add_boost_libraries(BasicRoutingService thread date_time core regex)
@@ -42,11 +47,42 @@ include_directories(${logger_dir}/client/src)
include_directories(${utils_dir}/SmartProxy/src)
-include_directories(${util_cpp_dir}/WorkQueue/include)
+include_directories(${util_cpp_dir}/Threading/include)
+include_directories(${util_cpp_dir}/StateMachine/include)
asterisk_scf_component_build_icebox(BasicRoutingService)
target_link_libraries(BasicRoutingService ${LUA_LIBRARIES})
target_link_libraries(BasicRoutingService logging-client)
-target_link_libraries(BasicRoutingService WorkQueue)
+target_link_libraries(BasicRoutingService Threading)
#asterisk_scf_component_install(BasicRoutingService RUNTIME bin "Basic Routing Service" Core)
+
+########################################
+# Basic Routing State replicator
+
+asterisk_scf_component_init(BasicRoutingStateReplicator CXX)
+
+asterisk_scf_component_add_slice(BasicRoutingStateReplicator ServiceLocatorIf)
+asterisk_scf_component_add_slice(BasicRoutingStateReplicator ComponentServiceIf)
+asterisk_scf_component_add_slice(BasicRoutingStateReplicator BasicRoutingStateReplicationIf)
+asterisk_scf_component_add_slice(BasicRoutingStateReplicator RoutingIf)
+
+# This line allows us to use the templated state replicator
+# code. This statement is not the most
+# permanent of changes and assumes the directories are
+# structured in the way that gitall structures them.
+
+include_directories(${utils_dir}/StateReplicator/src)
+include_directories(${utils_dir}/SmartProxy/src)
+
+asterisk_scf_component_add_file(BasicRoutingStateReplicator BasicRoutingStateReplicatorApp.cpp)
+asterisk_scf_component_add_file(BasicRoutingStateReplicator BasicRoutingStateReplicator.h)
+
+asterisk_scf_component_add_ice_libraries(BasicRoutingStateReplicator IceStorm)
+
+asterisk_scf_component_build_icebox(BasicRoutingStateReplicator)
+
+target_link_libraries(BasicRoutingStateReplicator logging-client)
+
+#asterisk_scf_component_install(BasicRoutingStateReplicator RUNTIME bin "Basic Routing Service State Replicator" BasicRoutingStateReplicator)
+
diff --git a/src/ConnectBridgedSessionsOperation.cpp b/src/ConnectBridgedSessionsOperation.cpp
index 86bdd22..8bca93d 100644
--- a/src/ConnectBridgedSessionsOperation.cpp
+++ b/src/ConnectBridgedSessionsOperation.cpp
@@ -52,15 +52,16 @@ ConnectBridgedSessionsOperation::ConnectBridgedSessionsOperation(const AMD_Sessi
const ::Ice::Current& current,
const SessionContext& context,
OperationsManager* const listener)
- : SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsPtr>(cb,
- context,
- listener,
- boost::bind(&ConnectBridgedSessionsOperation::connectBridgedSessionsState, this)),
+ : SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsPtr, ConnectBridgedSessionsOp::OperationState>(cb,
+ context,
+ listener,
+ ConnectBridgedSessionsOp::STATE_CONNECT),
mInitiatorCallback(cb),
mSessionToReplace(sessionToReplace),
mBridgedSession(bridgedSession),
mIceCurrent(current)
{
+ mStateMachine.addState(ConnectBridgedSessionsOp::STATE_CONNECT, boost::bind(&ConnectBridgedSessionsOperation::connectBridgedSessionsState, this));
}
ConnectBridgedSessionsOperation::~ConnectBridgedSessionsOperation()
diff --git a/src/ConnectBridgedSessionsOperation.h b/src/ConnectBridgedSessionsOperation.h
index 17908b3..bd96b3d 100644
--- a/src/ConnectBridgedSessionsOperation.h
+++ b/src/ConnectBridgedSessionsOperation.h
@@ -15,8 +15,11 @@
*/
#pragma once
+#include <boost/function.hpp>
+
#include "RoutingIf.h"
#include "SessionCommunications/SessionCommunicationsIf.h"
+#include "AsteriskSCF/StateMachine/SimpleStateMachine.h"
#include "SessionRouterOperation.h"
@@ -25,6 +28,21 @@ namespace AsteriskSCF
namespace BasicRoutingService
{
+// This namespace exists solely to avoid enumerators from different types
+// colliding.
+namespace ConnectBridgedSessionsOp
+{
+ /**
+ * This enum defines the states for the operation class below.
+ * It would have been defined within the class except that it is needed as a template
+ * parameter to the base class. (C++0x enum class will alleviate such nonsense.)
+ */
+ enum OperationState
+ {
+ STATE_CONNECT
+ };
+}
+
/**
* This class represents an operation to replace one session in a Bridge with sessions
* from another bridge. No routing is actually performed. This operation exists here for consistency.
@@ -35,7 +53,8 @@ namespace BasicRoutingService
*
* This object is an instance of WorkQueue::Work so that it can enqueued to a worker thread.
*/
-class ConnectBridgedSessionsOperation : public SessionRouterOperation<AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr>
+class ConnectBridgedSessionsOperation : public SessionRouterOperation<AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr,
+ ConnectBridgedSessionsOp::OperationState>
{
public:
ConnectBridgedSessionsOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr& cb,
diff --git a/src/ConnectBridgedSessionsWithDestinationOperation.cpp b/src/ConnectBridgedSessionsWithDestinationOperation.cpp
index f2ea73e..718d348 100644
--- a/src/ConnectBridgedSessionsWithDestinationOperation.cpp
+++ b/src/ConnectBridgedSessionsWithDestinationOperation.cpp
@@ -50,15 +50,19 @@ ConnectBridgedSessionsWithDestinationOperation::ConnectBridgedSessionsWithDestin
const ::Ice::Current& current,
const SessionContext& context,
OperationsManager* const listener)
- : SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr>(cb,
+ : SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr, ConnectBridgedSessionsWithDestinationOp::OperationState>(cb,
context,
- listener,
- boost::bind(&ConnectBridgedSessionsWithDestinationOperation::lookupState, this)),
+ listener,
+ ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP),
mInitiatorCallback(cb),
mSessionToReplace(sessionToReplace),
mDestination(destination),
mIceCurrent(current)
{
+ mStateMachine.addState(ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP, boost::bind(&ConnectBridgedSessionsWithDestinationOperation::lookupState, this));
+ mStateMachine.addState(ConnectBridgedSessionsWithDestinationOp::STATE_WAIT_LOOKUP_RESULTS, boost::bind(&ConnectBridgedSessionsWithDestinationOperation::waitOnLookupState, this));
+ mStateMachine.addState(ConnectBridgedSessionsWithDestinationOp::STATE_BRIDGING, boost::bind(&ConnectBridgedSessionsWithDestinationOperation::establishBridgeState, this));
+
}
ConnectBridgedSessionsWithDestinationOperation::~ConnectBridgedSessionsWithDestinationOperation()
@@ -103,8 +107,7 @@ void ConnectBridgedSessionsWithDestinationOperation::lookupState()
// Route the destination
lg(Debug) << "connectBridgedSessionsWithDestination(): Routing destination " << mDestination;
- // Set the state to exectute after lookup.
- setState(boost::bind(&ConnectBridgedSessionsWithDestinationOperation::establishBridgeState, this), "establishBridgeState");
+ mStateMachine.setNextState(ConnectBridgedSessionsWithDestinationOp::STATE_WAIT_LOOKUP_RESULTS);
// Lookup the destination. This will use AMI, and the callback should
// schedule us to execute again.
@@ -112,6 +115,22 @@ void ConnectBridgedSessionsWithDestinationOperation::lookupState()
lookupEndpoints(mDestination, mIceCurrent);
}
+/**
+ * This is a state handler for one of this operation's states.
+ * Entering this state, the AMI call(s) to EndpointLocators have been made,
+ * and we are waiting for the results.
+ */
+void ConnectBridgedSessionsWithDestinationOperation::waitOnLookupState()
+{
+ if (mFinished)
+ {
+ return;
+ }
+
+ // Set the state handler to exectute once we've looked up our endpoints.
+ mStateMachine.setNextState(ConnectBridgedSessionsWithDestinationOp::STATE_BRIDGING);
+}
+
/**
* This is a state handler for one of this operation's states.
* Entering this state, the destination endpoint has been obtained.
diff --git a/src/ConnectBridgedSessionsWithDestinationOperation.h b/src/ConnectBridgedSessionsWithDestinationOperation.h
index 95655c5..06ef0a8 100644
--- a/src/ConnectBridgedSessionsWithDestinationOperation.h
+++ b/src/ConnectBridgedSessionsWithDestinationOperation.h
@@ -15,8 +15,11 @@
*/
#pragma once
+#include <boost/function.hpp>
+
#include "RoutingIf.h"
#include "SessionCommunications/SessionCommunicationsIf.h"
+#include "AsteriskSCF/StateMachine/SimpleStateMachine.h"
#include "SessionRouterOperation.h"
@@ -25,6 +28,23 @@ namespace AsteriskSCF
namespace BasicRoutingService
{
+// This namespace exists solely to avoid enumerators from different types
+// colliding.
+namespace ConnectBridgedSessionsWithDestinationOp
+{
+ /**
+ * This enum defines the states for the operation class below.
+ * It would have been defined within the class except that it is needed as a template
+ * parameter to the base class. (C++0x enum class will alleviate such nonsense.)
+ */
+ enum OperationState
+ {
+ STATE_LOOKUP,
+ STATE_WAIT_LOOKUP_RESULTS,
+ STATE_BRIDGING
+ };
+}
+
/**
* This operation replaces one session in a Bridge with a new session routable by the
* destination param. This is a specialization of SessionRouterOperation<T> that handles the
@@ -34,7 +54,8 @@ namespace BasicRoutingService
*
* This object is an instance of WorkQueue::Work so that it can enqueued to a worker thread.
*/
- class ConnectBridgedSessionsWithDestinationOperation : public SessionRouterOperation<AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr>
+ class ConnectBridgedSessionsWithDestinationOperation : public SessionRouterOperation<AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr,
+ ConnectBridgedSessionsWithDestinationOp::OperationState>
{
public:
ConnectBridgedSessionsWithDestinationOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb,
@@ -46,12 +67,20 @@ public:
virtual ~ConnectBridgedSessionsWithDestinationOperation();
+
private:
/**
* This is a state handler for one of this operation's states.
*/
void lookupState();
+ /**
+ * This is a state handler for one of this operation's states.
+ * Entering this state, the AMI call(s) to EndpointLocators have been made,
+ * and we are waiting for the results.
+ */
+ void waitOnLookupState();
+
/**
* This is a state handler for one of this operation's states.
* Entering this state, the destination endpoint has been obtained.
diff --git a/src/EndpointRegistry.cpp b/src/EndpointRegistry.cpp
index 542a1b0..d784770 100644
--- a/src/EndpointRegistry.cpp
+++ b/src/EndpointRegistry.cpp
@@ -17,9 +17,12 @@
#include <boost/thread/thread.hpp>
#include <boost/thread/shared_mutex.hpp>
-#include "RoutingServiceEventPublisher.h"
#include "EndpointRegistry.h"
+#include "SmartProxy.h"
+#include "BasicRoutingStateReplicator.h"
+#include "RoutingServiceEventPublisher.h"
#include "ScriptProcessor.h"
+
#include "logger.h"
using namespace ::AsteriskSCF::Core::Endpoint::V1;
@@ -75,8 +78,8 @@ typedef map<std::string, RegisteredLocator> EndpointLocatorMap;
class EndpointRegistryPriv
{
public:
- EndpointRegistryPriv(ScriptProcessor* scriptProcessor, const RoutingEventsPtr& eventPublisher) :
- mScriptProcessor(scriptProcessor), mEventPublisher(eventPublisher)
+ EndpointRegistryPriv(ScriptProcessor* scriptProcessor, const RoutingEventsPtr& eventPublisher, bool active) :
+ mScriptProcessor(scriptProcessor), mEventPublisher(eventPublisher), mActive(active)
{
}
@@ -116,6 +119,7 @@ public:
boost::shared_ptr<ScriptProcessor> mScriptProcessor;
EndpointLocatorMap mEndpointLocatorMap;
const RoutingEventsPtr mEventPublisher;
+ bool mActive;
};
/**
@@ -216,12 +220,16 @@ typedef IceUtil::Handle<LookupResultCollector> LookupResultCollectorPtr;
/**
* Constructor.
*/
-EndpointRegistry::EndpointRegistry(ScriptProcessor* scriptProcessor, const RoutingEventsPtr& eventPublisher) :
- mImpl(new EndpointRegistryPriv(scriptProcessor, eventPublisher))
+EndpointRegistry::EndpointRegistry(ScriptProcessor* scriptProcessor, const RoutingEventsPtr& eventPublisher, bool active) :
+ mImpl(new EndpointRegistryPriv(scriptProcessor, eventPublisher, active))
+{
+}
+
+void EndpointRegistry::setActive(bool isActive)
{
+ mImpl->mActive = isActive;
}
-
/**
* Returns the endpoints that match the specified destination id.
* @param id String identifier of the the destination.
@@ -332,13 +340,13 @@ void EndpointRegistry::addEndpointLocator(const std::string& locatorId, const Re
RegisteredLocator newLocator(locator, regexList);
mImpl->insertLocatorMapItem(locatorId, newLocator);
- mImpl->mEventPublisher->addEndpointLocatorEvent(locatorId, regexList, Event::SUCCESS);
+ mImpl->mEventPublisher->addEndpointLocatorEvent(locatorId, regexList, locator, Event::SUCCESS);
}
catch (...)
{
lg(Error) << "Exception adding EndpointLocator.";
- mImpl->mEventPublisher->addEndpointLocatorEvent(locatorId, regexList, Event::FAILURE);
+ mImpl->mEventPublisher->addEndpointLocatorEvent(locatorId, regexList, locator, Event::FAILURE);
return;
}
}
@@ -356,7 +364,7 @@ void EndpointRegistry::removeEndpointLocator(const std::string& locatorId, const
EndpointLocatorMapIterator existing;
bool exists = locatorExists(mImpl->mEndpointLocatorMap, locatorId, mImpl->mLock, existing);
- if (exists)
+ if (!exists)
{
lg(Warning) << "Received request to remove Endpoint Locator not currently registered. Id = " << locatorId;
mImpl->mEventPublisher->removeEndpointLocatorEvent(locatorId, Event::FAILURE);
diff --git a/src/EndpointRegistry.h b/src/EndpointRegistry.h
index 775338e..466b9f4 100644
--- a/src/EndpointRegistry.h
+++ b/src/EndpointRegistry.h
@@ -18,6 +18,8 @@
#include <boost/shared_ptr.hpp>
#include "RoutingIf.h"
+#include "SmartProxy.h"
+#include "BasicRoutingStateReplicationIf.h"
namespace AsteriskSCF
{
@@ -30,7 +32,13 @@ class ScriptProcessor;
class EndpointRegistry : public AsteriskSCF::Core::Routing::V1::LocatorRegistry
{
public:
- EndpointRegistry(ScriptProcessor* scriptProcessor, const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& eventPublisher );
+ EndpointRegistry(ScriptProcessor* scriptProcessor,
+ const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& eventPublisher,
+ bool active);
+
+ void setActive(bool isActive);
+
+ void setStateReplicator(const AsteriskSCF::SmartProxy::SmartProxy<AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorPrx>& replicator);
/**
* Configure the EndpointRegistry to use a different scriptProcessor than the
diff --git a/src/RouteSessionOperation.cpp b/src/RouteSessionOperation.cpp
index b3c8faf..94c82f2 100644
--- a/src/RouteSessionOperation.cpp
+++ b/src/RouteSessionOperation.cpp
@@ -50,15 +50,18 @@ RouteSessionOperation::RouteSessionOperation(const AMD_SessionRouter_routeSessio
const ::Ice::Current& current,
const SessionContext& context,
OperationsManager* const listener)
- : SessionRouterOperation<AMD_SessionRouter_routeSessionPtr>(cb,
- context,
- listener,
- boost::bind(&RouteSessionOperation::lookupState, this)),
+ : SessionRouterOperation<AMD_SessionRouter_routeSessionPtr, RouteSessionOp::OperationState>(cb,
+ context,
+ listener,
+ RouteSessionOp::STATE_LOOKUP),
mInitiatorCallback(cb),
mSource(source),
mDestination(destination),
mIceCurrent(current)
{
+ mStateMachine.addState(RouteSessionOp::STATE_LOOKUP, boost::bind(&RouteSessionOperation::lookupState, this));
+ mStateMachine.addState(RouteSessionOp::STATE_WAIT_LOOKUP_RESULTS, boost::bind(&RouteSessionOperation::waitOnLookupState, this));
+ mStateMachine.addState(RouteSessionOp::STATE_BRIDGING, boost::bind(&RouteSessionOperation::establishBridgeState, this));
}
RouteSessionOperation::~RouteSessionOperation()
@@ -91,7 +94,7 @@ void RouteSessionOperation::lookupState()
mListenerManager = listener;
// Set the state handler to exectute once we've looked up our endpoints.
- setState(boost::bind(&RouteSessionOperation::establishBridgeState, this), "establishBridgeState");
+ mStateMachine.setNextState(RouteSessionOp::STATE_WAIT_LOOKUP_RESULTS);
// Lookup the destination. This will use AMI, and the callback will
// schedule us to execute again.
@@ -99,6 +102,22 @@ void RouteSessionOperation::lookupState()
lookupEndpoints(mDestination, mIceCurrent);
}
+/**
+ * This is a state handler for one of this operation's states.
+ * Entering this state, the AMI call(s) to EndpointLocators have been made,
+ * and we are waiting for the results.
+ */
+void RouteSessionOperation::waitOnLookupState()
+{
+ if (mFinished)
+ {
+ return;
+ }
+
+ // Set the state handler to exectute once we've looked up our endpoints.
+ mStateMachine.setNextState(RouteSessionOp::STATE_BRIDGING);
+}
+
/**
* This is a state handler for one of this operation's states.
* Entering this state, the destination endpoint has been obtained. This state
diff --git a/src/RouteSessionOperation.h b/src/RouteSessionOperation.h
index 2100853..47202a6 100644
--- a/src/RouteSessionOperation.h
+++ b/src/RouteSessionOperation.h
@@ -15,6 +15,8 @@
*/
#pragma once
+#include <boost/function.hpp>
+
#include "RoutingIf.h"
#include "SessionCommunications/SessionCommunicationsIf.h"
@@ -25,7 +27,24 @@ namespace AsteriskSCF
namespace BasicRoutingService
{
- /**
+// This namespace exists solely to avoid enumerators from different types
+// colliding.
+namespace RouteSessionOp
+{
+ /**
+ * This enum defines the states for the operation class below.
+ * It would have been defined within the class except that it is needed as a template
+ * parameter to the base class. (C++0x enum class will alleviate such nonsense.)
+ */
+ enum OperationState
+ {
+ STATE_LOOKUP,
+ STATE_WAIT_LOOKUP_RESULTS,
+ STATE_BRIDGING
+ };
+}
+
+/**
* This is a specialization of the SessionRouterOperation<T> to handle the
* routeSession() operation. The template parameter T is the type of the routeSession()
* AMD callback handler to allow this object to send results to the initiator of this
@@ -34,7 +53,9 @@ namespace BasicRoutingService
* This object is an instance of WorkQueue::Work so that
* it can be enqueued to a worker thread.
*/
-class RouteSessionOperation : public SessionRouterOperation<AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr>
+
+class RouteSessionOperation : public SessionRouterOperation<AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr,
+ RouteSessionOp::OperationState>
{
public:
RouteSessionOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr& cb,
@@ -47,6 +68,7 @@ public:
virtual ~RouteSessionOperation();
private:
+
/**
* This is a state handler for one of this operation's states.
* This method is called via mCurrentStateHandler when doWork() is executed from the
@@ -54,6 +76,13 @@ private:
*/
void lookupState();
+ /**
+ * This is a state handler for one of this operation's states.
+ * Entering this state, the AMI call(s) to EndpointLocators have been made,
+ * and we are waiting for the results.
+ */
+ void waitOnLookupState();
+
/**
* This is a state handler for one of this operation's states.
* Entering this state, the destination endpoint has been obtained. This state
@@ -70,7 +99,7 @@ private:
AsteriskSCF::SessionCommunications::V1::SessionPrx mSource;
std::string mDestination;
::Ice::Current mIceCurrent;
-
+
}; // class RouteSessionOperation
diff --git a/src/RoutingServiceEventPublisher.cpp b/src/RoutingServiceEventPublisher.cpp
index 99a132a..c3c4256 100644
--- a/src/RoutingServiceEventPublisher.cpp
+++ b/src/RoutingServiceEventPublisher.cpp
@@ -19,11 +19,13 @@
#include <boost/thread/mutex.hpp>
#include "RoutingServiceEventPublisher.h"
+#include "LocalReplicaListener.h"
#include "logger.h"
using namespace ::std;
using namespace ::AsteriskSCF::Core::Routing::V1;
using namespace ::AsteriskSCF::System::Logging;
+using namespace ::AsteriskSCF::BasicRoutingService::V1;
namespace
{
@@ -38,11 +40,11 @@ namespace BasicRoutingService
/**
* The private implementation of RoutingServiceEventPublisher.
*/
-class RoutingServiceEventPublisherPriv
+class RoutingServiceEventPublisherPriv
{
public:
- RoutingServiceEventPublisherPriv(const Ice::ObjectAdapterPtr& adapter) :
- mAdapter(adapter), mInitialized(false)
+ RoutingServiceEventPublisherPriv(const Ice::ObjectAdapterPtr& adapter, bool active) :
+ mAdapter(adapter), mInitialized(false), mActive(active)
{
boost::lock_guard<boost::mutex> lock(mLock);
initialize();
@@ -130,21 +132,45 @@ public:
return mInitialized;
}
+ void setActive(bool val)
+ {
+ mActive = val;
+ }
+
+ bool isActive()
+ {
+ return mActive;
+ }
+
public:
+ AsteriskSCF::SmartProxy::SmartProxy<AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorPrx> mStateReplicator;
+
Event::RoutingEventsPrx mEventTopic; // Using one-way proxy.
boost::mutex mLock;
private:
Ice::ObjectAdapterPtr mAdapter;
bool mInitialized;
+ bool mActive;
+
};
/**
* Class constructor.
*/
-RoutingServiceEventPublisher::RoutingServiceEventPublisher(const Ice::ObjectAdapterPtr& adapter) :
- mImpl(new RoutingServiceEventPublisherPriv(adapter))
+RoutingServiceEventPublisher::RoutingServiceEventPublisher(const Ice::ObjectAdapterPtr& adapter, bool active) :
+ mImpl(new RoutingServiceEventPublisherPriv(adapter, active))
+{
+}
+
+void RoutingServiceEventPublisher::setActive(bool val)
{
+ mImpl->setActive(val);
+}
+
+void RoutingServiceEventPublisher::setStateReplicator(const AsteriskSCF::SmartProxy::SmartProxy<AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorPrx>& replicator)
+{
+ mImpl->mStateReplicator = replicator;
}
/**
@@ -153,12 +179,9 @@ RoutingServiceEventPublisher::RoutingServiceEventPublisher(const Ice::ObjectAdap
void RoutingServiceEventPublisher::lookupEvent(const std::string& destination,
AsteriskSCF::Core::Routing::V1::Event::OperationResult result, const Ice::Current &)
{
- { // scope for the lock
- boost::lock_guard<boost::mutex> lock(mImpl->mLock);
- if (!mImpl->isInitialized())
- {
- return;
- }
+ if (!mImpl->isActive() || !mImpl->isInitialized())
+ {
+ return;
}
try
@@ -176,17 +199,40 @@ void RoutingServiceEventPublisher::lookupEvent(const std::string& destination,
* Send a message to the service's event topic to report the addEndpointLocator event.
*/
void RoutingServiceEventPublisher::addEndpointLocatorEvent(const std::string& locatorId,
- const ::AsteriskSCF::Core::Routing::V1::RegExSeq& regexList, AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
+ const ::AsteriskSCF::Core::Routing::V1::RegExSeq& regexList,
+ const AsteriskSCF::Core::Routing::V1::EndpointLocatorPrx& locator,
+ AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
const Ice::Current &)
{
- if (!mImpl->isInitialized())
+ if (mImpl->isActive() && result == Event::SUCCESS)
+ {
+ try
+ {
+ // Push this information to the state replicator.
+ RoutingStateItemSeq setItems;
+
+ EndpointLocatorAddPtr addEndpointItem(new EndpointLocatorAdd());
+ addEndpointItem->key = locatorId;
+ addEndpointItem->locator = locator;
+ addEndpointItem->regExList = regexList;
+
+ setItems.push_back(addEndpointItem);
+
+ mImpl->mStateReplicator->setState(setItems);
+ }
+ catch(...)
+ {
+ }
+ }
+
+ if (!mImpl->isActive() || !mImpl->isInitialized())
{
return;
}
try
{
- mImpl->mEventTopic->addEndpointLocatorEvent(locatorId, regexList, result);
+ mImpl->mEventTopic->addEndpointLocatorEvent(locatorId, regexList, locator, result);
}
catch(const Ice::Exception &e)
{
@@ -200,7 +246,26 @@ void RoutingServiceEventPublisher::addEndpointLocatorEvent(const std::string& lo
void RoutingServiceEventPublisher::removeEndpointLocatorEvent(const std::string& locatorId,
AsteriskSCF::Core::Routing::V1::Event::OperationResult result, const Ice::Current &)
{
- if (!mImpl->isInitialized())
+ if (mImpl->isActive() && result == Event::SUCCESS)
+ {
+ try
+ {
+ // Push this information to the state replicator.
+ RoutingStateItemSeq setItems;
+
+ EndpointLocatorRemovePtr removeEndpointItem(new EndpointLocatorRemove());
+ removeEndpointItem->key = locatorId;
+
+ setItems.push_back(removeEndpointItem);
+
+ mImpl->mStateReplicator->setState(setItems);
+ }
+ catch(...)
+ {
+ }
+ }
+
+ if (!mImpl->isActive() || !mImpl->isInitialized())
{
return;
}
@@ -222,7 +287,7 @@ void RoutingServiceEventPublisher::setEndpointLocatorDestinationIdsEvent(const s
const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList, AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
const Ice::Current &)
{
- if (!mImpl->isInitialized())
+ if (!mImpl->isActive() || !mImpl->isInitialized())
{
return;
}
@@ -242,7 +307,7 @@ void RoutingServiceEventPublisher::setEndpointLocatorDestinationIdsEvent(const s
*/
void RoutingServiceEventPublisher::clearEndpointLocatorsEvent(const Ice::Current &)
{
- if (!mImpl->isInitialized())
+ if (!mImpl->isActive() || !mImpl->isInitialized())
{
return;
}
@@ -262,7 +327,7 @@ void RoutingServiceEventPublisher::clearEndpointLocatorsEvent(const Ice::Current
*/
void RoutingServiceEventPublisher::setPolicyEvent(const std::string& policy, const Ice::Current &)
{
- if (!mImpl->isInitialized())
+ if (!mImpl->isActive() || !mImpl->isInitialized())
{
return;
}
diff --git a/src/RoutingServiceEventPublisher.h b/src/RoutingServiceEventPublisher.h
index b3972b6..fc3a531 100644
--- a/src/RoutingServiceEventPublisher.h
+++ b/src/RoutingServiceEventPublisher.h
@@ -19,6 +19,8 @@
#include <boost/shared_ptr.hpp>
#include "RoutingIf.h"
+#include "SmartProxy.h"
+#include "BasicRoutingStateReplicationIf.h"
namespace AsteriskSCF
{
@@ -32,7 +34,11 @@ class RoutingServiceEventPublisherPriv;
class RoutingServiceEventPublisher : public ::AsteriskSCF::Core::Routing::V1::Event::RoutingEvents
{
public:
- RoutingServiceEventPublisher(const Ice::ObjectAdapterPtr& adapter);
+ RoutingServiceEventPublisher(const Ice::ObjectAdapterPtr& adapter, bool active);
+
+ void setActive(bool val);
+
+ void setStateReplicator(const AsteriskSCF::SmartProxy::SmartProxy<AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorPrx>& replicator);
// Overrides
@@ -50,8 +56,11 @@ public:
* @param regexList List of regex strings used to identify the destinations available by this locator.
* @param result Informs event listeners of the operations success or failure.
*/
- void addEndpointLocatorEvent(const std::string& locatorId, const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList,
- AsteriskSCF::Core::Routing::V1::Event::OperationResult result, const Ice::Current&);
+ void addEndpointLocatorEvent(const std::string& locatorId,
+ const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList,
+ const AsteriskSCF::Core::Routing::V1::EndpointLocatorPrx& locator,
+ AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
+ const Ice::Current&);
/**
* Send a message to the service's event topic to report a removeEndpointLocator event.
@@ -85,5 +94,7 @@ private:
boost::shared_ptr<RoutingServiceEventPublisherPriv> mImpl; // pimpl idiom applied.
};
+typedef IceInternal::Handle<RoutingServiceEventPublisher> RoutingServiceEventPublisherPtr;
+
} // end BasicRoutingService
} // end AsteriskSCF
diff --git a/src/SessionListener.cpp b/src/SessionListener.cpp
index b39af59..c96d7ec 100644
--- a/src/SessionListener.cpp
+++ b/src/SessionListener.cpp
@@ -52,7 +52,7 @@ void SessionListenerImpl::stopped(const SessionPrx& session, const AsteriskSCF::
SessionSeq cacheSessions;
{
- boost::shared_lock<boost::shared_mutex> lock(mLock);
+ boost::shared_lock<boost::shared_mutex> lock(mSessionLock);
cacheSessions = mSessions;
}
@@ -76,7 +76,7 @@ void SessionListenerImpl::stopped(const SessionPrx& session, const AsteriskSCF::
void SessionListenerImpl::addSession(const SessionPrx& session)
{
- boost::unique_lock<boost::shared_mutex> lock(mLock);
+ boost::unique_lock<boost::shared_mutex> lock(mSessionLock);
mSessions.push_back(session);
}
@@ -86,7 +86,7 @@ void SessionListenerImpl::addSession(const SessionPrx& session)
void SessionListenerImpl::addSessionAndListen(SessionPrx session)
{
{ // critical scope
- boost::unique_lock<boost::shared_mutex> lock(mLock);
+ boost::unique_lock<boost::shared_mutex> lock(mSessionLock);
mSessions.push_back(session);
}
@@ -106,7 +106,7 @@ void SessionListenerImpl::addSessionAndListen(SessionPrx session)
const int SessionListenerImpl::getNumSessions()
{
- boost::shared_lock<boost::shared_mutex> lock(mLock);
+ boost::shared_lock<boost::shared_mutex> lock(mSessionLock);
return mSessions.size();
}
@@ -122,7 +122,7 @@ void SessionListenerImpl::unregister()
{
SessionSeq sessionsToCall;
{ // critical scope
- boost::shared_lock<boost::shared_mutex> lock(mLock);
+ boost::shared_lock<boost::shared_mutex> lock(mSessionLock);
sessionsToCall = mSessions;
}
@@ -143,7 +143,7 @@ void SessionListenerImpl::unregister()
// Since we're through listening to them, we should just drop our references to them.
{ // critical scope
- boost::unique_lock<boost::shared_mutex> lock(mLock);
+ boost::unique_lock<boost::shared_mutex> lock(mSessionLock);
mSessions.clear();
}
}
diff --git a/src/SessionListener.h b/src/SessionListener.h
index 0466b1b..5d3485f 100644
--- a/src/SessionListener.h
+++ b/src/SessionListener.h
@@ -27,8 +27,9 @@ namespace BasicRoutingService
{
/**
- * Listener used to monitor sessions during the routing process. Primarily used to
- * insure the relevant sessions haven't stopped prior to being bridged.
+ * Listener used to monitor sessions during the routing process.
+ * Primarily used to insure the relevant sessions haven't stopped prior to being bridged.
+ * The sessions involved in an operation are maintained in this object.
*/
class SessionListenerImpl : public AsteriskSCF::SessionCommunications::V1::SessionListener
{
@@ -83,7 +84,7 @@ public: // Impl operations
void setProxy(const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& prx);
private:
- boost::shared_mutex mLock;
+ boost::shared_mutex mSessionLock;
AsteriskSCF::SessionCommunications::V1::SessionSeq mSessions;
bool mTerminated;
diff --git a/src/SessionRouter.cpp b/src/SessionRouter.cpp
index 5bba4a5..fc0c935 100644
--- a/src/SessionRouter.cpp
+++ b/src/SessionRouter.cpp
@@ -19,7 +19,7 @@
#include <boost/function.hpp>
#include <boost/bind.hpp>
-#include "AsteriskSCF/WorkQueue/WorkQueue.h"
+#include "AsteriskSCF/Threading/WorkQueue.h"
#include "SessionRouter.h"
#include "RouteSessionOperation.h"
@@ -39,6 +39,7 @@ using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::SessionCommunications::V1;
using namespace AsteriskSCF::Core::Routing::V1::Event;
using namespace AsteriskSCF::Core::Routing::V1;
+using namespace AsteriskSCF::Threading;
using namespace std;
namespace
@@ -51,7 +52,7 @@ namespace AsteriskSCF
namespace BasicRoutingService
{
-typedef map<WorkQueue::Work*, boost::shared_ptr<WorkQueue::Work> > OperationMap;
+typedef map<AsteriskSCF::Threading::WorkQueue::Work*, boost::shared_ptr<AsteriskSCF::Threading::WorkQueue::Work> > OperationMap;
/**
* Private operations and state of the SessionRouter.
@@ -62,7 +63,7 @@ public:
SessionRouterPriv(const Ice::ObjectAdapterPtr& objectAdapter,
const EndpointRegistryPtr& endpointRegistry,
const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& eventPublisher,
- const boost::shared_ptr<WorkQueue>& workQueue) :
+ const boost::shared_ptr<AsteriskSCF::Threading::WorkQueue>& workQueue) :
mSessionContext(objectAdapter,
endpointRegistry,
eventPublisher,
@@ -86,7 +87,7 @@ public:
/**
* Enqueue the work to the WorkQueue.
*/
- void scheduleOperation(const boost::shared_ptr<WorkQueue::Work>& work)
+ void scheduleOperation(const boost::shared_ptr<AsteriskSCF::Threading::WorkQueue::Work>& work)
{
// Maintain refs to all ongoing operations.
mOngoingOperations[work.get()] = work;
@@ -101,7 +102,7 @@ public: // Overrides
* Handle a notice from an operation that it has completed.
* Remove our shared_ptr reference so that it will die.
*/
- virtual void finished(WorkQueue::Work* op)
+ virtual void finished(AsteriskSCF::Threading::WorkQueue::Work* op)
{
boost::lock_guard<boost::mutex> guard(mLock);
OperationMap::iterator kvp = mOngoingOperations.find(op);
@@ -118,7 +119,7 @@ public: // Overrides
* The operation doesn't have a shared_ptr to itself, so
* it can't do it internally.
*/
- virtual void reschedule(WorkQueue::Work *op)
+ virtual void reschedule(AsteriskSCF::Threading::WorkQueue::Work *op)
{
mSessionContext.workQueue->enqueue(getOngoingOperationSharedPointer(op));
}
diff --git a/src/SessionRouter.h b/src/SessionRouter.h
index 93d2801..0d775ea 100644
--- a/src/SessionRouter.h
+++ b/src/SessionRouter.h
@@ -18,7 +18,7 @@
#include <Ice/Ice.h>
#include <boost/shared_ptr.hpp>
-#include "AsteriskSCF/WorkQueue/WorkQueue.h"
+#include "AsteriskSCF/Threading/WorkQueue.h"
#include "SessionCommunications/SessionCommunicationsIf.h"
#include "SmartProxy.h"
@@ -40,7 +40,7 @@ public:
SessionRouter(const Ice::ObjectAdapterPtr& objectAdapter,
const EndpointRegistryPtr& endpointRegistry,
const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& eventPublisher,
- const boost::shared_ptr<WorkQueue>& sessionRouterWorkQueue);
+ const boost::shared_ptr<AsteriskSCF::Threading::WorkQueue>& sessionRouterWorkQueue);
~SessionRouter();
void setBridgeManager(
diff --git a/src/SessionRouterOperation.h b/src/SessionRouterOperation.h
index fcc8003..047fb7b 100644
--- a/src/SessionRouterOperation.h
... 181 lines suppressed ...
--
asterisk-scf/integration/routing.git
More information about the asterisk-scf-commits
mailing list