[asterisk-scf-commits] asterisk-scf/integration/sip.git branch "threading" created.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Wed May 4 17:18:57 CDT 2011
branch "threading" has been created
at 5620383d390adef3a55d7a81234f1e1a8ff03430 (commit)
- Log -----------------------------------------------------------------
commit 5620383d390adef3a55d7a81234f1e1a8ff03430
Author: Mark Michelson <mmichelson at digium.com>
Date: Wed May 4 17:18:10 2011 -0500
Add some of those stubs for some early thread pool work.
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 254d404..755ab2f 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -4,6 +4,8 @@ endif()
include_directories(${logger_dir}/include)
include_directories(${utils_dir}/StateReplicator/include)
include_directories(${utils_dir}/SmartProxy/include)
+include_directories(${utils_dir}/WorkQueue/include)
+include_directories(${utils_dir}/ThreadPool/include)
include_directories(${API_INCLUDE_DIR})
asterisk_scf_slice_include_directories(${API_SLICE_DIR})
diff --git a/src/PJSipSessionModule.cpp b/src/PJSipSessionModule.cpp
index 868670a..d2c8b87 100644
--- a/src/PJSipSessionModule.cpp
+++ b/src/PJSipSessionModule.cpp
@@ -51,6 +51,8 @@ using namespace AsteriskSCF::Core::Endpoint::V1;
using namespace AsteriskSCF::SessionCommunications::V1;
using namespace AsteriskSCF::Media::V1;
using namespace AsteriskSCF::SIP::V1;
+using namespace AsteriskSCF::System::ThreadPool::V1;
+using namespace AsteriskSCF::System::WorkQueue::V1;
class RouteSessionCallback : public IceUtil::Shared
{
@@ -1234,5 +1236,49 @@ pjsip_dialog *PJSipSessionModule::uaOnDialogForked(pjsip_dialog*, pjsip_rx_data*
return NULL;
}
+PJSipSessionModuleThreadPoolListener::PJSipSessionModuleThreadPoolListener()
+{
+}
+
+void PJSipSessionModuleThreadPoolListener::stateChanged(const PoolPtr&, int, int, int)
+{
+ //stub
+}
+
+void PJSipSessionModuleThreadPoolListener::queueWorkAdded(const PoolPtr&, int, bool)
+{
+ //stub
+}
+
+void PJSipSessionModuleThreadPoolListener::queueEmptied(const PoolPtr&)
+{
+ //stub
+}
+
+SessionWork::SessionWork(const QueuePtr&)
+{
+ //stub
+}
+
+void SessionWork::workAdded(int, bool)
+{
+ //stub
+}
+
+void SessionWork::emptied()
+{
+ //stub
+}
+
+void SessionWork::execute()
+{
+ //stub
+}
+
+void SessionWork::enqueueWork(const SuspendableWorkPtr&)
+{
+ //stub
+}
+
}; //end namespace SipSessionManager
}; //end namespace AsteriskSCF
diff --git a/src/PJSipSessionModule.h b/src/PJSipSessionModule.h
index e22c31d..e680471 100644
--- a/src/PJSipSessionModule.h
+++ b/src/PJSipSessionModule.h
@@ -25,6 +25,8 @@
#include <AsteriskSCF/SmartProxy.h>
#include <AsteriskSCF/System/Component/ReplicaIf.h>
+#include <AsteriskSCF/System/ThreadPool/ThreadPoolIf.h>
+#include <AsteriskSCF/System/WorkQueue/WorkQueueIf.h>
#include "SipStateReplicator.h"
#include "SipSession.h"
@@ -54,6 +56,60 @@ private:
SipSessionPtr mSession;
};
+class PJSipSessionModuleThreadPoolListenerPriv;
+
+/**
+ * Listens to the PJSipSessionModule's thread pool.
+ *
+ * It is responsible for all decisions regarding the pool. This mostly means
+ * that it decides when the pool should have greater or fewer threads.
+ */
+class PJSipSessionModuleThreadPoolListener : public AsteriskSCF::System::ThreadPool::V1::PoolListener
+{
+public:
+ PJSipSessionModuleThreadPoolListener();
+ void stateChanged(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool, int active, int idle, int zombie);
+ void queueWorkAdded(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool, int count, bool wasEmpty);
+ void queueEmptied(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool);
+private:
+ boost::shared_ptr<PJSipSessionModuleThreadPoolListenerPriv> mPriv;
+};
+
+typedef IceUtil::Handle<PJSipSessionModuleThreadPoolListener> PJSipSessionModuleThreadPoolListenerPtr;
+
+class SessionWorkPriv;
+
+/**
+ * This is where session related work items get enqueued.
+ *
+ * Each session will have its own SessionWork object to which it will queue suspendable work.
+ */
+class SessionWork : public AsteriskSCF::System::WorkQueue::V1::Work, public AsteriskSCF::System::WorkQueue::V1::QueueListener
+{
+public:
+ /**
+ * Overrides of QueueListener interface
+ */
+ SessionWork(const AsteriskSCF::System::WorkQueue::V1::QueuePtr& threadPoolQueue);
+ void workAdded(int newWork, bool wasEmpty);
+ void emptied();
+
+ /**
+ * Override of Work interface
+ */
+ void execute();
+
+ /**
+ * This is the method that will be called within the SIP session gateway that will
+ * result in work making it into the session gateway's thread pool.
+ */
+ void enqueueWork(const AsteriskSCF::System::WorkQueue::V1::SuspendableWorkPtr& work);
+private:
+ boost::shared_ptr<SessionWorkPriv> mPriv;
+};
+
+typedef IceUtil::Handle<SessionWork> SessionWorkPtr;
+
class PJSipSessionModule : public PJSipModule
{
public:
@@ -96,6 +152,9 @@ private:
AsteriskSCF::SmartProxy::SmartProxy<AsteriskSCF::SIP::V1::SipStateReplicatorPrx> mStateReplicator;
AsteriskSCF::System::Component::V1::ReplicaPtr mReplica;
pjsip_endpoint *mEndpoint;
+ AsteriskSCF::System::ThreadPool::V1::PoolPtr mPool;
+ AsteriskSCF::System::WorkQueue::V1::QueuePtr mPoolQueue;
+ AsteriskSCF::System::ThreadPool::V1::PoolListenerPtr mPoolListener;
};
}; //end namespace SipSessionManager
diff --git a/src/PJSipSessionModuleConstruction.cpp b/src/PJSipSessionModuleConstruction.cpp
index f78c59d..fb6d6b2 100644
--- a/src/PJSipSessionModuleConstruction.cpp
+++ b/src/PJSipSessionModuleConstruction.cpp
@@ -14,6 +14,9 @@
* at the top of the source tree.
*/
+#include <AsteriskSCF/WorkQueue.h>
+#include <AsteriskSCF/ThreadPool.h>
+
#include "PJSipSessionModule.h"
namespace AsteriskSCF
@@ -22,6 +25,9 @@ namespace AsteriskSCF
namespace SipSessionManager
{
+using namespace AsteriskSCF::WorkQueue;
+using namespace AsteriskSCF::ThreadPool;
+
static char moduleName[] = "PJSipSessionModule";
static PJSipSessionModule *sessionModule;
@@ -114,6 +120,12 @@ PJSipSessionModule::PJSipSessionModule(pjsip_endpoint *endpt,
mModule.on_tx_response = NULL;
mModule.on_tsx_state = NULL;
+ mPoolQueue = new AsteriskSCF::WorkQueue::WorkQueue();
+ mPoolListener = new PJSipSessionModuleThreadPoolListener();
+
+ ThreadPoolFactoryPtr factory(new ThreadPoolFactory);
+ mPool = factory->createPool(mPoolListener, mPoolQueue);
+
if (pjsip_ua_instance()->id == -1)
{
pj_bzero(&mUaParam, sizeof(&mUaParam));
-----------------------------------------------------------------------
--
asterisk-scf/integration/sip.git
More information about the asterisk-scf-commits
mailing list