[asterisk-scf-commits] asterisk-scf/integration/sip.git branch "sessiondefaults" created.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Thu Jul 28 16:40:11 CDT 2011


branch "sessiondefaults" has been created
        at  379af249fae119e6f79df274d18ec29868af0f9b (commit)

- Log -----------------------------------------------------------------
commit 379af249fae119e6f79df274d18ec29868af0f9b
Author: Ken Hunt <ken.hunt at digium.com>
Date:   Thu Jul 28 16:39:59 2011 -0500

    Support for default SessionListeners and default SessionCookies.

diff --git a/slice/AsteriskSCF/Replication/SipSessionManager/SipStateReplicationIf.ice b/slice/AsteriskSCF/Replication/SipSessionManager/SipStateReplicationIf.ice
index 59e121c..e86b91c 100644
--- a/slice/AsteriskSCF/Replication/SipSessionManager/SipStateReplicationIf.ice
+++ b/slice/AsteriskSCF/Replication/SipSessionManager/SipStateReplicationIf.ice
@@ -163,6 +163,18 @@ module V1
       AsteriskSCF::SessionCommunications::V1::SessionCookieDict mCookies;
    };
 
+   class DefaultSessionListenerItem extends SipStateItem
+   {
+      string endpointName;
+      AsteriskSCF::SessionCommunications::V1::SessionListener* listener;
+   };
+
+   class DefaultSessionCookieItem extends SipStateItem
+   {
+      string endpointName;
+      AsteriskSCF::SessionCommunications::V1::SessionCookie cookie;
+   };
+
    class SipRegistrarStateItem extends SipStateItem
    {
       /**
diff --git a/src/SipEndpoint.cpp b/src/SipEndpoint.cpp
index 8393972..59a2cb8 100644
--- a/src/SipEndpoint.cpp
+++ b/src/SipEndpoint.cpp
@@ -13,6 +13,8 @@
  * the GNU General Public License Version 2. See the LICENSE.txt file
  * at the top of the source tree.
  */
+#include <boost/thread/locks.hpp>
+
 #include "PJSipManager.h"
 #include "SipEndpointFactory.h"
 #include "SipSession.h"
@@ -21,14 +23,25 @@
 
 #include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
 #include <AsteriskSCF/logger.h>
+#include <AsteriskSCF/Collections/Set.h>
 #include "NATOptions.h"
 #include <AsteriskSCF/Media/MediaIf.h>
 #include <AsteriskSCF/Media/SDP/MediaSDPIf.h>
+#include <AsteriskSCF/System/ExceptionsIf.h>
+#include <AsteriskSCF/Replication/SipSessionManager/SipStateReplicationIf.h>
 
+using namespace std;
 using namespace AsteriskSCF::System::Logging;
+using namespace AsteriskSCF::System::V1;
 using namespace AsteriskSCF::Media::V1;
 using namespace AsteriskSCF::Media::SDP::V1;
 using namespace AsteriskSCF::Core::Discovery::V1;
+using namespace AsteriskSCF::SessionCommunications::V1;
+using namespace AsteriskSCF::Replication::SipSessionManager::V1;
+using namespace AsteriskSCF::System::WorkQueue::V1;
+using namespace AsteriskSCF::WorkQueue;
+using namespace AsteriskSCF::Replication::SipSessionManager::V1;
+using namespace AsteriskSCF::Discovery;
 
 namespace
 {
@@ -40,6 +53,213 @@ namespace AsteriskSCF
 namespace SipSessionManager
 {
 
+class AddDefaultSessionListener : public Work
+{
+public:
+    AddDefaultSessionListener(const string& stateKey,
+                              const string& endpointName, 
+                              const SessionListenerPrx& listener,
+                              const SmartProxy<SipStateReplicatorPrx>& stateReplicator)
+        :mState(new DefaultSessionListenerItem(stateKey, "", endpointName, listener)),
+         mStateReplicator(stateReplicator)
+    {
+    }
+
+    void execute()
+    {
+        SipStateItemSeq items;
+        items.push_back(mState);
+
+        Ice::ObjectPrx oneway;
+        try
+        {
+            oneway = mStateReplicator->ice_oneway();
+        }
+        catch (const Ice::NoEndpointException&)
+        {
+            lg(Error) << "No endpoint for oneway invocation of setState() for state replication. Using two-way proxy.";
+            mStateReplicator->setState(items);
+            return;
+        }
+
+        SipStateReplicatorPrx oneWayStateReplicator = SipStateReplicatorPrx::uncheckedCast(oneway);
+
+        try
+        {
+            oneWayStateReplicator->setState(items);
+        }
+        catch (const Ice::TwowayOnlyException&)
+        {
+            lg(Error) << "setState() is not oneway. Using two-way.";
+            mStateReplicator->setState(items);
+        }        
+    }
+private:
+    DefaultSessionListenerItemPtr mState; 
+    SmartProxy<SipStateReplicatorPrx> mStateReplicator;
+};
+
+class RemoveDefaultSessionListener : public Work
+{
+public:
+    RemoveDefaultSessionListener(const string& stateKey,
+                              const string& endpointName, 
+                              const SessionListenerPrx& listener,
+                              const SmartProxy<SipStateReplicatorPrx>& stateReplicator)
+        :mState(new DefaultSessionListenerItem(stateKey, "", endpointName, listener)),
+         mStateReplicator(stateReplicator)
+    {
+    }
+
+    void execute()
+    {
+        SipStateItemSeq items;
+        items.push_back(mState);
+
+        Ice::ObjectPrx oneway;
+        try
+        {
+            oneway = mStateReplicator->ice_oneway();
+        }
+        catch (const Ice::NoEndpointException&)
+        {
+            lg(Error) << "No endpoint for oneway invocation of removeState() for state replication. Using two-way proxy.";
+            mStateReplicator->removeStateForItems(items);
+            return;
+        }
+
+        SipStateReplicatorPrx oneWayStateReplicator = SipStateReplicatorPrx::uncheckedCast(oneway);
+
+        try
+        {
+            oneWayStateReplicator->removeStateForItems(items);
+        }
+        catch (const Ice::TwowayOnlyException&)
+        {
+            lg(Error) << "removeState() is not oneway. Using two-way.";
+            mStateReplicator->removeStateForItems(items);
+        }        
+    }
+private:
+    DefaultSessionListenerItemPtr mState; 
+    SmartProxy<SipStateReplicatorPrx> mStateReplicator;
+};
+
+string cookieKey(string &endpointName, const SessionCookiePtr& cookie)
+{
+    string key = endpointName + "::" + cookie->ice_id();
+    return key;
+}
+
+class AddDefaultCookies : public Work
+{
+public:
+    AddDefaultCookies(const string& endpointName, 
+                      const SessionCookies& cookies,
+                      const SmartProxy<SipStateReplicatorPrx>& stateReplicator)
+        : mEndpointName(endpointName),
+          mCookies(cookies),
+          mStateReplicator(stateReplicator)
+    {
+    }
+
+    void execute()
+    {
+        SipStateItemSeq items;
+
+        for(SessionCookies::iterator i=mCookies.begin(); i != mCookies.end(); ++i)
+        {
+            DefaultSessionCookieItemPtr cookieItem = 
+               new DefaultSessionCookieItem(cookieKey(mEndpointName, *i),
+                                            "",
+                                            mEndpointName,
+                                            (*i));
+        }
+
+        Ice::ObjectPrx oneway;
+        try
+        {
+            oneway = mStateReplicator->ice_oneway();
+        }
+        catch (const Ice::NoEndpointException&)
+        {
+            lg(Error) << "No endpoint for oneway invocation of setState() for state replication. Using two-way proxy.";
+            mStateReplicator->setState(items);
+            return;
+        }
+
+        SipStateReplicatorPrx oneWayStateReplicator = SipStateReplicatorPrx::uncheckedCast(oneway);
+
+        try
+        {
+            oneWayStateReplicator->setState(items);
+        }
+        catch (const Ice::TwowayOnlyException&)
+        {
+            lg(Error) << "setState() is not oneway. Using two-way.";
+            mStateReplicator->setState(items);
+        }        
+    }
+private:
+    string mEndpointName;
+    SessionCookies mCookies; 
+    SmartProxy<SipStateReplicatorPrx> mStateReplicator;
+};
+
+class RemoveDefaultCookies : public Work
+{
+public:
+    RemoveDefaultCookies(const string& endpointName, 
+                      const SessionCookies& cookies,
+                      const SmartProxy<SipStateReplicatorPrx>& stateReplicator)
+        : mEndpointName(endpointName),
+          mCookies(cookies),
+          mStateReplicator(stateReplicator)
+    {
+    }
+
+    void execute()
+    {
+        SipStateItemSeq items;
+
+        for(SessionCookies::iterator i=mCookies.begin(); i != mCookies.end(); ++i)
+        {
+            DefaultSessionCookieItemPtr cookieItem = 
+                new DefaultSessionCookieItem(cookieKey(mEndpointName, *i),
+                                             "",
+                                             mEndpointName,
+                                             (*i));
+}
+
+        Ice::ObjectPrx oneway;
+        try
+        {
+            oneway = mStateReplicator->ice_oneway();
+        }
+        catch (const Ice::NoEndpointException&)
+        {
+            lg(Error) << "No endpoint for oneway invocation of removeState() for state replication. Using two-way proxy.";
+            mStateReplicator->removeStateForItems(items);
+            return;
+        }
+
+        SipStateReplicatorPrx oneWayStateReplicator = SipStateReplicatorPrx::uncheckedCast(oneway);
+
+        try
+        {
+            oneWayStateReplicator->removeStateForItems(items);
+        }
+        catch (const Ice::TwowayOnlyException&)
+        {
+            lg(Error) << "removeState() is not oneway. Using two-way.";
+            mStateReplicator->removeStateForItems(items);
+        }        
+    }
+private:
+    string mEndpointName;
+    SessionCookies mCookies; 
+    SmartProxy<SipStateReplicatorPrx> mStateReplicator;
+};
 /**
  * Class used to store information about the configured formats.
  */
@@ -164,15 +384,27 @@ public:
     /**
      * Constructor for the SipEndpointImplPriv class.
      */
-    SipEndpointImplPriv(const Ice::ObjectAdapterPtr& adapter, const boost::shared_ptr<SipEndpointFactory>& factory,
-            const std::string& name, const PJSipManagerPtr& manager,
-            const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
-            const AsteriskSCF::System::Component::V1::ReplicaPtr& replica) :
+    SipEndpointImplPriv(const Ice::ObjectAdapterPtr& adapter, 
+                        const boost::shared_ptr<SipEndpointFactory>& factory,
+                        const std::string& name, 
+                        const PJSipManagerPtr& manager,
+                        const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
+                        const AsteriskSCF::System::Component::V1::ReplicaPtr& replica,
+                        const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::Replication::SipSessionManager::V1::SipStateReplicatorPrx>& stateReplicator,
+                        const AsteriskSCF::System::WorkQueue::V1::QueuePtr& workQueue) :
         mName(name), mAdapter(adapter), mEndpointFactory(factory), mManager(manager), mServiceLocator(serviceLocator),
-        mReplica(replica)
+        mReplica(replica), mStateReplicator(stateReplicator), mWorkQueue(workQueue),
+        mDefaultListeners(adapter, lg, "Default Session Listeners"),
+        mDefaultSessionCookies(adapter, lg, "Default Session Cookies")
     {
     };
     
+    std::string replicaKeyName(SessionListenerPrx listener)
+    {
+        std::string key = mName + "::" + mAdapter->getCommunicator()->identityToString(listener->ice_getIdentity());
+        return key;
+    }
+     
     /**
      * The name of the endpoint.
      */
@@ -222,18 +454,32 @@ public:
      *
      */
     AsteriskSCF::System::Component::V1::ReplicaPtr mReplica;
+
+    // A WorkQueue for state replication. 
+    AsteriskSCF::System::WorkQueue::V1::QueuePtr mWorkQueue;
+
+    // The state replicator for this component.
+    AsteriskSCF::Discovery::SmartProxy<
+        AsteriskSCF::Replication::SipSessionManager::V1::SipStateReplicatorPrx> mStateReplicator;
+
+    AsteriskSCF::Collections::ProxySet<SessionListenerPrx> mDefaultListeners;
+    AsteriskSCF::Collections::HandleSet<SessionCookiePtr> mDefaultSessionCookies;
 };
 
 SipEndpoint::SipEndpoint(const Ice::ObjectAdapterPtr& adapter, 
-    boost::shared_ptr<SipEndpointFactory> factory, std::string name, 
-    const PJSipManagerPtr& manager,
-    const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator, 
-    const AsteriskSCF::System::Component::V1::ReplicaPtr replica)
-    : mImplPriv(new SipEndpointImplPriv(adapter, factory, name, manager, serviceLocator, replica))
+                         boost::shared_ptr<SipEndpointFactory> factory, 
+                         std::string name, 
+                         const PJSipManagerPtr& manager,
+                         const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator, 
+                         const AsteriskSCF::System::Component::V1::ReplicaPtr replica,
+                         const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::Replication::SipSessionManager::V1::SipStateReplicatorPrx>& stateReplicator,
+                         const AsteriskSCF::System::WorkQueue::V1::QueuePtr& workQueue)
+    : mImplPriv(new SipEndpointImplPriv(adapter, factory, name, manager, serviceLocator, replica, stateReplicator, workQueue))
 {
     lg(Debug) << "Constructing SIP endpoint " << name;
 
-    mImplPriv->mEndpointProxy = AsteriskSCF::SessionCommunications::V1::SessionEndpointPrx::uncheckedCast(mImplPriv->mAdapter->addWithUUID(this));
+    mImplPriv->mEndpointProxy = AsteriskSCF::SessionCommunications::V1::SessionEndpointPrx::uncheckedCast(
+        mImplPriv->mAdapter->addWithUUID(this));
 }
 
 void SipEndpoint::removeFromAdapter()
@@ -344,10 +590,12 @@ std::string SipEndpoint::getId(const Ice::Current&)
     return mImplPriv->mEndpointProxy->ice_getIdentity().name;
 }
 
