[asterisk-scf-commits] asterisk-scf/integration/sip.git branch "threading" created.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Wed May 4 17:18:57 CDT 2011


branch "threading" has been created
        at  5620383d390adef3a55d7a81234f1e1a8ff03430 (commit)

- Log -----------------------------------------------------------------
commit 5620383d390adef3a55d7a81234f1e1a8ff03430
Author: Mark Michelson <mmichelson at digium.com>
Date:   Wed May 4 17:18:10 2011 -0500

    Add some of those stubs for some early thread pool work.

diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 254d404..755ab2f 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -4,6 +4,8 @@ endif()
 include_directories(${logger_dir}/include)
 include_directories(${utils_dir}/StateReplicator/include)
 include_directories(${utils_dir}/SmartProxy/include)
+include_directories(${utils_dir}/WorkQueue/include)
+include_directories(${utils_dir}/ThreadPool/include)
 include_directories(${API_INCLUDE_DIR})
 
 asterisk_scf_slice_include_directories(${API_SLICE_DIR})
diff --git a/src/PJSipSessionModule.cpp b/src/PJSipSessionModule.cpp
index 868670a..d2c8b87 100644
--- a/src/PJSipSessionModule.cpp
+++ b/src/PJSipSessionModule.cpp
@@ -51,6 +51,8 @@ using namespace AsteriskSCF::Core::Endpoint::V1;
 using namespace AsteriskSCF::SessionCommunications::V1;
 using namespace AsteriskSCF::Media::V1;
 using namespace AsteriskSCF::SIP::V1;
+using namespace AsteriskSCF::System::ThreadPool::V1;
+using namespace AsteriskSCF::System::WorkQueue::V1;
 
 class RouteSessionCallback : public IceUtil::Shared
 {
@@ -1234,5 +1236,49 @@ pjsip_dialog *PJSipSessionModule::uaOnDialogForked(pjsip_dialog*, pjsip_rx_data*
     return NULL;
 }
 
+PJSipSessionModuleThreadPoolListener::PJSipSessionModuleThreadPoolListener()
+{
+}
+
+void PJSipSessionModuleThreadPoolListener::stateChanged(const PoolPtr&, int, int, int)
+{
+    //stub
+}
+
+void PJSipSessionModuleThreadPoolListener::queueWorkAdded(const PoolPtr&, int, bool)
+{
+    //stub
+}
+
+void PJSipSessionModuleThreadPoolListener::queueEmptied(const PoolPtr&)
+{
+    //stub
+}
+
+SessionWork::SessionWork(const QueuePtr&)
+{
+    //stub
+}
+
+void SessionWork::workAdded(int, bool)
+{
+    //stub
+}
+
+void SessionWork::emptied()
+{
+    //stub
+}
+
+void SessionWork::execute()
+{
+    //stub
+}
+
+void SessionWork::enqueueWork(const SuspendableWorkPtr&)
+{
+    //stub
+}
+
 }; //end namespace SipSessionManager
 }; //end namespace AsteriskSCF
diff --git a/src/PJSipSessionModule.h b/src/PJSipSessionModule.h
index e22c31d..e680471 100644
--- a/src/PJSipSessionModule.h
+++ b/src/PJSipSessionModule.h
@@ -25,6 +25,8 @@
 
 #include <AsteriskSCF/SmartProxy.h>
 #include <AsteriskSCF/System/Component/ReplicaIf.h>
+#include <AsteriskSCF/System/ThreadPool/ThreadPoolIf.h>
+#include <AsteriskSCF/System/WorkQueue/WorkQueueIf.h>
 
 #include "SipStateReplicator.h"
 #include "SipSession.h"
@@ -54,6 +56,60 @@ private:
     SipSessionPtr mSession;
 };
 
