[asterisk-scf-commits] asterisk-scf/integration/sip.git branch "sessiondefaults" created.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Thu Jul 28 16:40:11 CDT 2011
branch "sessiondefaults" has been created
at 379af249fae119e6f79df274d18ec29868af0f9b (commit)
- Log -----------------------------------------------------------------
commit 379af249fae119e6f79df274d18ec29868af0f9b
Author: Ken Hunt <ken.hunt at digium.com>
Date: Thu Jul 28 16:39:59 2011 -0500
Support for default SessionListeners and default SessionCookies.
diff --git a/slice/AsteriskSCF/Replication/SipSessionManager/SipStateReplicationIf.ice b/slice/AsteriskSCF/Replication/SipSessionManager/SipStateReplicationIf.ice
index 59e121c..e86b91c 100644
--- a/slice/AsteriskSCF/Replication/SipSessionManager/SipStateReplicationIf.ice
+++ b/slice/AsteriskSCF/Replication/SipSessionManager/SipStateReplicationIf.ice
@@ -163,6 +163,18 @@ module V1
AsteriskSCF::SessionCommunications::V1::SessionCookieDict mCookies;
};
+ class DefaultSessionListenerItem extends SipStateItem
+ {
+ string endpointName;
+ AsteriskSCF::SessionCommunications::V1::SessionListener* listener;
+ };
+
+ class DefaultSessionCookieItem extends SipStateItem
+ {
+ string endpointName;
+ AsteriskSCF::SessionCommunications::V1::SessionCookie cookie;
+ };
+
class SipRegistrarStateItem extends SipStateItem
{
/**
diff --git a/src/SipEndpoint.cpp b/src/SipEndpoint.cpp
index 8393972..59a2cb8 100644
--- a/src/SipEndpoint.cpp
+++ b/src/SipEndpoint.cpp
@@ -13,6 +13,8 @@
* the GNU General Public License Version 2. See the LICENSE.txt file
* at the top of the source tree.
*/
+#include <boost/thread/locks.hpp>
+
#include "PJSipManager.h"
#include "SipEndpointFactory.h"
#include "SipSession.h"
@@ -21,14 +23,25 @@
#include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
#include <AsteriskSCF/logger.h>
+#include <AsteriskSCF/Collections/Set.h>
#include "NATOptions.h"
#include <AsteriskSCF/Media/MediaIf.h>
#include <AsteriskSCF/Media/SDP/MediaSDPIf.h>
+#include <AsteriskSCF/System/ExceptionsIf.h>
+#include <AsteriskSCF/Replication/SipSessionManager/SipStateReplicationIf.h>
+using namespace std;
using namespace AsteriskSCF::System::Logging;
+using namespace AsteriskSCF::System::V1;
using namespace AsteriskSCF::Media::V1;
using namespace AsteriskSCF::Media::SDP::V1;
using namespace AsteriskSCF::Core::Discovery::V1;
+using namespace AsteriskSCF::SessionCommunications::V1;
+using namespace AsteriskSCF::Replication::SipSessionManager::V1;
+using namespace AsteriskSCF::System::WorkQueue::V1;
+using namespace AsteriskSCF::WorkQueue;
+using namespace AsteriskSCF::Replication::SipSessionManager::V1;
+using namespace AsteriskSCF::Discovery;
namespace
{
@@ -40,6 +53,213 @@ namespace AsteriskSCF
namespace SipSessionManager
{
+class AddDefaultSessionListener : public Work
+{
+public:
+ AddDefaultSessionListener(const string& stateKey,
+ const string& endpointName,
+ const SessionListenerPrx& listener,
+ const SmartProxy<SipStateReplicatorPrx>& stateReplicator)
+ :mState(new DefaultSessionListenerItem(stateKey, "", endpointName, listener)),
+ mStateReplicator(stateReplicator)
+ {
+ }
+
+ void execute()
+ {
+ SipStateItemSeq items;
+ items.push_back(mState);
+
+ Ice::ObjectPrx oneway;
+ try
+ {
+ oneway = mStateReplicator->ice_oneway();
+ }
+ catch (const Ice::NoEndpointException&)
+ {
+ lg(Error) << "No endpoint for oneway invocation of setState() for state replication. Using two-way proxy.";
+ mStateReplicator->setState(items);
+ return;
+ }
+
+ SipStateReplicatorPrx oneWayStateReplicator = SipStateReplicatorPrx::uncheckedCast(oneway);
+
+ try
+ {
+ oneWayStateReplicator->setState(items);
+ }
+ catch (const Ice::TwowayOnlyException&)
+ {
+ lg(Error) << "setState() is not oneway. Using two-way.";
+ mStateReplicator->setState(items);
+ }
+ }
+private:
+ DefaultSessionListenerItemPtr mState;
+ SmartProxy<SipStateReplicatorPrx> mStateReplicator;
+};
+
+class RemoveDefaultSessionListener : public Work
+{
+public:
+ RemoveDefaultSessionListener(const string& stateKey,
+ const string& endpointName,
+ const SessionListenerPrx& listener,
+ const SmartProxy<SipStateReplicatorPrx>& stateReplicator)
+ :mState(new DefaultSessionListenerItem(stateKey, "", endpointName, listener)),
+ mStateReplicator(stateReplicator)
+ {
+ }
+
+ void execute()
+ {
+ SipStateItemSeq items;
+ items.push_back(mState);
+
+ Ice::ObjectPrx oneway;
+ try
+ {
+ oneway = mStateReplicator->ice_oneway();
+ }
+ catch (const Ice::NoEndpointException&)
+ {
+ lg(Error) << "No endpoint for oneway invocation of removeState() for state replication. Using two-way proxy.";
+ mStateReplicator->removeStateForItems(items);
+ return;
+ }
+
+ SipStateReplicatorPrx oneWayStateReplicator = SipStateReplicatorPrx::uncheckedCast(oneway);
+
+ try
+ {
+ oneWayStateReplicator->removeStateForItems(items);
+ }
+ catch (const Ice::TwowayOnlyException&)
+ {
+ lg(Error) << "removeState() is not oneway. Using two-way.";
+ mStateReplicator->removeStateForItems(items);
+ }
+ }
+private:
+ DefaultSessionListenerItemPtr mState;
+ SmartProxy<SipStateReplicatorPrx> mStateReplicator;
+};
+
+string cookieKey(string &endpointName, const SessionCookiePtr& cookie)
+{
+ string key = endpointName + "::" + cookie->ice_id();
+ return key;
+}
+
+class AddDefaultCookies : public Work
+{
+public:
+ AddDefaultCookies(const string& endpointName,
+ const SessionCookies& cookies,
+ const SmartProxy<SipStateReplicatorPrx>& stateReplicator)
+ : mEndpointName(endpointName),
+ mCookies(cookies),
+ mStateReplicator(stateReplicator)
+ {
+ }
+
+ void execute()
+ {
+ SipStateItemSeq items;
+
+ for(SessionCookies::iterator i=mCookies.begin(); i != mCookies.end(); ++i)
+ {
+ DefaultSessionCookieItemPtr cookieItem =
+ new DefaultSessionCookieItem(cookieKey(mEndpointName, *i),
+ "",
+ mEndpointName,
+ (*i));
+ }
+
+ Ice::ObjectPrx oneway;
+ try
+ {
+ oneway = mStateReplicator->ice_oneway();
+ }
+ catch (const Ice::NoEndpointException&)
+ {
+ lg(Error) << "No endpoint for oneway invocation of setState() for state replication. Using two-way proxy.";
+ mStateReplicator->setState(items);
+ return;
+ }
+
+ SipStateReplicatorPrx oneWayStateReplicator = SipStateReplicatorPrx::uncheckedCast(oneway);
+
+ try
+ {
+ oneWayStateReplicator->setState(items);
+ }
+ catch (const Ice::TwowayOnlyException&)
+ {
+ lg(Error) << "setState() is not oneway. Using two-way.";
+ mStateReplicator->setState(items);
+ }
+ }
+private:
+ string mEndpointName;
+ SessionCookies mCookies;
+ SmartProxy<SipStateReplicatorPrx> mStateReplicator;
+};
+
+class RemoveDefaultCookies : public Work
+{
+public:
+ RemoveDefaultCookies(const string& endpointName,
+ const SessionCookies& cookies,
+ const SmartProxy<SipStateReplicatorPrx>& stateReplicator)
+ : mEndpointName(endpointName),
+ mCookies(cookies),
+ mStateReplicator(stateReplicator)
+ {
+ }
+
+ void execute()
+ {
+ SipStateItemSeq items;
+
+ for(SessionCookies::iterator i=mCookies.begin(); i != mCookies.end(); ++i)
+ {
+ DefaultSessionCookieItemPtr cookieItem =
+ new DefaultSessionCookieItem(cookieKey(mEndpointName, *i),
+ "",
+ mEndpointName,
+ (*i));
+}
+
+ Ice::ObjectPrx oneway;
+ try
+ {
+ oneway = mStateReplicator->ice_oneway();
+ }
+ catch (const Ice::NoEndpointException&)
+ {
+ lg(Error) << "No endpoint for oneway invocation of removeState() for state replication. Using two-way proxy.";
+ mStateReplicator->removeStateForItems(items);
+ return;
+ }
+
+ SipStateReplicatorPrx oneWayStateReplicator = SipStateReplicatorPrx::uncheckedCast(oneway);
+
+ try
+ {
+ oneWayStateReplicator->removeStateForItems(items);
+ }
+ catch (const Ice::TwowayOnlyException&)
+ {
+ lg(Error) << "removeState() is not oneway. Using two-way.";
+ mStateReplicator->removeStateForItems(items);
+ }
+ }
+private:
+ string mEndpointName;
+ SessionCookies mCookies;
+ SmartProxy<SipStateReplicatorPrx> mStateReplicator;
+};
/**
* Class used to store information about the configured formats.
*/
@@ -164,15 +384,27 @@ public:
/**
* Constructor for the SipEndpointImplPriv class.
*/
- SipEndpointImplPriv(const Ice::ObjectAdapterPtr& adapter, const boost::shared_ptr<SipEndpointFactory>& factory,
- const std::string& name, const PJSipManagerPtr& manager,
- const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
- const AsteriskSCF::System::Component::V1::ReplicaPtr& replica) :
+ SipEndpointImplPriv(const Ice::ObjectAdapterPtr& adapter,
+ const boost::shared_ptr<SipEndpointFactory>& factory,
+ const std::string& name,
+ const PJSipManagerPtr& manager,
+ const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
+ const AsteriskSCF::System::Component::V1::ReplicaPtr& replica,
+ const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::Replication::SipSessionManager::V1::SipStateReplicatorPrx>& stateReplicator,
+ const AsteriskSCF::System::WorkQueue::V1::QueuePtr& workQueue) :
mName(name), mAdapter(adapter), mEndpointFactory(factory), mManager(manager), mServiceLocator(serviceLocator),
- mReplica(replica)
+ mReplica(replica), mStateReplicator(stateReplicator), mWorkQueue(workQueue),
+ mDefaultListeners(adapter, lg, "Default Session Listeners"),
+ mDefaultSessionCookies(adapter, lg, "Default Session Cookies")
{
};
+ std::string replicaKeyName(SessionListenerPrx listener)
+ {
+ std::string key = mName + "::" + mAdapter->getCommunicator()->identityToString(listener->ice_getIdentity());
+ return key;
+ }
+
/**
* The name of the endpoint.
*/
@@ -222,18 +454,32 @@ public:
*
*/
AsteriskSCF::System::Component::V1::ReplicaPtr mReplica;
+
+ // A WorkQueue for state replication.
+ AsteriskSCF::System::WorkQueue::V1::QueuePtr mWorkQueue;
+
+ // The state replicator for this component.
+ AsteriskSCF::Discovery::SmartProxy<
+ AsteriskSCF::Replication::SipSessionManager::V1::SipStateReplicatorPrx> mStateReplicator;
+
+ AsteriskSCF::Collections::ProxySet<SessionListenerPrx> mDefaultListeners;
+ AsteriskSCF::Collections::HandleSet<SessionCookiePtr> mDefaultSessionCookies;
};
SipEndpoint::SipEndpoint(const Ice::ObjectAdapterPtr& adapter,
- boost::shared_ptr<SipEndpointFactory> factory, std::string name,
- const PJSipManagerPtr& manager,
- const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
- const AsteriskSCF::System::Component::V1::ReplicaPtr replica)
- : mImplPriv(new SipEndpointImplPriv(adapter, factory, name, manager, serviceLocator, replica))
+ boost::shared_ptr<SipEndpointFactory> factory,
+ std::string name,
+ const PJSipManagerPtr& manager,
+ const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
+ const AsteriskSCF::System::Component::V1::ReplicaPtr replica,
+ const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::Replication::SipSessionManager::V1::SipStateReplicatorPrx>& stateReplicator,
+ const AsteriskSCF::System::WorkQueue::V1::QueuePtr& workQueue)
+ : mImplPriv(new SipEndpointImplPriv(adapter, factory, name, manager, serviceLocator, replica, stateReplicator, workQueue))
{
lg(Debug) << "Constructing SIP endpoint " << name;
- mImplPriv->mEndpointProxy = AsteriskSCF::SessionCommunications::V1::SessionEndpointPrx::uncheckedCast(mImplPriv->mAdapter->addWithUUID(this));
+ mImplPriv->mEndpointProxy = AsteriskSCF::SessionCommunications::V1::SessionEndpointPrx::uncheckedCast(
+ mImplPriv->mAdapter->addWithUUID(this));
}
void SipEndpoint::removeFromAdapter()
@@ -344,10 +590,12 @@ std::string SipEndpoint::getId(const Ice::Current&)
return mImplPriv->mEndpointProxy->ice_getIdentity().name;
}
+/**
+ * This implements the slice-defined interface for the session factory method.
+ */
AsteriskSCF::SessionCommunications::V1::SessionPrx SipEndpoint::createSession(const std::string& destination,
- const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener, const Ice::Current&)
+ const SessionListenerPrx& listener, const Ice::Current&)
{
-
std::cout << "Got call over Ice to create a session for endpoint " << mImplPriv->mName << std::endl;
if (mImplPriv->mConfig.sessionConfig.callDirection != BOTH &&
mImplPriv->mConfig.sessionConfig.callDirection != INBOUND)
@@ -356,7 +604,12 @@ AsteriskSCF::SessionCommunications::V1::SessionPrx SipEndpoint::createSession(co
return 0;
}
- SipSessionPtr session = new SipSession(mImplPriv->mAdapter, this, destination, listener, mImplPriv->mManager,
+ vector<SessionListenerPrx> listeners = mImplPriv->mDefaultListeners.getAll();
+ listeners.push_back(listener);
+
+ SessionCookies defaultCookies = mImplPriv->mDefaultSessionCookies.getAll();
+
+ SipSessionPtr session = new SipSession(mImplPriv->mAdapter, this, destination, listeners, defaultCookies, mImplPriv->mManager,
mImplPriv->mServiceLocator, mImplPriv->mReplica, mImplPriv->mConfig.sessionConfig.rtpOverIPv6, true,
NATEndpointOptions(mImplPriv->mConfig.sessionConfig.rtpOverICE, mImplPriv->mConfig.sessionConfig.rtpICEIncludeTURN,
mImplPriv->mConfig.transportConfig.enableNAT));
@@ -367,7 +620,10 @@ AsteriskSCF::SessionCommunications::V1::SessionPrx SipEndpoint::createSession(co
AsteriskSCF::SipSessionManager::SipSessionPtr SipEndpoint::createSession(const std::string& destination)
{
- SipSessionPtr session = new SipSession(mImplPriv->mAdapter, this, destination, 0, mImplPriv->mManager,
+ vector<SessionListenerPrx> defaultListeners = mImplPriv->mDefaultListeners.getAll();
+ SessionCookies defaultCookies = mImplPriv->mDefaultSessionCookies.getAll();
+
+ SipSessionPtr session = new SipSession(mImplPriv->mAdapter, this, destination, defaultListeners, defaultCookies, mImplPriv->mManager,
mImplPriv->mServiceLocator, mImplPriv->mReplica, mImplPriv->mConfig.sessionConfig.rtpOverIPv6, false,
NATEndpointOptions(mImplPriv->mConfig.sessionConfig.rtpOverICE, mImplPriv->mConfig.sessionConfig.rtpICEIncludeTURN,
mImplPriv->mConfig.transportConfig.enableNAT)
@@ -376,11 +632,15 @@ AsteriskSCF::SipSessionManager::SipSessionPtr SipEndpoint::createSession(const s
return session;
}
+/**
+ * This factory method is used to create sessions in a standby component.
+ */
AsteriskSCF::SipSessionManager::SipSessionPtr SipEndpoint::createSession(const std::string& destination,
- const Ice::Identity& sessionid, const Ice::Identity& mediaid,
- const AsteriskSCF::Replication::SipSessionManager::V1::RTPMediaSessionSeq& mediasessions,
- const AsteriskSCF::Media::V1::StreamSourceSeq& sources,
- const AsteriskSCF::Media::V1::StreamSinkSeq& sinks)
+ const Ice::Identity& sessionid,
+ const Ice::Identity& mediaid,
+ const AsteriskSCF::Replication::SipSessionManager::V1::RTPMediaSessionSeq& mediasessions,
+ const AsteriskSCF::Media::V1::StreamSourceSeq& sources,
+ const AsteriskSCF::Media::V1::StreamSinkSeq& sinks)
{
SipSessionPtr session = new SipSession(mImplPriv->mAdapter, this, destination, sessionid, mediaid, mediasessions,
sources, sinks, mImplPriv->mManager, mImplPriv->mServiceLocator, mImplPriv->mReplica, false,
@@ -420,6 +680,77 @@ std::string SipEndpoint::getName()
return mImplPriv->mName;
};
+void SipEndpoint::addDefaultSessionListener(const SessionListenerPrx& listener, const Ice::Current& current)
+{
+ mImplPriv->mDefaultListeners.add(listener);
+
+ if (mImplPriv->mReplica->isActive())
+ {
+ // Replicate this information.
+ WorkPtr work = new AddDefaultSessionListener(
+ mImplPriv->replicaKeyName(listener),
+ mImplPriv->mName,
+ listener,
+ mImplPriv->mStateReplicator);
+
+ mImplPriv->mWorkQueue->enqueueWork(work);
+ }
+}
+
+void SipEndpoint::removeDefaultSessionListener(const SessionListenerPrx& listener, const Ice::Current&)
+{
+ mImplPriv->mDefaultListeners.remove(listener);
+
+ if (mImplPriv->mReplica->isActive())
+ {
+ // Replicate this information.
+ WorkPtr work = new RemoveDefaultSessionListener(mImplPriv->replicaKeyName(listener),
+ mImplPriv->mName,
+ listener,
+ mImplPriv->mStateReplicator);
+ mImplPriv->mWorkQueue->enqueueWork(work);
+ }
+}
+
+void SipEndpoint::addDefaultSessionCookies(const SessionCookies& cookies, const Ice::Current&)
+{
+ mImplPriv->mDefaultSessionCookies.add(cookies);
+
+ if (mImplPriv->mReplica->isActive())
+ {
+ // Replicate this information.
+ WorkPtr work = new AddDefaultCookies(mImplPriv->mName,
+ cookies,
+ mImplPriv->mStateReplicator);
+ mImplPriv->mWorkQueue->enqueueWork(work);
+ }
+}
+
+void SipEndpoint::removeDefaultSessionCookies(const SessionCookies& cookies, const Ice::Current&)
+{
+ mImplPriv->mDefaultSessionCookies.remove(cookies);
+
+ if (mImplPriv->mReplica->isActive())
+ {
+ // Replicate this information.
+ WorkPtr work = new RemoveDefaultCookies(mImplPriv->mName,
+ cookies,
+ mImplPriv->mStateReplicator);
+ mImplPriv->mWorkQueue->enqueueWork(work);
+ }
+}
+
+void SipEndpoint::addDefaultSessionCookie(const SessionCookiePtr& cookie)
+{
+ mImplPriv->mDefaultSessionCookies.add(cookie);
+}
+
+void SipEndpoint::removeDefaultSessionCookie(const SessionCookiePtr& cookie)
+{
+ mImplPriv->mDefaultSessionCookies.remove(cookie);
+
+}
+
AsteriskSCF::SessionCommunications::V1::SessionEndpointPrx SipEndpoint::getEndpointProxy()
{
return mImplPriv->mEndpointProxy;
diff --git a/src/SipEndpoint.h b/src/SipEndpoint.h
index 2ce13ff..a4dfa1f 100644
--- a/src/SipEndpoint.h
+++ b/src/SipEndpoint.h
@@ -33,6 +33,9 @@
#include <AsteriskSCF/Media/MediaIf.h>
#include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
#include <AsteriskSCF/Media/SDP/MediaSDPIf.h>
+#include <AsteriskSCF/WorkQueue/WorkQueue.h>
+#include <AsteriskSCF/Discovery/SmartProxy.h>
+#include "SipStateReplicationIf.h"
#include "SipSession.h"
@@ -266,9 +269,14 @@ class SipEndpoint : public AsteriskSCF::SessionCommunications::V1::SessionEndpoi
{
public:
- SipEndpoint(const Ice::ObjectAdapterPtr& adapter, boost::shared_ptr<SipEndpointFactory> factory, std::string name,
- const PJSipManagerPtr& manager, const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
- const AsteriskSCF::System::Component::V1::ReplicaPtr replica);
+ SipEndpoint(const Ice::ObjectAdapterPtr& adapter,
+ boost::shared_ptr<SipEndpointFactory> factory,
+ std::string name,
+ const PJSipManagerPtr& manager,
+ const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
+ const AsteriskSCF::System::Component::V1::ReplicaPtr replica,
+ const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::Replication::SipSessionManager::V1::SipStateReplicatorPrx>& stateReplicator,
+ const AsteriskSCF::System::WorkQueue::V1::QueuePtr& workQueue);
bool operator==(const std::string &name) const;
@@ -279,6 +287,18 @@ public:
AsteriskSCF::SessionCommunications::V1::SessionPrx createSession(const std::string&,
const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx&, const Ice::Current&);
AsteriskSCF::SessionCommunications::V1::SessionSeq getSessions(const Ice::Current&);
+ void addDefaultSessionListener(
+ const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
+ const Ice::Current& = Ice::Current());
+ void removeDefaultSessionListener(
+ const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
+ const Ice::Current& = Ice::Current());
+ void addDefaultSessionCookies(
+ const AsteriskSCF::SessionCommunications::V1::SessionCookies& cookies,
+ const Ice::Current&);
+ void removeDefaultSessionCookies(
+ const AsteriskSCF::SessionCommunications::V1::SessionCookies& cookies,
+ const Ice::Current&);
/**
* Implementation specific.
@@ -336,6 +356,11 @@ public:
void setSRTPOptions(const SipEndpointMediaSRTPConfig& srtpConfig);
void setSignalingNATOptions(bool enable);
+ void addDefaultSessionCookie(
+ const AsteriskSCF::SessionCommunications::V1::SessionCookiePtr& cookie);
+
+ void removeDefaultSessionCookie(
+ const AsteriskSCF::SessionCommunications::V1::SessionCookiePtr& cookie);
private:
/**
* Private implementation details.
diff --git a/src/SipEndpointFactory.cpp b/src/SipEndpointFactory.cpp
index 49724d6..0c5a024 100644
--- a/src/SipEndpointFactory.cpp
+++ b/src/SipEndpointFactory.cpp
@@ -16,10 +16,17 @@
#include <pjlib.h>
#include <AsteriskSCF/logger.h>
+#include <AsteriskSCF/WorkQueue/WorkQueue.h>
+#include <AsteriskSCF/WorkQueue/DefaultQueueListener.h>
+
#include "SipEndpoint.h"
#include "SipEndpointFactory.h"
using namespace AsteriskSCF::System::Logging;
+using namespace AsteriskSCF::System::WorkQueue::V1;
+using namespace AsteriskSCF::WorkQueue;
+using namespace AsteriskSCF::Discovery;
+using namespace AsteriskSCF::Replication::SipSessionManager::V1;
namespace
{
@@ -31,9 +38,27 @@ namespace AsteriskSCF
namespace SipSessionManager
{
+SipEndpointFactory::SipEndpointFactory(const Ice::ObjectAdapterPtr& adapter,
+ const PJSipManagerPtr& manager,
+ const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
+ const AsteriskSCF::System::Component::V1::ReplicaPtr& replica,
+ const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::Replication::SipSessionManager::V1::SipStateReplicatorPrx>& replicator)
+ : mAdapter(adapter),
+ mManager(manager),
+ mServiceLocator(serviceLocator),
+ mReplica(replica),
+ mStateReplicator(replicator)
+{
+ mQueue = new AsteriskSCF::WorkQueue::WorkQueue();
+ AsteriskSCF::WorkQueue::DefaultQueueListenerPtr listener(new AsteriskSCF::WorkQueue::DefaultQueueListener(mQueue, 0));
+ mQueue->setListener(listener);
+}
+
SipEndpointPtr SipEndpointFactory::createEndpoint(std::string endpointName)
{
- SipEndpointPtr endpoint = new SipEndpoint(mAdapter, shared_from_this(), endpointName, mManager, mServiceLocator, mReplica);
+ SipEndpointPtr endpoint = new SipEndpoint(mAdapter, shared_from_this(),
+ endpointName, mManager, mServiceLocator, mReplica, mStateReplicator, mQueue);
+
mEndpoints.push_back(endpoint);
return endpoint;
}
diff --git a/src/SipEndpointFactory.h b/src/SipEndpointFactory.h
index 09e02d9..e852864 100644
--- a/src/SipEndpointFactory.h
+++ b/src/SipEndpointFactory.h
@@ -22,6 +22,9 @@
#include <AsteriskSCF/Core/Endpoint/EndpointIf.h>
#include <AsteriskSCF/SIP/SIPRegistrarIf.h>
+#include <AsteriskSCF/WorkQueue/WorkQueue.h>
+#include <AsteriskSCF/Discovery/SmartProxy.h>
+#include "SipStateReplicationIf.h"
namespace AsteriskSCF
{
@@ -36,10 +39,11 @@ namespace SipSessionManager
class SipEndpointFactory : public boost::enable_shared_from_this<SipEndpointFactory>
{
public:
- SipEndpointFactory(const Ice::ObjectAdapterPtr& adapter, const PJSipManagerPtr& manager,
+ SipEndpointFactory(const Ice::ObjectAdapterPtr& adapter,
+ const PJSipManagerPtr& manager,
const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
- const AsteriskSCF::System::Component::V1::ReplicaPtr& replica) :
- mAdapter(adapter), mManager(manager), mServiceLocator(serviceLocator), mReplica(replica) { };
+ const AsteriskSCF::System::Component::V1::ReplicaPtr& replica,
+ const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::Replication::SipSessionManager::V1::SipStateReplicatorPrx>& stateReplicator);
SipEndpointPtr createEndpoint(std::string);
@@ -74,6 +78,12 @@ private:
* A pointer to the replica information.
*/
AsteriskSCF::System::Component::V1::ReplicaPtr mReplica;
+
+ // The state replicator for this component.
+ AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::Replication::SipSessionManager::V1::SipStateReplicatorPrx> mStateReplicator;
+
+ // A work queue used by endpoints to replicate state.
+ AsteriskSCF::System::WorkQueue::V1::QueuePtr mQueue;
};
}; // end SipSessionManager
diff --git a/src/SipSession.cpp b/src/SipSession.cpp
index 044455b..7f76c84 100755
--- a/src/SipSession.cpp
+++ b/src/SipSession.cpp
@@ -32,6 +32,7 @@
#include <AsteriskSCF/Media/SDP/MediaSDPIf.h>
#include "NATOptions.h"
+using namespace AsteriskSCF::SessionCommunications::V1;
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::System::NAT::V1;
using namespace AsteriskSCF::Media::RTP::V1;
@@ -414,17 +415,23 @@ void SipSession::initializePJSIPStructs()
}
/**
- * Default constructor.
+ * Standard constructor.
*/
-SipSession::SipSession(const Ice::ObjectAdapterPtr& adapter, const SipEndpointPtr& endpoint,
- const std::string& destination, const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
+SipSession::SipSession(const Ice::ObjectAdapterPtr& adapter,
+ const SipEndpointPtr& endpoint,
+ const std::string& destination,
+ const vector<SessionListenerPrx>& listeners,
+ const SessionCookies& cookies,
const PJSipManagerPtr& manager, const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
- const AsteriskSCF::System::Component::V1::ReplicaPtr& replica, bool ipv6, bool isUAC, const NATEndpointOptions& natOptions)
+ const AsteriskSCF::System::Component::V1::ReplicaPtr& replica,
+ bool ipv6, bool isUAC, const NATEndpointOptions& natOptions)
: mImplPriv(new SipSessionPriv(adapter, endpoint, destination, manager, serviceLocator, replica, natOptions))
{
- if (listener != 0)
+ mImplPriv->mListeners.insert(mImplPriv->mListeners.end(), listeners.begin(), listeners.end());
+
+ for(SessionCookies::const_iterator i = cookies.begin(); i != cookies.end(); ++i)
{
- mImplPriv->mListeners.push_back(listener);
+ mImplPriv->mSessionCookies[(*i)->ice_id()] = (*i);
}
mImplPriv->mSessionProxy =
diff --git a/src/SipSession.h b/src/SipSession.h
index 60d800b..35ba4c1 100644
--- a/src/SipSession.h
+++ b/src/SipSession.h
@@ -105,7 +105,9 @@ class SipSession : public AsteriskSCF::SessionCommunications::V1::Session
{
public:
SipSession(const Ice::ObjectAdapterPtr&, const SipEndpointPtr&, const std::string&,
- const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx&, const PJSipManagerPtr& manager,
+ const std::vector<AsteriskSCF::SessionCommunications::V1::SessionListenerPrx>&,
+ const AsteriskSCF::SessionCommunications::V1::SessionCookies&,
+ const PJSipManagerPtr& manager,
const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
const AsteriskSCF::System::Component::V1::ReplicaPtr& replica,
bool ipv6, bool isUAC, const NATEndpointOptions& natOptions);
diff --git a/src/SipSessionManagerApp.cpp b/src/SipSessionManagerApp.cpp
index 64aa0bb..f28affb 100644
--- a/src/SipSessionManagerApp.cpp
+++ b/src/SipSessionManagerApp.cpp
@@ -544,20 +544,23 @@ void SipSessionManager::initialize(const string& appName, const Ice::Communicato
// it now.
mServiceLocator = ServiceLocatorPrx::checkedCast(mCommunicator->propertyToProxy("LocatorService.Proxy"));
+ // Locate the State Replicator so we can fail over!
+ locateStateReplicator();
+
// Create and publish our Replica interface support.
mReplicaService = new ReplicaImpl(mLocalAdapter);
mLocalAdapter->add(mReplicaService, mCommunicator->stringToIdentity(ReplicaServiceId));
lg(Debug) << "Created SIP Replica Implementation";
- mEndpointFactory.reset(new SipEndpointFactory(mGlobalAdapter, mPJSipManager, mServiceLocator, mReplicaService));
+ mEndpointFactory.reset(new SipEndpointFactory(mGlobalAdapter, mPJSipManager, mServiceLocator, mReplicaService, mStateReplicator));
lg(Debug) << "Created SIP endpoint factory";
mRegistrarListener = new SipDefaultRegistrarListener(mEndpointFactory);
mGlobalAdapter->add(mRegistrarListener, mCommunicator->stringToIdentity(RegistrarListenerId));
lg(Debug) << "Added default registrar listener to object adapter";
- // Locate the Routing Service so that we can do routing. This is done here so it can be passed to the configuration service.
- locateRoutingService();
+ // Locate the Routing Service so that we can do routing. This is done here so it can be passed to the configuration service.
+ locateRoutingService();
// Create and publish our Configuration interface support.
mConfigurationService = createConfigurationServant(mPJSipManager, mEndpointFactory, mRoutingId, mRoutingServiceLocatorRegistry);
@@ -610,9 +613,6 @@ void SipSessionManager::start(const string& name, const Ice::CommunicatorPtr& ic
// Locate the Session Router so we can REALLY do routing.
locateSessionRouter();
- // Locate the State Replicator so we can fail over!
- locateStateReplicator();
-
registerPJSipModules();
// Register with the state replicator if we are a listener
diff --git a/src/SipStateReplicator.h b/src/SipStateReplicator.h
index d53bc81..8856d48 100644
--- a/src/SipStateReplicator.h
+++ b/src/SipStateReplicator.h
@@ -39,6 +39,8 @@ typedef IceUtil::Handle<SipStateReplicatorI> SipStateReplicatorIPtr;
//
struct SipStateReplicatorListenerImpl;
+class SipEndpointFactory;
+
class SipStateReplicatorListenerI : public AsteriskSCF::Replication::SipSessionManager::V1::SipStateReplicatorListener
{
public:
diff --git a/src/SipStateReplicatorListener.cpp b/src/SipStateReplicatorListener.cpp
index ed9fd83..0ddeccb 100644
--- a/src/SipStateReplicatorListener.cpp
+++ b/src/SipStateReplicatorListener.cpp
@@ -137,6 +137,9 @@ public:
for (SipStateItemSeq::const_iterator iter = items.begin(); iter != items.end(); ++iter)
{
SipRegistrarStateItemPtr regItem;
+ DefaultSessionListenerItemPtr defaultListenerItem;
+ DefaultSessionCookieItemPtr defaultCookieItem;
+
if ((regItem = SipRegistrarStateItemPtr::dynamicCast((*iter))))
{
PJSipRegistrarModulePtr regModule;
@@ -149,6 +152,30 @@ public:
}
removeRegistrationState(regModule, regItem);
}
+ else if(defaultListenerItem = DefaultSessionListenerItemPtr::dynamicCast((*iter)))
+ {
+ SipEndpointPtr endpoint = mEndpointFactory->findByName(defaultListenerItem->endpointName);
+
+ // If we lack the endpoint (due to misconfiguration) we can't procceed
+ if (endpoint == 0)
+ {
+ continue;
+ }
+
+ endpoint->removeDefaultSessionListener(defaultListenerItem->listener);
+ }
+ else if(defaultCookieItem = DefaultSessionCookieItemPtr::dynamicCast((*iter)))
+ {
+ SipEndpointPtr endpoint = mEndpointFactory->findByName(defaultCookieItem->endpointName);
+
+ // If we lack the endpoint (due to misconfiguration) we can't procceed
+ if (endpoint == 0)
+ {
+ continue;
+ }
+
+ endpoint->removeDefaultSessionCookie(defaultCookieItem->cookie);
+ }
}
}
@@ -239,6 +266,8 @@ public:
SipTransactionStateItemPtr transaction;
SipRegistrarStateItemPtr regItem;
boost::shared_ptr<SipStateReplicatorItem> localitem;
+ DefaultSessionListenerItemPtr defaultListenerItem;
+ DefaultSessionCookieItemPtr defaultCookieItem;
// Depending on the type of state item we apply it differently
if ((session = SipSessionStateItemPtr::dynamicCast((*item))))
@@ -271,7 +300,7 @@ public:
localitem->getSession()->setListeners(session->mListeners);
localitem->getSession()->setBridge(session->mBridge);
- localitem->getSession()->setCookies(session->mCookies);
+ localitem->getSession()->setCookies(session->mCookies);
}
else if ((dialog = SipDialogStateItemPtr::dynamicCast((*item))))
{
@@ -430,6 +459,30 @@ public:
}
setRegistrationState(regModule, regItem);
}
+ else if (defaultListenerItem = DefaultSessionListenerItemPtr::dynamicCast((*item)))
+ {
+ SipEndpointPtr endpoint = mEndpointFactory->findByName(defaultListenerItem->endpointName);
+
+ // If we lack the endpoint (due to misconfiguration) we can't procceed
+ if (endpoint == 0)
+ {
+ continue;
+ }
+
+ endpoint->addDefaultSessionListener(defaultListenerItem->listener);
+ }
+ else if (defaultCookieItem = DefaultSessionCookieItemPtr::dynamicCast((*item)))
+ {
+ SipEndpointPtr endpoint = mEndpointFactory->findByName(defaultCookieItem->endpointName);
+
+ // If we lack the endpoint (due to misconfiguration) we can't procceed
+ if (endpoint == 0)
+ {
+ continue;
+ }
+
+ endpoint->addDefaultSessionCookie(defaultCookieItem->cookie);
+ }
}
}
std::string mId;
@@ -454,7 +507,7 @@ void SipStateReplicatorListenerI::stateRemoved(const Ice::StringSeq& itemKeys, c
mImpl->removeStateNoticeImpl(itemKeys);
}
-void SipStateReplicatorListenerI::stateSet(const SipStateItemSeq& items, const Ice::Current&)
+void SipStateReplicatorListenerI::stateSet(const SipStateItemSeq& items, const Ice::Current& current)
{
mImpl->setStateNoticeImpl(items);
}
-----------------------------------------------------------------------
--
asterisk-scf/integration/sip.git
More information about the asterisk-scf-commits
mailing list