+/**
+ * This implements the slice-defined interface for the session factory method. 
+ */
 AsteriskSCF::SessionCommunications::V1::SessionPrx SipEndpoint::createSession(const std::string& destination,
-        const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener, const Ice::Current&)
+        const SessionListenerPrx& listener, const Ice::Current&)
 {
-
     std::cout << "Got call over Ice to create a session for endpoint " << mImplPriv->mName << std::endl;
     if (mImplPriv->mConfig.sessionConfig.callDirection != BOTH &&
             mImplPriv->mConfig.sessionConfig.callDirection != INBOUND)
@@ -356,7 +604,12 @@ AsteriskSCF::SessionCommunications::V1::SessionPrx SipEndpoint::createSession(co
         return 0;
     }
 
-    SipSessionPtr session = new SipSession(mImplPriv->mAdapter, this, destination, listener, mImplPriv->mManager,
+    vector<SessionListenerPrx> listeners = mImplPriv->mDefaultListeners.getAll();
+    listeners.push_back(listener);
+
+    SessionCookies defaultCookies = mImplPriv->mDefaultSessionCookies.getAll();
+
+    SipSessionPtr session = new SipSession(mImplPriv->mAdapter, this, destination, listeners, defaultCookies, mImplPriv->mManager,
         mImplPriv->mServiceLocator, mImplPriv->mReplica, mImplPriv->mConfig.sessionConfig.rtpOverIPv6, true, 
         NATEndpointOptions(mImplPriv->mConfig.sessionConfig.rtpOverICE, mImplPriv->mConfig.sessionConfig.rtpICEIncludeTURN,
             mImplPriv->mConfig.transportConfig.enableNAT));
@@ -367,7 +620,10 @@ AsteriskSCF::SessionCommunications::V1::SessionPrx SipEndpoint::createSession(co
 
 AsteriskSCF::SipSessionManager::SipSessionPtr SipEndpoint::createSession(const std::string& destination)
 {
-    SipSessionPtr session = new SipSession(mImplPriv->mAdapter, this, destination, 0, mImplPriv->mManager,
+    vector<SessionListenerPrx> defaultListeners =  mImplPriv->mDefaultListeners.getAll();
+    SessionCookies defaultCookies = mImplPriv->mDefaultSessionCookies.getAll();
+
+    SipSessionPtr session = new SipSession(mImplPriv->mAdapter, this, destination, defaultListeners, defaultCookies, mImplPriv->mManager,
         mImplPriv->mServiceLocator, mImplPriv->mReplica, mImplPriv->mConfig.sessionConfig.rtpOverIPv6, false,
         NATEndpointOptions(mImplPriv->mConfig.sessionConfig.rtpOverICE, mImplPriv->mConfig.sessionConfig.rtpICEIncludeTURN,
             mImplPriv->mConfig.transportConfig.enableNAT)
@@ -376,11 +632,15 @@ AsteriskSCF::SipSessionManager::SipSessionPtr SipEndpoint::createSession(const s
     return session;
 }
 
+/**
+ * This factory method is used to create sessions in a standby component.  
+ */
 AsteriskSCF::SipSessionManager::SipSessionPtr SipEndpoint::createSession(const std::string& destination,
-                                                                         const Ice::Identity& sessionid, const Ice::Identity& mediaid,
-                                                                         const AsteriskSCF::Replication::SipSessionManager::V1::RTPMediaSessionSeq& mediasessions,
-                                                                         const AsteriskSCF::Media::V1::StreamSourceSeq& sources,
-                                                                         const AsteriskSCF::Media::V1::StreamSinkSeq& sinks)
+                                                    const Ice::Identity& sessionid, 
+                                                    const Ice::Identity& mediaid,
+                                                    const AsteriskSCF::Replication::SipSessionManager::V1::RTPMediaSessionSeq& mediasessions,
+                                                    const AsteriskSCF::Media::V1::StreamSourceSeq& sources,
+                                                    const AsteriskSCF::Media::V1::StreamSinkSeq& sinks)
 {
     SipSessionPtr session = new SipSession(mImplPriv->mAdapter, this, destination, sessionid, mediaid, mediasessions,
             sources, sinks, mImplPriv->mManager, mImplPriv->mServiceLocator, mImplPriv->mReplica, false,
@@ -420,6 +680,77 @@ std::string SipEndpoint::getName()
     return mImplPriv->mName;
 };
 
+void SipEndpoint::addDefaultSessionListener(const SessionListenerPrx& listener, const Ice::Current& current)
+{
+    mImplPriv->mDefaultListeners.add(listener);
+
+    if (mImplPriv->mReplica->isActive())
+    {
+        // Replicate this information.
+        WorkPtr work = new AddDefaultSessionListener(
+                        mImplPriv->replicaKeyName(listener),
+                        mImplPriv->mName,
+                        listener,
+                        mImplPriv->mStateReplicator);
+
+        mImplPriv->mWorkQueue->enqueueWork(work);
+    }
+}
+
+void SipEndpoint::removeDefaultSessionListener(const SessionListenerPrx& listener, const Ice::Current&)
+{
+    mImplPriv->mDefaultListeners.remove(listener);
+
+    if (mImplPriv->mReplica->isActive())
+    {
+        // Replicate this information. 
+        WorkPtr work = new RemoveDefaultSessionListener(mImplPriv->replicaKeyName(listener),
+                                                        mImplPriv->mName,
+                                                        listener,
+                                                        mImplPriv->mStateReplicator);
+        mImplPriv->mWorkQueue->enqueueWork(work);
+    }
+}
+
+void SipEndpoint::addDefaultSessionCookies(const SessionCookies& cookies, const Ice::Current&)
+{
+    mImplPriv->mDefaultSessionCookies.add(cookies);
+
+    if (mImplPriv->mReplica->isActive())
+    {
+        // Replicate this information.
+        WorkPtr work = new AddDefaultCookies(mImplPriv->mName,
+                                             cookies,
+                                             mImplPriv->mStateReplicator);
+        mImplPriv->mWorkQueue->enqueueWork(work);
+    }
+}
+
+void SipEndpoint::removeDefaultSessionCookies(const SessionCookies& cookies, const Ice::Current&)
+{
+    mImplPriv->mDefaultSessionCookies.remove(cookies);
+
+    if (mImplPriv->mReplica->isActive())
+    {
+        // Replicate this information.
+        WorkPtr work = new RemoveDefaultCookies(mImplPriv->mName,
+                                                cookies,
+                                                mImplPriv->mStateReplicator);
+        mImplPriv->mWorkQueue->enqueueWork(work);
+    }
+}
+
+void SipEndpoint::addDefaultSessionCookie(const SessionCookiePtr& cookie)
+{
+    mImplPriv->mDefaultSessionCookies.add(cookie);
+}
+
+void SipEndpoint::removeDefaultSessionCookie(const SessionCookiePtr& cookie)
+{
+    mImplPriv->mDefaultSessionCookies.remove(cookie);
+
+}
+
 AsteriskSCF::SessionCommunications::V1::SessionEndpointPrx SipEndpoint::getEndpointProxy()
 {
     return mImplPriv->mEndpointProxy;
diff --git a/src/SipEndpoint.h b/src/SipEndpoint.h
index 2ce13ff..a4dfa1f 100644
--- a/src/SipEndpoint.h
+++ b/src/SipEndpoint.h
@@ -33,6 +33,9 @@
 #include <AsteriskSCF/Media/MediaIf.h>
 #include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
 #include <AsteriskSCF/Media/SDP/MediaSDPIf.h>
+#include <AsteriskSCF/WorkQueue/WorkQueue.h>
+#include <AsteriskSCF/Discovery/SmartProxy.h>
+#include "SipStateReplicationIf.h"
 
 #include "SipSession.h"
 
@@ -266,9 +269,14 @@ class SipEndpoint : public AsteriskSCF::SessionCommunications::V1::SessionEndpoi
 {
 public:
 
-    SipEndpoint(const Ice::ObjectAdapterPtr& adapter, boost::shared_ptr<SipEndpointFactory> factory, std::string name, 
-        const PJSipManagerPtr& manager, const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator, 
-        const AsteriskSCF::System::Component::V1::ReplicaPtr replica);
+    SipEndpoint(const Ice::ObjectAdapterPtr& adapter, 
+        boost::shared_ptr<SipEndpointFactory> factory, 
+        std::string name, 
+        const PJSipManagerPtr& manager, 
+        const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator, 
+        const AsteriskSCF::System::Component::V1::ReplicaPtr replica,
+        const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::Replication::SipSessionManager::V1::SipStateReplicatorPrx>& stateReplicator,
+        const AsteriskSCF::System::WorkQueue::V1::QueuePtr& workQueue);
 
     bool operator==(const std::string &name) const;
 
@@ -279,6 +287,18 @@ public:
     AsteriskSCF::SessionCommunications::V1::SessionPrx createSession(const std::string&,
             const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx&, const Ice::Current&);
     AsteriskSCF::SessionCommunications::V1::SessionSeq getSessions(const Ice::Current&);
+    void addDefaultSessionListener(
+        const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener, 
+        const Ice::Current& = Ice::Current());
+    void removeDefaultSessionListener(
+        const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
+         const Ice::Current& = Ice::Current());
+     void addDefaultSessionCookies(
+         const AsteriskSCF::SessionCommunications::V1::SessionCookies& cookies,
+         const Ice::Current&);
+     void removeDefaultSessionCookies(
+         const AsteriskSCF::SessionCommunications::V1::SessionCookies& cookies,
+         const Ice::Current&);
 
     /**
      * Implementation specific.
@@ -336,6 +356,11 @@ public:
     void setSRTPOptions(const SipEndpointMediaSRTPConfig& srtpConfig);
     void setSignalingNATOptions(bool enable);
 
+     void addDefaultSessionCookie(
+         const AsteriskSCF::SessionCommunications::V1::SessionCookiePtr& cookie);
+
+     void removeDefaultSessionCookie(
+         const AsteriskSCF::SessionCommunications::V1::SessionCookiePtr& cookie);
 private:
     /**
      * Private implementation details.
diff --git a/src/SipEndpointFactory.cpp b/src/SipEndpointFactory.cpp
index 49724d6..0c5a024 100644
--- a/src/SipEndpointFactory.cpp
+++ b/src/SipEndpointFactory.cpp
@@ -16,10 +16,17 @@
 
 #include <pjlib.h>
 #include <AsteriskSCF/logger.h>
+#include <AsteriskSCF/WorkQueue/WorkQueue.h>
+#include <AsteriskSCF/WorkQueue/DefaultQueueListener.h>
+
 #include "SipEndpoint.h"
 #include "SipEndpointFactory.h"
 
 using namespace AsteriskSCF::System::Logging;
+using namespace AsteriskSCF::System::WorkQueue::V1;
+using namespace AsteriskSCF::WorkQueue;
+using namespace AsteriskSCF::Discovery;
+using namespace AsteriskSCF::Replication::SipSessionManager::V1;
 
 namespace
 {
@@ -31,9 +38,27 @@ namespace AsteriskSCF
 namespace SipSessionManager
 {
 
+SipEndpointFactory::SipEndpointFactory(const Ice::ObjectAdapterPtr& adapter, 
+            const PJSipManagerPtr& manager,
+            const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
+            const AsteriskSCF::System::Component::V1::ReplicaPtr& replica,
+            const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::Replication::SipSessionManager::V1::SipStateReplicatorPrx>& replicator) 
+      : mAdapter(adapter), 
+        mManager(manager), 
+        mServiceLocator(serviceLocator), 
+        mReplica(replica),
+        mStateReplicator(replicator)
+{ 
+    mQueue = new AsteriskSCF::WorkQueue::WorkQueue();
+    AsteriskSCF::WorkQueue::DefaultQueueListenerPtr listener(new AsteriskSCF::WorkQueue::DefaultQueueListener(mQueue, 0));
+    mQueue->setListener(listener);
+}
+
 SipEndpointPtr SipEndpointFactory::createEndpoint(std::string endpointName)
 {
-    SipEndpointPtr endpoint = new SipEndpoint(mAdapter, shared_from_this(), endpointName, mManager, mServiceLocator, mReplica);
+    SipEndpointPtr endpoint = new SipEndpoint(mAdapter, shared_from_this(), 
+        endpointName, mManager, mServiceLocator, mReplica, mStateReplicator, mQueue);
+
     mEndpoints.push_back(endpoint);
     return endpoint;
 }
diff --git a/src/SipEndpointFactory.h b/src/SipEndpointFactory.h
index 09e02d9..e852864 100644
--- a/src/SipEndpointFactory.h
+++ b/src/SipEndpointFactory.h
@@ -22,6 +22,9 @@
 
 #include <AsteriskSCF/Core/Endpoint/EndpointIf.h>
 #include <AsteriskSCF/SIP/SIPRegistrarIf.h>
+#include <AsteriskSCF/WorkQueue/WorkQueue.h>
+#include <AsteriskSCF/Discovery/SmartProxy.h>
+#include "SipStateReplicationIf.h"
 
 namespace AsteriskSCF
 {
@@ -36,10 +39,11 @@ namespace SipSessionManager
 class SipEndpointFactory : public boost::enable_shared_from_this<SipEndpointFactory>
 {
 public:
-    SipEndpointFactory(const Ice::ObjectAdapterPtr& adapter, const PJSipManagerPtr& manager,
+    SipEndpointFactory(const Ice::ObjectAdapterPtr& adapter, 
+        const PJSipManagerPtr& manager,
         const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
-        const AsteriskSCF::System::Component::V1::ReplicaPtr& replica) :
-        mAdapter(adapter), mManager(manager), mServiceLocator(serviceLocator), mReplica(replica) { };
+        const AsteriskSCF::System::Component::V1::ReplicaPtr& replica,
+        const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::Replication::SipSessionManager::V1::SipStateReplicatorPrx>& stateReplicator);
 
     SipEndpointPtr createEndpoint(std::string);
 
@@ -74,6 +78,12 @@ private:
      * A pointer to the replica information.
      */
     AsteriskSCF::System::Component::V1::ReplicaPtr mReplica;
+
+    // The state replicator for this component.
+    AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::Replication::SipSessionManager::V1::SipStateReplicatorPrx> mStateReplicator;
+
+    // A work queue used by endpoints to replicate state. 
+    AsteriskSCF::System::WorkQueue::V1::QueuePtr mQueue;
 };
 
 }; // end SipSessionManager
diff --git a/src/SipSession.cpp b/src/SipSession.cpp
index 044455b..7f76c84 100755
--- a/src/SipSession.cpp
+++ b/src/SipSession.cpp
@@ -32,6 +32,7 @@
 #include <AsteriskSCF/Media/SDP/MediaSDPIf.h>
 #include "NATOptions.h"
 
+using namespace AsteriskSCF::SessionCommunications::V1;
 using namespace AsteriskSCF::System::Logging;
 using namespace AsteriskSCF::System::NAT::V1;
 using namespace AsteriskSCF::Media::RTP::V1;
@@ -414,17 +415,23 @@ void SipSession::initializePJSIPStructs()
 }
 
 /**
- * Default constructor.
+ * Standard constructor.
  */
-SipSession::SipSession(const Ice::ObjectAdapterPtr& adapter, const SipEndpointPtr& endpoint,
-        const std::string& destination,  const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
+SipSession::SipSession(const Ice::ObjectAdapterPtr& adapter, 
+        const SipEndpointPtr& endpoint,
+        const std::string& destination,  
+        const vector<SessionListenerPrx>& listeners,
+        const SessionCookies& cookies,
         const PJSipManagerPtr& manager, const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
-        const AsteriskSCF::System::Component::V1::ReplicaPtr& replica, bool ipv6, bool isUAC, const NATEndpointOptions& natOptions)
+        const AsteriskSCF::System::Component::V1::ReplicaPtr& replica, 
+        bool ipv6, bool isUAC, const NATEndpointOptions& natOptions)
     : mImplPriv(new SipSessionPriv(adapter, endpoint, destination, manager, serviceLocator, replica, natOptions))
 {
-    if (listener != 0)
+    mImplPriv->mListeners.insert(mImplPriv->mListeners.end(), listeners.begin(), listeners.end());
+
+    for(SessionCookies::const_iterator i = cookies.begin(); i != cookies.end(); ++i)
     {
-        mImplPriv->mListeners.push_back(listener);
+        mImplPriv->mSessionCookies[(*i)->ice_id()] = (*i);
     }
 
     mImplPriv->mSessionProxy =
diff --git a/src/SipSession.h b/src/SipSession.h
index 60d800b..35ba4c1 100644
--- a/src/SipSession.h
+++ b/src/SipSession.h
@@ -105,7 +105,9 @@ class SipSession : public AsteriskSCF::SessionCommunications::V1::Session
 {
 public:
     SipSession(const Ice::ObjectAdapterPtr&, const SipEndpointPtr&, const std::string&,
-        const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx&, const PJSipManagerPtr& manager,
+        const std::vector<AsteriskSCF::SessionCommunications::V1::SessionListenerPrx>&,         
+        const AsteriskSCF::SessionCommunications::V1::SessionCookies&, 
+        const PJSipManagerPtr& manager,
         const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
         const AsteriskSCF::System::Component::V1::ReplicaPtr& replica,
         bool ipv6, bool isUAC, const NATEndpointOptions& natOptions);
diff --git a/src/SipSessionManagerApp.cpp b/src/SipSessionManagerApp.cpp
index 64aa0bb..f28affb 100644
--- a/src/SipSessionManagerApp.cpp
+++ b/src/SipSessionManagerApp.cpp
@@ -544,20 +544,23 @@ void SipSessionManager::initialize(const string& appName, const Ice::Communicato
         // it now.
         mServiceLocator = ServiceLocatorPrx::checkedCast(mCommunicator->propertyToProxy("LocatorService.Proxy"));
 
+        // Locate the State Replicator so we can fail over!
+        locateStateReplicator();
+
         // Create and publish our Replica interface support.
         mReplicaService = new ReplicaImpl(mLocalAdapter);
         mLocalAdapter->add(mReplicaService, mCommunicator->stringToIdentity(ReplicaServiceId));
         lg(Debug) << "Created SIP Replica Implementation";
 
-        mEndpointFactory.reset(new SipEndpointFactory(mGlobalAdapter, mPJSipManager, mServiceLocator, mReplicaService));
+        mEndpointFactory.reset(new SipEndpointFactory(mGlobalAdapter, mPJSipManager, mServiceLocator, mReplicaService, mStateReplicator));
         lg(Debug) << "Created SIP endpoint factory";
 
         mRegistrarListener = new SipDefaultRegistrarListener(mEndpointFactory);
         mGlobalAdapter->add(mRegistrarListener, mCommunicator->stringToIdentity(RegistrarListenerId));
         lg(Debug) << "Added default registrar listener to object adapter";
 
-	// Locate the Routing Service so that we can do routing. This is done here so it can be passed to the configuration service.
-	locateRoutingService();
+        // Locate the Routing Service so that we can do routing. This is done here so it can be passed to the configuration service.
+        locateRoutingService();
 
         // Create and publish our Configuration interface support.
         mConfigurationService = createConfigurationServant(mPJSipManager, mEndpointFactory, mRoutingId, mRoutingServiceLocatorRegistry);
@@ -610,9 +613,6 @@ void SipSessionManager::start(const string& name, const Ice::CommunicatorPtr& ic
     // Locate the Session Router so we can REALLY do routing.
     locateSessionRouter();
 
-    // Locate the State Replicator so we can fail over!
-    locateStateReplicator();
-
     registerPJSipModules();
 
     // Register with the state replicator if we are a listener
diff --git a/src/SipStateReplicator.h b/src/SipStateReplicator.h
index d53bc81..8856d48 100644
--- a/src/SipStateReplicator.h
+++ b/src/SipStateReplicator.h
@@ -39,6 +39,8 @@ typedef IceUtil::Handle<SipStateReplicatorI> SipStateReplicatorIPtr;
 //
 struct SipStateReplicatorListenerImpl; 
 
+class SipEndpointFactory;
+
 class SipStateReplicatorListenerI : public AsteriskSCF::Replication::SipSessionManager::V1::SipStateReplicatorListener
 {
 public:
diff --git a/src/SipStateReplicatorListener.cpp b/src/SipStateReplicatorListener.cpp
index ed9fd83..0ddeccb 100644
--- a/src/SipStateReplicatorListener.cpp
+++ b/src/SipStateReplicatorListener.cpp
@@ -137,6 +137,9 @@ public:
         for (SipStateItemSeq::const_iterator iter = items.begin(); iter != items.end(); ++iter)
         {
             SipRegistrarStateItemPtr regItem;
+            DefaultSessionListenerItemPtr defaultListenerItem;
+            DefaultSessionCookieItemPtr defaultCookieItem;
+
             if ((regItem = SipRegistrarStateItemPtr::dynamicCast((*iter))))
             {
                 PJSipRegistrarModulePtr regModule;
@@ -149,6 +152,30 @@ public:
                 }
                 removeRegistrationState(regModule, regItem);
             }
+            else if(defaultListenerItem = DefaultSessionListenerItemPtr::dynamicCast((*iter)))
+            {
+                SipEndpointPtr endpoint = mEndpointFactory->findByName(defaultListenerItem->endpointName);
+
+                // If we lack the endpoint (due to misconfiguration) we can't procceed
+                if (endpoint == 0)
+                {
+                    continue;
+                }
+
+                endpoint->removeDefaultSessionListener(defaultListenerItem->listener);
+            }
+            else if(defaultCookieItem = DefaultSessionCookieItemPtr::dynamicCast((*iter)))
+            {
+                SipEndpointPtr endpoint = mEndpointFactory->findByName(defaultCookieItem->endpointName);
+
+                // If we lack the endpoint (due to misconfiguration) we can't procceed
+                if (endpoint == 0)
+                {
+                    continue;
+                }
+
+                endpoint->removeDefaultSessionCookie(defaultCookieItem->cookie);
+            }
         }
     }
 
@@ -239,6 +266,8 @@ public:
             SipTransactionStateItemPtr transaction;
             SipRegistrarStateItemPtr regItem;
             boost::shared_ptr<SipStateReplicatorItem> localitem;
+            DefaultSessionListenerItemPtr defaultListenerItem;
+            DefaultSessionCookieItemPtr defaultCookieItem;
 
             // Depending on the type of state item we apply it differently
             if ((session = SipSessionStateItemPtr::dynamicCast((*item))))
@@ -271,7 +300,7 @@ public:
 
                 localitem->getSession()->setListeners(session->mListeners);
                 localitem->getSession()->setBridge(session->mBridge);
-		localitem->getSession()->setCookies(session->mCookies);
+                localitem->getSession()->setCookies(session->mCookies);
             }
             else if ((dialog = SipDialogStateItemPtr::dynamicCast((*item))))
             {
@@ -430,6 +459,30 @@ public:
                 }
                 setRegistrationState(regModule, regItem);
             }
+            else if (defaultListenerItem  = DefaultSessionListenerItemPtr::dynamicCast((*item)))
+            {
+                SipEndpointPtr endpoint = mEndpointFactory->findByName(defaultListenerItem->endpointName);
+
+                // If we lack the endpoint (due to misconfiguration) we can't procceed
+                if (endpoint == 0)
+                {
+                    continue;
+                }
+
+                endpoint->addDefaultSessionListener(defaultListenerItem->listener);
+            }
+            else if (defaultCookieItem  = DefaultSessionCookieItemPtr::dynamicCast((*item)))
+            {
+                SipEndpointPtr endpoint = mEndpointFactory->findByName(defaultCookieItem->endpointName);
+
+                // If we lack the endpoint (due to misconfiguration) we can't procceed
+                if (endpoint == 0)
+                {
+                    continue;
+                }
+
+                endpoint->addDefaultSessionCookie(defaultCookieItem->cookie);
+            }
         }
     }
     std::string mId;
@@ -454,7 +507,7 @@ void SipStateReplicatorListenerI::stateRemoved(const Ice::StringSeq& itemKeys, c
     mImpl->removeStateNoticeImpl(itemKeys);
 }
 
-void SipStateReplicatorListenerI::stateSet(const SipStateItemSeq& items, const Ice::Current&)
+void SipStateReplicatorListenerI::stateSet(const SipStateItemSeq& items, const Ice::Current& current)
 {
     mImpl->setStateNoticeImpl(items);
 }

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/sip.git



More information about the asterisk-scf-commits mailing list