+class PJSipSessionModuleThreadPoolListenerPriv;
+
+/**
+ * Listens to the PJSipSessionModule's thread pool.
+ *
+ * It is responsible for all decisions regarding the pool. This mostly means
+ * that it decides when the pool should have greater or fewer threads.
+ */
+class PJSipSessionModuleThreadPoolListener : public AsteriskSCF::System::ThreadPool::V1::PoolListener
+{
+public:
+    PJSipSessionModuleThreadPoolListener();
+    void stateChanged(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool, int active, int idle, int zombie);
+    void queueWorkAdded(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool, int count, bool wasEmpty);
+    void queueEmptied(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool);
+private:
+    boost::shared_ptr<PJSipSessionModuleThreadPoolListenerPriv> mPriv;
+};
+
+typedef IceUtil::Handle<PJSipSessionModuleThreadPoolListener> PJSipSessionModuleThreadPoolListenerPtr;
+
+class SessionWorkPriv;
+
+/**
+ * This is where session related work items get enqueued.
+ *
+ * Each session will have its own SessionWork object to which it will queue suspendable work.
+ */
+class SessionWork : public AsteriskSCF::System::WorkQueue::V1::Work, public AsteriskSCF::System::WorkQueue::V1::QueueListener
+{
+public:
+    /**
+     * Overrides of QueueListener interface
+     */
+    SessionWork(const AsteriskSCF::System::WorkQueue::V1::QueuePtr& threadPoolQueue);
+    void workAdded(int newWork, bool wasEmpty);
+    void emptied();
+
+    /**
+     * Override of Work interface
+     */
+    void execute();
+
+    /**
+     * This is the method that will be called within the SIP session gateway that will
+     * result in work making it into the session gateway's thread pool.
+     */
+    void enqueueWork(const AsteriskSCF::System::WorkQueue::V1::SuspendableWorkPtr& work);
+private:
+    boost::shared_ptr<SessionWorkPriv> mPriv;
+};
+
+typedef IceUtil::Handle<SessionWork> SessionWorkPtr;
+
 class PJSipSessionModule : public PJSipModule
 {
 public:
@@ -96,6 +152,9 @@ private:
     AsteriskSCF::SmartProxy::SmartProxy<AsteriskSCF::SIP::V1::SipStateReplicatorPrx> mStateReplicator;
     AsteriskSCF::System::Component::V1::ReplicaPtr mReplica;
     pjsip_endpoint *mEndpoint;
+    AsteriskSCF::System::ThreadPool::V1::PoolPtr mPool;
+    AsteriskSCF::System::WorkQueue::V1::QueuePtr mPoolQueue;
+    AsteriskSCF::System::ThreadPool::V1::PoolListenerPtr mPoolListener;
 };
 
 }; //end namespace SipSessionManager
diff --git a/src/PJSipSessionModuleConstruction.cpp b/src/PJSipSessionModuleConstruction.cpp
index f78c59d..fb6d6b2 100644
--- a/src/PJSipSessionModuleConstruction.cpp
+++ b/src/PJSipSessionModuleConstruction.cpp
@@ -14,6 +14,9 @@
  * at the top of the source tree.
  */
 
+#include <AsteriskSCF/WorkQueue.h>
+#include <AsteriskSCF/ThreadPool.h>
+
 #include "PJSipSessionModule.h"
 
 namespace AsteriskSCF
@@ -22,6 +25,9 @@ namespace AsteriskSCF
 namespace SipSessionManager
 {
 
+using namespace AsteriskSCF::WorkQueue;
+using namespace AsteriskSCF::ThreadPool;
+
 static char moduleName[] = "PJSipSessionModule";
 
 static PJSipSessionModule *sessionModule;
@@ -114,6 +120,12 @@ PJSipSessionModule::PJSipSessionModule(pjsip_endpoint *endpt,
     mModule.on_tx_response = NULL;
     mModule.on_tsx_state = NULL;
 
+    mPoolQueue = new AsteriskSCF::WorkQueue::WorkQueue();
+    mPoolListener = new PJSipSessionModuleThreadPoolListener(); 
+
+    ThreadPoolFactoryPtr factory(new ThreadPoolFactory);
+    mPool = factory->createPool(mPoolListener, mPoolQueue);
+
     if (pjsip_ua_instance()->id == -1)
     {
         pj_bzero(&mUaParam, sizeof(&mUaParam));

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/sip.git



More information about the asterisk-scf-commits mailing list